频道栏目
首页 > 网络 > 云计算 > 正文

aggregate和treeaggregate

2017-08-10 09:52:01      个评论    来源:andyliuzhii的专栏  
收藏   我要投稿
1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2、从aggregate 函数的定义可知,combine函数的输出类型必须和输入的类型一致

scala> def seqOP(a:Int, b:Int) : Int = {
| println("seqOp: " + a + "\t" + b)
| math.min(a,b)
| }
seqOP: (a: Int, b: Int)Int

scala> def combOp(a:Int, b:Int): Int = {
| println("combOp: " + a + "\t" + b)
| a + b
| }
combOp: (a: Int, b: Int)Int

scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
scala> z. aggregate(3)(seqOP, combOp)
seqOp: 3 1
seqOp: 3 4
seqOp: 1 2
seqOp: 3 5
seqOp: 1 3
seqOp: 3 6
combOp: 3 1
combOp: 4 3

res20: Int = 7

scala> def seqOp(a:String, b:String) : String = {
| println("seqOp: " + a + "\t" + b)
| math.min(a.length , b.length ).toString
| }
seqOp: (a: String, b: String)String

scala> def combOp(a:String, b:String) : String = {
| println("combOp: " + a + "\t" + b)
| a + b
| }
combOp: (a: String, b: String)String

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 345
seqOp: 12
seqOp: 0 4567
seqOp: 0 23
combOp: 1
combOp: 1 1

res25: String = 11

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 12
seqOp: 345
seqOp: 0 23
seqOp: 0
combOp: 0
combOp: 0 1

res26: String = 01

========================================================================================

aggregate需要三个参数(初始值zeroValue,函数seqOp和函数combOp),返回值类型U同初始值zeroValue一样。
处理过程:
1.在rdd的每个分区上应用seqOp函数(应用初始值zeroValue)并返回分区的结果值(U类型)。
2.分区的结果值返回到driver端做reduce处理,也就是说在分区的结果集上应用函数combOp(应用初始值zeroValue),
并返回最终结果值(U类型)。

【aggregate】
scala> def seq(a:Int,b:Int):Int={
| println("seq:"+a+":"+b)
| math.min(a,b)}
seq: (a: Int, b: Int)Int


scala> def comb(a:Int,b:Int):Int={
| println("comb:"+a+":"+b)
| a+b}
comb: (a: Int, b: Int)Int


val z =sc.parallelize(List(1,2,4,5,8,9),3)
scala> z.aggregate(3)(seq,comb)
seq:3:4
seq:3:1
seq:1:2
seq:3:8
seq:3:5
seq:3:9
comb:3:1
comb:4:3
comb:7:3
res0: Int = 10
【treeAggregate】
scala> def seq(a:Int,b:Int):Int={
| println("seq:"+a+":"+b)
| math.min(a,b)}
seq: (a: Int, b: Int)Int


scala> def comb(a:Int,b:Int):Int={
| println("comb:"+a+":"+b)
| a+b}
comb: (a: Int, b: Int)Int


val z =sc.parallelize(List(1,2,4,5,8,9),3)
scala> z.treeAggregate(3)(seq,comb)
seq:3:4 //3 分区1
seq:3:1 //1 分区1
seq:1:2 //1 分区1
seq:3:8 //3 分区2
seq:3:5 //3 分区2
seq:3:9 //3 分区3
comb:1:3
comb:4:3
res1: Int = 7


由上可见,形式上两种用法一致,只是aggregate 比 treeAggregate在最后结果的reduce操作时,多使用了一次初始值。


3.区别


查看aggregate的代码和treeAggregate的代码实现会发现,确实如上现象所反映,整理结果如下:
(1)最终结果上,aggregate会比treeAggregate多做一次对于初始值的combOp操作。但从参数名字上就可以看到,
一般要传入类似0或者空的集合的zeroValue初始值。
(2)aggregate会把分区的结果直接拿到driver端做reduce操作。treeAggregate会先把分区结果做reduceByKey,
最后再把结果拿到driver端做reduce,算出最终结果。reduceByKey需要几层,由参数depth决定,也就是相当于
做了depth层的reduceByKey,这也是treeAggregate名字的由来。

上一篇:HadoopHDFS通过QJM实现高可用HA环境搭建
下一篇:Spring的AOP
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站