import org.apache.spark.{SparkConf, SparkContext}
/**
* 解决数据倾斜的方法之一,加盐
*/
object AggWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)
val array = Array("you,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump")
val rdd = sc.parallelize(array)
rdd.flatMap(_.split(","))
.map(word => {
val prefix = (new util.Random()).nextInt(3)
(prefix + "_" + word, 1)
}).reduceByKey(_ + _)
.map(tuple => {
val word = tuple._1.split("_")(1)
(word, tuple._2)
}).reduceByKey(_ + _)
.foreach(tuple => {
println(tuple._1 + " " + tuple._2)
})
sc.stop()
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 解决数据倾斜-加盐
*/
object GroupBykeyTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)
val array = Array("you,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump")
val rdd = sc.parallelize(array)
val groupByKeyDD: RDD[(String, Iterable[Int])] = rdd.flatMap(_.split(","))
.map(word => {
val prefix = (new util.Random()).nextInt(3)
(prefix + "_" + word, 1)
}).groupByKey()
groupByKeyDD.map(tuple => {
val iterator = tuple._2.iterator
val sum = iterator.sum
(tuple._1, sum)
}).map(tuple => {
val word = tuple._1.split("_")(1)
(word, tuple._2)
}).groupByKey()
.map(tuple => {
val word = tuple._1
val count = tuple._2.sum
(word, count)
}).foreach(tuple => {
println(tuple._1 + " " + tuple._2)
})
}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* Created by Administrator on 2018/4/24.
*/
public class ActionOperator {
public static SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
public static JavaSparkContext sc = new JavaSparkContext(conf);
/***
* max,min => .max .min
* TopN
* 二次排序
*/
/**
* 取出单词次数出现最多的前两个单词
*/
public static void topN(){
final JavaRDD rdd = sc.parallelize(
Arrays.asList("you,jump", "you,jump", "i,you"));
final List<>> result = rdd.flatMap(new FlatMapFunction() {
@Override
public Iterator call(String s) throws Exception {
return Arrays.asList(s.split(",")).iterator();
}
}).mapToPair(new PairFunction() {
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
}).reduceByKey(new Function2() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}).mapToPair(new PairFunction<>, Integer, String>() {
@Override
public Tuple2 call(Tuple2 tuple) throws Exception {
return new Tuple2<>(tuple._2, tuple._1);
}
}).sortByKey(false).take(2);
for(Tuple2 tuple:result){
System.out.println(tuple._2 + " "+ tuple._1);
}
}
public static void main(String[] args) {
topN();
}
}
/**
* Created by Administrator on 2018/4/24.
* textFile
*/
class SecondarSortKey(val first:Int,val second:Int)
extends Ordered[SecondarSortKey] with Serializable{
override def compare(that: SecondarSortKey): Int = {
if(this.first != that.first){
this.first - that.first
}else{
this.second - that.second
}
}
}