首页 > 网络 > 云计算 > 正文
spark入门知识讲解和基础数据操作编程(统一用scala编程实例)
2016-09-18       个评论      
收藏    我要投稿

讲到这里有同学可能对编程完后如何将程序放到线上spark集群上运行以及如何理解spark框架在大数据架构体系中的位置及其基本原理有所疑问。

今天就主要来聊聊这两个方面:

1、我们首先要将上一篇博文中的程序打成一个jar包,我们就命名为sparkApp.jar,然后将这个jar包传到集群机器的测试目录下(要保证机器能访问到此文件),最后我们用spark-submit 命令完成此jar包的运行,下面给出详细代码(其中我们设置了运行类和on-yarn模式以及运行的内存和executors数,运行的输入路径和输出路径等)
 

 spark-submit --class com.besttone.UserOnlineAnalysis --master yarn-client --executor-memory 2g --num-executors 3 file:///home/hadoop/test/sparkApp.jar test/apponoff.bz2 test/out22

 

2、关于第二个问题我们先简单了解一下什么是spark?

Apache Spark is an open source cluster computing system that aims to make date analytics fast ----both fast to run and fast to write.

基本意思就是一个开源的云计算系统,宗旨就是为了提高数据分析的速度无论是计算速度还是数据写的速度。

\

上面是spark云计算系统在大数据架构中的位置,可以看出基本上可以用另辟蹊径来形容,和hadoop MR,hive以及storm走的是完全不一样的计算架构。

\

这个是MR的数据处理流程图,我们可以看到数据的处理过程中几进几出磁盘,并且被不断的被迁移多次,这就导致MR处理数据非常的慢(额外的复制,序列化和磁盘IO开销)。

\

在来看看spark运行过程数据的走向图,可以看到数据基本全部都是在内存中进行,无需落地。spark快并非仅仅是因为内存计算还有重要的一点是其DAG的优化,什么呢,就是其在做一个数据计算的过程中,会将整个计算过程每一步RDD的形成做一个优化形成一个有向无环图的scheduler。按照优化过的DAG图运行会避免很多重复计算和数据迁移等。

好了,到这里大家对spark基本上有了一个宏观上的理解了。

下面谈谈spark运行的核心数据单元RDD

RDD定义:分布式弹性数据集。\

spark运行基本就包含上面两个步骤第一个是transformation,另外一个是Action。

\

以上是已经封装好的函数或者说是接口。

下面就具体说说这些函数的运用和编程:(以下代码是借鉴过来讲解一下,注意下面的代码都是在安装了spark的集群上运行的,输入spark-shell后操作的)

1、加载文件

scala>valinFile=sc.textFile("/home/scipio/spam.data") //这一步相当于将数据加载到内存变成RDD

 

输出

14/06/2812:15:34INFOMemoryStore:ensureFreeSpace(32880)calledwithcurMem=65736,maxMem=31138775014/06/2812:15:34INFOMemoryStore:Blockbroadcast_2storedasvaluestomemory(estimatedsize32.1KB,free296.9MB)
inFile:org.apache.spark.rdd.RDD[String]=MappedRDD[7]attextFileat:12

2、显示一行

scala>inFile.first()

输出

14/06/2812:15:39INFOFileInputFormat:Totalinputpathstoprocess:114/06/2812:15:39INFOSparkContext:Startingjob:firstat:1514/06/2812:15:39INFODAGScheduler:Gotjob0(firstat:15)with1outputpartitions(allowLocal=true)
14/06/2812:15:39INFODAGScheduler:Finalstage:Stage0(firstat:15)
14/06/2812:15:39INFODAGScheduler:Parentsoffinalstage:List()
14/06/2812:15:39INFODAGScheduler:Missingparents:List()
14/06/2812:15:39INFODAGScheduler:Computingtherequestedpartitionlocally
14/06/2812:15:39INFOHadoopRDD:Inputsplit:file:/home/scipio/spam.data:0+34917014/06/2812:15:39INFOSparkContext:Jobfinished:firstat:15,took0.532360118s
res2:String=00.640.6400.320000000.640000.3201.291.9300.960000000000000000000000000000000.778003.756612781

3、函数运用

(1)map:其实就可以理解为映射,但是这种映射逻辑我们可以自己定义,比如下面的映射逻辑就是先将RDD中的数据按照空格进行分割,然后对每个元素再进行映射转化成Double型。(可以看出结果由原来的string变成一个Arraylist包含double型的数据)

scala>valnums=inFile.map(x=>x.split('').map(_.toDouble))
nums:org.apache.spark.rdd.RDD[Array[Double]]=MappedRDD[8]atmapat:14

scala>nums.first()
14/06/2812:19:07INFOSparkContext:Startingjob:firstat:1714/06/2812:19:07INFODAGScheduler:Gotjob1(firstat:17)with1outputpartitions(allowLocal=true)
14/06/2812:19:07INFODAGScheduler:Finalstage:Stage1(firstat:17)
14/06/2812:19:07INFODAGScheduler:Parentsoffinalstage:List()
14/06/2812:19:07INFODAGScheduler:Missingparents:List()
14/06/2812:19:07INFODAGScheduler:Computingtherequestedpartitionlocally
14/06/2812:19:07INFOHadoopRDD:Inputsplit:file:/home/scipio/spam.data:0+34917014/06/2812:19:07INFOSparkContext:Jobfinished:firstat:17,took0.011412903s
res3:Array[Double]=Array(0.0,0.64,0.64,0.0,0.32,0.0,0.0,0.0,0.0,0.0,0.0,0.64,0.0,0.0,0.0,0.32,0.0,1.29,1.93,0.0,0.96,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.778,0.0,0.0,3.756,61.0,278.0,1.0)

(2)collect 属于action函数,可以理解其是将起码RDD的处理结果收集放到一个Array中。

scala>valrdd=sc.parallelize(List(1,2,3,4,5))
rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[9]atparallelizeat:12

scala>valmapRdd=rdd.map(2*_)
mapRdd:org.apache.spark.rdd.RDD[Int]=MappedRDD[10]atmapat:14

scala>mapRdd.collect
14/06/2812:24:45INFOSparkContext:Jobfinished:collectat:17,took1.789249751s
res4:Array[Int]=Array(2,4,6,8,10)

(3)filter 过滤函数,你可以自己定义自己的过滤规则,比如包含什么 _.contain() ,不包含什么 !_.contain() 等等,下面是过滤掉<=5的数据。

scala>valfilterRdd=sc.parallelize(List(1,2,3,4,5)).map(_*2).filter(_>5)
filterRdd:org.apache.spark.rdd.RDD[Int]=FilteredRDD[13]atfilterat:12

scala>filterRdd.collect
14/06/2812:27:45INFOSparkContext:Jobfinished:collectat:15,took0.056086178s
res5:Array[Int]=Array(6,8,10)

(4)flatMap这是做一次扁平化处理,其操作通俗来讲就是将一个含有大量嵌套array变成一个大array,每个子array元素都变成独立的元素,方便后续处理。(Array[Array[String]]---->Array[String]) flatMap之后的collect的结果是Array[String],其实是先map后flat的过程。先map产生了Array[Array[String]],然后又经过flat合并成了Array[String]。在文件中保存的结果是每一个分割后的单词。

scala>valrdd=sc.textFile("/home/scipio/README.md")
14/06/2812:31:55INFOMemoryStore:ensureFreeSpace(32880)calledwithcurMem=98616,maxMem=31138775014/06/2812:31:55INFOMemoryStore:Blockbroadcast_3storedasvaluestomemory(estimatedsize32.1KB,free296.8MB)
rdd:org.apache.spark.rdd.RDD[String]=MappedRDD[15]attextFileat:12

scala>rdd.count
14/06/2812:32:50INFOSparkContext:Jobfinished:countat:15,took0.341167662s
res6:Long=127

scala>rdd.cache
res7:rdd.type=MappedRDD[15]attextFileat:12

scala>rdd.count
14/06/2812:33:00INFOSparkContext:Jobfinished:countat:15,took0.32015745s
res8:Long=127

scala>valwordCount=rdd.flatMap(_.split('')).map(x=>(x,1)).reduceByKey(_+_)
wordCount:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[20]atreduceByKeyat:14

scala>wordCount.collect

res9:Array[(String,Int)]=Array((means,1),(under,2),(this,4),(Because,1),(Python,2),(agree,1),(cluster.,1),(its,1),(YARN,,3),(have,2),(pre-built,1),(MRv1,,1),(locally.,1),(locally,2),(changed,1),(several,1),(only,1),(sc.parallelize(1,1),(This,2),(basic,1),(first,1),(requests,1),(documentation,1),(Configuration,1),(MapReduce,2),(without,1),(setting,1),("yarn-client",1),([params]`.,1),(any,2),(application,1),(prefer,1),(SparkPi,2),(,1),(version,3),(file,1),(documentation,,1),(test,1),(MASTER,1),(entry,1),(example,3),(are,2),(systems.,1),(params,1),(scala>,1),(hadoop-client,1),(refer,1),(configure,1),(Interactive,2),(artifact,1),(can,7),(file's,1),(build,3),(when,2),(2.0.X,,1),(Apac...

scala>wordCount.saveAsTextFile("/home/scipio/wordCountResult.txt")

(5)union求两个数据集的并集

scala>valrdd=sc.parallelize(List(('a',1),('a',2)))
rdd:org.apache.spark.rdd.RDD[(Char,Int)]=ParallelCollectionRDD[10]atparallelizeat:12

scala>valrdd2=sc.parallelize(List(('b',1),('b',2)))
rdd2:org.apache.spark.rdd.RDD[(Char,Int)]=ParallelCollectionRDD[11]atparallelizeat:12

scala>rddunionrdd2
res3:org.apache.spark.rdd.RDD[(Char,Int)]=UnionRDD[12]atunionat:17

scala>res3.collect

res4:Array[(Char,Int)]=Array((a,1),(a,2),(b,1),(b,2))

(6) join:按照每个单元素的key做join映射

scala>valrdd1=sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
rdd1:org.apache.spark.rdd.RDD[(Char,Int)]=ParallelCollectionRDD[10]atparallelizeat:12

scala>valrdd2=sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
rdd2:org.apache.spark.rdd.RDD[(Char,Int)]=ParallelCollectionRDD[11]atparallelizeat:12

scala>rdd1joinrdd2
res1:org.apache.spark.rdd.RDD[(Char,(Int,Int))]=FlatMappedValuesRDD[14]atjoinat:17

res1.collect

res2:Array[(Char,(Int,Int))]=Array((b,(3,7)),(b,(3,8)),(b,(4,7)),(b,(4,8)),(a,(1,5)),(a,(1,6)),(a,(2,5)),(a,(2,6)))

(7)lookup 按照key进行查找,输出value的集合

valrdd1=sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
rdd1.lookup('a')
res3:Seq[Int]=WrappedArray(1,2)

(8)groupByKey 按照key做聚合,其和reducebykey不同的是,key相同的,其对于的value都放到一个arraybuffer中不会做sum操作,而reducebykey就会对key相关的value做sum操作得出一个value值。(大家可以试试)

valwc=sc.textFile("/home/scipio/README.md").flatMap(_.split('')).map((_,1)).groupByKey
wc.collect

14/06/2812:56:14INFOSparkContext:Jobfinished:collectat:15,took2.933392093s
res0:Array[(String,Iterable[Int])]=Array((means,ArrayBuffer(1)),(under,ArrayBuffer(1,1)),(this,ArrayBuffer(1,1,1,1)),(Because,ArrayBuffer(1)),(Python,ArrayBuffer(1,1)),(agree,ArrayBuffer(1)),(cluster.,ArrayBuffer(1)),(its,ArrayBuffer(1)),(YARN,,ArrayBuffer(1,1,1)),(have,ArrayBuffer(1,1)),(pre-built,ArrayBuffer(1)),(MRv1,,ArrayBuffer(1)),(locally.,ArrayBuffer(1)),(locally,ArrayBuffer(1,1)),(changed,ArrayBuffer(1)),(sc.parallelize(1,ArrayBuffer(1)),(only,ArrayBuffer(1)),(several,ArrayBuffer(1)),(This,ArrayBuffer(1,1)),(basic,ArrayBuffer(1)),(first,ArrayBuffer(1)),(documentation,ArrayBuffer(1)),(Configuration,ArrayBuffer(1)),(MapReduce,ArrayBuffer(1,1)),(requests,ArrayBuffer(1)),(without,ArrayBuffer(1)),("yarn-client",ArrayBuffer(1)),([params]`.,Ar...

(9)sortByKey:此实例中做了reducebykey操作目的就是为了统计单词的数量,因为我们为每个单词value赋值为1了,然后按key排序 倒序 由多到少排序。

valrdd=sc.textFile("/home/scipio/README.md")
valwordcount=rdd.flatMap(_.split('')).map((_,1)).reduceByKey(_+_)
valwcsort=wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
wcsort.saveAsTextFile("/home/scipio/sort.txt")

升序的话,sortByKey(true)

终于结束了,今天大家学习了吗
点击复制链接 与好友分享!回本站首页
上一篇:从几何角度切入最近邻
下一篇:揭秘阿里投资200亿的张北数据中心
相关文章
图文推荐
文章
推荐
热门新闻

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训
版权所有: 红黑联盟--致力于做实用的IT技术学习网站