由于Scala才刚刚开始学习,还是对python更为熟悉,因此在这记录一下自己的学习过程,主要内容来自于spark的官方帮助文档,这一节的地址为:
环境:Ubuntu16.04LTS,Spark2.0.1,Hadoop2.7.3,Python3.5.2,
利用sparkshell进行交互式分析
1.基础
首先打开spark与python交互的API
$cd/usr/local/spark$./bin/pysparkSpark最重要的一个概念就是RDD(ResilientDistributedDataset),弹性分布式数据集。RDD可以利用Hadoop的InputFormats创建,或者从其他RDD转换。
这里,作为入门,我们利用spark安装后文件夹中自带的README.md(此文件位置为/usr/local/spark/README.md)文件作为例子,学习如何创建一个新的RDD。
创建新的RDD:
>>>textFile=sc.textFile(“README.md”)
RDD支持两种类型的操作,actions和transformations:
actions:在数据集上运行计算后返回值
transformations:转换,从现有数据集创建一个新的数据集
RDD可以有执行一系列的动作(actions),这些动作可以返回值(values),转换(transformations),或者指向新的RDD的指针。下边学习RDD的一些简单的动作:
>>>textFile.count()#计数,返回RDD中items的个数,这里就是README.md的总行#数99>>>textFile.first()#RDD中的第一个item,这里就是文件README.md的第一行u'#ApacheSpark'注意:如果之前是从/usr/local/spark启动pyspark,然后读取README.md文件的,如果执行count语句,会出现以下错误:
py4j.protocol.Py4JJavaError:Anerroroccurredwhilecallingz:org.apache.spark.api.python.PythonRDD.collectAndServe.
:org.apache.hadoop.mapred.InvalidInputException:Inputpathdoesnotexist:hdfs://localhost:9000/user/spark/README.md
这是因为在使用相对路径时,系统默认是从hdfs://localhost:9000/目录下读取README.md文件的,但是README.md文件并不在这一目录下,所以sc.textFile()必须使用绝对路径,此时代码修改为:
>>>textFile=sc.textFile(“file:///usr/local/spark/README.md”)99下边尝试使用一个转换(transformation)。例如,使用filter这一转换返回一个新的RDD,这些RDD中的items都含有“Spark”字符串。
>>>linesWithSpark=textFile.filter(lambdaline:“Spark”inline)我们还可以将actions和transformation链接起来:
>>>textFile.filter(lambdaline:“Spark”inline).count()#有多好行含有“Spark”这一字符串19
2.更多的RDD操作
利用RDD的动作和转换能够完成很多复杂的计算。例如,我们希望找到含有最后单词的一句话:
>>>textFile.map(lambdaline:len(line.split())).reduce(lambdaa,b:aif(a>b)elseb)22这个语句中,map函数将len(line.split())这一语句在所有line上执行,返回每个line所含有的单词个数,也就是将line都map到一个整数值,然后创建一个新的RDD。然后调用reduce,找到最大值。map和reduce函数里的参数是python中的匿名函数(lambda),事实上,我们这里也可以传递python中更顶层的函数。比如,我们先定义一个比较大小的函数,这样我们的代码会更容易理解:
>>>defmax(a,b):...ifa>b:...returna...else:...returnb...>>>textFile.map(lambdaline:len(line.split())).reduce(max)22Hadoop掀起了MapReduce的热潮。在spark中,能够更加容易的实现MapReduce
>>>wordCounts=textFile.flatMap(lambdaline:line.split()).map(lambdaword:(word,1)).reduceByKey(lambdaa,b:a+b)上述语句中,利用flatMap,map和reduceByKey三个转换,计算文件README.md中每个单词出现的个数,并返回一个新的RDD,每个item的格式为(string,int),即单词和对应的出现次数。其中,
flatMap(func):与map相似,但是每个输入的item能够被map到0个或者更多的输出items上,也就是说func的返回值应当是一个Seq,而不是一个单独的item,上述语句中,匿名函数返回的就是一句话中所含的每个单词
reduceByKey(func):可以作用于使用“键-值”(K,V)形式存储的数据集上并返回一组新的数据集(K,V),其中,每个键的值为聚合使用func操作的结果,这里相当于python中字典的含义。上述语句中,相当于当某个单词出现一次时,就在这个单词的出现次数上加1,每个单词就是一个Key,reducByKey中的匿名函数计算单词的出现次数。
要收集上述语句的计算结果,可以使用collect这一动作:
>>>wordCounts.collect()[(u'when',1),(u'R,',1),(u'including',3),(u'computation',1),...]
3.缓存Caching
Spark也支持将数据集存入集群范围的内存缓存中。这对于需要进行重复访问的数据非常有用,比如我们需要在一个小的数据集中执行查询操作,或者需要执行一个迭代算法(例如PageRank)。下面,利用之前命令中得到的linesWithSpark数据集,演示缓存这一操作过程:
>>>linesWithSpark.cache()PythonRDD[26]atRDDatPythonRDD.scala:48>>>linesWithSpark.count()19>>>linesWithSpark.count()19利用Spark去缓存一个100行的文件可能并没什么意义。但是有趣的是,这一系列的操作可以用于非常大的数据集上,甚至含有成千上万的节点的数据集。
4.自含式应用程序(self-containedapplications)
假设我们希望利用SparkAPI写一个自含式应用程序,我们可以利用Scala,Java或者Python完成。
下边,简单介绍一下怎样利用PythonAPI(PySpark)写一个应用程序,命名为SimpleApp.py.
在spark所在目录下输入:
./bin/spark-submit--masterlocal[4]SimpleApp.py输出为:
Lineswitha:61,Lineswithb:27
此外,Spark自带很多例子,可以在spark目录下输入下列指令查看:
#ForScalaandJava,userun-example:./bin/run-exampleSparkPi#ForPythonexamples,usespark-submitdirectly:./bin/spark-submitexamples/src/main/python/pi.py#ForRexamples,usespark-submitdirectly:./bin/spark-submitexamples/src/main/r/dataframe.R