(上)史上最全干货!FlinkSQL成神之路(全文18万字138个案例42张图)腾讯云开发者社区

源码公众号后台回复1.13.2最全flinksql获取。

ApacheFlink提供了两种关系型API用于统一流和批处理,Table和SQLAPI。

importorg.apache.flink.table.api.*;importstaticorg.apache.flink.table.api.Expressions.*;EnvironmentSettingssettings=EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironmenttEnv=TableEnvironment.create(settings);//下面就是TableAPI的案例,其语义等同于//selecta,count(b)ascnt//fromOrders//groupbyaDataSetresult=tEnv.from("Orders").groupBy($("a")).select($("a"),$("b").count().as("cnt")).toDataSet(counts,Row.class);result.print();代码语言:javascript复制insertintotargetselecta,count(b)ascntfromOrdersgroupbya注意:无论输入是连续(流处理)还是有界(批处理),在Table和SQL任一API中同一条查询语句是具有相同的语义并且会产出相同的结果的。这就是说为什么FlinkSQL和TableAPI可以做到在用户接口层面的流批统一。xdm,用一套SQL既能跑流任务,也能跑批任务,它不香嘛?

TableAPI和SQLAPI也与DataStreamAPI做到了无缝集成。可以轻松地在三种API之间灵活切换。例如,可以使用SQL的MATCH_RECOGNIZE子句匹配出异常的数据,然后使用再转为DataStreamAPI去灵活的构建针对于异常数据的自定义报警机制。

在xdm大体了解了这两个API是干啥的之后,我们就可以直接来看看,怎么使用这两个API了。

根据小伙伴们使用的编程语言的不同(Java或Scala),需要将对应的依赖包添加到项目中。

在小伙伴萌看下文之前,先看一下2.2节整体的思路,跟着博主思路走,会更清晰:

无论是对于SQLAPI来说还是对于TableAPI来说,都是使用TableEnvironment接口承载我们的业务查询逻辑的。只是在用户的使用接口的方式上有区别,以上述的Java代码为例,TableAPI其实就是模拟SQL的查询方式封装了Java语言的lambda强类型API,SQL就是纯SQL查询。Table和SQL很多时候都是掺杂在一起的,大家理解的时候就可以直接将Table和SQLAPI直接按照SQL进行理解,不用强行做特殊的区分。

而且博主推荐的话,直接上SQLAPI就行,其实TableAPI在企业实战中用的不是特别多。你说TableAPI方便吧,它确实比DataStreamAPI方便,但是又比SQL复杂。一般生产使用不多。

注意:由于Table和SQLAPI基本上属于一回事,后续如果没有特别介绍的话,博主就直接按照SQLAPI进行介绍了。

如果xdm想直接上手运行一段FlinkSQL的代码。

可以直接在公众号后台回复1.13.2最全flinksql获取源代码。所有的源码都开源到github上面了。里面包含了非常多的案例。可以直接拿来在本地运行的!!!肥肠的方便。

TableEnvironment是使用SQLAPI永远都离不开的一个接口。其是SQLAPI使用的入口(上下文),就像是你要使用JavaDataStreamAPI去写一个Flink任务需要使用到StreamExecutionEnvironment一样。

可以认为TableEnvironment在SQLAPI中的地位和StreamExecutionEnvironment在DataStream中的地位是一样的,都是包含了一个Flink任务运行时的所有上下文环境信息。大家这样对比学习会比较好理解。

TableEnvironment包含的功能如下:

接下来介绍如何创建一个TableEnvironment。案例为Java。easygame。

如果你是inStreamingMode,则最终创建出来的TableEnvironment实例为StreamTableEnvironmentImpl。

如果你是inBatchMode,则最终创建出来的TableEnvironment实例为TableEnvironmentImpl。

它两虽然都继承了TableEnvironment接口,但是StreamTableEnvironmentImpl支持的功能更多一些。大家可以直接去看看接口实验一下,这里就不进行详细介绍。

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);2.2.3.SQL中表的概念一个表的全名(标识)会由三个部分组成:Catalog名称.数据库名称.表名称。如果Catalog名称或者数据库名称没有指明,就会使用当前默认值default。

举个例子,下面这个SQL创建的Table的全名为default.default.table1。

注意:这里有不同的地方就是,离线HiveMetaStore中不会有Catalog这个概念,其标识都是数据库.数据表。

表(视图、外部表)可以是临时的,并与单个Flinksession(可以理解为Flink任务运行一次就是一个session)的生命周期绑定。

表(视图、外部表)也可以是永久的,并且对多个Flinksession都生效。

xdm,是不是又和Hive一样?惊不惊喜意不意外。对比学习+1。

上文已经说了,一个VIEW其实就是一段SQL逻辑的查询结果。

视图VIEW在TableAPI中的体现就是:一个Table的Java对象,其封装了一段查询逻辑。如下案例所示:

这种创建方式是不是贼熟悉,和离线Hive一样+1~

注意:在TableAPI中的一个Table对象被后续的多个查询使用的场景下:Table对象不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个Table的结果,小伙伴萌可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个Table的逻辑执行多次。类似于withtmpas(DML)的语法

首先,如果xdm想直接上手运行一段FlinkSQL的代码。

来看看一个SQL查询案例。

博主举一个案例:在pdd这种发补贴券的场景下,希望可以在发的补贴券总金额超过1w元时,及时报警出来,来帮助控制预算,防止发的太多。

对应的解决方案,我们可以想到使用SQL计算补贴券发放的结果,但是SQL的问题在于无法做到报警。所以我们可以将SQL的查询的结果(即Table对象)转为DataStream,然后就可以在DataStream后自定义报警逻辑的算子。

我们直接上SQL和DataStreamAPI互相转化的案例:

在介绍完一些基本概念之后,我们来认识一下,FlinkSQL中的数据类型。

FlinkSQL内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。

总共包含3部分:

博主认为读完本节你应该掌握:

在流式SQL诞生之前,所有的基于SQL的数据查询都是基于批数据的,没有将SQL应用到流数据处理这一说法。

那么如果我们想将SQL应用到流处理中,必然要站在巨人的肩膀(批数据处理的流程)上面进行,那么具体的分析思路如下:

博主下文就会根据上述三个步骤来一步一步介绍动态表诞生的背景以及这个概念是如何诞生的。

首先对比一下常见的批处理和流处理中数据源(输入表)、处理逻辑、数据汇(结果表)的异同点。

-

输入表

处理逻辑

结果表

批处理

静态表:输入数据有限、是有界集合

批式计算:每次执行查询能够访问到完整的输入数据,然后计算,输出完整的结果数据

静态表:数据有限

流处理

动态表:输入数据无限,数据实时增加,并且源源不断

流式计算:执行时不能够访问到完整的输入数据,每次计算的结果都是一个中间结果

动态表:数据无限

对比上述流批处理之后,我们得到了要将SQL应用于流式任务的三个要解决的核心点:

将上面3个点总结一下,也就引出了本节的动态表和连续查询两种技术方案:

动态表。这里的动态其实是相比于批处理的静态(有界)来说的。

来看一个具体的案例,下图显示了点击事件流(左侧)如何转换为动态表(右侧)。当数据源生成更多的点击事件记录时,映射出来的动态表也会不断增长,这就是动态表的概念:

DynamicTable

连续查询。

部分高级关系数据库系统提供了一个称为物化视图(MaterializedViews)的特性。

物化视图其实就是一条SQL查询,就像常规的虚拟视图VIEW一样。但与虚拟视图不同的是,物化视图会缓存查询的结果,因此在请求访问视图时不需要对查询进行重新计算,可以直接获取物化视图的结果,小伙伴萌可以认为物化视图其实就是把结果缓存了下来。

举个例子:批处理中,如果以Hive天级别的物化视图来说,其实就是每天等数据源ready之后,调度物化视图的SQL执行然后产生新的结果提供服务。那么就可以认为一条表示了输入、处理、输出的SQL就是一个构建物化视图的过程。

映射到我们的流任务中,输入、处理逻辑、输出这一套流程也是一个物化视图的概念。相比批处理来说,流处理中,我们的数据源表的数据是源源不断的。那么从输入、处理、输出的整个物化视图的维护流程也必须是实时的。

因此我们就需要引入一种实时视图维护(EagerViewMaintenance)的技术去做到:一旦更新了物化视图的数据源表就立即更新视图的结果,从而保证输出的结果也是最新的。

这种实时视图维护(EagerViewMaintenance)的技术就叫做连续查询。

注意:

总结前两节,动态表&连续查询两项技术在一条流SQL中的执行流程总共包含了三个步骤,如下图及总结所示:

Query

我们实际介绍一个案例来看看其运行方式,以上文介绍到的点击事件流为例,点击事件流数据的字段如下:

下面介绍两个查询的案例:

第一个查询:一个简单的GROUP-BYCOUNT聚合查询,写过SQL的都不会陌生吧,这种应该都是最基础,最常用的对数据按照类别分组的方法。

如下图所示groupby聚合的常用案例。

time

那么本案例中呢,是基于clicks表中user字段对clicks表(点击事件流)进行分组,来统计每一个user的访问的URL的数量。下面的图展示了当clicks输入表来了新数据(即表更新时),连续查询(ContinuousQuery)的计算逻辑。

groupagg

当查询开始,clicks表(左侧)是空的。

注意上述特殊标记出来的字体,可以看到连续查询对于结果的数据输出方式有两种:

大家对于插入(insert)结果表这件事都比较好理解,因为离线数据都只有插入这个概念。

但是更新(update)结果表就是离线处理中没有概念了。这就是连续查询中中比较重要一个概念。后文会介绍。

接下来介绍第二条查询语句。

第二条查询与第一条类似,但是groupby中除了user字段之外,还groupby了tumble,其代表开了个滚动窗口(后面会详细说明滚动窗口的作用),然后计算url数量。

tumblewindow

而这个查询只有插入(insert)结果表这个行为。

虽然前一节的两个查询看起来非常相似(都计算分组进行计数聚合),但它们在一个重要方面不同:

上面是FlinkSQL连续查询处理机制上面的两类查询方式。我们可以发现连续查询的处理机制不一样,产出到结果表中的结果数据也是不一样的。针对上面两种结果表的更新方式,FlinkSQL提出了changelog表的概念来进行兼容。

changelog表这个概念其实就和MySQLbinlog是一样的。会包含INSERT、UPDATE、DELETE三种数据,通过这三种数据的处理来描述实时处理技术对于动态表的变更:

可以看到我们的标题都是随着一个SQL的生命周期的。从输入流映射为SQL动态输入表、实时处理底层技术-SQL连续查询到本小节的SQL动态输出表转化为输出数据。都是有逻辑关系的。

我们上面介绍到了连续查询(ContinuousQuery)的输出结果表是一个changelog。其可以像普通数据库表一样通过INSERT、UPDATE和DELETE来不断修改。

它可能是一个只有一行、不断更新changelog表,也可能是一个insert-only的changelog表,没有UPDATE和DELETE修改,或者介于两者之间的其他表。

在将动态表转换为流或将其写入外部系统时,需要对这些不同状态的数据进行编码。Flink的TableAPI和SQLAPI支持三种方式来编码一个动态表的变化:

retract

upsert

小伙伴萌会问到,关系代数是啥东西?

其实关系代数就是对于数据集(即表)的一系列的操作(即查询语句)。常见关系代数有:

RelationalAlgebra

那么SQL和关系代数是啥关系呢?

SQL就是能够表示关系代数一种面向用户的接口:即用户能使用SQL表达关系代数的处理逻辑,也就是我们可以用SQL去在表(数据集)上执行我们的业务逻辑操作(关系代数操作)。

在小伙伴萌看下文之前,先看一下2.5节整体的思路,跟着博主思路走:

小伙伴萌要注意到:

解决方案必须要有啊。如下。

至少博主目前没有碰到过,因为这个问题在底层的数据集成系统都已经给解决了,小伙伴萌拿到手的ODS层表都是已经按照所在地区的时区给格式化好的了。

举个例子:小伙伴萌看到日期分区为2022-01-01的Hive表时,可以默认认为该分区中的数据就对应到你所在地区的时区的2022-01-01日的数据。

而本节SQL时区旨在帮助大家了解到以下两个场景的问题:

而以上两个场景就会导致:

因此充分了解本节的知识内容可以很好的帮你避免时区问题错误。

在FlinkSQLclient中执行结果如下:

>nc-lk9999A,1.1,2021-04-1514:01:00B,1.2,2021-04-1514:02:00A,1.8,2021-04-1514:03:00B,2.5,2021-04-1514:04:00C,3.8,2021-04-1514:05:00C,3.8,2021-04-1514:11:00

THE END
1.26道家常菜菜谱大全,勤劳下厨的人先享受美食,每一道菜都很美味鸡腿26道家常菜菜谱大全,勤劳下厨的人先享受美食,每一道菜都很美味,美食,鸡腿,蒜香,咖喱,超级,手撕鸡,咕噜肉,酸菜鱼,家常菜https://www.163.com/dy/article/JIAKV2AK05568JTI.html
2.家常菜谱家常菜谱1000例菜谱大全家常菜谱大全做法家常菜谱1000例图片,又名"家常菜谱大全带图片"顾名思义,就是有超过1000例以上并带有图片的家庭饭桌上常见的一些家常菜谱的菜式。家常菜谱1000例图片做法中常见的食材有家禽、海产、蔬菜、水果、熟食、谷类、中式早点等。 有了食材,调味品也是不可少的,家常菜谱大全带图片中介绍的调料主要有盐、糖、花椒粉、味精、http://www.360doc.com/content/16/1112/19/15294959_606036616.shtml
3.RuoYi框架RuoYi框架学习超简单案例新闻管理系统(附源码)做一个简单的新闻系统。 每一篇新闻有:所属菜单、标题、内容(内容包含图片和文字)、创建日期、作者。 新闻菜单有(树表):菜单名称、菜单类型、菜单图标、菜单是否展示 二、数据库设计 注释不要忘记,代码生成的时候会用到注释!(注释时请不要加上我括号里面解释的内容) https://blog.csdn.net/weixin_44034328/article/details/104081504
4.超详细超简单的fullPage.js插件API实例mingjixiaohuifullPage简单例子 *{ margin:0; padding:0; } body{ font-size:40px; color:#000; } .slide{ font-size:40px; text-align:center; } #nav{ position:fixed; left:0; top: 0; width:100%; height:50px; background:rgba(200,200,200,0.5); z-index:99; } #nav ul{ height:100%; listhttps://www.cnblogs.com/mingjixiaohui/p/5424494.html
5.2024-接着我会引入本节课的主题:“今天我们将学习如何炒土豆丝,这是一道非常常见且营养丰富的家常菜。” 2.知识讲解 -我会简要介绍土豆的营养价值,以及炒土豆丝的步骤和要点。 -通过展示教材中的图片和文字说明,我会详细讲解切土豆丝的技巧,如何保持土豆丝的爽脆口感。 3.示范操作 -在讲台上,我会现场示范切土豆丝https://m.book118.com/html/2024/1128/7120050015010003.shtm
6.新濠超防滑大理石瓷砖潮居案例:138m2在质感中享受高级品味极简并不是简单、简约的廉价,而是一种轻奢,简约到极致的设计,是一种简于外形,而轻奢在内心的设计,是高度自信的诠释。新濠超防滑大理石瓷砖打造的江苏丹阳-中南二期,就是以极简实用为前提,佐以纯粹质感为载体,呈现一种岁月静好的生活状态。 首先映入眼帘的是过道地面,这里设计的是素雅的空间写意与线性的高质感,当光https://digi.china.com/digi/20241202/202412021611086.html
7.shchijie.cn/xxxr94184621.htm白萝卜的吃法大全家常菜的做法 欧美日韩抽插轮奸视频 五级黄色毛片 老太太性爱俱乐部 《清纯女主被大鸡巴帅哥操在线阅读全文》 草久久黄色网站 丰满少妇免费国语电影 中国一级黄色视频 美女奶黄网站在线观看免费 巧妇美穴AV 男人和女人在床上污污的软件 国语国产自制在线小视频 亚州一区H视频 A级黄在线观看 http://shchijie.cn/xxxr94184621.htm
8.海宁4月27日0时起解除全市所有三区!4.2疫情终于完结了!4月7日18时到24时,海宁市在开展核酸筛查时,新增9例初筛阳性人员,经复核结果为阳性,均为集中隔离人员。本轮疫情发生以来,海宁市累计报告本土新冠肺炎阳性病例133例,新增的9例主要活动轨迹后续将向社会公布。 病例125:徐某某,男,65岁,现住地址: 硖石街道水月亭14幢2单元,工作地址:硖石街道锦霞家常菜(太平弄1-1https://jiaxing.19lou.com/forum-778-thread-96191648930031480-1-1.html
9.67277777.com/xxxr94100823.htm新增本土确诊病例43例:外省返川闭环转运人员2例,成都7例,内江11例,宜宾5例,18例由既往无症状感染者51.79MB 69%好评138人) 熟女掰穴 国产一级内谢视频 精品人妻少妇一级毛片免费桃色 95.54MB 71无码人妻一区二区三区在线神菜美 32.23MB 14%好评2816人) 黑人大屌插b 女生和男生一起努力生http://67277777.com/xxxr94100823.htm
10.日本广岛17日新增确诊138例创新高医院等多场所发生群聚传染人民网东京12月18日电(李沐航) 据日本《朝日新闻》网站报道,日本广岛县公布,17日新增确诊病例138例,超过本月11日和14日的110例,再创单日确诊病例最高值。当日,死亡病例1例,累计死亡病例达13例。截至目前,广岛县确诊病例已达2027例。 据了解,截至16日,广岛市立广岛市民医院已有11人确诊感染新冠病毒。其中9人为http://m.people.cn/n4/2020/1218/c58-14634755.html
11.www.shrjd.com/xxxr98235139关小雨被?超污网站 国产欧美一级内谢\ 国产男女又大又粗又长网站下载 鬼父第一季在线观看免费完整番外翻译 黄色毛片A级操逼免费看 欧美日大鸡巴干美女哦哦 特级毛片黄色网站 骚逼爆操 淫乱”肉肏毛片国产 uygur porin 撕开内衣挠奶头和屁股视频 91国人群交 小yoyo?交精品 麻豆色资源 美女秘无http://www.shrjd.com/xxxr98235139