频道栏目
首页 > 网络 > 云计算 > 正文

通过SparkStreaming的window操作实战模拟热点搜索词案例实战

2016-08-25 09:11:31         来源:吾心光明  
收藏   我要投稿

一:在线热点搜索词实现解析

背景描述:在社交网络(例如微博),电子商务(例如京东),热搜词(例如百度)等人们核心关注的内容之一就是我所关注的内容中,大家正在最关注什么或者说当前的热点是什么,这在市级企业级应用中是非常有价值,例如我们关心过去30分钟大家正在热搜什么,并且每5分钟更新一次,这就使得热点内容是动态更新的,当然更有价值。 Yahoo(是Hadoop的最大用户)被收购,因为没做到实时在线处理实现技术:Spark Streaming(在线批处理) 提供了滑动窗口的奇数来支撑实现上述业务背景,我外面您可以使用reduceByKeyAndWindow操作来做具体实现

我们知道在SparkStreaming中可以设置batchInterval,让SparkStreaming每隔batchInterval时间提交一次Job,假设batchInterval设置为5秒,那如果需要对1分钟内的数据做统计,该如何实现呢?SparkStreaming中提供了window的概念。我们看下图:

这里写图片描述

官网给的例子每个2秒钟更新过去3秒钟的内容,3秒钟算一下,5秒钟算一下,3秒钟是一个窗口。window可以包含多个batchInterval(例如5秒),但是必须为batchInterval的整数倍例如1分钟。另外window可以移动,称之为滑动时间间隔,它也是batchInterval的整数倍,例如10秒。一般情况滑动时间间隔小于window的时间长度,否则会丢失数据。<喎"http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwPlNwYXJrU3RyZWFtaW5nzOG5qcHLyOfPwtPrd2luZG93z+C52LXEt723qKO6PGJyIC8+DQo8aW1nIGFsdD0="这里写图片描述" src="http://www.2cto.com/uploadfile/Collfiles/20160825/20160825091703114.png" title="\" />

二、SparkStreaming 实现在线热点搜索词实战

1、经过分析我们采用reduceByKeyAndWindow的方法,reduceByKeyAndWindow方法分析如下:

从代码上面来看, 入口为:

reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 

一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值

val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]  
      .mapValues(mergeValues)  

先计算oldRDD 和newRDD

//currentWindow 就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值

然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值

// 0秒                  10秒     15秒                25秒  
//  _____________________________  
// |  previous window   _________|___________________  
// |___________________|       current window        |  --------------> Time  
//                     |_____________________________|  
//  
// |________ _________|          |________ _________|  
//          |                             |  
//          V                             V  
//       old RDDs                     new RDDs  
//  

reduceByWindow(reduceFunc, windowDuration, slideDuration) 代码:

可以看到他做了两次reduce, 第一次对整个self做一次reduce, 然后截取时间区间, 对结果再做一次reduce。

第一点: 对整个self做reduce会比较慢, 因为self都是相对比较大的集合。
第二点:进行了两次reduce ,源码如下:

def reduceByWindow(  
    reduceFunc: (T, T) => T,  
    windowDuration: Duration,  
    slideDuration: Duration  
  ): DStream[T] = ssc.withScope {  
  this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)  
}  

如果我们看:
reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

实际上他是调用了效率非常高的reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc) 方法 ==》 详细计算过程参考之前的博文

这样的话其实他只对newRDDs和oldRDDs做reduce, 由于这两个RDDs都非常小, 可以想象效率是非常高的

def reduceByWindow(  
    reduceFunc: (T, T) => T,  
    invReduceFunc: (T, T) => T,  
    windowDuration: Duration,  
    slideDuration: Duration  
  ): DStream[T] = ssc.withScope {  
    this.map(x => (1, x))  
        .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)  
        .map(_._2)  
}  

如果看reduceByKeyAndWindow的话, 情况也是一样, 一个是执行:

self.reduceByKey(reduceFunc, partitioner)  
        .window(windowDuration, slideDuration)  
        .reduceByKey(reduceFunc, partitioner)

而另外一个确是在已有的window值基础上做了简单的加加减减

宗上, 从效率上面考虑, 我们应该尽量使用包含invReduceFunc的方法, 同样情况下摒弃只有reduceFunc的方法

2、我们案例代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by zpf on 2016/8/23.
  */
/**
  * 使用Scala并发集群运行的Spark来实现在线热搜词
  *
  * 背景描述:在社交网络(例如微博),电子商务(例如京东),热搜词(例如百度)等人们核心关注的内容之一就是我所关注的内容中
  * 大家正在最关注什么或者说当前的热点是什么,这在市级企业级应用中是非常有价值,例如我们关心过去30分钟大家正在热搜什么,并且
  * 每5分钟更新一次,这就使得热点内容是动态更新的,当然更有价值。
  * Yahoo(是Hadoop的最大用户)被收购,因为没做到实时在线处理
  * 实现技术:Spark Streaming(在线批处理) 提供了滑动窗口的奇数来支撑实现上述业务背景,我外面您可以使用reduceByKeyAndWindow操作来做具体实现
  *
  */
object OnlineHottestItems {
  def main(args: Array[String]){
    /**
      * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
      * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
      * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
      * 只有1G的内存)的初学者       *
      */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineHottestItems") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

/*
* 此处设置 Batch Interval 实在spark Streaming 中生成基本Job的单位,窗口和滑动时间间隔
* 一定是该batch Interval的整数倍*/
    val ssc = new StreamingContext(conf, Seconds(5))

    val hottestStream = ssc.socketTextStream("Master", 9999)

    /*
    * 用户搜索的格式简化为 name item,在这里我们由于要计算热点内容,所以只需要提取item即可
    * 提取出的item通过map转化为(item,1)形式
    * 每隔20秒更新过去60秒的内容窗口60秒,滑动20秒
    * */

    val searchPair = hottestStream.map(_.split(" ")(1)).map(item => (item , 1))
    val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int,v2:Int) => v1 + v2, Seconds(60) ,Seconds(20))

    hottestDStream.transform(hottestItemRDD => {
    val top3 =  hottestItemRDD.map(pair => (pair._2,pair._1) ).sortByKey(false).
      map(pair => (pair._2,pair._1)).take(3)

      for(item <- top3){
        println(item)
      }
      hottestItemRDD
    }).print()

    ssc.start()
    ssc.awaitTermination()

  }
}

3、将程序打包运行到集群上观察结果:

4、接下来我们使用reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) 这个函数,来实现增量的计算。

使用这个函数,必须进行Checkpoint。代码如下

ssc.checkpoint("/user/checkpoints/")
val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))
相关TAG标签 实战 案例 热点
上一篇:Sqoop-1.4.6Merge源码分析与改造使其支持多个merge-key
下一篇:phoenixlocalindex本地索引分裂源码分析
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站