频道栏目
首页 > 网络 > 云计算 > 正文

SparkStreaming两个小Demo、Spark Streaming处理文件系统数据

2017-11-14 11:08:24      个评论    来源:Wing_93的博客  
收藏   我要投稿

通过对Spark Streaming的基本使用介绍后,我这次写了两个小DEMO,加深下对其的使用。

一、Spark Streaming处理文件系统数据

流程图为:

\

通过SparkStreaming来监听一个固定socket上的数据,获取socket上的数据,然后存储到内存中,再对数据做其他操作。由于socket已经占用了一个资源,所以local[N],N线程数要大于1才能有资源腾出给其他操作。

代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming处理Socket数据
  *
  * 测试: nc
  */
object NetworkWordCount {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    /**
      * 创建StreamingContext需要两个参数:SparkConf和batch interval
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost", 6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
首先创建SparkConf文件,由于监听socket端口,sparkstreaming中需要占用一个receiver,所以此处需要设置为local[2],设置了两个线程,腾出一个资源来进行Transformation和outp operation操作,然后再创建SparkStreamingContext入口对象,此处设置的5s一批次,通过这个streaming对象调用socketTextStream,此时的lines就相当于是个DStream,然后进行flatMap操作,实现字母求和,相当于是进行Transformation操作,最后将结果打印出来,相当于是进行output operations操作。最后用nc -lk 6789进行测试。

注意添加下面两个依赖:


            com.fasterxml.jackson.module
            jackson-module-scala_2.11
            2.6.5
        

        
            net.jpountz.lz4
            lz4
            1.3.0
        

二、Spark Streaming处理文件系统数据

代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用Spark Streaming处理文件系统(local/hdfs)的数据
  */
object FileWordCount {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local").setAppName("FileWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.textFileStream("file:///Users/rocky/data/imooc/ss/")

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()

    ssc.start()
    ssc.awaitTermination()


  }

}
由于文件系统是本地的,所以可以直接设置为local,采用textFileStream的方法,监控ss文件夹里面文件的变化,并将这些变化进行transformation处理,注意此处要采用的move或put的方式移动文件到该文件夹,不支持递归嵌套的文件夹,文件夹里文件形式要一样,一旦将某文件移动到该文件夹,该文件不能再添加新的数据,即使添加新的数据也不处理,除非再移动进来一个新的文件。
上一篇:cdh的agent无法启动问题解决办法
下一篇:变态青蛙跳代码实现教程
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站