# Spark实现排序

2018-09-28 10:19:35

question：用spark对数据进行排序，首先按照颜值的从高到低进行排序，如果颜值相等，在根据年龄的升序排序

1.User类继承ordered，并且序列化

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

* 自定义排序

* Created by zhangjingcun on 2018/9/27 17:13.

*/

object CustomSort1 {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")

val sc = new SparkContext(conf)

//用spark对数据进行排序，首先按照颜值的从高到低进行排序，如果颜值相等，在根据年龄的升序排序

val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

//并行化成RDD

val userLines: RDD[String] = sc.makeRDD(users)

//整理数据

val userRdd: RDD[User1] = userLines.map(line => {

val fileds = line.split(",")

val id = fileds(0).toLong

val name = fileds(1)

val fv = fileds(2).toInt

val age = fileds(3).toInt

new User1(id, name, fv, age)

})

//排序

val sorted: RDD[User1] = userRdd.sortBy(u => u)

//收集数据

val result: Array[User1] = sorted.collect()

println(result.toBuffer)

sc.stop()

}

}

class User1 (val id:Long, val name:String, val fv:Int, val age:Int) extends Ordered[User1] with Serializable {

override def compare(that: User1): Int = {

//颜值相等的时候

if (that.fv == this.fv) {

this.age - that.age

} else {

-(this.fv - that.fv)

}

}

override def toString: String = {

s"User:(\$id, \$name, \$fv, \$age)"

}

}

2.User继承Sorted没有序列化，不需要new

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

/**

* Created by zhangjingcun on 2018/9/27 17:32.

*/

object CustomSort2 {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")

val sc = new SparkContext(conf)

//用spark对数据进行排序，首先按照颜值的从高到低进行排序，如果颜值相等，在根据年龄的升序排序

val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

//并行化成RDD

val userLines: RDD[String] = sc.makeRDD(users)

//整理数据

val userRdd: RDD[User2] = userLines.map(line => {

val fileds = line.split(",")

val id = fileds(0).toLong

val name = fileds(1)

val fv = fileds(2).toInt

val age = fileds(3).toInt

User2(id, name, fv, age)

})

//排序

val sorted: RDD[User2] = userRdd.sortBy(u => u)

//收集数据

val result: Array[User2] = sorted.collect()

println(result.toBuffer)

sc.stop()

}

}

//case 可以不使用new关键字

//不需要实现序列化

case class User2 (val id:Long, val name:String, val fv:Int, val age:Int) extends Ordered[User2] {

override def compare(that: User2): Int = {

//颜值相等的时候

if (that.fv == this.fv) {

this.age - that.age

} else {

-(this.fv - that.fv)

}

}

override def toString: String = {

s"User:(\$id, \$name, \$fv, \$age)"

}

}

3.

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

/**

* Created by zhangjingcun on 2018/9/27 17:37.

*/

object CustomSort3 {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")

val sc = new SparkContext(conf)

//用spark对数据进行排序，首先按照颜值的从高到低进行排序，如果颜值相等，在根据年龄的升序排序

val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

//并行化成RDD

val userLines: RDD[String] = sc.makeRDD(users)

//整理数据

val userRdd: RDD[(Long, String, Int, Int)] = userLines.map(line => {

val fileds = line.split(",")

val id = fileds(0).toLong

val name = fileds(1)

val fv = fileds(2).toInt

val age = fileds(3).toInt

(id, name, fv, age)

})

//排序

val sorted: RDD[(Long, String, Int, Int)] = userRdd.sortBy(tp => User3(tp._1, tp._2, tp._3, tp._4))

//收集数据

val result: Array[(Long, String, Int, Int)] = sorted.collect()

println(result.toBuffer)

sc.stop()

}

}

//case 可以不使用new关键字

//不需要实现序列化

case class User3 (val id:Long, val name:String, val fv:Int, val age:Int) extends Ordered[User3] {

override def compare(that: User3): Int = {

//颜值相等的时候

if (that.fv == this.fv) {

this.age - that.age

} else {

-(this.fv - that.fv)

}

}

override def toString: String = {

s"User:(\$id, \$name, \$fv, \$age)"

}

}

4.

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.rdd.RDD

/**

* Created by zhangjingcun on 2018/9/27 17:41.

*/

object CustomSort4 {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")

val sc = new SparkContext(conf)

//用spark对数据进行排序，首先按照颜值的从高到低进行排序，如果颜值相等，在根据年龄的升序排序

val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

//并行化成RDD

val userLines: RDD[String] = sc.makeRDD(users)

//整理数据

val tpRdd: RDD[(Long, String, Int, Int)] = userLines.map(line => {

val fileds = line.split(",")

val id = fileds(0).toLong

val name = fileds(1)

val fv = fileds(2).toInt

val age = fileds(3).toInt

(id, name, fv, age)

})

//利用元祖的比较特点：先比较第一个，如果不相等，按照第一个属性排序，在比较下个属性

implicit val rules = Ordering[(Int, Int)].on[(Long, String, Int, Int)](t => (-t._3, t._4))

val sorted = tpRdd.sortBy(t => t)

//收集数据

val result: Array[(Long, String, Int, Int)] = sorted.collect()

println(result.toBuffer)

sc.stop()

}

}