此套面试题来自于各大厂的真实面试题及常问的知识点,如果能理解吃透这些问题,你的大数据能力将会大大提升,进入大厂指日可待
复习大数据面试题,看这一套就够了!
本文目录:
一、Hadoop二、Hive三、Spark四、Kafka五、HBase六、Flink七、Clickhouse八、Doris九、数据仓库十、数据湖十一、必备SQL题十二、必备算法十三、大数据算法设计题
版本更新如下:
Hadoop中常问的就三块,第一:分布式存储(HDFS);第二:分布式计算框架(MapReduce);第三:资源调度框架(YARN)。
这个问题虽然见过无数次,面试官问过无数次,还是有不少面试者不能完整的说出来,所以请务必记住。并且很多问题都是从HDFS读写流程中引申出来的。
HDFS写流程:
注:Hadoop在设计时考虑到数据的安全与高效,数据文件默认在HDFS上存放三份,存储策略为本地一份,同机架内其它某一节点上一份,不同机架的某一节点上一份。
HDFS读流程:
客户端读取完DataNode上的块之后会进行checksum验证,也就是把客户端读取到本地的块与HDFS上的原始块进行校验,如果发现校验结果不一致,客户端会通知NameNode,然后再从下一个拥有该block副本的DataNode继续读。
客户端上传文件时与DataNode建立pipeline管道,管道的正方向是客户端向DataNode发送的数据包,管道反向是DataNode向客户端发送ack确认,也就是正确接收到数据包之后发送一个已确认接收到的应答。
当DataNode突然挂掉了,客户端接收不到这个DataNode发送的ack确认,客户端会通知NameNode,NameNode检查该块的副本与规定的不符,NameNode会通知DataNode去复制副本,并将挂掉的DataNode作下线处理,不再让它参与文件上传与下载。
NameNode数据存储在内存和本地磁盘,本地磁盘数据存储在fsimage镜像文件和edits编辑日志文件。
首次启动NameNode:
第二次启动NameNode:
SecondaryNameNode是合并NameNode的editlogs到fsimage文件中;
它的具体工作机制:
所以如果NameNode中的元数据丢失,是可以从SecondaryNameNode恢复一部分元数据信息的,但不是全部,因为NameNode正在写的edits日志还没有拷贝到SecondaryNameNode,这部分恢复不了。
这个问题就要说NameNode的高可用了,即NameNodeHA。
一个NameNode有单点故障的问题,那就配置双NameNode,配置有两个关键点,一是必须要保证这两个NameNode的元数据信息必须要同步的,二是一个NameNode挂掉之后另一个要立马补上。
如果面试官再问HA中的共享存储是怎么实现的知道吗?可以进行解释下:NameNode共享存储方案有很多,比如LinuxHA,VMwareFT,QJM等,目前社区已经把由Clouderea公司实现的基于QJM(QuorumJournalManager)的方案合并到HDFS的trunk之中并且作为默认的共享存储实现。基于QJM的共享存储系统主要用于保存EditLog,并不保存FSImage文件。FSImage文件还是在NameNode的本地磁盘上。QJM共享存储的基本思想来自于Paxos算法,采用多个称为JournalNode的节点组成的JournalNode集群来存储EditLog。每个JournalNode保存同样的EditLog副本。每次NameNode写EditLog的时候,除了向本地磁盘写入EditLog之外,也会并行地向JournalNode集群之中的每一个JournalNode发送写请求,只要大多数的JournalNode节点返回成功就认为向JournalNode集群写入EditLog成功。如果有2N+1台JournalNode,那么根据大多数的原则,最多可以容忍有N台JournalNode节点挂掉。
假设NameNode1当前为Active状态,NameNode2当前为Standby状态。如果某一时刻NameNode1对应的ZKFailoverController进程发生了“假死”现象,那么Zookeeper服务端会认为NameNode1挂掉了,根据前面的主备切换逻辑,NameNode2会替代NameNode1进入Active状态。但是此时NameNode1可能仍然处于Active状态正常运行,这样NameNode1和NameNode2都处于Active状态,都可以对外提供服务。这种情况称为脑裂。
脑裂对于NameNode这类对数据一致性要求非常高的系统来说是灾难性的,数据会发生错乱且无法恢复。zookeeper社区对这种问题的解决方法叫做fencing,中文翻译为隔离,也就是想办法把旧的ActiveNameNode隔离起来,使它不能正常对外提供服务。
在进行fencing的时候,会执行以下的操作:
Hadoop上大量HDFS元数据信息存储在NameNode内存中,因此过多的小文件必定会压垮NameNode的内存。
每个元数据对象约占150byte,所以如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储1亿个文件,则NameNode需要20G空间。
显而易见的解决这个问题的方法就是合并小文件,可以选择在客户端上传时执行一定的策略先合并,或者是使用Hadoop的CombineFileInputFormat\
简单概述:
inputFile通过split被切割为多个split文件,通过Record按行读取内容给map(自己写的处理逻辑的方法),数据被map处理完之后交给OutputCollect收集器,对其结果key进行分区(默认使用的hashPartitioner),然后写入buffer,每个maptask都有一个内存缓冲区(环形缓冲区),存放着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式溢写到磁盘,当整个maptask结束后再对磁盘中这个maptask产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reducetask的拉取。
详细步骤:
简单描述:
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。
copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了。
开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
shuffle阶段分为四个步骤:依次为:分区,排序,规约,分组,其中前三个步骤在map阶段完成,最后一个步骤在reduce阶段完成。
shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M
在shuffle阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多。
hadoop当中支持的压缩算法:gzip、bzip2、LZO、LZ4、Snappy,这几种压缩算法综合压缩和解压缩的速率,谷歌的Snappy是最优的,一般都选择Snappy压缩。谷歌出品,必属精品。
规约(combiner)是不能够影响任务的运行结果的局部汇总,适用于求和类,不适用于求平均值,如果reduce的输入参数类型和输出参数的类型是一样的,则规约的类可以使用reduce类,只需要在驱动类中指明规约的类即可。
YARN的基本设计思想是将MapReduceV1中的JobTracker拆分为两个独立的服务:ResourceManager和ApplicationMaster。
ResourceManager负责整个系统的资源管理和分配,ApplicationMaster负责单个应用程序的的管理。
调度器根据容量、队列等限制条件,将系统中的资源分配给正在运行的应用程序,在保证容量、公平性和服务等级的前提下,优化集群资源利用率,让所有的资源都被充分利用应用程序管理器负责管理整个系统中的所有的应用程序,包括应用程序的提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重启它。
当jobclient向YARN提交一个应用程序后,YARN将分两个阶段运行这个应用程序:一是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,监控运行直到结束。具体步骤如下:
在Yarn中有三种调度器可以选择:FIFOScheduler,CapacityScheduler,FairScheduler。
Apache版本的hadoop默认使用的是CapacityScheduler调度方式。CDH版本的默认使用的是FairScheduler调度方式
FIFOScheduler(先来先服务):
FIFOScheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。
FIFOScheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞,比如有个大任务在执行,占用了全部的资源,再提交一个小任务,则此小任务会一直被阻塞。
CapacityScheduler(能力调度器):
FairScheduler(公平调度器):
在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。
比如:当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
需要注意的是,在Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。
未被external修饰的是内部表,被external修饰的为外部表。
区别:
Hive支持索引(3.0版本之前),但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。并且Hive索引提供的功能很有限,效率也并不高,因此Hive索引很少使用。
适用于不更新的静态字段。以免总是重建索引数据。每次建立、更新数据后,都要重建索引以构建索引表。
hive在指定列上建立索引,会产生一张索引表(Hive的一张物理表),里面的字段包括:索引列的值、该值对应的HDFS文件路径、该值在文件中的偏移量。
注意:Hive中每次有数据时需要及时更新索引,相当于重建一个新表,否则会影响数据查询的效率和准确性,Hive官方文档已经明确表示Hive的索引不推荐被使用,在新版本的Hive中已经被废弃了。
扩展:Hive是在0.7版本之后支持索引的,在0.8版本后引入bitmap索引处理器,在3.0版本开始移除索引的功能,取而代之的是2.3版本开始的物化视图,自动重写的物化视图替代了索引的功能。
ORC和Parquet都是高性能的存储方式,这两种存储格式总会带来存储和性能上的提升。
Parquet:
ORC:
星形模式(StarSchema)是最常用的维度建模方式。星型模式是以事实表为中心,所有的维度表直接连接在事实表上,像星星一样。星形模式的维度建模由一个事实表和一组维表成,且具有以下特点:
a.维表只和事实表关联,维表之间没有关联;
b.每个维表主键为单列,且该主键放置在事实表中,作为两边连接的外键;
c.以事实表为核心,维表围绕核心呈星形分布。
雪花模式(SnowflakeSchema)是对星形模式的扩展。雪花模式的维度表可以拥有其他维度表的,虽然这种模型相比星型更规范一些,但是由于这种模型不太容易理解,维护成本比较高,而且性能方面需要关联多层维表,性能比星型模型要低。
星座模式是星型模式延伸而来,星型模式是基于一张事实表的,而星座模式是基于多张事实表的,而且共享维度信息。前面介绍的两种维度建模方法都是多维表对应单事实表,但在很多时候维度空间内的事实表不止一个,而一个维表也可能被多个事实表用到。在业务发展后期,绝大部分维度建模都采用的是星座模式。
Hive处理json数据总体来说有两个方向的路走:
sortby不是全局排序,其在数据进入reducer前完成排序.因此,如果用sortby进行排序,并且设置mapred.reduce.tasks>1,则sortby只保证每个reducer的输出有序,不保证全局有序。
数据倾斜问题主要有以下几种:
注意:对于leftjoin或者rightjoin来说,不会对关联的字段自动去除null值,对于innerjoin来说,会对关联的字段自动去除null值。
使用方法:
#对于非分区表altertableAconcatenate;#对于分区表altertableBpartition(day=20201224)concatenate;注意:1、concatenate命令只支持RCFILE和ORC文件类型。2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。3、当多次使用concatenate后文件数量不在变化,这个跟参数mapreduce.input.fileinputformat.split.minsize=256mb的设置有关,可设定每个文件的最小size。
在mapper中将多个文件合成一个split作为输入(CombineHiveInputFormat底层是Hadoop的CombineFileInputFormat方法):
sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;--默认每个Map最大输入大小(这个值决定了合并后文件的数量):
setmapred.max.split.size=256000000;--256M一个节点上split的至少大小(这个值决定了多个DataNode上的文件是否需要合并):
setmapred.min.split.size.per.node=100000000;--100M一个交换机下split的至少大小(这个值决定了多个交换机上的文件是否需要合并):
setmapred.min.split.size.per.rack=100000000;--100M3.减少Reduce的数量reduce的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量。
hive中的分区函数distributeby正好是控制MR中partition分区的,可以通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可:
#设置reduce的数量有两种方式,第一种是直接设置reduce个数setmapreduce.job.reduces=10;#第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数sethive.exec.reducers.bytes.per.reducer=5120000000;--默认是1G,设置为5G#执行以下语句,将数据均衡的分配到reduce中setmapreduce.job.reduces=10;insertoverwritetableApartition(dt)select*fromBdistributebyrand();对于上述语句解释:如设置reduce数量为10,使用rand(),随机生成一个数x%10,这样数据就会随机进入reduce中,防止出现有的文件过大或过小。
HadoopArchive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。
#用来控制归档是否可用sethive.archive.enabled=true;#通知Hive在创建归档时是否可以设置父目录sethive.archive.har.parentdir.settable=true;#控制需要归档文件的大小sethar.partfile.size=1099511627776;使用以下命令进行归档:ALTERTABLEAARCHIVEPARTITION(dt='2021-05-07',hr='12');对已归档的分区恢复为原文件:ALTERTABLEAUNARCHIVEPARTITION(dt='2021-05-07',hr='12');注意:归档的分区可以查看不能insertoverwrite,必须先unarchive
针对hive中表的存储格式通常有orc和parquet,压缩格式一般使用snappy。相比与textfile格式表,orc占有更少的存储。因为hive底层使用MR计算架构,数据流是hdfs到磁盘再到hdfs,而且会有很多次,所以使用orc数据格式和snappy压缩策略可以降低IO读写,还能降低网络传输量,这样在一定程度上可以节省存储,还能提升hql任务执行效率;
并行执行,调节parallel参数;
调节jvm参数,重用jvm;
设置map、reduce的参数;开启strictmode模式;
关闭推测执行设置。
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。
Mr/tez/spark区别:
Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。
Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘DAG有向无环图。兼顾了可靠性和效率。一般处理天指标。
Tez引擎:完全基于内存。注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。
具体运行流程如下:
rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。从物理的角度来看rdd存储的是block和node之间的映射。
RDD是spark提供的核心抽象,全称为弹性分布式数据集。
RDD在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作(分布式数据集)
RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。比如某结点内存只能处理20W数据,那么这20W数据就会放入内存中计算,剩下10W放到磁盘中。RDD的弹性体现在于RDD上自动进行内存和磁盘之间权衡和切换的机制。
reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。
所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。
cogroup:对多个(2~4)RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
与reduceByKey不同的是:reduceByKey针对一个RDD中相同的key进行合并。而cogroup针对多个RDD中相同的key的元素进行合并。
cogroup的函数实现:这个实现根据要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例,这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作,最后返回的RDD的value是一个Pair的实例,这个实例包含两个Iterable的值,第一个值表示的是RDD1中相同KEY的值,第二个值表示的是RDD2中相同key的值。
由于做cogroup的操作,需要通过partitioner进行重新分区的操作,因此,执行这个流程时,需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd,同时他们对应的partitioner相同时,就不需要执行shuffle)。
场景:表关联查询或者处理重复的key。
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖;
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)。
DAG(DirectedAcyclicGraph有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程);原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算(数据被操作的一个过程)。
并行计算。
一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行。
对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量放在在同一个stage中,可以实现流水线计算)。
对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要要划分stage。
核心算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖加入本Stage,遇见宽依赖进行Stage切分。
Spark内核会从触发Action操作的那个RDD开始从后往前推,首先会为最后一个RDD创建一个Stage,然后继续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的Stage,那个RDD就是新的Stage的最后一个RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行Stage的划分,直到所有的RDD全部遍历完成为止。
每个数据分片都对应具体物理位置,数据的位置是被blockManager管理,无论数据是在磁盘,内存还是tacyan,都是由blockManager管理。
针对sparksql的task数量:spark.sql.shuffle.partitions=50
非sparksql程序设置生效:spark.default.parallelism=10
这道题常考,这里只是给大家一个思路,简单说下!面试之前还需做更多准备。
join其实常见的就分为两类:map-sidejoin和reduce-sidejoin。
当大表和小表join时,用map-sidejoin能显著提高效率。
将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的join操作一般会将所有数据根据key发送到所有的reduce分区中去,也就是shuffle的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为reduce-side-join。
在大数据量的情况下,join是一中非常昂贵的操作,需要在join之前应尽可能的先缩小数据量。
对于缩小数据量,有以下几条建议:
这个问题如果深挖还挺复杂的,这里简单介绍下总体流程:
SQLConf中的spark.sql.variable.substitute,默认是可用的;参考SparkSqlParser
重点部分就是DAG和Lingae
Hadoop底层使用MapReduce计算架构,只有map和reduce两种操作,表达能力比较欠缺,而且在MR过程中会重复的读写hdfs,造成大量的磁盘io读写操作,所以适合高时延环境下批处理计算的应用;
Spark是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括map、reduce、filter、flatmap、groupbykey、reducebykey、union和join等,数据分析更加快速,所以适合低时延环境下计算的应用;
spark与hadoop最大的区别在于迭代式计算模型。基于mapreduce框架的Hadoop主要分为map和reduce两个阶段,两个阶段完了就结束了,所以在一个job里面能做的处理很有限;spark计算模型是基于内存的迭代式计算模型,可以分为n个阶段,根据用户编写的RDD算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以spark相较于mapreduce,计算模型更加灵活,可以提供更强大的功能。
但是spark也有劣势,由于spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如OOM内存溢出等情况,导致spark程序可能无法运行起来,而mapreduce虽然运行缓慢,但是至少可以慢慢运行完。
Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析,但Hadoop特别适合是单次分析的数据量“很大”的情景,而Spark则适用于数据量不是很大的情景。
spark非常重要的一个功能特性就是可以将RDD持久化在内存中。
调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。
原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。
Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
检查点机制是我们在sparkstreaming中用来保障容错性的主要机制,它可以使sparkstreaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:
最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低
Sparkstreaming是sparkcoreAPI的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。
它支持从多种数据源读取数据,比如Kafka、Flume、Twitter和TCPSocket,并且能够使用算子比如map、reduce、join和window等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。
Sparkstreaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。
DStream是sparkstreaming提供的一种高级抽象,代表了一个持续不断的数据流。
DStream可以通过输入数据源来创建,比如Kafka、flume等,也可以通过其他DStream的高阶函数来创建,比如map、reduce、join和window等。
Master实际上可以配置两个,Spark原生的standalone模式是支持Master主备切换的。当ActiveMaster节点挂掉以后,我们可以将StandbyMaster切换为ActiveMaster。
SparkMaster主备切换可以基于两种机制,一种是基于文件系统的,一种是基于ZooKeeper的。
基于文件系统的主备切换机制,需要在ActiveMaster挂掉之后手动切换到StandbyMaster上;
而基于Zookeeper的主备切换机制,可以实现自动切换Master。
如下算子会导致shuffle操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;
这个问题的宗旨是问你sparksql中dataframe和sql的区别,从执行原理、操作方便程度和自定义程度来分析这个问题。
不会的。
因为程序在运行之前,已经申请过资源了,driver和Executors通讯,不需要和master进行通讯的。
spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括Worker,Driver和Application以及Executors。standby节点要从zk中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。
注:Master切换需要注意2点:1、在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为SparkApplication在运行前就已经通过ClusterManager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系。2、在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有ActiveMaster才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因Action操作触发新的Job的提交请求。
可以这样说:
kafka消费消息的offset是定义在zookeeper中的,如果想重复消费kafka的消息,可以在redis中自己记录offset的checkpoint点(n个),当想重复消费消息时,通过读取redis中的checkpoint点进行zookeeper的offset重设,这样就可以达到重复消费消息的目的了
kafka使用的是磁盘存储。
速度快是因为:
注:
分三个点说,一个是生产者端,一个消费者端,一个broker端。
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有0,1,-1。
如果是同步模式:ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1。
注:ack=0:producer不等待broker同步完成的确认,继续发送下一条(批)信息。ack=1(默认):producer要等待leader成功收到数据并得到确认,才发送下一条message。ack=-1:producer得到follwer确认,才发送下一条数据。
通过offsetcommit来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。
而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于offset的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置KafkaSpoutConfig.bulider.setGroupid的时候设置成了一样的groupid,这种情况会导致这两个组共享同一份数据,就会产生组A消费partition1,partition2中的消息,组B消费partition3的消息,这样每个组消费的消息都会丢失,都是不完整的。为了保证每个组都独享一份消息数据,groupid一定不要重复才行。
每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。
采集层主要可以使用Flume,Kafka等技术。
Flume:Flume是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.
Kafka:Kafka是一个可持久化的分布式的消息队列。Kafka是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。
相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。
所以,Cloudera建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
kafka宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。
想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。
在Kafka中,生产者写入消息、消费者读取消息的操作都是与leader副本进行交互的,从而实现的是一种主写主读的生产消费模型。Kafka并不支持主写从读,因为主写从读有2个很明显的缺点:
而kafka的主写主读的优点就很多了:
每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。
kafka只能保证partition内是有序的,但是partition间的有序是没办法的。爱奇艺的搜索架构,是从业务上把需要有序的打到同个partition。
kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中,常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据,这时我们就要对kafka进行以下配置:server.properties
replica.fetch.max.bytes:1048576broker可复制的消息的最大字节数,默认为1Mmessage.max.bytes:1000012kafka会接收单个消息size的最大限制,默认为1M左右注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。
Client写入->存入MemStore,一直到MemStore满->Flush成一个StoreFile,直至增长到一定阈值->触发Compact合并操作->多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除->当StoreFilesCompact后,逐步形成越来越大的StoreFile->单个StoreFile大小超过一定阈值后(默认10G),触发Split操作,把当前RegionSplit成2个Region,Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上
由此过程可知,HBase只是增加数据,没有更新和删除操作,用户的更新和删除都是逻辑层面的,在物理层面,更新只是追加操作,删除只是标记操作。
用户写操作只需要进入到内存即可立即返回,从而保证I/O高性能。
首先一点需要明白:Hbase是基于HDFS来存储的。
HDFS:
HBase:
Hbase中的每张表都通过行键(rowkey)按照一定的范围被分割成多个子表(HRegion),默认一个HRegion超过256M就要被分割成两个,由HRegionServer管理,管理哪些HRegion由Hmaster分配。HRegion存取一个子表时,会创建一个HRegion对象,然后对表的每个列族(ColumnFamily)创建一个store实例,每个store都会有0个或多个StoreFile与之对应,每个StoreFile都会对应一个HFile,HFile就是实际的存储文件,一个HRegion还拥有一个MemStore实例。
热点现象:
某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。
热点现象出现的原因:
热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。
热点现象解决办法:
为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。常见的方法有以下这些:
长度原则:100字节以内,8的倍数最好,可能的情况下越短越好。因为HFile是按照keyvalue存储的,过长的rowkey会影响存储效率;其次,过长的rowkey在memstore中较大,影响缓冲效果,降低检索效率。最后,操作系统大多为64位,8的倍数,充分利用操作系统的最佳性能。
唯一原则:分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。
原则:在合理范围内能尽量少的减少列簇就尽量减少列簇,因为列簇是共享region的,每个列簇数据相差太大导致查询效率低下。
在hbase中每当有memstore数据flush到磁盘之后,就形成一个storefile,当storeFile的数量达到一定程度后,就需要将storefile文件来进行compaction操作。
Compact的作用:
Flink是一个面向流处理和批处理的分布式数据计算引擎,能够基于同一个Flink运行,可以提供流处理和批处理两种类型的功能。在Flink的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界流。
Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。
Flink运行时由两种类型的进程组成:一个JobManager和一个或者多个TaskManager。
Client不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行Java/Scala程序的一部分运行,也可以在命令行进程./bin/flinkrun...中运行。
可以通过多种方式启动JobManager和TaskManager:直接在机器上作为standalone集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager连接到JobManagers,宣布自己可用,并被分配工作。
JobManager:
JobManager具有许多与协调Flink应用程序的分布式执行有关的职责:它决定何时调度下一个task(或一组task)、对完成的task或执行失败做出反应、协调checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
ResourceManager负责Flink集群中的资源提供、回收、分配,管理taskslots。
Dispatcher提供了一个REST接口,用来提交Flink应用程序执行,并为每个提交的作业启动一个新的JobMaster。它还运行FlinkWebUI用来提供作业执行信息。
JobMaster负责管理单个JobGraph的执行。Flink集群中可以同时运行多个作业,每个作业都有自己的JobMaster。
TaskManagers:
TaskManager(也称为worker)执行作业流的task,并且缓存和交换数据流。
必须始终至少有一个TaskManager。在TaskManager中资源调度的最小单位是taskslot。TaskManager中taskslot的数量表示并发处理task的数量。请注意一个taskslot中可以执行多个算子。
1.架构模型
SparkStreaming在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink在运行时主要包含:Jobmanager、Taskmanager和Slot。
2.任务调度
SparkStreaming连续不断的生成微小的数据批次,构建有向无环图DAG,SparkStreaming会依次创建DStreamGraph、JobGenerator、JobScheduler。
Flink根据用户提交的代码生成StreamGraph,经过优化生成JobGraph,然后提交给JobManager进行处理,JobManager会根据JobGraph生成ExecutionGraph,ExecutionGraph是Flink调度最核心的数据结构,JobManager根据ExecutionGraph对Job进行调度。
4.容错机制
对于SparkStreaming任务,我们可以设置checkpoint,然后假如发生故障并重启,我们可以从上次checkpoint之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。
Flink则使用两阶段提交协议来解决这个问题。
Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamportalgorithm”算法。
每个需要Checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。
sparkstreaming的checkpoint仅仅是针对driver的故障恢复做了数据和元数据的checkpoint。而flink的checkpoint机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤:
开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面
预提交(preCommit)将内存中缓存的数据写入文件并关闭
正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟
丢弃(abort)丢弃临时文件
若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。
端到端的exactly-once对sink要求比较高,具体实现主要有幂等写入和事务性写入两种方式。
幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。
如果外部系统不支持事务,那么可以用预写日志的方式,把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统。
分两部分:
在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。
Flink内部是基于producer-consumer模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink使用了高效有界的分布式阻塞队列,就像Java通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。
1.反压出现的场景
它们有一个共同的特点:数据的消费速度小于数据的生产速度。
2.反压监控方法
通过FlinkWebUI发现反压问题。
Flink的TaskManager会每隔50ms触发一次反压状态监测,共监测100次,并将计算结果反馈给JobManager,最后由JobManager进行计算反压的比例,然后进行展示。
这个比例展示逻辑如下:
OK:0<=Ratio<=0.10,表示状态良好正;
LOW:0.10 HIGH:0.5 0.01,代表100次中有一次阻塞在内部调用。 3.flink反压的实现方式 Flink任务的组成由基本的“流”和“算子”构成,“流”中的数据在“算子”间进行计算和转换时,会被放入分布式的阻塞队列中。当消费者的阻塞队列满时,则会降低生产者的数据生产速度 4.反压问题定位和处理 Flink会因为数据堆积和处理速度变慢导致checkpoint超时,而checkpoint是Flink保证数据一致性的关键所在,最终会导致数据的不一致发生。 数据倾斜:可以在Flink的后台管理页面看到每个Task处理数据的大小。当数据倾斜出现时,通常是简单地使用类似KeyBy等分组聚合函数导致的,需要用户将热点Key进行预处理,降低或者消除热点Key的影。 GC:不合理的设置TaskManager的垃圾回收参数会导致严重的GC问题,我们可以通过-XX:+PrintGCDetails参数查看GC的日志。 代码本身:开发者错误地使用Flink算子,没有深入了解算子的实现机制导致性能问题。我们可以通过查看运行机器节点的CPU和内存情况定位问题。 Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和checkpoint交互。Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。 为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。 Flink并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink为了直接操作二进制数据实现了自己的序列化框架。 1.flink数据倾斜的表现: 任务节点频繁出现反压,增加并行度也不能解决问题; 部分节点出现OOM异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。 2.数据倾斜产生的原因: 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区; 技术上大量使用了KeyBy、GroupBy等操作,错误的使用了分组Key,人为产生数据热点。 3.解决问题的思路: 业务上要尽量避免热点key的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理; 技术上出现热点时,要调整方案打散原来的key,避免直接聚合;此外Flink还提供了大量的功能可以避免数据倾斜。 保存延迟数据则是通过sideOutputLateData(outputTag:OutputTag[T])保存 获取延迟数据是通过DataStream.getSideOutput(tag:OutputTag[X])获取 window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决: 在流式处理中,CEP当然是要支持EventTime的,那么相对应的也要支持数据的迟到现象,也就是watermark的处理逻辑。CEP对未匹配成功的事件序列的处理,和迟到数据是类似的。在FlinkCEP的处理逻辑中,状态没有满足的和迟到的数据,都会存储在一个Map数据结构中,也就是说,如果我们限定判断事件序列的时长为5分钟,那么内存中就会存储5分钟的数据,这在我看来,也是对内存的极大损伤之一。 们在实际生产环境中可以从四个不同层面设置并行度: .map(newRollingAdditionMapper()).setParallelism(10)//将操作算子设置并行度$FLINK_HOME/bin/flink的-p参数修改并行度env.setParallelism(10)全局配置在flink-conf.yaml文件中,parallelism.default,默认是1:可以设置默认值大一点 需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。 在一个FlinkJob中,数据需要在不同的task中进行交换,整个数据交换是有TaskManager负责的,TaskManager的网络组件首先从缓冲buffer中收集records,然后再发送。Records并不是一个一个被发送的,是积累一个批次再发送,batch技术可以更加高效的利用网络资源。 Flink摒弃了Java原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。 TypeInformation是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。 TypeInformation支持以下几种类型: 构建抽象语法树的事情交给了Calcite去做。SQLquery会经过Calcite解析器转变成SQL节点树,通过验证后构建成Calcite的抽象语法树(也就是图中的LogicalPlan)。另一边,TableAPI上的调用会构建成TableAPI的抽象语法树,并通过Calcite提供的RelBuilder转变成Calcite的抽象语法树。然后依次被转换成逻辑执行计划和物理执行计划。 在提交任务后会分发到各个TaskManager中运行,在运行时会使用Janino编译器编译代码后运行。 优点: 缺点: 分区和索引 分区粒度根据业务特点决定,不宜过粗或过细。般选择按天分区,也可指定为tuple();以单表1亿数据为例,分区控制在10-30个为最佳。 必须指定索引列,clickhouse中的索引列即排序列,通过orderby指定,般在查询条件中经常被来充当筛选条件的属性被纳进来;可以是单维度,也可以是组合维度的索引;通常需要满级列在前、查询频率的在前原则;还有基数特别的不适合做索引列,如户表的userid字段;通常筛选后的数据满在百万以内为最佳。 数据采样策略 通过采运算可极提升数据分析的性能。 数据量太时应避免使select*操作,查询的性能会与查询的字段和数量成线性变换;字段越少,消耗的IO资源就越少,性能就会越。 千万以上数据集orderby查询时需要搭配where条件和limit语句起使。如必须不要在结果集上构建虚拟列,虚拟列常消耗资源浪费性能,可以考虑在前端进处理,或者在表中构造实际字段进额外存储。 不建议在基列上执distinct去重查询,改为近似去重uniqCombined。 多表Join时要满表在右的原则,右表关联时被加载到内存中与左表进较。 存储 ClickHouse不持设置多数据录,为了提升数据io性能,可以挂载虚拟券组,个券组绑定多块物理磁盘提升读写性能;多数查询场景SSD盘会普通机械硬盘快2-3倍。 ClickHouse提供了量的数据引擎,分为数据库引擎、表引擎,根据数据特点及使场景选择合适的引擎关重要。 ClickHouse引擎分类: 在以下种情况下,ClickHouse使的数据库引擎: 在所有的表引擎中,最为核的当属MergeTree系列表引擎,这些表引擎拥有最为强的性能和最泛的使场合。对于MergeTree系列的其他引擎,主要于特殊途,场景相对有限。MergeTree系列表引擎是官主推的存储引擎,持乎所有ClickHouse核功能。 MergeTree作为家族系列最基础的表引擎,主要有以下特点: orderby是MergeTree中唯个必填项,甚primarykey还重要,因为当户不设置主键的情况,很多处理会依照orderby的字段进处理。 要求:主键必须是orderby字段的前缀字段。 如果ORDERBY与PRIMARYKEY不同,PRIMARYKEY必须是ORDERBY的前缀(为了保证分区内数据和主键的有序性)。 ORDERBY决定了每个分区中数据的排序规则; PRIMARYKEY决定了级索引(primary.idx); 分区录:MergeTree是以列件+索引件+表定义件组成的,但是如果设定了分区那么这些件就会保存到不同的分区录中。 首先Doris是一个有着MPP架构的分析型数据库产品。对于PB数量级、结构化数据可以做到亚秒级查询响应。使用上兼容MySQL协议,语法是标准的SQL。Doris本身不依赖任何其他系统,相比Hadoop生态产品更易于运维。 应用场景包括:固定历史报表分析、实时数据分析、交互式数据分析等。 一般情况下,用户的原始数据,比如日志或者在事务型数据库中的数据,经过流式系统或离线处理后,导入到Doris中以供上层的报表工具或者数据分析师查询使用。 Doris的架构很简洁,只设FE(Frontend)、BE(Backend)两种角色、两个进程。 FE主要有有三个角色,一个是Leader,一个是Follower,还有一个Observer。Leader跟Follower,主要是用来达到元数据的高可用,保证单节点宕机的情况下,元数据能够实时地在线恢复,而不影响整个服务。Observer只是用来扩展查询节点,就是说如果在发现集群压力非常大的情况下,需要去扩展整个查询的能力,那么可以加Observer的节点。Observer不参与任何的写入,只参与读取。 在使用接口方面,Doris采用MySQL协议,高度兼容MySQL语法,支持标准SQL,用户可以通过各类客户端工具来访问Doris,并支持与BI工具的无缝对接。 Doris的数据模型主要分为3类: Aggregate聚合模型: 聚合模型需要用户在建表时显式的将列分为Key列和Value列。该模型会自动的对Key相同的行,在Value列上进行聚合操作。 当我们导入数据时,对于Key列相同的行会聚合成一行,而Value列会按照设置的AggregationType进行聚合。AggregationType目前有以下四种聚合方式: 例如: 这是一个典型的用户基础信息表。这类数据没有聚合需求,只需保证主键唯一性。(这里的主键为user_id+username)。那么我们的建表语句如下: Duplicate模型: 在某些多维分析场景下,数据既没有主键,也没有聚合需求。因此,我们引入Duplicate数据模型来满足这类需求。 ROLLUP ROLLUP在多维分析中是“上卷”的意思,即将数据按某种指定的粒度进行进一步聚合。 在Doris中,我们将用户通过建表语句创建出来的表称为Base表(BaseTable)。在Base表之上,我们可以创建任意多个ROLLUP表。这些ROLLUP的数据是基于Base表产生的,并且在物理上是独立存储的。 ROLLUP表的基本作用,在于在Base表的基础上,获得更粗粒度的聚合数据。 Duplicate模型中的ROLLUP 因为Duplicate模型没有聚合的语意。所以该模型中的ROLLUP,已经失去了“上卷”这一层含义。而仅仅是作为调整列顺序,以命中前缀索引的作用。 不同于传统的数据库设计,Doris不支持在任意列上创建索引。Doris这类MPP架构的OLAP数据库,通常都是通过提高并发,来处理大量数据的。 本质上,Doris的数据存储在类似SSTable(SortedStringTable)的数据结构中。该结构是一种有序的数据结构,可以按照指定的列进行排序存储。在这种数据结构上,以排序列作为条件进行查找,会非常的高效。 在Aggregate、Uniq和Duplicate三种数据模型中。底层的数据存储,是按照各自建表语句中,AGGREGATEKEY、UNIQKEY和DUPLICATEKEY中指定的列进行排序存储的。 而前缀索引,即在排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式。 在建表时,正确的选择列顺序,能够极大地提高查询效率。 ROLLUP调整前缀索引 因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求。因此,我们可以通过创建ROLLUP来人为的调整列顺序,以获得更好的查询效率。 物化视图是将预先计算(根据定义好的SELECT语句)好的数据集,存储在Doris中的一个特殊的表。 物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询。 使用场景(物化视图主要针对Duplicate明细模型做聚合操作) 优势 在没有物化视图功能之前,用户一般都是使用Rollup功能通过预聚合方式提升查询效率的。但是Rollup具有一定的局限性,他不能基于明细模型做预聚合。 物化视图则在覆盖了Rollup的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是Rollup的一个超集。 物化视图的局限性 压缩采用Snappy,存储采用orc,压缩比是100g数据压缩完10g左右。 事实表有:事务事实表、周期快照事实表、累积快照事实表、事实事实表。 事务事实表记录的是事务层的事实,保存的是最原的数据,也称“原事实表”。事务事实表中的数据在事务事件发后产,数据的粒度通常是每个事务记录条记录。 这个与上三个有所不同。事实表中通常要保留度量事实和多个维度外键,度量事实是事实表的关键所在。 事实表中没有这些度量事实,只有多个维度外键。事实型事实表通常来跟踪些事件或说明某些活动的范围。 第类事实型事实表是来跟踪事件的事实表。例如:学注册事件 第类事实型事实表是来说明某些活动范围的事实表。例如:促销范围事实表。 星形模式(StarSchema)是最常用的维度建模方式。星型模式是以事实表为中心,所有的维度表直接连接在事实表上,像星星一样。星形模式的维度建模由一个事实表和一组维表成,且具有以下特点:a.维表只和事实表关联,维表之间没有关联;b.每个维表主键为单列,且该主键放置在事实表中,作为两边连接的外键;c.以事实表为核心,维表围绕核心呈星形分布; 雪花模式(SnowflakeSchema)是对星形模式的扩展。雪花模式的维度表可以拥有其他维度表的,虽然这种模型相比星型更规范一些,但是由于这种模型不太容易理解,维护成本比较高,而且性能方面需要关联多层维表,性能也比星型模型要低。所以一般不是很常用 3.星座模式 通常是指ods表的同个业务期数据中包含了前天或后天凌晨附近的数据或者丢失当天变更的数据,这种现象就叫做漂移,且在部分公司中都会遇到的场景。 通常有两种解决案: 第种案较暴,这不做过多解释,主要来讲解下第种解决案。(这种解决案在数据之路这本书有体现)。 通常都是根据以上的某个字段来切分ODS表,这就产了数据漂移。具体场景如下: 第种解决案: 通常数据建模有以下个流程: 那么范式建模,即3NF模型具有以下特点: 基于以上三个特点,3NF的最终的就是为了降低数据冗余,保障数据致性;同时也有了数据关联逻辑复杂的缺点。 常的维度模型类型主要有: 狭义来讲就是来描述数据的数据。 义来看,除了业务逻辑直接读写处理的业务数据,所有其他来维护整个系统运转所需要的数据,都可以较为元数据。 定义:元数据metadata是关于数据的数据。在数仓系统中,元数据可以帮助数据仓库管理员和数据仓库开发员便的找到他们所关的数据;元数据是描述数据仓库内部数据的结构和建法的数据。按照途可分为:技术元数据、业务元数据。 技术元数据 存储关于数据仓库技术细节的数据,于开发和管理数据仓库使的数据。 业务元数据 从业务度描述了数据仓库中的数据,他提供了介于使者和实际系统之间的语义层,使不懂计算机技术的业务员也能读懂数仓中的数据。 主题 主题是在较层次上将数据进综合、归类和分析利的个抽象概念,每个主题基本对应个宏观的分析领域。在逻辑意义上,它是对企业中某宏观分析领域所涉及的分析对象。 向主题的数据组织式,就是在较层次上对分析对象数据的个完整并且致的描述,能刻画各个分析对象所涉及的企业各项数据,以及数据之间的联系。 主题是根据分析的要求来确定的。 主题域 主题域是对某个主题进分析后确定的主题的边界。 数仓建设过程中,需要对主题进分析,确定主题所涉及到的表、字段、维度等界限。 数仓建设中,最重要的是数据准确性,数据的真正价值在于数据驱动决策,通过数据指导运营,在一个不准确的数据驱动下,得到的一定是错误的数据分析,影响的是公司的业务发展决策,最终导致公司的策略调控失败。 单表数据量监控 一张表的记录数在一个已知的范围内,或者上下浮动不会超过某个阈值 单表空值检测 某个字段为空的记录数在一个范围内,或者占总量的百分比在某个阈值范围内 单表重复值检测 一个或多个字段是否满足某些规则 跨表数据量对比 主要针对同步流程,监控两张表的数据量是否一致 数据商业分析的目标是利用大数据为所有职场人员做出迅捷,高质,高效的决策提供可规模化的解决方案。商业分析是创造价值的数据科学。 数据商业分析中会存在很多判断: 比如想知道线上渠道A、B各自带来了多少流量,新上线的产品有多少用户喜欢,新注册流中注册的人数有多少。这些都需要通过数据来展示结果。 我们需要知道渠道A为什么比渠道B好,这些是要通过数据去发现的。也许某个关键字带来的流量转化率比其他都要低,这时可以通过信息、知识、数据沉淀出发生的原因是什么。 在对渠道A、B有了判断之后,根据以往的知识预测未来会发生什么。在投放渠道C、D的时候,猜测渠道C比渠道D好,当上线新的注册流、新的优化,可以知道哪一个节点比较容易出问题,这些都是通过数据进行预测的过程。 所有工作中最有意义的还是商业决策,通过数据来判断应该做什么。这是商业分析最终的目的。 用架构图能很快说明白,用阿里的数据架构图来说: 简单来说,数据湖的定义就是原始数据保存区.虽然这个概念国内谈的少,但绝大部分互联网公司都已经有了。国内一般把整个HDFS叫做数仓(广义),即存放所有数据的地方。 数据湖最早是2011年由Pentaho的首席技术官JamesDixon提出的一个概念,他认为诸如数据集市,数据仓库由于其有序性的特点,势必会带来数据孤岛效应,而数据湖可以由于其开放性的特点可以解决数据孤岛问题。 为什么不是数据河? 因为,数据要能存,而不是一江春水向东流。 为什么不是数据池? 因为,要足够大,大数据太大,一池存不下。 为什么不是数据海? 因为,企业的数据要有边界,可以流通和交换,但更注重隐私和安全,“海到无边天作岸”,那可不行。 所以数据要能“存”,数据要够“存”,数据要有边界地“存”。企业级的数据是需要长期积淀的,因此是“数据湖”。 同时湖水天然会进行分层,满足不同的生态系统要求,这与企业建设统一数据中心,存放管理数据的需求是一致的。热数据在上层方便流通应用,温数据、冷数据位于数据中心的不同存储介质之中,达到数据存储容量与成本的平衡。 需要具备把各种数据源接入集成到数据湖中的能力。数据湖的存储也应该是多样的,比如HDFS、HIVE、HBASE等等。 如果把整个互联网想象成一个巨大的数据湖。那么,之所以人们可以这么有效的利用这个湖中的数据,就是因为有了Google这样的搜索引擎。人们可以通过搜索,方便地找到他们想要的数据,进而进行分析。搜索能力是数据湖的十分重要的能力。 对数据的使用权限进行管控,对敏感数据进行脱敏或加密处理,也是数据湖能商用所必须具备的能力。 数据质量是分析正确的关键。因此必须对进入数据湖中的数据的质量情况进行检验。及时发现数据湖中数据质量的问题。为有效的数据探索提供保障。 应该具备一系列好用的数据分析工具,以便各类用户可以对数据湖中的数据进行自助探索。包括: 数据湖刚提出来时,只是一个朴素的理念。而从理念变成一个可以落地的系统,就面临着许多不得不考虑的现实问题: 首先,把所有原始数据都存储下来的想法,要基于一个前提,就是存储成本很低。而今数据产生的速度越来越快、产生的量越来越大的情况下,把所有原始数据,不分价值大小,都存储下来,这个成本在经济上能不能接受,可能需要打一个问号。 其次,数据湖中存放这各类最原始的明细数据,包括交易数据、用户数据等敏感数据,这些数据的安全怎么保证?用户访问的权限如何控制? 再次,湖中的数据怎么治理?谁对数据的质量、数据的定义、数据的变更负责?如何确保数据的定义、业务规则的一致性? 数据湖的理念很好,但是它现在还缺乏像数据仓库那样,有一整套方法论为基础,有一系列具有可操作性的工具和生态为支撑。正因如此,目前把Hadoop用来对特定的、高价值的数据进行处理,构建数据仓库的模式,取得了较多的成功;而用来落实数据湖理念的模式,遭遇了一系列的失败。这里,总结一些典型的数据湖失败的原因: 数据仓库,准确说,是面向历史数据沉淀和分析使用的,有三大特点: 数据湖,准确说,其出发点是补全数据仓库实时处理能力、交互式分析能力等新技术缺失的情况。其最重要的特点,就是丰富的计算引擎:批处理、流式、交互式、机器学习,该有的,应有尽有,企业需要什么,就用什么。数据湖也有三个特征: 数据湖和数仓,就是原始数据和数仓模型的区别。因为数仓(狭义)中的表,主要是事实表-维度表,主要用于BI、出报表,和原始数据是不一样的。 为什么要强调数据湖呢? 真正的原因在于,datascience和machinelearning进入主流了,需要用原始数据做分析,而数仓的维度模型则通常用于聚合。 但数据湖背后其实还有更大的区别: 也就是组织架构和分工的差别——传统企业的数据团队可能被当做IT,整天要求提数,而在新型的互联网/科技团队,数据团队负责提供简单易用的工具,业务部门直接进行数据的使用。 从传统集中式的数仓转为开放式的数据湖,并不简单,会碰到许多问题 这也是目前各大互联网公司都在改进的方向! 2020年,大数据DataBricks公司首次提出了湖仓一体(DataLakehouse)概念,希望将数据湖和数据仓库技术合而为一,此概念一出各路云厂商纷纷跟进。 DataLakehouse(湖仓一体)是新出现的一种数据架构,它同时吸收了数据仓库和数据湖的优势,数据分析师和数据科学家可以在同一个数据存储中对数据进行操作,同时它也能为公司进行数据治理带来更多的便利性。 一直以来,我们都在使用两种数据存储方式来架构数据: 现在许多的公司往往同时会搭建数仓、数据湖这两种存储架构,一个大的数仓和多个小的数据湖。这样,数据在这两种存储中就会有一定的冗余。 DataLakehouse的出现试图去融合数仓和数据湖这两者之间的差异,通过将数仓构建在数据湖上,使得存储变得更为廉价和弹性,同时lakehouse能够有效地提升数据质量,减小数据冗余。在lakehouse的构建中,ETL起了非常重要的作用,它能够将未经规整的数据湖层数据转换成数仓层结构化的数据。 下面详细解释下: 湖仓一体(DataLakehouse): 依据DataBricks公司对Lakehouse的定义:一种结合了数据湖和数据仓库优势的新范式,解决了数据湖的局限性。Lakehouse使用新的系统设计:直接在用于数据湖的低成本存储上实现与数据仓库中类似的数据结构和数据管理功能。 解释拓展: 湖仓一体,简单理解就是把面向企业的数据仓库技术与数据湖存储技术相结合,为企业提供一个统一的、可共享的数据底座。 避免传统的数据湖、数据仓库之间的数据移动,将原始数据、加工清洗数据、模型化数据,共同存储于一体化的“湖仓”中,既能面向业务实现高并发、精准化、高性能的历史数据、实时数据的查询服务,又能承载分析报表、批处理、数据挖掘等分析型业务。 湖仓一体方案的出现,帮助企业构建起全新的、融合的数据平台。通过对机器学习和AI算法的支持,实现数据湖+数据仓库的闭环,提升业务的效率。数据湖和数据仓库的能力充分结合,形成互补,同时对接上层多样化的计算生态。 目前开源的数据湖有江湖人称“数据湖三剑客”的Hudi、DeltaLake和IceBerg。 ApacheHudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及消费变化数据的能力。 Hudi支持如下两种表类型: 使用Parquet格式存储数据。CopyOnWrite表的更新操作需要通过重写实现。 使用列式文件格式(Parquet)和行式文件格式(Avro)混合的方式来存储数据。MergeOnRead使用列式格式存放Base数据,同时使用行式格式存放增量数据。最新写入的增量数据存放至行式文件中,根据可配置的策略执行COMPACTION操作合并增量数据至列式文件中。 应用场景 Hudi支持插入、更新和删除数据的能力。可以实时摄取消息队列(Kafka)和日志服务SLS等日志数据至Hudi中,同时也支持实时同步数据库Binlog产生的变更数据。 Hudi优化了数据写入过程中产生的小文件。因此,相比其他传统的文件格式,Hudi对HDFS文件系统更加的友好。 Hudi支持多种数据分析引擎,包括Hive、Spark、Presto和Impala。Hudi作为一种文件格式,不需要依赖额外的服务进程,在使用上也更加的轻量化。 Hudi支持IncrementalQuery查询类型,可以通过SparkStreaming查询给定COMMIT后发生变更的数据。Hudi提供了一种消费HDFS变化数据的能力,可以用来优化现有的系统架构。 DeltaLake是Spark计算框架和存储系统之间带有Schema信息数据的存储中间层。它给Spark带来了三个最主要的功能: 第一,DeltaLake使得Spark能支持数据更新和删除功能; 第二,DeltaLake使得Spark能支持事务; 第三,支持数据版本管理,运行用户查询历史数据快照。 核心特性 Iceberg官网定义:Iceberg是一个通用的表格式(数据组织格式),它可以适配Presto,Spark等引擎提供高性能的读写和元数据管理功能。 数据湖相比传统数仓而言,最明显的便是优秀的T+0能力,这个解决了Hadoop时代数据分析的顽疾。传统的数据处理流程从数据入库到数据处理通常需要一个较长的环节、涉及许多复杂的逻辑来保证数据的一致性,由于架构的复杂性使得整个流水线具有明显的延迟。 Iceberg的ACID能力可以简化整个流水线的设计,降低整个流水线的延迟。降低数据修正的成本。传统Hive/Spark在修正数据时需要将数据读取出来,修改后再写入,有极大的修正成本。Iceberg所具有的修改、删除能力能够有效地降低开销,提升效率。 随着flink等技术的不断发展,流批一体生态不断完善,但在流批一体数据存储方面一直是个空白,直到Iceberg等数据湖技术的出现,这片空白被慢慢填补。 Iceberg提供ACID事务能力,上游数据写入即可见,不影响当前数据处理任务,这大大简化了ETL; Iceberg提供了upsert、mergeinto能力,可以极大地缩小数据入库延迟; Iceberg提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg支持隐藏分区和分区进化,方便业务进行数据分区策略更新。 Iceberg屏蔽了底层数据存储格式的差异,提供对于Parquet,ORC和Avro格式的支持。Iceberg起到了中间桥梁的能力,将上层引擎的能力传导到下层的存储格式。 Iceberg的架构和实现并未绑定于某一特定引擎,它实现了通用的数据组织格式,利用此格式可以方便地与不同引擎对接,目前Iceberg支持的计算引擎有Spark、Flink、Presto以及Hive。 相比于Hudi、DeltaLake,Iceberg的架构实现更为优雅,同时对于数据格式、类型系统有完备的定义和可进化的设计;面向对象存储的优化。Iceberg在数据组织方式上充分考虑了对象存储的特性,避免耗时的listing和rename操作,使其在基于对象存储的数据湖架构适配上更有优势。 Iceberg支持通过流式方式读取增量数据,支持StructedStreaming以及FlinktableSource。 Deltalake 由于ApacheSpark在商业化上取得巨成功,所以由其背后商业公司Databricks推出的Deltalake也显得格外亮眼。在没有delta数据湖之前,Databricks的客户般会采经典的lambda架构来构建他们的流批处理场景。 Hudi Iceberg Netflix的数据湖原先是借助Hive来构建,但发现Hive在设计上的诸多缺陷之后,开始转为研Iceberg,并最终演化成Apache下个度抽象通的开源数据湖案。 三者均为DataLake的数据存储中间层,其数据管理的功能均是基于系列的meta件。Meta件的类似于数据库的catalog\wal,起到schema管理、事务管理和数据管理的功能。与数据库不同的是,这些meta件是与数据件起存放在存储引擎中的,户可以直接看到。这个做法直接继承了数据分析中数据对户可见的传统,但是形中也增加了数据被不破坏的风险。旦删了meta录,表就被破坏了,恢复难度很。 Meta包含有表的schema信息。因此系统可以掌握schema的变动,提供schema演化的持。Meta件也有transactionlog的功能(需要件系统有原性和致性的持)。所有对表的变更都会成份新的meta件,于是系统就有了ACID和多版本的持,同时可以提供访问历史的功能。在这些,三者是相同的。 Hudi的设计标正如其名,HadoopUpsertsDeletesandIncrementals(原为HadoopUpsertsanDIncrementals),强调了其主要持Upserts、Deletes和Incremental数据处理,其主要提供的写具是SparkHudiDataSourceAPI和提供的HoodieDeltaStreamer,均持三种数据写式:UPSERT,INSERT和BULK_INSERT。其对Delete的持也是通过写时指定定的选项持的,并不持纯粹的delete接。 在查询,Hudi持Hive、Spark、Presto。 在性能,Hudi设计了HoodieKey,个类似于主键的东西。对于查询性能,般需求是根据查询谓词成过滤条件下推datasource。Hudi这没怎么做作,其性能完全基于引擎带的谓词下推和partitionprune功能。 Hudi的另特是持CopyOnWrite和MergeOnRead。前者在写时做数据的merge,写性能略差,但是读性能更些。后者读的时候做merge,读性能差,但是写数据会较及时,因后者可以提供近实时的数据分析能。最后,Hudi提供了个名为run_sync_tool的脚本同步数据的schema到Hive表。Hudi还提供了个命令具于管理Hudi表。 Iceberg没有类似的HoodieKey设计,其不强调主键。没有主键,做update/delete/merge等操作就要通过Join来实现,Join需要有个类似SQL的执引擎。 Iceberg在查询性能做了量的作。值得提的是它的hiddenpartition功能。Hiddenpartition意思是说,对于户输的数据,户可以选取其中某些列做适当的变换(Transform)形成个新的列作为partition列。这个partition列仅仅为了将数据进分区,并不直接体现在表的schema中。 Delta的定位是流批体的DataLake存储层,持update/delete/merge。由于出Databricks,spark的所有数据写式,包括基于dataframe的批式、流式,以及SQL的Insert、InsertOverwrite等都是持的(开源的SQL写暂不持,EMR做了持)。不强调主键,因此其update/delete/merge的实现均是基于spark的join功能。在数据写,Delta与Spark是强绑定的,这点Hudi是不同的:Hudi的数据写不绑定Spark(可以Spark,也可以使Hudi的写具写)。 在查询,开源Delta前持Spark与Presto,但是,Spark是不可或缺的,因为deltalog的处理需要到Spark。这意味着如果要Presto查询Delta,查询时还要跑个Spark作业。更为难受的是,Presto查询是基于SymlinkTextInputFormat。在查询之前,要运Spark作业成这么个Symlink件。如果表数据是实时更新的,意味着每次在查询之前先要跑个SparkSQL,再跑Presto。为此,EMR在这做了改进可以不必事先启动个Spark任务。 在查询性能,开源的Delta乎没有任何优化。 Delta在数据merge性能不如Hudi,在查询性能不如Iceberg,是不是意味着Delta是处了呢?其实不然。Delta的优点就是与Spark的整合能,尤其是其流批体的设计,配合multi-hop的datapipeline,可以持分析、Machinelearning、CDC等多种场景。使灵活、场景持完善是它相Hudi和Iceberg的最优点。另外,Delta号称是Lambda架构、Kappa架构的改进版,需关流批,需关架构。这点上Hudi和Iceberg是所不及的。 三个引擎的初衷场景并不完全相同,Hudi为了incremental的upserts,Iceberg定位于性能的分析与可靠的数据管理,Delta定位于流批体的数据处理。这种场景的不同也造成了三者在设计上的差别。尤其是Hudi,其设计与另外两个相差别更为明显。因此后是趋同还筑起各专长优势壁垒未可知。 Delta、Hudi、Iceberg三个开源项中,Delta和Hudi跟Spark的代码深度绑定,尤其是写路径。这两个项设计之初,都基本上把Spark作为他们的默认计算引擎了。ApacheIceberg的向常坚定,宗旨就是要做个通化设计的TableFormat。 它完美的解耦了计算引擎和底下的存储系统,便于多样化计算引擎和件格式,很好的完成了数据湖架构中的TableFormat这层的实现,因此也更容易成为TableFormat层的开源事实标准。 另,ApacheIceberg也在朝着流批体的数据存储层发展,manifest和snapshot的设计,有效地隔离不同transaction的变更,常便批处理和增量计算。并且,ApacheFlink已经是个流批体的计算引擎,者都可以完美匹配,合打造流批体的数据湖架构。 ApacheIceberg这个项背后的社区资源常丰富。在国外,Netflix、Apple、Linkedin、Adobe等公司都有PB级别的产数据运在ApacheIceberg上;在国内,腾讯这样的巨头也有常庞的数据跑在ApacheIceberg之上,最的业务每天有T的增量数据写。 编写一个SQL查询,获取Employee表中第二高的薪水(Salary)。 +----+--------+|Id|Salary|+----+--------+|1|100||2|200||3|300|+----+--------+例如上述Employee表,SQL查询应该返回200作为第二高的薪水。如果不存在第二高的薪水,那么查询应返回null。 +---------------------+|SecondHighestSalary|+---------------------+|200|+---------------------+SELECTIFNULL((SELECTDISTINCTSalaryFROMEmployeeORDERBYSalaryDESCLIMIT1OFFSET1),NULL)ASSecondHighestSalary2.分数排名编写一个SQL查询来实现分数排名。 如果两个分数相同,则两个分数排名(Rank)相同。请注意,平分后的下一个名次应该是下一个连续的整数值。换句话说,名次之间不应该有“间隔”。 +----+-------+|Id|Score|+----+-------+|1|3.50||2|3.65||3|4.00||4|3.85||5|4.00||6|3.65|+----+-------+例如,根据上述给定的Scores表,你的查询应该返回(按分数从高到低排列): +-------+------+|Score|Rank|+-------+------+|4.00|1||4.00|1||3.85|2||3.65|3||3.65|3||3.50|4|+-------+------+selectScore,dense_rank()over(orderbyScoredesc)`rank`fromScores3.连续出现的数字编写一个SQL查询,查找所有至少连续出现三次的数字。 +----+-----+|Id|Num|+----+-----+|1|1||2|1||3|1||4|2||5|1||6|2||7|2|+----+-----+