频道栏目
首页 > 资讯 > 云计算 > 正文

spark与kafka连接测试

16-12-21        来源:[db:作者]  
收藏   我要投稿

测试环境:scala版本为2.11.8,jdk版本为java1.7.79,搭建的工程为maven工程,所需要的依赖有:

            org.apache.spark
            spark-streaming_2.11
            2.0.1
        

        
            org.apache.spark
            spark-streaming-kafka-0-8_2.11
            2.0.1
        

scala代码有:

package sparkStreaming__

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Created by xiaopengpeng on 2016/12/12.
  *
  */
/*class Spark_kafka_123 {

}*/
object Spark_kafka_123{
  def main(args: Array[String]): Unit = {
    if (args.length!=4){
      System.err.println("Usage:Spark_kafka")
      System.exit(1)
    }
    val Array(zkQuorum,group,topic,numThreads) = args
    val conf = new SparkConf().setAppName("Spark_kafka").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("checkpoint")
    val topicMap = topic.split(",").map((_,numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x=>(x,1L)).reduceByKeyAndWindow(_+_,_-_,Minutes(10),Seconds(20),5)
    wordCounts.print
    ssc.start()
    ssc.awaitTermination()
  }
}
相关TAG标签
上一篇:Apache Kafka简介
下一篇:Windwos10安装Docker
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站