Spark入门指南:从基础概念到实践应用全解析框架程序开发

学习一个东西之前先要知道这个东西是什么。

Spark是一个开源的大数据处理引擎,它提供了一整套开发API,包括流计算和机器学习。它支持批处理和流处理。

Spark提供了6大核心组件:

(1)SparkCore

SparkCore是Spark的基础,它提供了内存计算的能力,是分布式处理大数据集的基础。它将分布式数据抽象为弹性分布式数据集(RDD),并为运行在其上的上层组件提供API。所有Spark的上层组件都建立在SparkCore的基础之上。

(2)SparkSQL

SparkSQL是一个用于处理结构化数据的Spark组件。它允许使用SQL语句查询数据。Spark支持多种数据源,包括Hive表、Parquet和JSON等。

(3)SparkStreaming

(4)SparkMLlib

SparkMLlib是Spark的机器学习库。它提供了常用的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维等。MLlib还提供了一些底层优化原语和高层流水线API,可以帮助开发人员更快地创建和调试机器学习流水线。

(5)SparkGraphX

SparkGraphX是Spark的图形计算库。它提供了一种分布式图形处理框架,可以帮助开发人员更快地构建和分析大型图形。

Spark有许多优势,其中一些主要优势包括:

上手写一个简单的代码例子,下面是一个WordCount的Spark程序:

接下来,程序创建了一个包含两个字符串的列表,并使用parallelize方法将其转换为一个RDD。然后,它使用flatMap方法将每一行文本拆分成单词,并使用map方法将每个单词映射为一个键值对(key-valuepair),其中键是单词,值是1。

最后,程序使用reduceByKey方法将具有相同键的键值对进行合并,并对它们的值进行求和。最终结果是一个包含每个单词及其出现次数的RDD。程序使用collect方法将结果收集到驱动程序,并使用foreach方法打印出来。

Spark的理论较多,为了更有效地学习Spark,首先来理解下其基本概念。

Application指的就是用户编写的Spark应用程序。

如下,"WordCount"就是该应用程序的名字。

importorg.apache.spark.sql.SparkSessionobjectWordCount{defmain(args:Array[String]){//创建SparkSession对象,它是SparkApplication的入口valspark=SparkSession.builder.appName("WordCount").getOrCreate()//读取文本文件并创建DatasetvaltextFile=spark.read.textFile("hdfs://...")//使用flatMap转换将文本分割为单词,并使用reduceByKey转换计算每个单词的数量valcounts=textFile.flatMap(line=>line.split("")).groupByKey(identity).count()//将结果保存到文本文件中counts.write.text("hdfs://...")//停止SparkSessionspark.stop()}}2.DriverDriver是运行SparkApplication的进程,它负责创建SparkSession和SparkContext对象,并将代码转换和操作。

它还负责创建逻辑和物理计划,并与集群管理器协调调度任务。

简而言之,SparkApplication是使用SparkAPI编写的程序,而SparkDriver是负责运行该程序并与集群管理器协调的进程。

可以将Driver理解为运行SparkApplicationmain方法的进程。

driver的内存大小可以进行设置,配置如下:

#设置driver内存大小driver-memory1024m3.Master&Worker在Spark中,Master是独立集群的控制者,而Worker是工作者。

一个Spark独立集群需要启动一个Master和多个Worker。Worker就是物理节点,Worker上面可以启动Executor进程。

在每个Worker上为某应用启动的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。

每个任务都有各自独立的Executor。Executor是一个执行Task的容器。实际上它是一组计算资源(cpu核心、memory)的集合。

一个Worker节点可以有多个Executor。一个Executor可以运行多个Task。

Executor创建成功后,在日志文件会显示如下信息:

INFOExecutor:StartingexecutorID[executorId]onhost[executorHostname]5.RDDRDD(ResilientDistributedDataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD的Partition是指数据集的分区。它是数据集中元素的集合,这些元素被分区到集群的节点上,可以并行操作。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPUCore的数目。

一个函数会被作用在每一个分区。Spark中RDD的计算是以分片为单位的,compute函数会被作用到每个分区上。

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

一个Job包含多个RDD及作用于相应RDD上的各种操作,每个Action的触发就会生成一个job。用户提交的Job会提交给DAGScheduler,Job会被分解成Stage,Stage会被细化成Task。

被发送到Executor上的工作单元。每个Task负责计算一个分区的数据。

在Spark中,一个作业(Job)会被划分为多个阶段(Stage)。同一个Stage可以有多个Task并行执行(Task数=分区数)。

阶段之间的划分是根据数据的依赖关系来确定的。当一个RDD的分区依赖于另一个RDD的分区时,这两个RDD就属于同一个阶段。当一个RDD的分区依赖于多个RDD的分区时,这些RDD就属于不同的阶段。

上图中,Stage表示一个可以顺滑完成的阶段。曲线表示Shuffle过程。

如果Stage能够复用前面的Stage的话,那么会显示灰色。

在Spark中,Shuffle是指在不同阶段之间重新分配数据的过程。它通常发生在需要对数据进行聚合或分组操作的时候,例如reduceByKey或groupByKey等操作。

在Shuffle过程中,Spark会将数据按照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处理属于它自己分区的数据。

Stage的划分,简单来说是以宽依赖来划分的。

对于窄依赖,Partition的转换处理在Stage中完成计算,不划分(将窄依赖尽量放在在同一个Stage中,可以实现流水线计算)。

对于宽依赖,由于有Shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要划分Stage。

Spark会根据Shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的Stage阶段中。

至于什么是窄依赖和宽依赖,下文马上就会提及。

(1)窄依赖

父RDD的一个分区只会被子RDD的一个分区依赖。比如:map,filter和union,这种依赖称之为「窄依赖」。

窄依赖的多个分区可以并行计算,并且窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

(2)宽依赖

指子RDD的分区依赖于父RDD的所有分区,称之为「宽依赖」。

对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

有向无环图,其实说白了就是RDD之间的依赖关系图。

Spark的执行流程大致如下:

RDD的概念在Spark中十分重要,上面只是简单的介绍了一下,下面详细的对RDD展开介绍。

RDD是“ResilientDistributedDataset”的缩写,从全称就可以了解到RDD的一些典型特性:

RDD里面的数据集会被逻辑分成若干个分区,这些分区是分布在集群的不同节点的,基于这样的特性,RDD才能在集群不同节点并行计算。

RDD支持两种操作:

(1)转换操作(Transformation)

转换操作以RDD做为输入参数,然后输出一个或者多个RDD。转换操作不会修改输入RDD。Map()、Filter()这些都属于转换操作。

转换操作是惰性求值操作,只有在碰到行动操作(Actions)的时候,转换操作才会真正实行。转换操作分两种:「窄依赖」和「宽依赖」。

下面是一些常见的转换操作:

转换操作

描述

map

将函数应用于RDD中的每个元素,并返回一个新的RDD

filter

返回一个新的RDD,其中包含满足给定谓词的元素

flatMap

将函数应用于RDD中的每个元素,并将返回的迭代器展平为一个新的RDD

union

返回一个新的RDD,其中包含两个RDD的元素

distinct

返回一个新的RDD,其中包含原始RDD中不同的元素

groupByKey

将键值对RDD中具有相同键的元素分组到一起,并返回一个新的RDD

reduceByKey

将键值对RDD中具有相同键的元素聚合到一起,并返回一个新的RDD

sortByKey

返回一个新的键值对RDD,其中元素按照键排序

(2)行动操作(Action)

Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。

Action操作

reduce

通过函数聚合RDD中的所有元素

collect

将RDD中的所有元素返回到驱动程序

count

返回RDD中的元素个数

first

返回RDD中的第一个元素

take

返回RDD中的前n个元素

takeOrdered

返回RDD中的前n个元素,按照自然顺序或指定的顺序排序

saveAsTextFile

将RDD中的元素保存到文本文件中

foreach

将函数应用于RDD中的每个元素

创建RDD有3种不同方式:

(1)从外部存储系统

由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等:

valrdd1=sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")(2)从其他RDD

通过已有的RDD经过算子转换生成新的RDD:

valrdd2=rdd1.flatMap(_.split(""))(3)由一个已经存在的Scala集合创建

valrdd3=sc.parallelize(Array(1,2,3,4,5,6,7,8))或者valrdd4=sc.makeRDD(List(1,2,3,4,5,6,7,8))其实makeRDD方法底层调用了parallelize方法:

RDD缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

valrdd1=sc.textFile("hdfs://node01:8020/words.txt")valrdd2=rdd1.flatMap(x=>x.split("")).map((_,1)).reduceByKey(_+_)rdd2.cache//缓存/持久化rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了需要注意的是,在触发action的时候,才会去执行持久化。

cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中去除缓存,那么可以使用unpersist()方法。

rdd.persist(StorageLevel.MEMORY_ONLY)rdd.unpersist()5.存储级别RDD存储级别主要有以下几种。

级别

使用空间

是否在内存中

是否在磁盘上

备注

MEMORY_ONLY

使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。

MEMORY_ONLY_2

数据存2份

MEMORY_ONLY_SER

基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化。这种方式更加节省内存

MEMORY_ONLY_SER_2

数据序列化,数据存2份

MEMORY_AND_DISK

中等

部分

如果数据在内存中放不下,则溢写到磁盘

MEMORY_AND_DISK_2

MEMORY_AND_DISK_SER

基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化

MEMORY_AND_DISK_SER_2

DISK_ONLY

使用未序列化的Java对象格式,将数据全部写入磁盘文件中。

DISK_ONLY_2

OFF_HEAP

这个目前是试验型选项,类似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。

这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

血缘关系是指RDD之间的依赖关系。当你对一个RDD执行转换操作时,Spark会生成一个新的RDD,并记录这两个RDD之间的依赖关系。这种依赖关系就是血缘关系。

血缘关系可以帮助Spark在发生故障时恢复数据。当一个分区丢失时,Spark可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个RDD。

血缘关系还可以帮助Spark优化计算过程。Spark可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销。

我们可以执行toDebugString打印RDD的依赖关系。

下面是一个简单的例子:

valconf=newSparkConf().setAppName("LineageExample").setMaster("local")valsc=newSparkContext(conf)valdata=sc.parallelize(List(1,2,3,4,5))valmappedData=data.map(x=>x+1)valfilteredData=mappedData.filter(x=>x%2==0)println(filteredData.toDebugString)在这个例子中,我们首先创建了一个包含5个元素的RDD,并对它执行了两个转换操作:map和filter。然后,我们使用toDebugString方法打印了最终RDD的血缘关系。

运行这段代码后,你会看到类似下面的输出:

(2)MapPartitionsRDD[2]atfilterat:26[]|MapPartitionsRDD[1]atmapat:24[]|ParallelCollectionRDD[0]atparallelizeat:22[]这个输出表示最终的RDD是通过两个转换操作(map和filter)从原始的ParallelCollectionRDD转换而来的。

CheckPoint可以将RDD从其依赖关系中抽出来,保存到可靠的存储系统(例如HDFS,S3等),即它可以将数据和元数据保存到检查指向目录中。因此,在程序发生崩溃的时候,Spark可以恢复此数据,并从停止的任何地方开始。

CheckPoint分为两类:

开发人员可以使用RDD.checkpoint()方法来设置检查点。在使用检查点之前,必须使用SparkContext.setCheckpointDir(directory:String)方法设置检查点目录。

importorg.apache.spark.{SparkConf,SparkContext}objectCheckpointExample{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CheckpointExample").setMaster("local")valsc=newSparkContext(conf)//设置checkpoint目录sc.setCheckpointDir("/tmp/checkpoint")valdata=sc.parallelize(List(1,2,3,4,5))valmappedData=data.map(x=>x+1)valfilteredData=mappedData.filter(x=>x%2==0)//对RDD进行checkpointfilteredData.checkpoint()//触发checkpointfilteredData.count()}}RDD的检查点机制就好比Hadoop将中间计算值存储到磁盘,即使计算中出现了故障,我们也可以轻松地从中恢复。通过对RDD启动检查点机制可以实现容错和高可用。

参数名

参数说明

—master

master的地址,提交任务到哪里执行,例如spark://host:port,yarn,local。具体指可参考下面关于Master_URL的列表

—deploy-mode

在本地(client)启动driver或在cluster上启动,默认是client

—class

应用程序的主类,仅针对java或scala应用

—name

应用程序的名称

—jars

用逗号分隔的本地jar包,设置后,这些jar将包含在driver和executor的classpath下

—packages

包含在driver和executor的classpath中的jar的maven坐标

—exclude-packages

为了避免冲突而指定不包含的package

—repositories

远程repository

—confPROP=VALUE

指定spark配置属性的值,例如-confspark.executor.extraJavaOptinotallow=”-XX:MaxPermSize=256m”

—properties-file

加载的配置文件,默认为conf/spark-defaults.conf

—driver-memory

Driver内存,默认1G

—driver-java-options

传给driver的额外的Java选项

—driver-library-path

传给driver的额外的库路径

—driver-class-path

传给driver的额外的类路径

—driver-cores

Driver的核数,默认是1。在yarn或者standalone下使用

—executor-memory

每个executor的内存,默认是1G

—total-executor-cores

所有executor总共的核数。仅仅在mesos或者standalone下使用

—num-executors

启动的executor数量。默认为2。在yarn下使用

—executor-core

每个executor的核数。在yarn或者standalone下使用

MasterURL

含义

local

使用1个worker线程在本地运行Spark应用程序

local[K]

使用K个worker线程在本地运行Spark应用程序

local[*]

使用所有剩余worker线程在本地运行Spark应用程序

spark://HOST:PORT

连接到SparkStandalone集群,以便在该集群上运行Spark应用程序

mesos://HOST:PORT

连接到Mesos集群,以便在该集群上运行Spark应用程序

yarn-client

以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。

yarn-cluster

以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。

一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。

这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:「广播变量(broadcastvariable)」和「累加器(accumulator)」。

如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:

importorg.apache.spark.{SparkConf,SparkContext}objectBroadcastExample{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("BroadcastExample").setMaster("local")valsc=newSparkContext(conf)valdata=sc.parallelize(List(1,2,3,4,5))//创建一个广播变量valfactor=sc.broadcast(2)//使用广播变量valresult=data.map(x=>x*factor.value)result.collect().foreach(println)}}广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。

累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。

一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。

示例代码如下:

importorg.apache.spark.{SparkConf,SparkContext}objectAccumulatorExample{defmain(args:Array[String]){valconf=newSparkConf().setAppName("AccumulatorExample")valsc=newSparkContext(conf)valaccum=sc.longAccumulator("MyAccumulator")sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))println(accum.value)//输出10}}这个示例中,我们创建了一个名为MyAccumulator的累加器,并使用sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))来对其进行累加。最后,我们使用println(accum.value)来输出累加器的值,结果为10。

我们可以利用子类AccumulatorParam创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0值”(zerovalue),addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:

objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{defzero(initialValue:Vector):Vector={Vector.zeros(initialValue.size)}defaddInPlace(v1:Vector,v2:Vector):Vector={v1+=v2}}//Then,createanAccumulatorofthistype:valvecAccum=sc.accumulator(newVector(...))(VectorAccumulatorParam)九、SparkSQLSpark为结构化数据处理引入了一个称为SparkSQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。

数字类型包括:

字符串类型包括:

二进制类型包括:

布尔类型包括:

区间类型包括:

复合类型包括:

DataFrame是Spark中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。

DataFrame支持多种数据源,包括结构化数据文件、Hive表、外部数据库和现有的RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。

DataFrame的优点在于它提供了一种高级的抽象,使得用户可以使用类似于SQL的语言进行数据处理,而无需关心底层的实现细节。此外,Spark会自动对DataFrame进行优化,以提高查询性能。

下面是一个使用DataFrame的代码例子:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DataFrameExample").getOrCreate()importspark.implicits._valdata=Seq(("Alice",25),("Bob",30),("Charlie",35))valdf=data.toDF("name","age")df.show()在这个示例中,我们首先创建了一个SparkSession对象,然后使用toDF方法将一个序列转换为DataFrame。最后,我们使用show方法来显示DataFrame的内容。

在Scala中,可以通过以下几种方式创建DataFrame:

从现有的RDD转换而来。例如:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataFrame").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Int)valrdd=spark.sparkContext.parallelize(Seq(Person("Alice",25),Person("Bob",30)))valdf=rdd.toDF()df.show()从外部数据源读取。例如,从JSON文件中读取数据并创建DataFrame:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataFrame").getOrCreate()valdf=spark.read.json("path/to/json/file")df.show()通过编程方式创建。例如,使用createDataFrame方法:

importorg.apache.spark.sql.{Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}valspark=SparkSession.builder.appName("CreateDataFrame").getOrCreate()valschema=StructType(List(StructField("name",StringType,nullable=true),StructField("age",IntegerType,nullable=true)))valdata=Seq(Row("Alice",25),Row("Bob",30))valrdd=spark.sparkContext.parallelize(data)valdf=spark.createDataFrame(rdd,schema)df.show()5.DSL&SQL在Spark中,可以使用两种方式对DataFrame进行查询:「DSL(Domain-SpecificLanguage)」和「SQL」。

DSL是一种特定领域语言,它提供了一组用于操作DataFrame的方法。例如,下面是一个使用DSL进行查询的例子:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DSLandSQL").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.select("name","age").filter($"age">25).show()SQL是一种结构化查询语言,它用于管理关系数据库系统。在Spark中,可以使用SQL对DataFrame进行查询。例如,下面是一个使用SQL进行查询的例子:

SparkSQL支持多种数据源,包括Parquet、JSON、CSV、JDBC、Hive等。

下面是示例代码:

下面是从Parquet文件中读取数据并创建DataFrame的示例代码:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("LoadandSaveExample").getOrCreate()valdf=spark.read.load("path/to/parquet/file")df.show()下面是将DataFrame保存到Parquet文件的示例代码:

此外,SparkSQL还支持「自定义函数(User-DefinedFunction,UDF)」,可以让用户编写自己的函数并在查询中使用。

下面是一个使用SQL语法编写自定义函数的示例代码:

importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.functions.udfvalspark=SparkSession.builder.appName("UDFExample").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.createOrReplaceTempView("people")valsquare=udf((x:Int)=>x*x)spark.udf.register("square",square)spark.sql("SELECTname,square(age)FROMpeople").show()在这个示例中,我们首先定义了一个名为square的自定义函数,它接受一个整数参数并返回它的平方。然后,我们使用createOrReplaceTempView方法创建一个临时视图,并使用udf.register方法注册自定义函数。

最后,我们使用spark.sql方法执行SQL查询,并在查询中调用自定义函数。

DataSet是Spark1.6版本中引入的一种新的数据结构,它提供了RDD的强类型和DataFrame的查询优化能力。

在Scala中,可以通过以下几种方式创建DataSet:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataSet").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Int)valrdd=spark.sparkContext.parallelize(Seq(Person("Alice",25),Person("Bob",30)))valds=rdd.toDS()ds.show()从外部数据源读取。例如,从JSON文件中读取数据并创建DataSet:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataSet").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Long)valds=spark.read.json("path/to/json/file").as[Person]ds.show()通过编程方式创建。例如,使用createDataset方法:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("CreateDataSet").getOrCreate()importspark.implicits._caseclassPerson(name:String,age:Int)valdata=Seq(Person("Alice",25),Person("Bob",30))valds=spark.createDataset(data)ds.show()11.DataSetVSDataFrameDataSet和DataFrame都是Spark中用于处理结构化数据的数据结构。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等。

它们之间的主要区别在于类型安全性。DataFrame是一种弱类型的数据结构,它的列只有在运行时才能确定类型。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常。

而DataSet是一种强类型的数据结构,它的类型在编译时就已经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错。

此外,DataSet还提供了一些额外的操作,例如map、flatMap、reduce等。

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换。

DataFrame/Dataset转RDD:

valrdd1=testDF.rddvalrdd2=testDS.rddRDD转DataSet:

importspark.implicits._caseclassColtest(col1:String,col2:Int)extendsSerializable//定义字段名和类型valtestDS=rdd.map{line=>Coltest(line._1,line._2)}.toDS可以注意到,定义每一行的类型(caseclass)时,已经给出了字段名和类型,后面只要往caseclass里面添加值即可。

Dataset转DataFrame:

importspark.implicits._valtestDF=testDS.toDFDataFrame转Dataset:

importspark.implicits._caseclassColtest(col1:String,col2:Int)extendsSerializable//定义字段名和类型valtestDS=testDF.as[Coltest]这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型在DataFrame需要针对各个字段处理时极为方便。

注意:在使用一些特殊的操作时,一定要加上importspark.implicits._不然toDF、toDS无法使用。

SparkStreaming的工作原理是将实时数据流拆分为小批量数据,并使用Spark引擎对这些小批量数据进行处理。这种微批处理(Micro-BatchProcessing)的方式使得SparkStreaming能够以近乎实时的延迟处理大规模的数据流。

下面是一个简单的SparkStreaming示例代码:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("SparkStreamingExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()我们首先创建了一个StreamingContext对象,并指定了批处理间隔为1秒。然后,我们使用socketTextStream方法从套接字源创建了一个DStream。接下来,我们对DStream进行了一系列操作,包括flatMap、map和reduceByKey。最后,我们使用print方法打印出单词计数的结果。

SparkStreaming作为一种实时流处理框架,具有以下优点:

但是,SparkStreaming也有一些缺点:

在SparkStreaming中,可以通过以下几种方式创建DStream:

(1)从输入源创建。例如,从套接字源创建DStream:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)lines.print()ssc.start()ssc.awaitTermination()(2)通过转换操作创建。例如,对现有的DStream进行map操作:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))words.print()ssc.start()ssc.awaitTermination()(3)通过连接操作创建。例如,对两个DStream进行union操作:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines1=ssc.socketTextStream("localhost",9999)vallines2=ssc.socketTextStream("localhost",9998)vallines=lines1.union(lines2)lines.print()ssc.start()ssc.awaitTermination()总结:简单来说DStream就是对RDD的封装,你对DStream进行操作,就是对RDD进行操作。对于DataFrame/DataSet/DStream来说本质上都可以理解成RDD。

SparkStreaming提供了多种窗口函数,包括:

下面是一个使用窗口函数的示例代码:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("WindowExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(30),Seconds(10))wordCounts.print()ssc.start()ssc.awaitTermination()在这个示例中,我们首先创建了一个DStream,并对其进行了一系列转换操作。然后,我们使用reduceByKeyAndWindow函数对DStream进行窗口化处理,指定了窗口大小为30秒,滑动间隔为10秒。最后,我们使用print方法打印出单词计数的结果。

SparkStreaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。SparkStreaming支持以下输出操作:

与SparkStreaming相比,StructuredStreaming具有以下优点:

下面是一个简单的StructuredStreaming示例代码:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()importspark.implicits._valwords=lines.as[String].flatMap(_.split(""))valwordCounts=words.groupBy("value").count()valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()在这个示例中,我们首先创建了一个SparkSession对象。然后,我们使用readStream方法从套接字源创建了一个DataFrame。接下来,我们对DataFrame进行了一系列操作,包括flatMap、groupBy和count。最后,我们使用writeStream方法将结果输出到控制台。

StructuredStreaming同样支持DSL和SQL语法。

DSL语法:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()importspark.implicits._valwords=lines.as[String].flatMap(_.split(""))valwordCounts=words.groupBy("value").count()valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()SQL语法:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//订阅一个主题valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","host1:port1,host2:port2").option("subscribe","topic1").load()df.selectExpr("CAST(keyASSTRING)","CAST(valueASSTRING)").as[(String,String)]2.OutputStructuredStreaming支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等。下面是将数据写入到Parquet文件中的例子:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//从socket中读取数据vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()//将数据写入到Parquet文件中lines.writeStream.format("parquet").option("path","path/to/output/dir").option("checkpointLocation","path/to/checkpoint/dir").start()3.OutputMode每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

Outputmode指定了数据写入输出接收器的方式。StructuredStreaming支持以下三种outputmode:

OutputMode

Append

只将流DataFrame/Dataset中的新行写入接收器。

Complete

每当有更新时,将流DataFrame/Dataset中的所有行写入接收器。

Update

每当有更新时,只将流DataFrame/Dataset中更新的行写入接收器。

Outputsink指定了数据写入的位置。StructuredStreaming支持多种输出接收器,包括文件接收器、Kafka接收器、Foreach接收器、控制台接收器和内存接收器等。下面是一些使用Scala语言将数据写入到不同输出接收器中的例子:

假设我们在本地启动了一个socket服务器,并向其发送以下数据:

THE END
1.[Spark]一Spark基础入门[Spark]一、Spark基础入门 G:\Bigdata\4.spark\2024最新版Spark视频教程 一、Spark基础入门 1. Spark概述 1.1什么是Spark 回顾:Hadoop主要解决,海量数据的存储和海量数据的分析计算。 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 1.2 Hadoop与Spark历史https://blog.csdn.net/weixin_44428807/article/details/139758282
2.Spark基础知识点在spark 的基础上,spark还提供了包括spark sql, spark streaming, Mlib及GraphX在内的多个工具库,我们可以在一个应用中无缝地使用这些工具库。 运行方式 spark 支持多种运行方式,包含在hadoop 和Mesos上,也支持Standalone的独立运行模式,同时也可以运行在云Kubernetes (Spark 2.3开始支持)上。 http://www.wjks.cn/news/56706.html
3.如何开始学习使用Spark?要开始学习使用Spark,你需要掌握其基础概念、组件和运行模式。Spark是一个高速、通用和可扩展的大数据处理框架,广泛应用于批处理、交互式查询、实时流处理、机器学习和图计算[^1^]。以下是学习使用Spark的详细指南: 了解基础 掌握编程语言知识:Spark支持多种编程语言,如Scala、Python和Java[^1^]。选择一种你熟悉的语https://developer.aliyun.com/article/1601324
4.Spark编程基础(Scala版)本章节将讲解 Spark 简介、安装、运行架构、RDD 的设计与运行原理、部署模式。 2-1 概述 2-2 Spark生态系统 2-3 Spark运行架构 2-4 Spark 的部署方式 第3章 Scala 语言基础 本章节将介绍 Scala 语言基础语法。 3-1 Scala 语言概述 3-2 Scala 基础知识 https://m.educoder.net/paths/tqb74pxe
5.Spark概述代码1.1 importorg.apache.spark.{SparkConf,SparkContext}//建立的WordCount对象,以及定义main函数objectWordCount{defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local").setAppName("wordcount")valsc=newSparkContext(conf)vallines=sc.textFile("文件路径")valwords=lines.flatMap(linehttps://aibigdata.csu.edu.cn/ch1_spark_gaishu.html
6.spark从入门到实战spark基础知识spark从入门到实战 spark基础知识 目录 基础 概述 分工 作业提交流程 Executor 共享变量 Broadcast Variable(广播变量) Accumulator(累加变量) 内存管理 相关配置 堆内内存 堆外内存 Execution 内存和 Storage 内存动态调整 Task之间内存分布 Spark Core spark的shufflehttps://blog.51cto.com/u_16213725/7762628
7.零基础入门Spark快速构建 Spark 核心知识体系 Spark 三大计算场景案例实操 逐句注释的保姆级代码讲解 在故事中搞懂 Spark 开发实战技巧 课程介绍 说到学习 Spark,如果你对“他会结合自己这些年学习、应用和实战 Spark 的丰富经验,为你梳理一套零基础入门 Spark 的“三步走”方法论:熟悉Spark 开发 API 与常用算子、吃透 Spark https://time.geekbang.org/column/intro/100090001
8.Hudi核心知识点详解——快速入门数据湖袋鼠社区大数据大厂之 Hudi 数据湖框架性能提升:高效处理大数据变更 技术共享?数栈君发表了文章 ? 0 个评论 ? 135 次浏览 ? 2024-10-09 15:26 在之前对大数据技术的探索中,我们已经了解到大数据领域不断发展,各种技术和框架应运而生,以满足日益增长的数据处理需求。从之前讨论过的相关大数据技术和框架,https://www.dtstack.com/bbs/topic/8500
9.Spark相关知识资源汇总winway'sblogSpark Streaming使用入门 Spark常用算子 Scala 教程 Spark scala使用案例 Spark面试题 其他零散知识点 很久没有使用Spark,忘得差不过了。最近重新复习了一下,查阅的相关资料整理如下。 Spark架构原理 一、Spark快速入门 30分钟理解Spark的基本原理 大数据基础:Spark 工作原理及基础概念 https://winway.github.io/2024/05/10/spark-resource-summary/
10.《Spark最佳实践》试读:6.1SparkStreaming基础知识Spark Streaming计算支持Scala、Java、Python语言,暂时不支持R。 6.1 Spark Streaming基础知识 本节通过一个简单示例介绍如何开发Spark流式计算程序。 6.1.1 入门简单示例 在介绍完整的Spark流式计算之前,我们先给出一个最简单的示例:Spark Streaming版本的WordCount。使用Linux下的终端工具netcat(简称nc)启动一个TCP服务https://book.douban.com/reading/37743121/
11.clickhouse入门基础知识了解ClickHouse经常会被拿来与其他的分析型数据库作对比,比如Vertica、SparkSQL、Hive和Elasticsearch等,它与这些数据库确实存在许多相似之处。例如,它们都可以支撑海量数据的查询场景,都拥有分布式架构,都支持列存、数据分片、计算下推等特性。这其实也侧面说明了ClickHouse在设计上确实吸取了各路奇技淫巧。与其他数据库相比,https://www.jianshu.com/p/1f2d517744c3
12.SparkSQL入门与实践指南pdfepubmobitxt电子书下载2024本书共分为四篇:入门篇、基础篇、实践篇、调优篇,所有代码均采用简洁而优雅的Scala语言编写,Spark框架也是使用Scala语言编写的。 第一部分 入门篇(第1、2章) 第1章简要介绍Spark的诞生、Spark SQL的发展历史以及Spark SQL的用处等内容,使读者快速了解Spark SQL背景知识,为以后的学习奠定基础。 第2章通过讲解https://windowsfront.com/books/12319813
13.基础知识·asD基础知识RDD SparkContext,Spark最重要的API,用户逻辑与Spark集群主要的交互接口,它会和Cluster Master交互,包括向它申请计算资源等。 Resilient Distributed Datasets,弹性分布式数据集,只读分区记录的集合,Spark对所处理数据的基本抽象。 参考:Spark基本概念快速入门 点赞 您需要登录后才可以点赞https://www.kancloud.cn/asdj07/asd123/1035960
14.史上快速入门教程敬伟PS教程(A基础篇)A01初识PS15:09 C4D-1.2.C4D初识及基础操 1496播放 08:37 Spark基础入门-第七章-7.3 567播放 08:27 【AI系统教程】AI常用小工具(一 1245播放 07:55 【AI基础入门】26.实时上色工具 1067播放 03:34 IBM2.2Jupyter入门 5564播放 03:40 Mac入门系列-第三集 了解Doc 10.9万播放 01:https://open.163.com/newview/movie/free?pid=LEV1GNL3O
15.Kafka入门到精通学习路线图技术文章滴石it网Kafka是一个分布式流式处理平台,被广泛应用于大规模数据处理和实时数据流分析的场景中。以下是一个从入门到精通的学习路线图,帮助你系统地学习和掌握Kafka的相关技术。 1. 学习Kafka的概念和基础知识: – 了解Kafka的起源和背景,掌握Kafka的基本概念和术语,如消息、主题、分区、生产者、消费者等。 https://www.soft1188.com/javabk/6418.html
16.Spark快速大数据分析20240516123910.pdf1.5Spark的版本发布 1.6Spark的存储层次 第2章Spark下载与入门 2.1下载Spark 2.2Spark中PythonScala的hell 2.3Spark核心概念简介 2.4独立应用 2.4.1初始化SparkContext 2.4.2构建独立应用 2.5总结 第3章RDD编程 3.1RDD基础 3.2创建RDD 3.3RDD操作 3.3.1转化操作 3.3.2行动操作 333惰性求值 3.4向Spark传递函数 3.4.https://m.book118.com/html/2024/0516/7025054055006110.shtm
17.大数据Hadoop简单入门指南是一个快速且通用的集群计算系统,具备内存数据处理能力。Spark提供统一的分布式数据处理API,支持批量处理、迭代算法、交互式查询和流式数据处理。通过在内存中缓存数据并以分布式和并行的方式执行计算,Spark在许多场景中优于MapReduce。 示例用途:Spark可用于机器学习任务、实时数据处理、交互式数据分析和图处理。 http://www.360doc.com/content/24/0112/07/50382475_1110784560.shtml
18.Scala开发快速入门epubpdfmobitxt电子书下载2024Spark、Kafka等是大数据技术框架中的明星,Scala语言已经通过了工业界的检验,学习Scala语言为学习这些框架的设计原理打下坚实的基础,为以后参与开源、学习工业界最先进的大数据技术架构的优秀思想打下了坚实的基础。 如何阅读本书 本书涵盖三大主要部分: (1)Scala语言基础篇,主要介绍Scala语言入门基础知识、变量的定义、程https://book.tinynews.org/books/12026904
19.Windows8应用开发快速入门系列课程MicrosoftLearnLearn 发现 产品文档 开发语言 主题 登录 下载PDF 使用英语阅读 保存 添加到集合 添加到计划 通过 Facebookx.com 共享LinkedIn电子邮件 打印 项目 2021/10/20 反馈https://msdn.microsoft.com/zh-cn/jj878422