测试环境:scala版本为2.11.8,jdk版本为java1.7.79,搭建的工程为maven工程,所需要的依赖有:
org.apache.spark spark-streaming_2.11 2.0.1 org.apache.spark spark-streaming-kafka-0-8_2.11 2.0.1
scala代码有:
package sparkStreaming__ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * Created by xiaopengpeng on 2016/12/12. * */ /*class Spark_kafka_123 { }*/ object Spark_kafka_123{ def main(args: Array[String]): Unit = { if (args.length!=4){ System.err.println("Usage:Spark_kafka") System.exit(1) } val Array(zkQuorum,group,topic,numThreads) = args val conf = new SparkConf().setAppName("Spark_kafka").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("checkpoint") val topicMap = topic.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x=>(x,1L)).reduceByKeyAndWindow(_+_,_-_,Minutes(10),Seconds(20),5) wordCounts.print ssc.start() ssc.awaitTermination() } }