自定义代码实时加工是一种非常灵活的实时数据加工手段,在自定义代码中用户可以进行跨实例查询、微服务调用、缓存查询等各种操作,然后对实时接收到的数据行进行编辑。数据编辑支持用户自定义新增行、修改行、删除行。其中修改行支持用户新增列、修改列、删除列。自定义代码实时加工可用于以下场景:
构建实时数仓、数据湖以及进行数据治理时都需要对数据进行清洗,涉及数据过滤、加工、标准化。在这个过程中,用户可以上传自定义代码,引入自己的一些企业内部数据标准化处理的二方包或者调用一些微服务或者反查数据库对收到的实时数据进行编辑。这样同步到对端后,就是直接已经清洗好的数据。
数据同步到数仓、数据湖中时,必然面临的一个问题就是处理多个源端关联表的关系,引入自定义代码,可以很容易的来处理这些关系,生成对端数据源需要的宽表行。在CloudCanal平台上,我们会订阅一张主表,也就是多个关系中的主体,作为驱动表。这张表内的数据行在同步的时候是不完整的。构建一个写入到对端的宽表行,需要补足其中缺失的列,这些列也就是其他表中的数据。在自定义代码中,用户可以反查自己的数据库取到这些数据,组装成一个完整的宽表行,返回给CloudCanal,然后再写入对端。下面我们以一个具体的MySQL->ElasticSearch的同步来解释如何采用CloudCanal自定义代码功能完成宽表行构建。
其中Fieldproduct_detail的类型为nested,保存订单和商品的一对多关系
具体的JSONArray信息如下,保存一个订单关联的所有商品的明细信息
{"detail":[{"product_id":1,"product_name":"洗发液","expire_data":"2025-01-01"},{"product_id":2,"product_name":"沐浴露","expire_data":"2025-01-01"}]}宽表行构建操作步骤
宽表行的构建过程可以参考下图:
CloudCanal写入ES的时候允许用户选择源端某些列作为对端自定义_id列。在创建任务数据处理阶段,在上传自定义代码包之后,在对端映射列的下拉选项中可以选择。对于ES而言,相同_id上的doc,再次写入的时候会以upsert的方式写入。例如_id=1的文档在ES中已经存在,则当一个新的文档(id=1)再次写入时,会将对应field执行update操作。这个特性主要用于确保:从表的更新也能及时反馈到对端的索引上。用户创建MySQL->ES的任务时,只需要设置_id为join的关联列。这样后续从表中的字段有更新时,可以根据join列在对端索引上进行更新。
自定义代码有时候需要依赖源端的列来进行计算,但是实际上不需要同步到对端。这时候可以在创建任务数据处理阶段,
答:CloudCanal宽表构建是基于主表触发的,一般而言,关联表也需要及时反馈的目标端有如下方式:新建一个任务,把原来任务反查的表(从表)作为主表,全部订阅。创建任务的时候指定自定义_id列为参与宽表构建的关联列,这样从表的更新都会在对端索引对应的文档上进行update。
数据汇聚是当前用户构建数据中台、实时数仓等都会面临的问题。这里面主要涉及数据标准化、数据清理等工作。结合自定义代码,用户可以结合自己的数据标准化要求,自由的加工、拼接来自多个源端表内的数据,完成数据汇集的工作。
SDK提供的接口比较简洁,用户在自定义代码中实现CloudCanalProcessor接口完成自定义逻辑即可。其中的process方法会将CloudCanal实时同步的一批数据吐给用户,由用户自定义的处理这些行。
publicinterfaceCloudCanalProcessor{List
当前支持的Key主要如下,允许用户从context中直接获取CloudCanal帮助初始化好的源端和目标端的DataSource。针对关系型数据库,这个对象实际上是一个DruidDataSource,并且常驻在CloudCanal内存中,用户不需要自己close改数据源,改数据源会在运行时被重用。
publicclassRdbContextKeyimplementsContextKey{publicstaticfinalStringSOURCE_DATASOURCE="srcDataSource";publicstaticfinalStringTARGET_DATASOURCE="dstDataSource";publicRdbContextKey(){}}SDK支持的数据处理操作核心数据结构与元数据用户需要处理的数据行对应的核心数据结构是CustomRecord。其中包含的内容主要如下。
privateintopsFlag=0;//操作标记位0表示不做任何处理,-1删除行,1新增行,2修改行,用户无需感知privateSet
他的接口定义如下:
/***创建一个新的数据行,Map的key为列名,value为具体的值。*/RecordBuildercreateRecord(Map
使用RecordBuilder提供的方法,可以完成列的增删改、行的新增与删除。
使用RecordBuilder完成一系列操作后,可以调用build,生成最终的CustomRecord对象
建议部署测试环境和生产环境的CloudCanal,仅在测试环境开启debug模式,调试自定义代码。
用户自定义代码中使用如下方式定义logger后,可以在指定路径下查看日志。日志路径为:/home/clougence/logs/cloudcanal/tasks/${taskName}/custom_processor.log
privatestaticfinalStringLOG_NAME="custom_processor";privatestaticfinalLoggercustomLogger=LoggerFactory.getLogger(LOG_NAME);debug日志在任务详情页可以打开参数设置,开启自定义代码的debug日志,这样代码处理前后的数据内容会进行完整的打印,这会占用较多磁盘空间和影响性能,线上环境慎用。日志会打印在任务日志路径下的custom_process.log中
样例工程中的resource目录下的内容是打包自定义处理插件必备的。如果需要修改类名,可以修改resource/META-INF/cloudcanal/plugin.properties中的类名,需要使用全限定名称。
在创建任务的第四步,数据处理,可以选择配置数据处理插件然后选择上传代码包
任务完成创建后,可以在页面管理自己的代码包
Tips:
本次我们以MySQL->MySQL的数据同步为例,包含结构迁移、全量迁移和增量实时同步。
准备的表结构如下:
/*--学生表--*/CREATETABLE`student`(`id`int(4)NOTNULLAUTO_INCREMENTCOMMENT'学号',`name`varchar(20)DEFAULTNULLCOMMENT'名字',`score`int(3)NOTNULLCOMMENT'成绩',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=27DEFAULTCHARSET=utf8mb4COLLATE=utf8mb4_0900_ai_ciCOMMENT='学生信息表'我们的数据加工需求如下:
针对过来的所有数据,都进行新增列的操作。新增的列名字分别为col_varchar和col_int,列值分别为null和9999
针对来自源端integration_src库和源端student表的行,才进行修改列的操作。把分数为0的同学改为分数99
针对来自源端integration_src库和源端student表的行,才进行删除列的操作。针对这一批的所有数据,删除name这一列
每一批数据过来时,满足业务判断条件,则新增两行新的数据,主键采用自增id。新增的两行同时包含两个自定义的新增列,但是不包含name列。
针对来自源端integration_src库和源端student表的行,如果分数小于60的行则会被删除