在2019年之前,汽车之家的大部分实时业务都是运行在Storm之上的。Storm作为早期主流的实时计算引擎,凭借简单的Spout和Bolt编程模型以及集群本身的稳定性,俘获了大批用户,我们在2016年搭建了Storm平台。
随着实时计算的需求日渐增多,数据规模逐步增大,Storm在开发及维护成本上都凸显了不足,这里列举几个痛点:
在这个阶段,我们支持了最基本的实时计算需求,因为开发门槛比较高,很多实时业务都是由我们平台开发来完成,既做平台,又做数据开发,精力分散很严重。
2.第二阶段
我们从2018年开始调研Flink引擎,其相对完备的SQL支持,天生对状态的支持吸引了我们,在经过学习调研后,2019年初开始设计开发FlinkSQL平台,并于2019年中上线了AutoStream1.0平台。平台上线之初就在仓库团队、监控团队和运维团队得以应用,能够快速被用户主要得益于以下几点:
痛点:
在AutoStream1.0平台这个阶段,基于SQL开发的方式极大地降低了实时开发的门槛,各业务方可以自己实现实时业务的开发,同时数仓同学经过简单的学习后,就开始对接实时业务,将我们平台方从大量的业务需求中释放出来,让我们可以专心做平台方面的工作。
3.当前阶段
针对上面的几个方面,我们有针对性行的做了以下几点升级:
目前用户对平台的使用已经趋于熟悉,同时自助健康检查和自助诊断等功能的上线,我们平台方的日常on-call频率在逐步降低,开始逐渐进入平台建设的良性循环阶段。
4.应用场景
汽车之家用于做实时计算的数据主要分为三类:
以上这三类数据都会实时写入Kafka集群,在Flink集群中针对不同场景进行计算,结果数据写入到Redis、MySQL、Elasticsearch、HBase、Kafka、Kylin等引擎中,用于支持上层应用。
下面列举了一些应用场景:
5.集群规模
目前Flink集群服务器400+,部署模式为YARN(80%)和Kubernetes,运行作业数800+,日计算量1万亿,峰值每秒处理数据2000万条。
1.平台架构
上面是AutoStream平台目前的整体架构,主要是以下几部分内容:
2.基于SQL的开发流程
在平台提供以上功能的基础上,用户可以快速的实现SQL作业的开发:
平台默认会保存SQL每一次的变更记录,用户可以在线查看历史版本,同时我们会记录针对作业的各种操作,在作业维护阶段可以帮助用户追溯变更历史,定位问题。
下面是一个Demo,用于统计当天的PV、UV数据:
3.基于Catalog的元数据管理
元数据管理的主要内容:
下面是几个模块和Metastore交互的示意图:
4.UDXF管理
我们引入了JarService服务用来管理各种Jar,包括用户自定义作业、平台内部SDK组件、UDXF等,在JarService基础上我们可以很容易的实现UDXF的自助管理,在Onk8s的场景下,我们提供了统一的镜像,Pod启动后会从JarService下载对应的Jar到容器内部,用于支持作业的启动。
用户提交的SQL中如果包含FunctionDDL,我们会在JobClientService中会解析DDL,下载对应的Jar到本地。
为了避免和其他作业有依赖冲突,我们每次都会单独启动一个子进程来完成作业提交的操作。UDXFJar会被并加入到classpath中,我们对Flink做了一些修改,作业提交时会把这个Jar一并上传到HDFS中;同时AutoSQLSDK会根据函数名称和类名为当前作业注册UDF。
5.监控报警及日志收集
得益于Flink完善的Metric机制,我们可以方便的添加Metric,针对Connector,我们内嵌了丰富的Metric,并配置了默认的监控看板,通过看板可以查看CPU、内存、JVM、网络传输、Checkpoint、各种Connector的监控图表。同时平台和公司的云监控系统对接,自动生成默认的报警策略,监控存活状态、消费延迟等关键指标。同时用户可以在云监控系统修改默认的报警策略,添加新的报警项实现个性化监控报警。
日志通过云Filebeat组件写入到Elasticsearch集群,同时开放Kibana供用户查询。
整体的监控报警及日志收集架构如下:
6.健康检查机制
随着作业数的高速增长,出现了很多资源使用不合理的情况,比如前面提到的资源浪费的情况。用户大多时候都是在对接新需求,支持新业务,很少回过头来评估作业的资源配置是否合理,优化资源使用。所以平台规划了一版成本评估的模型,也就是现在说的健康检查机制,平台每天会针对作业做多维度的健康评分,用户可以随时在平台上查看单个作业的得分情况及最近30天的得分变化曲线。
我们引入了多维度,基于权重的评分策略,针对CPU、内存使用率、是否存在空闲Slot、GC情况、Kafka消费延迟、单核每秒处理数据量等多个维度的指标结合计算拓补图进行分析评估,最终产生一个综合分。
每个低分项都会显示低分的原因及参考范围,并显示一些指导建议,辅助用户进行优化。
我们新增了一个Metric,用一个0%~100%的数字体现TaskManagnerCPU利用率。这样用户可以直观的评估CPU是否存在浪费的情况。
下面是作业评分的大致流程:首先我们会收集和整理运行作业的基本信息和Metrics信息。然后应用我们设定好的规则,得到基本评分和基础建议信息。最后将得分信息和建议整合,综合评判,得出综合得分和最终的报告。用户可以通过平台查看报告。对于得分较低的作业,我们会发送报警给作业的归属用户。
7.自助诊断
如之前提到的痛点,用户定位线上问题时,只能求助于我们平台方,造成我们on-call工作量很大,同时用户体验也不好,鉴于此,所以我们上线了以下功能:
8.基于Checkpoint复制的快速容灾
当实时计算应用在重要业务场景时,单个Yarn集群一旦出现故障且短期内不可恢复,那么可能会对业务造成较大影响。
在此背景下,我们建设了Yarn多集群架构,两个独立的Yarn各自对应一套独立的HDFS环境,checkpoint数据定期在两个HDFS间相互复制。目前checkpoint复制的延迟稳定在20分钟内。
同时,在平台层面,我们把切换集群的功能直接开放给用户,用户可以在线查看checkpoint的复制情况,选择合适的checkpoint后(当然也可以选择不从checkpoint恢复)进行集群切换,然后重启作业,实现作业在集群间的相对平滑的迁移。
AutoStream平台的核心场景是支持实时计算开发人员的使用,使实时计算开发变得简单高效、可监控、易运维。同时随着平台的逐步完善,我们开始摸索如何对AutoStream平台进行重用,如何让Flink应用在更多场景下。重用AutoStream有以下几点优势:
基于以上几点,我们在建设其他系统时,优先重用AutoStream平台,以接口调用的方式进行对接,将Flink作业全流程的生命周期,完全托管给AutoStream平台,各系统优先考虑实现自身的业务逻辑即可。
我们团队内的AutoDTS(接入及分发任务)和AutoKafka(Kafka集群复制)系统目前就是依托于AutoStream建设的。简单介绍一下集成的方式,以AutoDTS为例:
1.AutoDTS数据接入分发平台
AutoDTS系统主要包含两部分功能:
1.1AutoDTS数据接入
下面是数据接入的架构图:
数据接入到KafkaTopic的同时,Topic会自动注册为一张AutoStream平台上的流表,方便用户使用。
数据接入基于Flink建设还有一个额外的好处,就是可以基于Flink的精确一次语义,低成本的实现精确一次数据接入,这对支持数据准确性要求很高的业务来说,是一个必要条件。
目前我们在做把业务表中的全量数据接入KafkaTopic中,基于Kafka的compact模式,可以实现Topic中同时包含存量数据和增量数据。这对于数据分发场景来说是十分友好的,目前如果想把数据实时同步到其他存储引擎中,需要先基于调度系统,接入一次全量数据,然后再开启实时分发任务,进行变更数据的实时分发。有了CompactTopic后,可以省去全量接入的操作。Flink1.12版本已经对CompactTopic做支持,引入upsert-kafkaConnector[1]
下面是一条样例数据:
下面是使用流表的示例:
1.2AutoDTS数据分发
我们已经知道,接入到Kafka中的数据是可以当做一张流表来使用的,而数据分发任务本质上是把这个流表的数据写入到其他存储引擎,鉴于AutoStream平台已经支持多种TableSink(Connector),我们只需要根据用户填写的下游存储的类型和地址等信息,就可以通过拼装SQL来实现数据的分发。
通过直接重用Connector的方式,最大化的避免了重复开发的工作。
下面是一个分发任务对应的SQL示例:
2.Kaka多集群架构
Kafka在实际应用中,有些场景是需要做Kafka多集群架构支持的,下面列举几个常见的场景:
2.1整体架构
先来看一下Kafka集群间的数据复制,这是建设多集群架构的基础。我们是使用MirrorMaker2来实现数据复制的,我们把MirrorMaker2改造成普通的Flink作业,运行在Flink集群中。
我们引入了RouteService和KafkaSDK,实现客户端快速切换访问的Kafka集群。
SDK会监听路由规则的变化,当需要切换集群时,只需要在RouteService后台切换路由规则,SDK发现路由集群发生变化时,会重启Producer/Consumer实例,切换到新集群。
如果是消费者发生了集群切换,由于Cluster1和Cluster2中Topic的offset是不同的,需要通过OffsetMappingService来获取当前ConsumerGroup在Cluster2中的offset,然后从这些Offset开始消费,实现相对平滑的集群切换。
2.2Kafka集群间的数据复制
我们使用MirrorMaker2来实现集群间的数据复制,MirrorMaker2是Kafka2.4版本引入的,具体以下特性:
clusters=primary,backup
primary.bootstrap.servers=vip1:9091
backup.bootstrap.servers=vip2:9092
primary->backup.enabled=true
backup->primary.enabled=true
这段配置完成primary到backup集群的双向数据复制,primary集群中的topic1中的数据会复制到backup集群中的primary.topic1这个Topic中,目标集群的Topic命名规则是sourceCluster.sourceTopicName,可以通过实现ReplicationPolicy接口来自定义命名策略。
上面三个用的都是connectruntime模块中的KafkaBasedLog工具类,这个工具类可以读写一个compact模式的topic数据,此时MirrorMaker2把topic当作KV存储使用。
sourceCluster.checkpoints.internal:记录sourceClusterconsumergroup在当前集群对应的offset,mm2会定期从源kafka集群读取topic对应的consumergroup提交的offset,并写到目标集群的sourceCluster.checkpoints.internaltopic中。
2.4MirrorMaker2的部署
下面是MirrorMaker2作业运行的流程,在AutoKafka平台上创建一个数据复制作业,会调用AutoStream平台接口,相应的创建一个MM2类型的作业。启动作业时,会调用AutoStream平台的接口把MM2作业提交到Flink集群中运行。
2.5路由服务
RouteService负责处理客户端的路由请求,根据客户端的信息匹配合适的路由规则,将最终路由结果,也就是集群信息返回给客户端。
支持基于集群名称、Topic、Group、ClientID以及客户端自定义的参数灵活配置路由规则。
下面的例子就是将Flink作业ID为1234的消费者,路由到cluster_a1集群。
2.6KafkaSDK
使用原生的kafka-clients是无法和RouteService进行通信的,客户端需要依赖我们提供的KafkaSDK(汽车之家内部开发的SDK)能和RouteService通信,实现动态路由的效果。
KafkaSDK实现了Producer、Consumer接口,本质是kafka-clients的代理,业务做较少的改动就可以引入KafkaSDK。
业务依赖KafkaSDK后,KafkaSDK会负责和RouteService通信,监听路由变化,当发现路由的集群发生变化时,会close当前的Producer/Consumer,创建新的Producer/Consumer,访问新的集群。
此外KafkaSDK还负责将Producer、Consumer的metric统一上报到云监控系统的prometheus,通过查看平台预先配置好的仪表盘,可以清晰的看到业务的生产、消费情况。
同时SDK会收集一些信息,比如应用名称、IP端口、进程号等,这些信息可以在AutoKafka平台上查到,方便我们和用户共同定位问题。
2.7OffsetMappingService
当Consumer的路由发生变化并切换集群时,情况有一些复杂,因为目前MirrorMaker2是先把数据从源集群消费出来,再写入到目标集群的,同一条数据可以确保写入到目标topic的相同分区,但是offset和源集群是不同的。
针对这种offset不一致的情况,MirrorMaker2会消费源集群的__consumer_offsets数据,加上目标集群对应的offset,写入到目标集群的sourceCluster.checkpoints.internaltopic中。
同时,源集群的mm2-offset-syncs.targetCluster.internaltopic记录了源集群和目标集群offset的映射关系,结合这两个topic,我们建设了OffsetMappingService来完成目标集群的offset的转换工作。
所以当Consumer需要切换集群时,会调用OffsetMappingService的接口,获取到目标集群的offsets,然后主动seek到这些位置开始消费,这样实现相对平滑的集群切换工作。
2.8Flink与Kafka多集群架构的集成
由于KafkaSDK兼容kafka-clients的用法,用户只需要更换依赖,然后设置cluster.code、Flink.id等参数即可。
当Producer/Consumer发生集群切换后,由于创建了新的Producer/Consumer实例,Kafka的metric数据没有重新注册,导致metric数据无法正常上报。我们在AbstractMetricGroup类中增加了unregister方法,在监听Producer/Consumer的切换事件时,重新注册kafkametrics就可以了。