# aggregate和treeaggregate

2017-08-10 09:52:01      个评论    来源：andyliuzhii的专栏

1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2、从aggregate 函数的定义可知，combine函数的输出类型必须和输入的类型一致

scala> def seqOP(a:Int, b:Int) : Int = {
| println("seqOp: " + a + "\t" + b)
| math.min(a,b)
| }
seqOP: (a: Int, b: Int)Int

scala> def combOp(a:Int, b:Int): Int = {
| println("combOp: " + a + "\t" + b)
| a + b
| }
combOp: (a: Int, b: Int)Int

scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
scala> z. aggregate(3)(seqOP, combOp)
seqOp: 3 1
seqOp: 3 4
seqOp: 1 2
seqOp: 3 5
seqOp: 1 3
seqOp: 3 6
combOp: 3 1
combOp: 4 3

res20: Int = 7

scala> def seqOp(a:String, b:String) : String = {
| println("seqOp: " + a + "\t" + b)
| math.min(a.length , b.length ).toString
| }
seqOp: (a: String, b: String)String

scala> def combOp(a:String, b:String) : String = {
| println("combOp: " + a + "\t" + b)
| a + b
| }
combOp: (a: String, b: String)String

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 345
seqOp: 12
seqOp: 0 4567
seqOp: 0 23
combOp: 1
combOp: 1 1

res25: String = 11

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 12
seqOp: 345
seqOp: 0 23
seqOp: 0
combOp: 0
combOp: 0 1

res26: String = 01

========================================================================================

aggregate需要三个参数（初始值zeroValue，函数seqOp和函数combOp），返回值类型U同初始值zeroValue一样。

1.在rdd的每个分区上应用seqOp函数（应用初始值zeroValue）并返回分区的结果值（U类型）。
2.分区的结果值返回到driver端做reduce处理，也就是说在分区的结果集上应用函数combOp（应用初始值zeroValue），

【aggregate】
scala> def seq(a:Int,b:Int):Int={
| println("seq:"+a+":"+b)
| math.min(a,b)}
seq: (a: Int, b: Int)Int

scala> def comb(a:Int,b:Int):Int={
| println("comb:"+a+":"+b)
| a+b}
comb: (a: Int, b: Int)Int

val z =sc.parallelize(List(1,2,4,5,8,9),3)
scala> z.aggregate(3)(seq,comb)
seq:3:4
seq:3:1
seq:1:2
seq:3:8
seq:3:5
seq:3:9
comb:3:1
comb:4:3
comb:7:3
res0: Int = 10
【treeAggregate】
scala> def seq(a:Int,b:Int):Int={
| println("seq:"+a+":"+b)
| math.min(a,b)}
seq: (a: Int, b: Int)Int

scala> def comb(a:Int,b:Int):Int={
| println("comb:"+a+":"+b)
| a+b}
comb: (a: Int, b: Int)Int

val z =sc.parallelize(List(1,2,4,5,8,9),3)
scala> z.treeAggregate(3)(seq,comb)
seq:3:4 //3 分区1
seq:3:1 //1 分区1
seq:1:2 //1 分区1
seq:3:8 //3 分区2
seq:3:5 //3 分区2
seq:3:9 //3 分区3
comb:1:3
comb:4:3
res1: Int = 7

3.区别

（1）最终结果上，aggregate会比treeAggregate多做一次对于初始值的combOp操作。但从参数名字上就可以看到，

（2）aggregate会把分区的结果直接拿到driver端做reduce操作。treeAggregate会先把分区结果做reduceByKey，