数据库CloudCanal2.0自定义代码实时加工能力(自定义实时ETL)说明与介绍个人文章

自定义代码实时加工是一种非常灵活的实时数据加工手段,在自定义代码中用户可以进行跨实例查询、微服务调用、缓存查询等各种操作,然后对实时接收到的数据行进行编辑。数据编辑支持用户自定义新增行、修改行、删除行。其中修改行支持用户新增列、修改列、删除列。自定义代码实时加工可用于以下场景:

构建实时数仓、数据湖以及进行数据治理时都需要对数据进行清洗,涉及数据过滤、加工、标准化。在这个过程中,用户可以上传自定义代码,引入自己的一些企业内部数据标准化处理的二方包或者调用一些微服务或者反查数据库对收到的实时数据进行编辑。这样同步到对端后,就是直接已经清洗好的数据。

数据同步到数仓、数据湖中时,必然面临的一个问题就是处理多个源端关联表的关系,引入自定义代码,可以很容易的来处理这些关系,生成对端数据源需要的宽表行。在CloudCanal平台上,我们会订阅一张主表,也就是多个关系中的主体,作为驱动表。这张表内的数据行在同步的时候是不完整的。构建一个写入到对端的宽表行,需要补足其中缺失的列,这些列也就是其他表中的数据。在自定义代码中,用户可以反查自己的数据库取到这些数据,组装成一个完整的宽表行,返回给CloudCanal,然后再写入对端。下面我们以一个具体的MySQL->ElasticSearch的同步来解释如何采用CloudCanal自定义代码功能完成宽表行构建。

其中Fieldproduct_detail的类型为nested,保存订单和商品的一对多关系

具体的JSONArray信息如下,保存一个订单关联的所有商品的明细信息

{"detail":[{"product_id":1,"product_name":"洗发液","expire_data":"2025-01-01"},{"product_id":2,"product_name":"沐浴露","expire_data":"2025-01-01"}]}宽表行构建操作步骤

宽表行的构建过程可以参考下图:

CloudCanal写入ES的时候允许用户选择源端某些列作为对端自定义_id列。在创建任务数据处理阶段,在上传自定义代码包之后,在对端映射列的下拉选项中可以选择。对于ES而言,相同_id上的doc,再次写入的时候会以upsert的方式写入。例如_id=1的文档在ES中已经存在,则当一个新的文档(id=1)再次写入时,会将对应field执行update操作。这个特性主要用于确保:从表的更新也能及时反馈到对端的索引上。用户创建MySQL->ES的任务时,只需要设置_id为join的关联列。这样后续从表中的字段有更新时,可以根据join列在对端索引上进行更新。

自定义代码有时候需要依赖源端的列来进行计算,但是实际上不需要同步到对端。这时候可以在创建任务数据处理阶段,

答:CloudCanal宽表构建是基于主表触发的,一般而言,关联表也需要及时反馈的目标端有如下方式:新建一个任务,把原来任务反查的表(从表)作为主表,全部订阅。创建任务的时候指定自定义_id列为参与宽表构建的关联列,这样从表的更新都会在对端索引对应的文档上进行update。

数据汇聚是当前用户构建数据中台、实时数仓等都会面临的问题。这里面主要涉及数据标准化、数据清理等工作。结合自定义代码,用户可以结合自己的数据标准化要求,自由的加工、拼接来自多个源端表内的数据,完成数据汇集的工作。

SDK提供的接口比较简洁,用户在自定义代码中实现CloudCanalProcessor接口完成自定义逻辑即可。其中的process方法会将CloudCanal实时同步的一批数据吐给用户,由用户自定义的处理这些行。

publicinterfaceCloudCanalProcessor{Listprocess(ListcustomRecordList,CustomProcessorContextcustomProcessorContext);}用户自定义处理上下文CustomProcessContext顶级接口process包含一个入参CustomProcessContext,这个对象中包含了一个Map,保存CloudCanal传递个用户自定义处理器的上下文信息。其中的Key由SDK中的ContextKey的实现类指定。

当前支持的Key主要如下,允许用户从context中直接获取CloudCanal帮助初始化好的源端和目标端的DataSource。针对关系型数据库,这个对象实际上是一个DruidDataSource,并且常驻在CloudCanal内存中,用户不需要自己close改数据源,改数据源会在运行时被重用。

publicclassRdbContextKeyimplementsContextKey{publicstaticfinalStringSOURCE_DATASOURCE="srcDataSource";publicstaticfinalStringTARGET_DATASOURCE="dstDataSource";publicRdbContextKey(){}}SDK支持的数据处理操作核心数据结构与元数据用户需要处理的数据行对应的核心数据结构是CustomRecord。其中包含的内容主要如下。

privateintopsFlag=0;//操作标记位0表示不做任何处理,-1删除行,1新增行,2修改行,用户无需感知privateSetcustomAddFields=newLinkedHashSet();//记录新增列的列名,内部元数据,用户无需感知privateCustomRecord.Coordinationcoordination;//记录CustomRecord关联的内部Record的坐标,用户无需感知privateMaprecordMetaMap=newLinkedHashMap();//保存了数据行的元数据信息,例如消息来自源端哪个库哪个表privateMapfieldMapAfter=newLinkedHashMap();//记录变更后的列值privateMapfieldMapBefore=newLinkedHashMap();//记录变更前的列值用户在具体使用时只要关心其中的recordMetaMap即可,其中的key值由SDK提供的类来给出。例如关系型数据库可以获取的meta信息均记录在RdbMetaKeys

他的接口定义如下:

/***创建一个新的数据行,Map的key为列名,value为具体的值。*/RecordBuildercreateRecord(MapfieldValueMap);/***删除当前RecordBuilder关联的数据行*/RecordBuilderdeleteRecord();/***新增一个列*/RecordBuilderaddField(StringaddFieldName,ObjectaddFieldValue);/***新增多个列*/RecordBuilderaddField(MapfieldValueMap);/***删除一个列*/RecordBuilderdropField(StringdropFieldName);/***删除多个列*/RecordBuilderdropField(ListdropFieldNames);/***更新已有的列的列值*/RecordBuilderupdateField(MapfieldValueMap);新增列的映射关系在自定义代码中,用户往往会新增列。CloudCanal默认使用“同名映射规则”。因为新增的列是没有在创建任务的时候指定映射关系的,所以新增的列默认均使用同名映射规则。假设我新增的列为name,则CloudCanal认为该列也应该被写到对端的name字段中。因此,在自定义代码处理中,为了确保能正确写入对端,新增列的列名可以使用对端用户自己想要映射的列名,这样会直接写入对端。

使用RecordBuilder提供的方法,可以完成列的增删改、行的新增与删除。

使用RecordBuilder完成一系列操作后,可以调用build,生成最终的CustomRecord对象

建议部署测试环境和生产环境的CloudCanal,仅在测试环境开启debug模式,调试自定义代码。

用户自定义代码中使用如下方式定义logger后,可以在指定路径下查看日志。日志路径为:/home/clougence/logs/cloudcanal/tasks/${taskName}/custom_processor.log

privatestaticfinalStringLOG_NAME="custom_processor";privatestaticfinalLoggercustomLogger=LoggerFactory.getLogger(LOG_NAME);debug日志在任务详情页可以打开参数设置,开启自定义代码的debug日志,这样代码处理前后的数据内容会进行完整的打印,这会占用较多磁盘空间和影响性能,线上环境慎用。日志会打印在任务日志路径下的custom_process.log中

样例工程中的resource目录下的内容是打包自定义处理插件必备的。如果需要修改类名,可以修改resource/META-INF/cloudcanal/plugin.properties中的类名,需要使用全限定名称。

在创建任务的第四步,数据处理,可以选择配置数据处理插件然后选择上传代码包

任务完成创建后,可以在页面管理自己的代码包

Tips:

本次我们以MySQL->MySQL的数据同步为例,包含结构迁移、全量迁移和增量实时同步。

准备的表结构如下:

/*--学生表--*/CREATETABLE`student`(`id`int(4)NOTNULLAUTO_INCREMENTCOMMENT'学号',`name`varchar(20)DEFAULTNULLCOMMENT'名字',`score`int(3)NOTNULLCOMMENT'成绩',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=27DEFAULTCHARSET=utf8mb4COLLATE=utf8mb4_0900_ai_ciCOMMENT='学生信息表'我们的数据加工需求如下:

针对过来的所有数据,都进行新增列的操作。新增的列名字分别为col_varchar和col_int,列值分别为null和9999

针对来自源端integration_src库和源端student表的行,才进行修改列的操作。把分数为0的同学改为分数99

针对来自源端integration_src库和源端student表的行,才进行删除列的操作。针对这一批的所有数据,删除name这一列

每一批数据过来时,满足业务判断条件,则新增两行新的数据,主键采用自增id。新增的两行同时包含两个自定义的新增列,但是不包含name列。

针对来自源端integration_src库和源端student表的行,如果分数小于60的行则会被删除

THE END
1.python数据清洗案例keyerror:'sepallengthpython数据清洗案例 获取数据: 检查缺失值 首先第一步,我们先检查一下数据集中是否存在空值,可以用pandas中的isnull、nonull、info方法来检查,我们都来试一遍 data.isnull() 1 可以看到,因为数据太多,没有办法全部找出来,这个时候可以用到sum方法来进行统计每一列有多少个缺失值https://blog.csdn.net/weixin_44941795/article/details/100836001
2.独家为数据分析而清洗数据——Python的21个案例和代码(下)本文介绍了为数据分析而准备的数据清洗的另外11个Python案例及代码。数据清洗是识别和纠正错误以及数据集不一致性的过程,以便于数据可以进行分析。在此过程中,数据专家可以更清楚地了解他们的业务中正在发生的事情,提供任何用户都可以利用的可靠分https://mp.weixin.qq.com/s?__biz=MzI1MjQ2OTQ3Ng==&mid=2247635700&idx=1&sn=c882563ba5a67b47e357af3d16bd1391&chksm=e8117d3d3879282281cabd67a5e3d93be0250a7862942d19801d29b5983732c5b1f81c939238&scene=27
3.数据清洗案例分析袋鼠社区数据清洗案例分析 - 在大数据时代,数据已经成为企业的重要资产。然而,原始数据往往存在许多问题,如缺失值、异常值、重复值等,这些问题会影响数据分析的准确性和可靠性。因此,数据清洗成为了数据处理过程中不可或缺的一步。本文将通过一个实际的数据清洗案例,详细介绍https://www.dtstack.com/bbs/article/12691
4.数据清洗案例数据清洗案例: 1、导入各种包 2、将表格导入系统:这里使用了将一个表格的多个sheet同时导入 # 将一张表里的3个sheet都导入系统 table=[pd.read_excel("/Volumes/台电酷闪/数据分析/python学习/202010Python数据清理/meal_order_detail.xlsx",sheet_name=i) for i in range(0,3)] https://www.jianshu.com/p/84d02414b04e
5.求数据清洗的案例分析资料本人第一次做数据清洗,虽然之前有学过一些数据清洗的方法,但是仍然对手头噪音很大,数量很多的数据感觉到无从下手。现征求各位高手看过的好的关于数据清洗的案例书或其他资料,或者关于如何对大量原始数据一步步分析建模的,要求讲的越具体越好。 谢谢大家帮忙! https://bbs.pinggu.org/jg/huiji_huijiku_3640882_1.html
6.MapReduce综合应用案例—招聘数据清洗MapReduce是Hadoop的核心功能之一,掌握它对学习Hadoop至关重要。 Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 任务关卡 第1关数据清洗 https://hnjdzy.educoder.net/shixuns/2lvmz89x/challenges
7.数据分享基于PythonHadoop零售交易数据的Spark数据处理与E案例数据集是在线零售业务的交易数据,采用Python为编程语言,采用Hadoop存储数据,采用Spark对数据进行处理分析,并使用Echarts做数据可视化。由于案例公司商业模式类似新零售,或者说有向此方向发展利好的趋势,所以本次基于利于公司经营与发展的方向进行数据分析。 https://developer.aliyun.com/article/1493639
8.书单想学PowerBI吗?来看看这些书吧!本书是Power BI 快速入门工具书,笔者将Power BI 的知识点做了系统整理,并以案例的方式呈现出来,使读者学习起来更轻松。全书共7 章,包括Power BI Desktop 初体验、数据清洗的革命、数据统计和呈现、建立表关联、交互式分析、使用DAX 函数、数据可视化等,其中重点介绍了Power BI 在数据清洗和数据可视化方面的应用。 http://www.broadview.com.cn/article/419989
9::侯晓焱邢永杰:我国证人证言排除的刑事司法实务观察1.数据清洗的考量因素 数据整理中误入的不属于当事人申请排除非法证据的数据主要包含几种情况: 一是文书记载了法院告知被告人享有申请回避、非法证据排除等权利,文书故此被命中,但案件本身不涉及非法证据问题的争议。二是文书在评析某项具体证据时,主动宣布该项证据中不存在非法证据排除的情形。三是二审裁判文书中记载http://iolaw.cssn.cn/fxyjdt/201907/t20190722_4936908.shtml
10.聊聊如何清理数据案例和步骤数据清理包括发现和解决潜在的数据不一致或错误以提高数据质量。错误是任何不反映所测量的真实值(例如,实际重量)的值(例如,记录的重量)。在此过程中,审查、分析、检测、修改或删除“脏”数据以使数据集“干净”。数据清理也称为数据清洗。一 为什么数据清理很重要在定量研究中,收集数据并使用统计分析来回答研究问题。http://www.360doc.com/content/23/0301/09/78237952_1069924279.shtml
11.大数据应用导论Chapter02大数据的采集与清洗2、Python清洗案例 # 载入必要库 # numpy是一个数值计算库,能够快速的进行矩阵计算 importnumpyasnp # pandas基于numpy的一种数据分析工具,能够快速的进行数据分析、可视化 importpandasaspd # matplotlib是一个2D绘图库,能够跨平台的快速绘制图表 importmatplotlib.pyplotasplt https://blog.51cto.com/14683590/5236225