源码公众号后台回复1.13.2最全flinksql获取。
ApacheFlink提供了两种关系型API用于统一流和批处理,Table和SQLAPI。
importorg.apache.flink.table.api.*;importstaticorg.apache.flink.table.api.Expressions.*;EnvironmentSettingssettings=EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironmenttEnv=TableEnvironment.create(settings);//下面就是TableAPI的案例,其语义等同于//selecta,count(b)ascnt//fromOrders//groupbyaDataSet
TableAPI和SQLAPI也与DataStreamAPI做到了无缝集成。可以轻松地在三种API之间灵活切换。例如,可以使用SQL的MATCH_RECOGNIZE子句匹配出异常的数据,然后使用再转为DataStreamAPI去灵活的构建针对于异常数据的自定义报警机制。
在xdm大体了解了这两个API是干啥的之后,我们就可以直接来看看,怎么使用这两个API了。
根据小伙伴们使用的编程语言的不同(Java或Scala),需要将对应的依赖包添加到项目中。
在小伙伴萌看下文之前,先看一下2.2节整体的思路,跟着博主思路走,会更清晰:
无论是对于SQLAPI来说还是对于TableAPI来说,都是使用TableEnvironment接口承载我们的业务查询逻辑的。只是在用户的使用接口的方式上有区别,以上述的Java代码为例,TableAPI其实就是模拟SQL的查询方式封装了Java语言的lambda强类型API,SQL就是纯SQL查询。Table和SQL很多时候都是掺杂在一起的,大家理解的时候就可以直接将Table和SQLAPI直接按照SQL进行理解,不用强行做特殊的区分。
而且博主推荐的话,直接上SQLAPI就行,其实TableAPI在企业实战中用的不是特别多。你说TableAPI方便吧,它确实比DataStreamAPI方便,但是又比SQL复杂。一般生产使用不多。
注意:由于Table和SQLAPI基本上属于一回事,后续如果没有特别介绍的话,博主就直接按照SQLAPI进行介绍了。
如果xdm想直接上手运行一段FlinkSQL的代码。
可以直接在公众号后台回复1.13.2最全flinksql获取源代码。所有的源码都开源到github上面了。里面包含了非常多的案例。可以直接拿来在本地运行的!!!肥肠的方便。
TableEnvironment是使用SQLAPI永远都离不开的一个接口。其是SQLAPI使用的入口(上下文),就像是你要使用JavaDataStreamAPI去写一个Flink任务需要使用到StreamExecutionEnvironment一样。
可以认为TableEnvironment在SQLAPI中的地位和StreamExecutionEnvironment在DataStream中的地位是一样的,都是包含了一个Flink任务运行时的所有上下文环境信息。大家这样对比学习会比较好理解。
TableEnvironment包含的功能如下:
接下来介绍如何创建一个TableEnvironment。案例为Java。easygame。
如果你是inStreamingMode,则最终创建出来的TableEnvironment实例为StreamTableEnvironmentImpl。
如果你是inBatchMode,则最终创建出来的TableEnvironment实例为TableEnvironmentImpl。
它两虽然都继承了TableEnvironment接口,但是StreamTableEnvironmentImpl支持的功能更多一些。大家可以直接去看看接口实验一下,这里就不进行详细介绍。
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);2.2.3.SQL中表的概念一个表的全名(标识)会由三个部分组成:Catalog名称.数据库名称.表名称。如果Catalog名称或者数据库名称没有指明,就会使用当前默认值default。
举个例子,下面这个SQL创建的Table的全名为default.default.table1。
注意:这里有不同的地方就是,离线HiveMetaStore中不会有Catalog这个概念,其标识都是数据库.数据表。
表(视图、外部表)可以是临时的,并与单个Flinksession(可以理解为Flink任务运行一次就是一个session)的生命周期绑定。
表(视图、外部表)也可以是永久的,并且对多个Flinksession都生效。
xdm,是不是又和Hive一样?惊不惊喜意不意外。对比学习+1。
上文已经说了,一个VIEW其实就是一段SQL逻辑的查询结果。
视图VIEW在TableAPI中的体现就是:一个Table的Java对象,其封装了一段查询逻辑。如下案例所示:
这种创建方式是不是贼熟悉,和离线Hive一样+1~
注意:在TableAPI中的一个Table对象被后续的多个查询使用的场景下:Table对象不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个Table的结果,小伙伴萌可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个Table的逻辑执行多次。类似于withtmpas(DML)的语法
首先,如果xdm想直接上手运行一段FlinkSQL的代码。
来看看一个SQL查询案例。
博主举一个案例:在pdd这种发补贴券的场景下,希望可以在发的补贴券总金额超过1w元时,及时报警出来,来帮助控制预算,防止发的太多。
对应的解决方案,我们可以想到使用SQL计算补贴券发放的结果,但是SQL的问题在于无法做到报警。所以我们可以将SQL的查询的结果(即Table对象)转为DataStream,然后就可以在DataStream后自定义报警逻辑的算子。
我们直接上SQL和DataStreamAPI互相转化的案例:
在介绍完一些基本概念之后,我们来认识一下,FlinkSQL中的数据类型。
FlinkSQL内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。
总共包含3部分:
博主认为读完本节你应该掌握:
在流式SQL诞生之前,所有的基于SQL的数据查询都是基于批数据的,没有将SQL应用到流数据处理这一说法。
那么如果我们想将SQL应用到流处理中,必然要站在巨人的肩膀(批数据处理的流程)上面进行,那么具体的分析思路如下:
博主下文就会根据上述三个步骤来一步一步介绍动态表诞生的背景以及这个概念是如何诞生的。
首先对比一下常见的批处理和流处理中数据源(输入表)、处理逻辑、数据汇(结果表)的异同点。
-
输入表
处理逻辑
结果表
批处理
静态表:输入数据有限、是有界集合
批式计算:每次执行查询能够访问到完整的输入数据,然后计算,输出完整的结果数据
静态表:数据有限
流处理
动态表:输入数据无限,数据实时增加,并且源源不断
流式计算:执行时不能够访问到完整的输入数据,每次计算的结果都是一个中间结果
动态表:数据无限
对比上述流批处理之后,我们得到了要将SQL应用于流式任务的三个要解决的核心点:
将上面3个点总结一下,也就引出了本节的动态表和连续查询两种技术方案:
动态表。这里的动态其实是相比于批处理的静态(有界)来说的。
来看一个具体的案例,下图显示了点击事件流(左侧)如何转换为动态表(右侧)。当数据源生成更多的点击事件记录时,映射出来的动态表也会不断增长,这就是动态表的概念:
DynamicTable
连续查询。
部分高级关系数据库系统提供了一个称为物化视图(MaterializedViews)的特性。
物化视图其实就是一条SQL查询,就像常规的虚拟视图VIEW一样。但与虚拟视图不同的是,物化视图会缓存查询的结果,因此在请求访问视图时不需要对查询进行重新计算,可以直接获取物化视图的结果,小伙伴萌可以认为物化视图其实就是把结果缓存了下来。
举个例子:批处理中,如果以Hive天级别的物化视图来说,其实就是每天等数据源ready之后,调度物化视图的SQL执行然后产生新的结果提供服务。那么就可以认为一条表示了输入、处理、输出的SQL就是一个构建物化视图的过程。
映射到我们的流任务中,输入、处理逻辑、输出这一套流程也是一个物化视图的概念。相比批处理来说,流处理中,我们的数据源表的数据是源源不断的。那么从输入、处理、输出的整个物化视图的维护流程也必须是实时的。
因此我们就需要引入一种实时视图维护(EagerViewMaintenance)的技术去做到:一旦更新了物化视图的数据源表就立即更新视图的结果,从而保证输出的结果也是最新的。
这种实时视图维护(EagerViewMaintenance)的技术就叫做连续查询。
注意:
总结前两节,动态表&连续查询两项技术在一条流SQL中的执行流程总共包含了三个步骤,如下图及总结所示:
Query
我们实际介绍一个案例来看看其运行方式,以上文介绍到的点击事件流为例,点击事件流数据的字段如下:
下面介绍两个查询的案例:
第一个查询:一个简单的GROUP-BYCOUNT聚合查询,写过SQL的都不会陌生吧,这种应该都是最基础,最常用的对数据按照类别分组的方法。
如下图所示groupby聚合的常用案例。
time
那么本案例中呢,是基于clicks表中user字段对clicks表(点击事件流)进行分组,来统计每一个user的访问的URL的数量。下面的图展示了当clicks输入表来了新数据(即表更新时),连续查询(ContinuousQuery)的计算逻辑。
groupagg
当查询开始,clicks表(左侧)是空的。
注意上述特殊标记出来的字体,可以看到连续查询对于结果的数据输出方式有两种:
大家对于插入(insert)结果表这件事都比较好理解,因为离线数据都只有插入这个概念。
但是更新(update)结果表就是离线处理中没有概念了。这就是连续查询中中比较重要一个概念。后文会介绍。
接下来介绍第二条查询语句。
第二条查询与第一条类似,但是groupby中除了user字段之外,还groupby了tumble,其代表开了个滚动窗口(后面会详细说明滚动窗口的作用),然后计算url数量。
tumblewindow
而这个查询只有插入(insert)结果表这个行为。
虽然前一节的两个查询看起来非常相似(都计算分组进行计数聚合),但它们在一个重要方面不同:
上面是FlinkSQL连续查询处理机制上面的两类查询方式。我们可以发现连续查询的处理机制不一样,产出到结果表中的结果数据也是不一样的。针对上面两种结果表的更新方式,FlinkSQL提出了changelog表的概念来进行兼容。
changelog表这个概念其实就和MySQLbinlog是一样的。会包含INSERT、UPDATE、DELETE三种数据,通过这三种数据的处理来描述实时处理技术对于动态表的变更:
可以看到我们的标题都是随着一个SQL的生命周期的。从输入流映射为SQL动态输入表、实时处理底层技术-SQL连续查询到本小节的SQL动态输出表转化为输出数据。都是有逻辑关系的。
我们上面介绍到了连续查询(ContinuousQuery)的输出结果表是一个changelog。其可以像普通数据库表一样通过INSERT、UPDATE和DELETE来不断修改。
它可能是一个只有一行、不断更新changelog表,也可能是一个insert-only的changelog表,没有UPDATE和DELETE修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些不同状态的数据进行编码。Flink的TableAPI和SQLAPI支持三种方式来编码一个动态表的变化:
retract
upsert
小伙伴萌会问到,关系代数是啥东西?
其实关系代数就是对于数据集(即表)的一系列的操作(即查询语句)。常见关系代数有:
RelationalAlgebra
那么SQL和关系代数是啥关系呢?
SQL就是能够表示关系代数一种面向用户的接口:即用户能使用SQL表达关系代数的处理逻辑,也就是我们可以用SQL去在表(数据集)上执行我们的业务逻辑操作(关系代数操作)。
在小伙伴萌看下文之前,先看一下2.5节整体的思路,跟着博主思路走:
小伙伴萌要注意到:
解决方案必须要有啊。如下。
至少博主目前没有碰到过,因为这个问题在底层的数据集成系统都已经给解决了,小伙伴萌拿到手的ODS层表都是已经按照所在地区的时区给格式化好的了。
举个例子:小伙伴萌看到日期分区为2022-01-01的Hive表时,可以默认认为该分区中的数据就对应到你所在地区的时区的2022-01-01日的数据。
而本节SQL时区旨在帮助大家了解到以下两个场景的问题:
而以上两个场景就会导致:
因此充分了解本节的知识内容可以很好的帮你避免时区问题错误。
在FlinkSQLclient中执行结果如下:
>nc-lk9999A,1.1,2021-04-1514:01:00B,1.2,2021-04-1514:02:00A,1.8,2021-04-1514:03:00B,2.5,2021-04-1514:04:00C,3.8,2021-04-1514:05:00C,3.8,2021-04-1514:11:00