频道栏目
首页 > 资讯 > 其他综合 > 正文

spark教程spark性能调优

18-04-25        来源:[db:作者]  
收藏   我要投稿
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 解决数据倾斜的方法之一,加盐
  */
object AggWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)

    val array = Array("you,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump")
    val rdd = sc.parallelize(array)

    rdd.flatMap(_.split(","))
      .map(word => {
        val prefix = (new util.Random()).nextInt(3)
        (prefix + "_" + word, 1)
      }).reduceByKey(_ + _)
      .map(tuple => {
        val word = tuple._1.split("_")(1)
        (word, tuple._2)
      }).reduceByKey(_ + _)
      .foreach(tuple => {
        println(tuple._1 + "  " + tuple._2)
      })
    sc.stop()
  }
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 解决数据倾斜-加盐
  */
object GroupBykeyTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)

    val array = Array("you,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump", "jump,jump")
    val rdd = sc.parallelize(array)

    val groupByKeyDD: RDD[(String, Iterable[Int])] = rdd.flatMap(_.split(","))
      .map(word => {
        val prefix = (new util.Random()).nextInt(3)
        (prefix + "_" + word, 1)
      }).groupByKey()
    groupByKeyDD.map(tuple => {
      val iterator = tuple._2.iterator
      val sum = iterator.sum
      (tuple._1, sum)
    }).map(tuple => {
      val word = tuple._1.split("_")(1)
      (word, tuple._2)
    }).groupByKey()
      .map(tuple => {
        val word = tuple._1
        val count = tuple._2.sum
        (word, count)
      }).foreach(tuple => {
      println(tuple._1 + "  " + tuple._2)
    })
  }
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * Created by Administrator on 2018/4/24.
 */
public class ActionOperator {
    public static SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
    public static JavaSparkContext sc = new JavaSparkContext(conf);
    /***
     * max,min    => .max  .min
     * TopN
     * 二次排序
     */
    /**
     * 取出单词次数出现最多的前两个单词
     */
    public static void topN(){
        final JavaRDD rdd = sc.parallelize(
                Arrays.asList("you,jump", "you,jump", "i,you"));
        final List<>> result = rdd.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String s) throws Exception {
                return Arrays.asList(s.split(",")).iterator();
            }
        }).mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        }).reduceByKey(new Function2() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }).mapToPair(new PairFunction<>, Integer, String>() {
            @Override
            public Tuple2 call(Tuple2 tuple) throws Exception {
                return new Tuple2<>(tuple._2, tuple._1);
            }
        }).sortByKey(false).take(2);

       for(Tuple2 tuple:result){
           System.out.println(tuple._2 + "  "+ tuple._1);
       }
    }
    
    public static void main(String[] args) {
        topN();
    }
}
/**
  * Created by Administrator on 2018/4/24.
  * textFile
  */
class SecondarSortKey(val first:Int,val second:Int)
  extends Ordered[SecondarSortKey] with Serializable{
  override def compare(that: SecondarSortKey): Int = {
    if(this.first != that.first){
      this.first - that.first
    }else{
      this.second - that.second
    }
  }
}
相关TAG标签
上一篇:看别人是如何应用Ceph软件的
下一篇:kubernetes istio api访问实例操作教程
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站