对大多数用户来说,ETL的核心价值在“T”所代表的转换部分。这个阶段要做很多工作,数据清洗就是其中一项重点任务。数据清洗是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。
数据仓库中的数据是面向某一主题数据的集合,这些数据从多个业务系统中抽取而来,并且包含历史变化,因此就不可避免地出现某些数据是错误的,或者数据相互之间存在冲突的情况。这些错误的或有冲突的数据显然不是我们想要的,被称为“脏数据”。要按照一定的规则处理脏数据,这个过程就是数据清洗。数据清洗的任务是过滤那些不符合要求的数据,将过滤的结果交给业务部门,确认是直接删除掉,还是修正之后再进行抽取。不符合要求的数据主要包括不完整的数据、错误的数据、重复的数据、不一致的数据四大类。
保障数据清洗处理顺利进行的原则是优先对数据清洗处理流程进行分析和系统化的设计,针对数据的主要问题和特征,设计一系列数据对照表和数据清洗程序库的有效组合,以便面对不断变化的、形形色色的数据清洗问题。数据清洗流程通常包括如下内容:
身份证号码格式校验是很多系统在数据集成时的一个常见需求,这里以18位身份证为例,使用一个Kettle转换实现身份证号码的合法性验证。该转换执行的结果是将所有合规与不合规的身份证号码写入相应的输出文件。按以下身份证号码的定义规则建立转换。
身份证18位分别代表的含义,从左到右方分别表示:1-2省级行政区代码。3-4地级行政区划分代码。5-6县区行政区分代码。7-1011-1213-14出生年、月、日。15-17顺序码,同一地区同年、同月、同日出生人的编号,奇数是男性,偶数是女性。18校验码,如果是0-9则用0-9表示,如果是10则用X(罗马数字10)表示。
身份证校验码的计算方法:
总的Kettle转换如图6-1所示。
图6-1校验身份证号码的Kettle转换
这是本专题到目前为止步骤最多的一个转换。虽然有些复杂,但条理还比较清楚。下面具体说明每个步骤的定义和作用。
“自定义常量数据”定义8条身份证号码模拟数据作为输入,其中包括各种不合规的情况及一条完全合规的号码:
cardid110102197203270816110102197203270816111010219720327081a00102197203270816000102197203270816110102197302290816110102197202290816110102197202300816第一层的四个步骤校验号码位数,18位的数据流向第二层,其它数据输出到一个错误文件。“字符串操作”步骤作用是去除字符串两边空格(Trimtype选择both),并将字符串转为大写(Lower/Upper选择upper)。“计算器”步骤返回一个表示字符串长度的新字段length(“计算”选择“ReturnthelengthofastringA”,“字段A”选择“cardid”,“值类型”选择“String”)。“过滤记录”步骤中的“条件”为“length=18”,为真时流向下面的步骤,为假时输出错误文件。
第二层的四个步骤校验号码的前17位均为数字,合规的数据流向第三层,其它数据输出到一个错误文件。“剪切字符串”步骤将18位字符串分隔成21个字段,字符串下标从0开始,如图6-2所示。province取前两位,用于后面验证省份代码。year取第7到第10位,用于后面计算是否闰年。p17取前17位,用于判断是否纯数字。s1-s18表示每一位,用于后面计算检验位。
图6-2剪切字符串
“JavaScript代码”步骤中只有一行脚本,用isNaN函数判断前17位的字符串是否纯数字,并返回Boolean类型的字段“notnumber”:
notnumber=isNaN(p17)“过滤记录2”步骤中的“条件”为“notnumber=N”,为真时流向下面的步骤,为假时输出错误文件。
第三层的两个步骤校验两位省份代码,合规的数据流向第四层,其它数据输出到一个错误文件。“过滤记录3”步骤中的“条件”为:
provinceINLIST11;12;13;14;15;21;22;23;31;32;33;34;35;36;37;41;42;43;44;45;46;50;51;52;53;54;61;62;63;64;65;71;81;82;91“计算器2”步骤定义如图6-3所示,先定义三个常数400、100、4,然后计算year除以这三个常数的余数,用于后面判断是否闰年。
图6-3计算闰年
“JavaScript代码2”步骤中的脚本如下,按闰年定义进行判断,返回Boolean类型的字段isleapyear表示year是否为闰年。
varisleapyear;if(y1==0||y2>0&&y3==0){isleapyear=true;}else{isleapyear=false;}“Switch/case”定义如图6-4所示,根据isleapyear的值,闰年与平年走不同数据校验分支。
图6-4闰年与平年走不同分支
“数据校验”步骤验证闰年的日期规则,“要检验的字段名”选择cardid,在“合法数据的正则表达式”中填写:
^[1-9][0-9]{5}19[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|[1-2][0-9]))[0-9]{3}[0-9X]$“数据校验2”步骤验证平年的日期规则,“要检验的字段名”选择cardid,在“合法数据的正则表达式”中填写:
^[1-9][0-9]{5}19[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|1[0-9]|2[0-8]))[0-9]{3}[0-9X]$没有通过两个校验步骤的数据分别输出到一个错误文件,通过的数据流向“公式”步骤。该步骤输出一个新字段s,公式如下,即将前17位数字和系数相乘的结果相加。
([s1]*7+[s2]*9+[s3]*10+[s4]*5+[s5]*8+[s6]*4+[s7]*2+[s8]*1+[s9]*6+[s10]*3+[s11]*7+[s12]*9+[s13]*10+[s14]*5+[s15]*8+[s16]*4+[s17]*2)“计算器3”步骤定义如图6-5所示,计算s除以11的余数。
图6-5计算余数
“JavaScript代码3”步骤中的脚本如下,计算校验位,返回Boolean类型的字段valid表示校验位是否正确。
执行转换后,各错误文件和正确输出文件内容如下:
[root@localhost6]#caterr1.txtcardid;length1101021972032708161;1911010219720327081;17[root@localhost6]#caterr2.txtcardid;p17A00102197203270816;A0010219720327081[root@localhost6]#caterr3.txtcardid;province000102197203270816;00[root@localhost6]#caterr4.txtcardid110102197202300816[root@localhost6]#caterr5.txtcardid110102197302290816[root@localhost6]#caterr6.txtcardid110102197202290816[root@localhost6]#catvalid.txtcardid110102197203270816[root@localhost6]#(2)去除重复数据有两种意义上的重复记录,一是完全重复的记录,即所有字段均都重复,二是部分字段重复的记录。发生第一种重复的原因主要是表设计不周,通过给表增加主键或唯一索引列即可避免。对于第二类重复问题,通常要求查询出重复记录中的任一条记录。Kettle转换中有“去除重复记录”和“唯一行(哈希值)”两个步骤用于实现去重操作。“去除重复记录”步骤前,应该按照去除重列进行排序,否则可能返回错误的结果。“唯一行(哈希值)”步骤则不需要事先对数据进行排序。图6-6所示为一个Kettle去重的例子。
图6-6Kettle去除重复数据
“自定义常量数据”步骤定义5条记录:
idname1a2b1b3a3b“去除重复记录”步骤中“用于比较的字段”选择id,即按id字段去重。因为没有排序,该步骤输出为4条记录,id=1仍然有两条记录:
idname1a2b1b3a“去除重复记录2”步骤的定义与“去除重复记录”步骤相同,但前置了一个“排序记录”步骤,在其中定义按id和name字段排序,因此去重输出为:
idname1a2b3a“唯一行(哈希值)”步骤的输出同上,该步骤不需先排序即可按预期去重。
DW条目名称
DW标准值
业务系统
源值
员工编号
101
HR
HR库.表名.列名
OA
OA库.表名.列名
102
考勤
考勤库.表名.列名
103
绩效
绩效库.表名.列名
104
表6-1标准值映像表
这张表建立在数据仓库模式中,人员数据从各个系统抽取来以后,与标准值映像表关联,从而形成统一的标准数据。映像表被其它源数据引用,是数据一致性的关键,其维护应该与HR系统同步。因此在ETL过程中应该首先处理HR表和映像表。
Hive是Hadoop生态圈的数据仓库软件,使用类似于SQL的语言读、写、管理分布式存储上的大数据集。它建立在Hadoop之上,具有以下功能和特点:
Hive提供标准的SQL功能,包括2003以后的标准和2011标准中的分析特性。Hive中的SQL还可以通过用户定义的函数(UDFs)、用户定义的聚合函数(UDAFs)、用户定义的表函数(UDTFs)进行扩展。Hive内建连接器支持CSV文本文件、Parquet、ORC等多种数据格式,用户也可以扩展支持其它格式的连接器。Hive被设计成一个可扩展的、高性能的、容错的、与输入数据格式松耦合的系统,适合于数据仓库中的汇总、分析、批处理查询等任务,而不适合联机事务处理的工作场景。Hive包括HCatalog和WebHCat两个组件。HCatalog是Hadoop的表和存储管理层,允许使用Pig和MapReduce等数据处理工具的用户更容易读写集群中的数据。WebHCat提供了一个服务,可以使用HTTP接口执行MapReduce(或YARN)、Pig、Hive作业或元数据操作。
Hive的体系结构如图6-7所示。
图6-7Hive体系结构
Hive建立在Hadoop的分布式文件系统(HDFS)和MapReduce之上。上图中显示了Hadoop1和Hadoop2中的两种MapReduce组件。在Hadoop1中,Hive查询被转化成MapReduce代码,并且使用第一版的MapReduce框架执行,如JobTracker和TaskTracker。在Hadoop2中,YARN将资源管理和调度从MapReduce框架中解耦。Hive查询仍然被转化为MapReduce代码并执行,但使用的是YARN框架和第二版的MapReduce。
为了更好地理解Hive如何与Hadoop的基本组件一起协同工作,可以把Hadoop看做一个操作系统,HDFS和MapReduce是这个操作系统的组成部分,而象Hive、HBase这些组件,则是操作系统的上层应用或功能。Hadoop生态圈的通用底层架构是,HDFS提供分布式存储,MapReduce为上层功能提供并行处理能力。
在HDFS和MapReduce之上,图中显示了Hive驱动程序和元数据存储。Hive驱动程序及其编译器负责编译、优化和执行HiveQL。依赖于具体情况,Hive驱动程序可能选择在本地执行Hive语句或命令,也可能是产生一个MapReduce作业。Hive驱动程序把元数据存储在数据库中。
缺省配置下,Hive在内建的Derby关系数据库系统中存储元数据,这种方式被称为嵌入模式。在这种模式下,Hive驱动程序、元数据存储和Derby全部运行在同一个Java虚拟机中(JVM)。这种配置适合于学习目的,它只支持单一Hive会话,所以不能用于多用户的生产环境。Hive还允许将元数据存储于本地或远程的外部数据库中,这种设置可以更好地支持Hive的多会话生产环境。并且,可以配置任何与JDBCAPI兼容的关系数据库系统存储元数据,如MySQL、Oracle等。
对应用支持的关键组件是HiveThrift服务,它允许一个富客户端集访问Hive,开源的SQuirreLSQL客户端被作为示例包含其中。任何与JDBC兼容的应用,都可以通过绑定的JDBC驱动访问Hive。与ODBC兼容的客户端,如Linux下典型的unixODBC和isql应用程序,可以从远程Linux客户端访问Hive。如果在客户端安装了相应的ODBC驱动,甚至可以从微软的Excel访问Hive。通过Thrift还可以用Java以外的程序语言,如PHP或Python访问Hive。就像JDBC、ODBC一样,Thrift客户端通过Thrift服务器访问Hive。
架构图的最上面包括一个命令行接口(CLI),可以在Linux终端窗口向Hive驱动程序直接发出查询或管理命令。还有一个简单的Web界面,通过它可以从浏览器访问Hive管理表及其数据。
从接收到发自命令行或是应用程序的查询命令,到把结果返回给用户,期间Hive的工作流程(第一版的MapReduce)如图6-8所示。
图6-8Hive工作流程
表6-2说明Hive如何与Hadoop的基本组件进行交互。从中不难看出,Hive的执行过程与关系数据库的非常相似,只不过是使用分布式计算框架来实现。
步骤
操作
1
执行查询从Hive的CLI或WebUI发查询命令给驱动程序(任何JDBC、ODBC数据库驱动)执行。
2
获得计划驱动程序请求查询编译器解析查询、检查语法、生成查询计划或者查询所需要的资源。
3
获取元数据编译器向元数据存储数据库发送元数据请求。
4
发送元数据作为响应,元数据存储发向编译器发送元数据。
5
发送计划编译器检查需要的资源,并把查询计划发送给驱动程序。至此,查询解析完成。
6
执行计划驱动程序向执行引擎发送执行计划。
7
执行作业执行计划的处理是一个MapReduce作业。执行引擎向Namenode上的JobTracker进程发送作业,JobTracker把作业分配给Datanode上的TaskTracker进程。此时,查询执行MapReduce作业。
7.1
操作元数据执行作业的同时,执行引擎可能会执行元数据操作,如DDL语句等。
8
取回结果执行引擎从Datanode接收结果。
9
发送结果执行引擎向驱动程序发送合成的结果值。
10
发送结果驱动程序向Hive接口(CLI或WebUI)发送结果。
表6-2Hive执行流程
HiveServer2(后面简称HS2)是从Hive0.11版本开始引入的,它提供了一个服务器接口,允许客户端在Hive中执行查询并取回查询结果。当前的实现是一个HiveServer的改进版本,它基于ThriftRPC,支持多客户端身份认证和并发操作,其设计对JDBC、ODBC这样的开放API客户端提供了更好的支持。
HS2使用单一进程提供两种服务,分别是基于Thrift的Hive服务和一个JettyWeb服务器。基于Thrift的Hive服务是HS2的核心,它对Hive查询,例如从Beeline里发出的查询语句做出响应。Hive通过Thrift提供Hive元数据存储的服务。通常来说,用户不能够调用元数据存储方法来直接对元数据进行修改,而应该通过HiveQL语言让Hive来执行这样的操作。用户应该只能通过只读方式来获取表的元数据信息。
不同版本的HS2,配置属性可能会有所不同。最基本的配置是在hive-site.xml文件中设置如下属性:
HS2支持通过HTTP协议传输ThriftRPC消息(Hive0.13以后的版本),这种方式特别用于支持客户端和服务器之间存在代理层的情况。当前HS2可以运行在TCP模式或HTTP模式下,但是不能同时使用两种模式。使用下面的属性设置启用HTTP模式:
可以配置hive.server2.global.init.file.location属性指定一个全局初始化文件的位置(Hive0.14以后版本),它或者是初始化文件本身的路径,或者是一个名为“.hiverc”的文件所在的目录。在这个初始化文件中可以包含的一系列命令,这些命令会在HS2实例中运行,例如注册标准的JAR包或函数等。
如下参数配置HS2的操作日志:
缺省情况下,HS2以连接服务器的用户的身份处理查询,但是如果将下面的属性设置为false,那么查询将以运行HS2进程的用户身份执行。当遇到无法创建临时表一类的错误时,可以尝试设置此属性。
为了避免不安全的内存溢出,可以通过将以下参数设置为true,禁用文件系统缓存。
HS2的Web界面提供配置、日志、度量和活跃会话等信息,其使用的缺省端口是10002。可以设置hive-site.xml文件中的hive.server2.webui.host、hive.server2.webui.port、hive.server2.webui.max.threads等属性配置Web接口。Web界面如图6-9所示。
图6-9HiveServer2的Web界面
可以使用两种方法查看Hive版本。
Hive的执行依赖于底层的MapReduce作业,因此对Hadoop作业的优化或者对MapReduce作业的调整是提高Hive性能的基础。大多数情况下,用户不需要了解Hive内部是如何工作的。但是当对Hive具有越来越多的经验后,学习一些Hive的底层实现细节和优化知识,会让用户更加高效地使用Hive。如果没有适当的调整,那么即使查询Hive中的一个小表,有时也会耗时数分钟才得到结果。也正是因为这个原因,Hive对于OLAP类型的应用有很大的局限性,它不适合需要立即返回查询结果的场景。然而,通过实施下面一系列的调优方法,Hive查询的性能会有大幅提高。
(1)启用压缩
压缩可以使磁盘上存储的数据量变小,例如,文本文件格式能够压缩40%甚至更高比例,这样可以通过降低I/O来提高查询速度。除非产生的数据用于外部系统,或者存在格式兼容性问题,建议总是启用压缩。压缩与解压缩会消耗CPU资源,但Hive产生的MadReduce作业往往是I/O密集型的,因此CPU开销通常不是问题。为了启用压缩,需要查出所使用的Hive版本支持的压缩编码方式,下面的set命令列出可用的编解码器(CDH5.7.0中的Hive)。
hive>setio.compression.codecs;io.compression.codecs=org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codechive>一个复杂的Hive查询在提交后,通常被转换为一系列中间阶段的MapReduce作业,Hive引擎将这些作业串联起来完成整个查询。可以将这些中间数据进行压缩。这里所说的中间数据指的是上一个MapReduce作业的输出,这些输出将被下一个MapReduce作业作为输入数据使用。我们可以在hive-site.xml文件中设置hive.exec.compress.intermediate属性以启用中间数据压缩。
hive>sethive.exec.compress.intermediate=true;hive>sethive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;hive>sethive.intermediate.compression.type=BLOCK;当Hive将输出写入到表中时,输出内容同样可以进行压缩。我们可以设置hive.exec.compress.output属性启用最终输出压缩。
两个大表连接时,会先基于连接键分别对两个表进行排序,然后连接它们。Mapper将特定键值的所有行发送给同一个Reducer。例如,表A的id列有1、2、3、4四个值,表B的id列有1、2、3三个值。查询语句如下:
如果连接中使用的表是按特定列分桶的,可以开启桶Map连接提升性能。
(3)避免使用orderby全局排序Hive中使用orderby子句实现全局排序。orderby只用一个Reducer产生结果,对于大数据集,这种做法效率很低。如果不需要全局有序,则可以使用sortby子句,该子句为每个reducer生成一个排好序的文件。如果需要控制一个特定数据行流向哪个reducer,可以使用distributeby子句,例如:
selectid,name,salary,deptfromemployeedistributebydeptsortbyidasc,namedesc;属于一个dept的数据会分配到同一个reducer进行处理,同一个dept的所有记录按照id、name列排序。最终的结果集是全局有序的。
(4)启用Tez执行引擎使用Tez执行引擎代替传统的MapReduce引擎会大幅提升Hive查询的性能。在安装好Tez后,配置hive.execution.engine属性指定执行引擎。
(6)启用并行执行每条HiveQL语句都被转化成一个或多个执行阶段,可能是一个MapReduce阶段、采样阶段、归并阶段、限制阶段等。缺省时,Hive在任意时刻只能执行其中一个阶段。如果组成一个特定作业的多个执行阶段是彼此独立的,那么它们可以并行执行,从而整个作业得以更快完成。通过设置下面的属性启用并行执行。
(7)启用MapReduce严格模式Hive提供了一个严格模式,可以防止用户执行那些可能产生负面影响的查询。通过设置下面的属性启用MapReduce严格模式。
(8)使用单一Reduce执行多个GroupBy通过为groupby操作开启单一reduce任务属性,可以将一个查询中的多个groupby操作联合在一起发送给单一MapReduce作业。
(10)启用向量化向量化特性在Hive0.13.1版本中被首次引入。通过查询执行向量化,使Hive从单行处理数据改为批量处理方式,具体来说是一次处理1024行而不是原来的每次只处理一行,这大大提升了指令流水线和缓存的利用率,从而提高了表扫描、聚合、过滤和连接等操作的性能。可以设置下面的属性启用查询执行向量化。
(11)启用基于成本的优化器Hive0.14版本开始提供基于成本优化器(CBO)特性。使用过Oracle数据库的读者对CBO一定不会陌生。与Oracle类似,Hive的CBO也可以根据查询成本制定执行计划,例如,确定表连接的顺序,以何种方式执行连接,使用的并行度等等。设置下面的属性启用基于成本优化器。
(12)使用ORC文件格式ORC文件格式可以有效提升Hive查询的性能。图6-10由Hortonworks公司提供,显示了Hive不同文件格式的大小对比。
图6-10Hive文件格式与大小对比
设计开发初始装载步骤前需要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据,还要了解数据源的特性,例如文件类型、记录结构和可访问性等。表8-3显示的是销售订单示例数据仓库需要的源数据的关键信息,包括源数据表、对应的数据仓库目标表等属性。这类表格通常称作数据源对应图,因为它反应了每个从源数据到目标数据的对应关系。在本示例中,客户和产品的源数据直接与其数据仓库里的目标表,customer_dim和product_dim表相对应,而销售订单事务表是多个数据仓库表的数据源。
源数据
源数据类型
文件名/表名
数据仓库中的目标表
客户
MySQL表
customer
customer_dim
产品
product
product_dim
销售订单
sales_order
order_dim、sales_order_fact
表8-3销售订单数据源映射
同一个维度表中的不同字段可以有不同的变化处理方式。在本示例中,客户维度历史的客户名称使用SCD1,客户地址使用SCD2,产品维度的两个属性,产品名称和产品类型都使用SCD2保存历史变化数据。
多维数据仓库中的维度表和事实表一般都需要有一个代理键,作为这些表的主键,代理键一般由单列的自增数字序列构成。Hive没有关系数据库中的自增列,但它也有一些对自增序列的支持,通常有两种方法生成代理键:使用row_number()窗口函数或者使用一个名为UDFRowSequence的用户自定义函数(UDF)。假设有维度表tbl_dim和过渡表tbl_stg,现在要将tbl_stg的数据装载到tbl_dim,装载的同时生成维度表的代理键。
insertintotbl_dimselectrow_number()over(orderbytbl_stg.id)+t2.sk_max,tbl_stg.*fromtbl_stgcrossjoin(selectcoalesce(max(sk),0)sk_maxfromtbl_dim)t2;上面语句中,先查询维度表中已有记录最大的代理键值,如果维度表中还没有记录,利用coalesce函数返回0。然后使用crossjoin连接生成过渡表和最大代理键值的笛卡尔集,最后使用row_number()函数生成行号,并将行号与最大代理键值相加的值,作为新装载记录的代理键。
因为窗口函数的方法比较通用,而且无需引入额外的JAR包,所以我们在示例中使用row_number()函数生成代理键。初始装载Kettle作业如图6-11所示。
图6-11初始装载作业
初始装载作业流程描述如下:
系统初始化部分包括“SQL_init_cdc_time”和“设置系统日期”两个作业项。“SQL_init_cdc_time”作业项中执行的SQL语句如下,用于数据初始化,以便测试或排错后重复执行,实现幂等操作。
图6-12设置系统日期转换
“自定义常量步骤”设置一个Date类型的常量max_date,格式为yyyy-MM-dd,数据为2200-01-01。该值用于设置渐变维的初始过期日期。“获取系统信息”步骤中用两个字段cur_date和pre_date表示当前日期和前一天的日期。当前日期用于获得需要处理的数据,前一天日期用于设置变量,在后续步骤中构成文件名。该步骤定义如下,两个字段将被以复制方式发送到“字段选择”和“插入/更新”步骤。
名称类型cur_date今天00:00:00pre_date昨天00:00:00“字段选择”步骤用于将pre_date字段格式化为“yyyy-MM-dd”形式。在该步骤的“元数据”标签中进行如下定义:
字段名称类型格式pre_dateDateyyyy-MM-dd“设置变量”步骤设置两个变量PRE_DATE、MAX_DATE,变量值从pre_date和max_date数据流字段获得。“变量活动类型”选择“Validintherootjob”,使得作业中涉及的所有子作业或转换都可以使用这两个变量。
“插入/更新”步骤定义如图6-13所示。该步骤的功能类似于SQL中replaceinto或mergeinto。当rds.cdc_time表字段current_load为NULL时执行插入操作,否则更新该字段的值,插入或更新的值为数据流字段cur_date的值。
图6-13更新rds.cdc_time表字段current_load的值
“装载过渡区”作业项调用的是一个子作业,如图6-14所示。
图6-14装载过渡区作业
该作业包括“Sqoopimportcustomer”、“Sqoopimportproduct”、“load_sales_order”三个作业项。前两个Sqoop作业的命令行定义如下,其含义与功能在前一篇中已经详细讲解,这里不再赘述。
--connectjdbc:mysql://node3:3306/source--delete-target-dir--password123456--tablecustomer--target-dir/user/hive/warehouse/rds.db/customer--usernameroot--connectjdbc:mysql://node3:3306/source--delete-target-dir--password123456--tableproduct--target-dir/user/hive/warehouse/rds.db/product--usernameroot“load_sales_order”作业项调用的是一个装载事实表的转换,如图6-15所示。
图6-15初始装载rds.sales_order表
“表输入”步骤执行下面的SQL,查询出当前日期与最后装载日期,本例中分别为“2020-10-07”和“1971-01-01”。
selectid,last_load,current_loadfromrds.cdc_time;“数据库连接”步骤的定义如图6-16所示。该步骤将前一步骤输出的last_load和current_load字段作为参数,查询出源数据中sales_order表的全部数据。
图6-16查询source.sales_order表的全部数据
最后的“Hadoopfileoutput”步骤将sales_order源数据以文本文件的形式,存储到rds.sales_order表对应的HDFS目录下。在该步骤的“文件”标签页中,“Folder/File”属性输入“/user/hive/warehouse/rds.db/sales_order/sales_order”,“扩展名”属性输入“txt”。“内容”标签页中,“分隔符”为“,”,“编码”选择“UTF-8”。字段标签页的定义表6-4所示。注意由于性能原因,对于Hive表不能使用普通的“表输出”步骤为其装载数据。
名称
类型
格式
长度
精度
order_number
Integer
0
customer_number
product_code
order_date
Date
yyyy-MM-ddHH:mm:ss
entry_date
order_amount
Number
00000000.00
表6-4sales_order.txt文件字段定义
“装载维度表”作业项调用一个如图6-17所示的转换。
图6-17初始装载维度表的转换
“装载客户维度”执行下面的SQL语句:
订单维度表的装载当然也可以使用类似的“执行SQL语句”步骤,但订单维度与客户维度或产品维度不同。在前一篇中曾提到,它的数据是单向递增的,不涉及数据更新,因此这里使用“表输入”、“增加序列”、“ORCoutput”三个步骤装载订单维度数据。
“表输入”步骤中执行以下查询:
图6-18用“ORCoutput”步骤装载dw.order_dim表
“装载事实表”作业项调用一个如图6-19所示的转换。
图6-19初始装载事实表的转换
selectorder_sk,customer_sk,product_sk,date_sk,order_amountfromrds.sales_ordera,dw.order_dimb,dw.customer_dimc,dw.product_dimd,dw.date_dimewherea.order_number=b.order_numberanda.customer_number=c.customer_numberanda.product_code=d.product_codeandto_date(a.order_date)=e.dt;“ORCoutput”与上一步装载dw.order_dim表的步骤相同,只是将“Folder/Filename”属性值改为:
updaterds.cdc_timesetlast_load=current_load;成功执行初始装载作业后,可以在Hive中执行下面的查询验证数据正确性。
usedw;selectorder_number,customer_name,product_name,dt,order_amountamountfromsales_order_facta,customer_dimb,product_dimc,order_dimd,date_dimewherea.customer_sk=b.customer_skanda.product_sk=c.product_skanda.order_sk=d.order_skanda.order_date_sk=e.date_skorderbyorder_number;四、定期装载初始装载只在开始数据仓库使用前执行一次,然而,必须要按时调度定期执行装载源数据的过程。与初始装载不同,定期装载一般都是增量的,并且需要捕获和记录数据的变化历史。本节说明执行定期装载的步骤,包括识别源数据与装载类型、创建Kettle作业和转换实现定期增量装载过程并执行验证。
定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。表6-5汇总了本示例的这些信息。
数据源
源数据存储
数据仓库
抽取模式
维度历史装载类型
整体、拉取
address列上SCD2,name列上SCD1
所有属性均为SCD2
order_dim
CDC(每天)、拉取
唯一订单号
sales_order_fact
N/A
date_dim
预装载
表6-5销售订单定期装载
图6-20定期装载作业
定期装载作业流程描述如下:
“装载过渡区”作业项调用的子作业与图6-14所示的初始装载过渡区只有一点不同:“load_sales_order”作业项调用的转换中,“Hadoopfileoutput”步骤生成的文件,其文件名中带有装载日期,这通过在“Folder/File”属性输入/user/hive/warehouse/rds.db/sales_order/sales_order_{PRE_DATE}实现。{PRE_DATE}引用的是前一作业项“设置系统日期”中所设置的变量,值为当前日期前一天。过渡区的rds.sales_order表存储全部销售订单数据,因此需要向表所对应的HDFS目录中新增文件,而不能覆盖已有文件。
“装载维度表”作业项调用一个如图6-21所示的转换。
图6-21定期装载维度表的转换
这个转换貌似很简单,只有三个执行SQL脚本的步骤。正如你所想到的,实现渐变维使用的就是Hive提供的行级更新功能。与单纯用shell执行SQL相比,Kettle转换一个明显的好处是这三个步骤可以并行以提高性能。“装载客户维度表”步骤中的SQL脚本如下:
第二句的insert语句处理customer_street_addresses列上scd2的新增行。这条语句插入SCD2的新增版本行。子查询中用innerjoin获取当期版本号和源数据信息。leftjoin连接是必要的,否则如果多次执行该语句,会生成多条重复的记录。最后用row_number()方法生成新纪录的代理键。新记录的版本号加1,开始日期为执行时的前一天,过期日期为‘2200-01-01’。
后面的四条SQL语句处理customer_name列上的scd1,因为scd1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录。在关系数据库中,SCD1非常好处理,如在MySQL中使用类似如下的语句即可:
updatecustomer_dima,customer_stgbseta.customer_name=b.customer_namewherea.customer_number=b.customer_numberanda.customer_name<>b.customer_name;但是hive里不能在update后跟多个表,也不支持在set子句中使用子查询,它只支持SETcolumn=value的形式,其中value只能是一个具体的值或者是一个标量表达式。所以这里使用了一个临时表存储需要更新的记录,然后将维度表和这个临时表关联,用先delete再insert代替update。为简单起见也不考虑并发问题(典型数据仓库应用的并发操作基本都是只读的,很少并发写,而且ETL通常是一个单独在后台运行的程序,如果用SQL实现,并不存在并发执行的情况,所以并发导致的问题并不像OLTP那样严重)。
最后的insert语句处理新增的customer记录。内层子查询使用rds.customer和dw.customer_dim的左外链接获取新增的数据。新数据的版本号为1,开始日期为执行时的前一天,过期日期为‘2200-01-01’。同样使用row_number()方法生成代理键。到这里,客户维度表的装载处理代码已完成。“装载产品维度表”步骤中的SQL脚本如下:
“装载事实表”作业项调用一个如图6-22所示的转换。
图6-22定期装载事实表的转换
selectlast_load,current_loadfromrds.cdc_time;“销售订单事务数据”是一个数据库连接步骤,定义如图6-23所示,输出过渡区中销售订单表的增量数据。
图6-23查询增量数据的数据库连接步骤
“获取日期代理键”使用的数据库查询步骤定义如图6-24所示。该步骤关联表字段date_dim.dt与流字段order_date查询出日期代理键date_dim.date_sk。
图6-24使用数据库查询步骤获取日期代理键
“获取客户代理键”、“获取产品代理键”、“获取订单代理键”使用的都是“维度查询/更新”步骤,它们的定义除表名和字段名外完全相同。例如“获取客户代理键”中的“目标模式”选择dw,“目标表”选择customer_dim,其定义如图6-25所示。
图6-25使用维度查询/更新步骤获取代理键
该步骤通过关联维度表和数据流中的业务主键字段customer_number,查询出订单日期order_date在生效日期effective_date与过期日期expiry_date区间内的客户维度代理键customer_sk,功能等价于下面的SQL查询:
selectcustomer_skfromrds.sales_ordera,customer_dimcwherea.customer_number=c.customer_numberanda.order_date>=c.effective_dateanda.order_date 最后的“ORCoutput”步骤定义如图6-26所示,将事实表数据以文件形式存储到相应的HDFS目录中,文件名中带有日期。 图6-26使用ORCoutput步骤增量装载事实表数据 与初始装载一样,最后一个“SQL”作业项执行下面的语句,将最后装载日期更新为当前装载日期。 updaterds.cdc_timesetlast_load=current_load;下面进行一些测试,验证数据装载的正确性。测试步骤:1.在MySQL的source源数据库中准备客户、产品和销售订单测试数据。 3.验证结果。 usedw;select*fromcustomer_dim;查询的部分结果如下: ...66loyalclients7070ritterrd.17055pittsburghpa12020-03-012020-10-0786loyalclients7777ritterrd.17055pittsburghpa22020-10-072200-01-0177distinguishedagencies9999scottst.17050mechanicsburgpa12020-03-012200-01-0198subsidiaries10000wetlineblvd.17055pittsburghpa12020-10-072200-01-01可以看到,客户6因为地址变更新增了一个版本,而客户7的姓名变更直接覆盖了原来的值,新增了客户8。注意客户6第一个版本的到期日期和第二个版本的生效日期同为‘2020-10-07’,这是因为任何一个SCD的有效期是一个“左闭右开”的区间,以客户6为例,其第一个版本的有效期大于等于‘2020-03-01’,小于‘2020-10-07’,即为‘2020-03-01’到‘2020-10-06’。 select*fromproduct_dim;查询的部分结果如下: ...33lcdpanelmonitor12020-03-012020-10-0743flatpanelmonitor22020-10-072200-01-0154keyboardperipheral12020-10-072200-01-01可以看到,产品3的名称变更使用SCD2增加了一个版本,新增了产品4的记录。 select*fromorder_dim;查询的部分结果如下: ...11111112020-10-072200-01-0111211212020-10-072200-01-0111311312020-10-072200-01-0111411412020-10-072200-01-0111511512020-10-072200-01-0111611612020-10-072200-01-0111711712020-10-072200-01-0111811812020-10-072200-01-01Timetaken:0.146seconds,Fetched:118row(s)现在有118个订单,102个是“初始导入”装载的,16个是本次定期装载的。 select*fromsales_order_fact;查询的部分结果如下: ...1108510117791.001113110116711.001127110115570.001131210114722.001141510117330.001153110117214.001169410119160.001179510118382.001183110114956.00Timetaken:0.135seconds,Fetched:118row(s)可以看到,2020年10月7日的16个销售订单被添加,产品3的代理键是4而不是3,客户6的代理键是8而不是6。 select*fromrds.cdc_time;查询结果如下: 12020-10-082020-10-08Timetaken:0.117seconds,Fetched:1row(s)可以看到,两个字段值都已更新为当前日期。 查看销售订单过渡区表和事实表所对应的的HDFS文件如下,不带日期的文件是初始装载作业所生成,带日期的文件为定期装载作业所生成。 [hdfs@manager~]$hdfsdfs-ls/user/hive/warehouse/rds.db/sales_order/*-rw-r--r--3roothive60122020-10-0820:28/user/hive/warehouse/rds.db/sales_order/sales_order.txt-rw-r--r--3roothive9602020-10-0820:39/user/hive/warehouse/rds.db/sales_order/sales_order_2020-10-07.txt[hdfs@manager~]$hdfsdfs-ls/user/hive/warehouse/dw.db/sales_order_fact/*-rw-r--r--3roothive16252020-10-0820:31/user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact-rw-r--r--3roothive7702020-10-0821:06/user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-10-07[hdfs@manager~]$以上示例说明了如何用Kettle实现Hadoop数据仓库的初始装载和定期装载。需要指出的一点是,就本示例的环境和数据量而言装载执行速度很慢,一次定期装载就需要二十多分钟,比关系数据库慢多了。但考虑到Hadoop本身就只适合大数据量的批处理任务,再加上Hive的性能问题一直就被诟病,也就不必再吐槽了。至此,ETL过程已经实现,下一篇将介绍如何定期自动执行这个过程。 数据清洗是转换过程的一个重要步骤,是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。Hive是Hadoop生态圈的数据仓库软件,使用类似于SQL的语言读、写、管理分布式存储上的大数据集。使用row_number()窗口函数或者使用一个名为UDFRowSequence的用户自定义函数可以生成代理键。Kettle作业和转换能够实现Hadoop数据仓库的初始装载和定期装载。