学习一个东西之前先要知道这个东西是什么。
Spark是一个开源的大数据处理引擎,它提供了一整套开发API,包括流计算和机器学习。它支持批处理和流处理。
Spark提供了6大核心组件:
(1)SparkCore
SparkCore是Spark的基础,它提供了内存计算的能力,是分布式处理大数据集的基础。它将分布式数据抽象为弹性分布式数据集(RDD),并为运行在其上的上层组件提供API。所有Spark的上层组件都建立在SparkCore的基础之上。
(2)SparkSQL
SparkSQL是一个用于处理结构化数据的Spark组件。它允许使用SQL语句查询数据。Spark支持多种数据源,包括Hive表、Parquet和JSON等。
(3)SparkStreaming
(4)SparkMLlib
SparkMLlib是Spark的机器学习库。它提供了常用的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维等。MLlib还提供了一些底层优化原语和高层流水线API,可以帮助开发人员更快地创建和调试机器学习流水线。
(5)SparkGraphX
SparkGraphX是Spark的图形计算库。它提供了一种分布式图形处理框架,可以帮助开发人员更快地构建和分析大型图形。
Spark有许多优势,其中一些主要优势包括:
上手写一个简单的代码例子,下面是一个WordCount的Spark程序:
接下来,程序创建了一个包含两个字符串的列表,并使用parallelize方法将其转换为一个RDD。然后,它使用flatMap方法将每一行文本拆分成单词,并使用map方法将每个单词映射为一个键值对(key-valuepair),其中键是单词,值是1。
最后,程序使用reduceByKey方法将具有相同键的键值对进行合并,并对它们的值进行求和。最终结果是一个包含每个单词及其出现次数的RDD。程序使用collect方法将结果收集到驱动程序,并使用foreach方法打印出来。
Spark的理论较多,为了更有效地学习Spark,首先来理解下其基本概念。
Application指的就是用户编写的Spark应用程序。
如下,"WordCount"就是该应用程序的名字。
importorg.apache.spark.sql.SparkSessionobjectWordCount{defmain(args:Array[String]){//创建SparkSession对象,它是SparkApplication的入口valspark=SparkSession.builder.appName("WordCount").getOrCreate()//读取文本文件并创建DatasetvaltextFile=spark.read.textFile("hdfs://...")//使用flatMap转换将文本分割为单词,并使用reduceByKey转换计算每个单词的数量valcounts=textFile.flatMap(line=>line.split("")).groupByKey(identity).count()//将结果保存到文本文件中counts.write.text("hdfs://...")//停止SparkSessionspark.stop()}}2.DriverDriver是运行SparkApplication的进程,它负责创建SparkSession和SparkContext对象,并将代码转换和操作。
它还负责创建逻辑和物理计划,并与集群管理器协调调度任务。
简而言之,SparkApplication是使用SparkAPI编写的程序,而SparkDriver是负责运行该程序并与集群管理器协调的进程。
可以将Driver理解为运行SparkApplicationmain方法的进程。
driver的内存大小可以进行设置,配置如下:
#设置driver内存大小driver-memory1024m3.Master&Worker在Spark中,Master是独立集群的控制者,而Worker是工作者。
一个Spark独立集群需要启动一个Master和多个Worker。Worker就是物理节点,Worker上面可以启动Executor进程。
在每个Worker上为某应用启动的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。
每个任务都有各自独立的Executor。Executor是一个执行Task的容器。实际上它是一组计算资源(cpu核心、memory)的集合。
一个Worker节点可以有多个Executor。一个Executor可以运行多个Task。
Executor创建成功后,在日志文件会显示如下信息:
INFOExecutor:StartingexecutorID[executorId]onhost[executorHostname]5.RDDRDD(ResilientDistributedDataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的Partition是指数据集的分区。它是数据集中元素的集合,这些元素被分区到集群的节点上,可以并行操作。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPUCore的数目。
一个函数会被作用在每一个分区。Spark中RDD的计算是以分片为单位的,compute函数会被作用到每个分区上。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
一个Job包含多个RDD及作用于相应RDD上的各种操作,每个Action的触发就会生成一个job。用户提交的Job会提交给DAGScheduler,Job会被分解成Stage,Stage会被细化成Task。
被发送到Executor上的工作单元。每个Task负责计算一个分区的数据。
在Spark中,一个作业(Job)会被划分为多个阶段(Stage)。同一个Stage可以有多个Task并行执行(Task数=分区数)。
阶段之间的划分是根据数据的依赖关系来确定的。当一个RDD的分区依赖于另一个RDD的分区时,这两个RDD就属于同一个阶段。当一个RDD的分区依赖于多个RDD的分区时,这些RDD就属于不同的阶段。
上图中,Stage表示一个可以顺滑完成的阶段。曲线表示Shuffle过程。
如果Stage能够复用前面的Stage的话,那么会显示灰色。
在Spark中,Shuffle是指在不同阶段之间重新分配数据的过程。它通常发生在需要对数据进行聚合或分组操作的时候,例如reduceByKey或groupByKey等操作。
在Shuffle过程中,Spark会将数据按照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处理属于它自己分区的数据。
Stage的划分,简单来说是以宽依赖来划分的。
对于窄依赖,Partition的转换处理在Stage中完成计算,不划分(将窄依赖尽量放在在同一个Stage中,可以实现流水线计算)。
对于宽依赖,由于有Shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要划分Stage。
Spark会根据Shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的Stage阶段中。
至于什么是窄依赖和宽依赖,下文马上就会提及。
(1)窄依赖
父RDD的一个分区只会被子RDD的一个分区依赖。比如:map,filter和union,这种依赖称之为「窄依赖」。
窄依赖的多个分区可以并行计算,并且窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
(2)宽依赖
指子RDD的分区依赖于父RDD的所有分区,称之为「宽依赖」。
对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
有向无环图,其实说白了就是RDD之间的依赖关系图。
Spark的执行流程大致如下:
RDD的概念在Spark中十分重要,上面只是简单的介绍了一下,下面详细的对RDD展开介绍。
RDD是“ResilientDistributedDataset”的缩写,从全称就可以了解到RDD的一些典型特性:
RDD里面的数据集会被逻辑分成若干个分区,这些分区是分布在集群的不同节点的,基于这样的特性,RDD才能在集群不同节点并行计算。
RDD支持两种操作:
(1)转换操作(Transformation)
转换操作以RDD做为输入参数,然后输出一个或者多个RDD。转换操作不会修改输入RDD。Map()、Filter()这些都属于转换操作。
转换操作是惰性求值操作,只有在碰到行动操作(Actions)的时候,转换操作才会真正实行。转换操作分两种:「窄依赖」和「宽依赖」。
下面是一些常见的转换操作:
转换操作
描述
map
将函数应用于RDD中的每个元素,并返回一个新的RDD
filter
返回一个新的RDD,其中包含满足给定谓词的元素
flatMap
将函数应用于RDD中的每个元素,并将返回的迭代器展平为一个新的RDD
union
返回一个新的RDD,其中包含两个RDD的元素
distinct
返回一个新的RDD,其中包含原始RDD中不同的元素
groupByKey
将键值对RDD中具有相同键的元素分组到一起,并返回一个新的RDD
reduceByKey
将键值对RDD中具有相同键的元素聚合到一起,并返回一个新的RDD
sortByKey
返回一个新的键值对RDD,其中元素按照键排序
(2)行动操作(Action)
Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。
Action操作
reduce
通过函数聚合RDD中的所有元素
collect
将RDD中的所有元素返回到驱动程序
count
返回RDD中的元素个数
first
返回RDD中的第一个元素
take
返回RDD中的前n个元素
takeOrdered
返回RDD中的前n个元素,按照自然顺序或指定的顺序排序
saveAsTextFile
将RDD中的元素保存到文本文件中
foreach
将函数应用于RDD中的每个元素
创建RDD有3种不同方式:
(1)从外部存储系统
由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等:
valrdd1=sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")(2)从其他RDD
通过已有的RDD经过算子转换生成新的RDD:
valrdd2=rdd1.flatMap(_.split(""))(3)由一个已经存在的Scala集合创建
valrdd3=sc.parallelize(Array(1,2,3,4,5,6,7,8))或者valrdd4=sc.makeRDD(List(1,2,3,4,5,6,7,8))其实makeRDD方法底层调用了parallelize方法:
RDD缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
valrdd1=sc.textFile("hdfs://node01:8020/words.txt")valrdd2=rdd1.flatMap(x=>x.split("")).map((_,1)).reduceByKey(_+_)rdd2.cache//缓存/持久化rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了需要注意的是,在触发action的时候,才会去执行持久化。
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中去除缓存,那么可以使用unpersist()方法。
rdd.persist(StorageLevel.MEMORY_ONLY)rdd.unpersist()5.存储级别RDD存储级别主要有以下几种。
级别
使用空间
是否在内存中
是否在磁盘上
备注
MEMORY_ONLY
高
低
是
否
使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。
MEMORY_ONLY_2
数据存2份
MEMORY_ONLY_SER
基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化。这种方式更加节省内存
MEMORY_ONLY_SER_2
数据序列化,数据存2份
MEMORY_AND_DISK
中等
部分
如果数据在内存中放不下,则溢写到磁盘
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化
MEMORY_AND_DISK_SER_2
DISK_ONLY
使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
DISK_ONLY_2
OFF_HEAP
这个目前是试验型选项,类似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。
这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
血缘关系是指RDD之间的依赖关系。当你对一个RDD执行转换操作时,Spark会生成一个新的RDD,并记录这两个RDD之间的依赖关系。这种依赖关系就是血缘关系。
血缘关系可以帮助Spark在发生故障时恢复数据。当一个分区丢失时,Spark可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个RDD。
血缘关系还可以帮助Spark优化计算过程。Spark可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销。
我们可以执行toDebugString打印RDD的依赖关系。
下面是一个简单的例子:
valconf=newSparkConf().setAppName("LineageExample").setMaster("local")valsc=newSparkContext(conf)valdata=sc.parallelize(List(1,2,3,4,5))valmappedData=data.map(x=>x+1)valfilteredData=mappedData.filter(x=>x%2==0)println(filteredData.toDebugString)在这个例子中,我们首先创建了一个包含5个元素的RDD,并对它执行了两个转换操作:map和filter。然后,我们使用toDebugString方法打印了最终RDD的血缘关系。
运行这段代码后,你会看到类似下面的输出:
(2)MapPartitionsRDD[2]atfilterat
CheckPoint可以将RDD从其依赖关系中抽出来,保存到可靠的存储系统(例如HDFS,S3等),即它可以将数据和元数据保存到检查指向目录中。因此,在程序发生崩溃的时候,Spark可以恢复此数据,并从停止的任何地方开始。
CheckPoint分为两类:
开发人员可以使用RDD.checkpoint()方法来设置检查点。在使用检查点之前,必须使用SparkContext.setCheckpointDir(directory:String)方法设置检查点目录。
importorg.apache.spark.{SparkConf,SparkContext}objectCheckpointExample{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CheckpointExample").setMaster("local")valsc=newSparkContext(conf)//设置checkpoint目录sc.setCheckpointDir("/tmp/checkpoint")valdata=sc.parallelize(List(1,2,3,4,5))valmappedData=data.map(x=>x+1)valfilteredData=mappedData.filter(x=>x%2==0)//对RDD进行checkpointfilteredData.checkpoint()//触发checkpointfilteredData.count()}}RDD的检查点机制就好比Hadoop将中间计算值存储到磁盘,即使计算中出现了故障,我们也可以轻松地从中恢复。通过对RDD启动检查点机制可以实现容错和高可用。
参数名
参数说明
—master
master的地址,提交任务到哪里执行,例如spark://host:port,yarn,local。具体指可参考下面关于Master_URL的列表
—deploy-mode
在本地(client)启动driver或在cluster上启动,默认是client
—class
应用程序的主类,仅针对java或scala应用
—name
应用程序的名称
—jars
用逗号分隔的本地jar包,设置后,这些jar将包含在driver和executor的classpath下
—packages
包含在driver和executor的classpath中的jar的maven坐标
—exclude-packages
为了避免冲突而指定不包含的package
—repositories
远程repository
—confPROP=VALUE
指定spark配置属性的值,例如-confspark.executor.extraJavaOptinotallow=”-XX:MaxPermSize=256m”
—properties-file
加载的配置文件,默认为conf/spark-defaults.conf
—driver-memory
Driver内存,默认1G
—driver-java-options
传给driver的额外的Java选项
—driver-library-path
传给driver的额外的库路径
—driver-class-path
传给driver的额外的类路径
—driver-cores
Driver的核数,默认是1。在yarn或者standalone下使用
—executor-memory
每个executor的内存,默认是1G
—total-executor-cores
所有executor总共的核数。仅仅在mesos或者standalone下使用
—num-executors
启动的executor数量。默认为2。在yarn下使用
—executor-core
每个executor的核数。在yarn或者standalone下使用
MasterURL
含义
local
使用1个worker线程在本地运行Spark应用程序
local[K]
使用K个worker线程在本地运行Spark应用程序
local[*]
使用所有剩余worker线程在本地运行Spark应用程序
spark://HOST:PORT
连接到SparkStandalone集群,以便在该集群上运行Spark应用程序
mesos://HOST:PORT
连接到Mesos集群,以便在该集群上运行Spark应用程序
yarn-client
以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。
yarn-cluster
以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。
这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:「广播变量(broadcastvariable)」和「累加器(accumulator)」。
如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
importorg.apache.spark.{SparkConf,SparkContext}objectBroadcastExample{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("BroadcastExample").setMaster("local")valsc=newSparkContext(conf)valdata=sc.parallelize(List(1,2,3,4,5))//创建一个广播变量valfactor=sc.broadcast(2)//使用广播变量valresult=data.map(x=>x*factor.value)result.collect().foreach(println)}}广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。
一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。
示例代码如下:
importorg.apache.spark.{SparkConf,SparkContext}objectAccumulatorExample{defmain(args:Array[String]){valconf=newSparkConf().setAppName("AccumulatorExample")valsc=newSparkContext(conf)valaccum=sc.longAccumulator("MyAccumulator")sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))println(accum.value)//输出10}}这个示例中,我们创建了一个名为MyAccumulator的累加器,并使用sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))来对其进行累加。最后,我们使用println(accum.value)来输出累加器的值,结果为10。
我们可以利用子类AccumulatorParam创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0值”(zerovalue),addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:
objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{defzero(initialValue:Vector):Vector={Vector.zeros(initialValue.size)}defaddInPlace(v1:Vector,v2:Vector):Vector={v1+=v2}}//Then,createanAccumulatorofthistype:valvecAccum=sc.accumulator(newVector(...))(VectorAccumulatorParam)九、SparkSQLSpark为结构化数据处理引入了一个称为SparkSQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。
数字类型包括:
字符串类型包括:
二进制类型包括:
布尔类型包括:
区间类型包括:
复合类型包括:
DataFrame是Spark中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。
DataFrame支持多种数据源,包括结构化数据文件、Hive表、外部数据库和现有的RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。
DataFrame的优点在于它提供了一种高级的抽象,使得用户可以使用类似于SQL的语言进行数据处理,而无需关心底层的实现细节。此外,Spark会自动对DataFrame进行优化,以提高查询性能。
下面是一个使用DataFrame的代码例子:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DataFrameExample").getOrCreate()importspark.implicits._valdata=Seq(("Alice",25),("Bob",30),("Charlie",35))valdf=data.toDF("name","age")df.show()在这个示例中,我们首先创建了一个SparkSession对象,然后使用toDF方法将一个序列转换为DataFrame。最后,我们使用show方法来显示DataFrame的内容。
在Scala中,可以通过以下几种方式创建DataFrame:
从现有的RDD转换而来。例如:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataFrame").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Int)valrdd=spark.sparkContext.parallelize(Seq(Person("Alice",25),Person("Bob",30)))valdf=rdd.toDF()df.show()从外部数据源读取。例如,从JSON文件中读取数据并创建DataFrame:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataFrame").getOrCreate()valdf=spark.read.json("path/to/json/file")df.show()通过编程方式创建。例如,使用createDataFrame方法:
importorg.apache.spark.sql.{Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}valspark=SparkSession.builder.appName("CreateDataFrame").getOrCreate()valschema=StructType(List(StructField("name",StringType,nullable=true),StructField("age",IntegerType,nullable=true)))valdata=Seq(Row("Alice",25),Row("Bob",30))valrdd=spark.sparkContext.parallelize(data)valdf=spark.createDataFrame(rdd,schema)df.show()5.DSL&SQL在Spark中,可以使用两种方式对DataFrame进行查询:「DSL(Domain-SpecificLanguage)」和「SQL」。
DSL是一种特定领域语言,它提供了一组用于操作DataFrame的方法。例如,下面是一个使用DSL进行查询的例子:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DSLandSQL").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.select("name","age").filter($"age">25).show()SQL是一种结构化查询语言,它用于管理关系数据库系统。在Spark中,可以使用SQL对DataFrame进行查询。例如,下面是一个使用SQL进行查询的例子:
SparkSQL支持多种数据源,包括Parquet、JSON、CSV、JDBC、Hive等。
下面是示例代码:
下面是从Parquet文件中读取数据并创建DataFrame的示例代码:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("LoadandSaveExample").getOrCreate()valdf=spark.read.load("path/to/parquet/file")df.show()下面是将DataFrame保存到Parquet文件的示例代码:
此外,SparkSQL还支持「自定义函数(User-DefinedFunction,UDF)」,可以让用户编写自己的函数并在查询中使用。
下面是一个使用SQL语法编写自定义函数的示例代码:
importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.functions.udfvalspark=SparkSession.builder.appName("UDFExample").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.createOrReplaceTempView("people")valsquare=udf((x:Int)=>x*x)spark.udf.register("square",square)spark.sql("SELECTname,square(age)FROMpeople").show()在这个示例中,我们首先定义了一个名为square的自定义函数,它接受一个整数参数并返回它的平方。然后,我们使用createOrReplaceTempView方法创建一个临时视图,并使用udf.register方法注册自定义函数。
最后,我们使用spark.sql方法执行SQL查询,并在查询中调用自定义函数。
DataSet是Spark1.6版本中引入的一种新的数据结构,它提供了RDD的强类型和DataFrame的查询优化能力。
在Scala中,可以通过以下几种方式创建DataSet:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataSet").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Int)valrdd=spark.sparkContext.parallelize(Seq(Person("Alice",25),Person("Bob",30)))valds=rdd.toDS()ds.show()从外部数据源读取。例如,从JSON文件中读取数据并创建DataSet:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataSet").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Long)valds=spark.read.json("path/to/json/file").as[Person]ds.show()通过编程方式创建。例如,使用createDataset方法:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataSet").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Int)valdata=Seq(Person("Alice",25),Person("Bob",30))valds=spark.createDataset(data)ds.show()11.DataSetVSDataFrameDataSet和DataFrame都是Spark中用于处理结构化数据的数据结构。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等。
它们之间的主要区别在于类型安全性。DataFrame是一种弱类型的数据结构,它的列只有在运行时才能确定类型。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常。
而DataSet是一种强类型的数据结构,它的类型在编译时就已经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错。
此外,DataSet还提供了一些额外的操作,例如map、flatMap、reduce等。
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换。
DataFrame/Dataset转RDD:
valrdd1=testDF.rddvalrdd2=testDS.rddRDD转DataSet:
importspark.implicits._caseclassColtest(col1:String,col2:Int)extendsSerializable//定义字段名和类型valtestDS=rdd.map{line=>Coltest(line._1,line._2)}.toDS可以注意到,定义每一行的类型(caseclass)时,已经给出了字段名和类型,后面只要往caseclass里面添加值即可。
Dataset转DataFrame:
importspark.implicits._valtestDF=testDS.toDFDataFrame转Dataset:
importspark.implicits._caseclassColtest(col1:String,col2:Int)extendsSerializable//定义字段名和类型valtestDS=testDF.as[Coltest]这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型在DataFrame需要针对各个字段处理时极为方便。
注意:在使用一些特殊的操作时,一定要加上importspark.implicits._不然toDF、toDS无法使用。
SparkStreaming的工作原理是将实时数据流拆分为小批量数据,并使用Spark引擎对这些小批量数据进行处理。这种微批处理(Micro-BatchProcessing)的方式使得SparkStreaming能够以近乎实时的延迟处理大规模的数据流。
下面是一个简单的SparkStreaming示例代码:
importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("SparkStreamingExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()我们首先创建了一个StreamingContext对象,并指定了批处理间隔为1秒。然后,我们使用socketTextStream方法从套接字源创建了一个DStream。接下来,我们对DStream进行了一系列操作,包括flatMap、map和reduceByKey。最后,我们使用print方法打印出单词计数的结果。
SparkStreaming作为一种实时流处理框架,具有以下优点:
但是,SparkStreaming也有一些缺点:
在SparkStreaming中,可以通过以下几种方式创建DStream:
(1)从输入源创建。例如,从套接字源创建DStream:
importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)lines.print()ssc.start()ssc.awaitTermination()(2)通过转换操作创建。例如,对现有的DStream进行map操作:
importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))words.print()ssc.start()ssc.awaitTermination()(3)通过连接操作创建。例如,对两个DStream进行union操作:
importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines1=ssc.socketTextStream("localhost",9999)vallines2=ssc.socketTextStream("localhost",9998)vallines=lines1.union(lines2)lines.print()ssc.start()ssc.awaitTermination()总结:简单来说DStream就是对RDD的封装,你对DStream进行操作,就是对RDD进行操作。对于DataFrame/DataSet/DStream来说本质上都可以理解成RDD。
SparkStreaming提供了多种窗口函数,包括:
下面是一个使用窗口函数的示例代码:
importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("WindowExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(30),Seconds(10))wordCounts.print()ssc.start()ssc.awaitTermination()在这个示例中,我们首先创建了一个DStream,并对其进行了一系列转换操作。然后,我们使用reduceByKeyAndWindow函数对DStream进行窗口化处理,指定了窗口大小为30秒,滑动间隔为10秒。最后,我们使用print方法打印出单词计数的结果。
SparkStreaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。SparkStreaming支持以下输出操作:
与SparkStreaming相比,StructuredStreaming具有以下优点:
下面是一个简单的StructuredStreaming示例代码:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()importspark.implicits._valwords=lines.as[String].flatMap(_.split(""))valwordCounts=words.groupBy("value").count()valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()在这个示例中,我们首先创建了一个SparkSession对象。然后,我们使用readStream方法从套接字源创建了一个DataFrame。接下来,我们对DataFrame进行了一系列操作,包括flatMap、groupBy和count。最后,我们使用writeStream方法将结果输出到控制台。
StructuredStreaming同样支持DSL和SQL语法。
DSL语法:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()importspark.implicits._valwords=lines.as[String].flatMap(_.split(""))valwordCounts=words.groupBy("value").count()valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()SQL语法:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//订阅一个主题valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","host1:port1,host2:port2").option("subscribe","topic1").load()df.selectExpr("CAST(keyASSTRING)","CAST(valueASSTRING)").as[(String,String)]2.OutputStructuredStreaming支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等。下面是将数据写入到Parquet文件中的例子:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//从socket中读取数据vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()//将数据写入到Parquet文件中lines.writeStream.format("parquet").option("path","path/to/output/dir").option("checkpointLocation","path/to/checkpoint/dir").start()3.OutputMode每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
Outputmode指定了数据写入输出接收器的方式。StructuredStreaming支持以下三种outputmode:
OutputMode
Append
只将流DataFrame/Dataset中的新行写入接收器。
Complete
每当有更新时,将流DataFrame/Dataset中的所有行写入接收器。
Update
每当有更新时,只将流DataFrame/Dataset中更新的行写入接收器。
Outputsink指定了数据写入的位置。StructuredStreaming支持多种输出接收器,包括文件接收器、Kafka接收器、Foreach接收器、控制台接收器和内存接收器等。下面是一些使用Scala语言将数据写入到不同输出接收器中的例子:
假设我们在本地启动了一个socket服务器,并向其发送以下数据: