频道栏目
首页 > 资讯 > 云计算 > 正文

spark-streaming入门(二)

16-05-28        来源:[db:作者]  
收藏   我要投稿

Input DStreams and Receivers

Input DStreams are DStreams representing the stream of input data received from streaming sources. In thequick example,lineswas an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with aReceiver(Scala doc,Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

【译】input DStreams指的是从输入源获取的输入流数据。在之前的例子中,lines 就是input DStream 因为他表示从netcat服务器获取的数据流。每个input DStream(除了文件系统以外,在后面的章节中介绍)与一个Receiver对象相关联,该对象从数据源接收数据并且把他保存在spark的内存中用于处理。

Spark Streaming provides two categories of built-in streaming sources.

·Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, socket connections, and Akka actors.

·Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in thelinkingsection.

【译】spark streaming提供两种类型的源数据流。

1.基本源:源在Streaming Context的api中直接可以获得,比如:文件系统、socket连接、Akka actors【个人理解】:个人感觉指的就是源能直接在程序中获得,读入到内存中并且参与到运算。

2.高级源:类似于Kafka、Flume、kinesis、twitter这样的源是可以通过一些额外通用类来使用的。这些额外需要的依赖连接已经在linking章节讨论过了。【个人理解】就是说使用这些高级的源的话,需要一些额外的依赖包。

We are going to discuss some of the sources present in each category later in this section.

Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in thePerformance Tuningsection). This will create multiple receivers which will simultaneously receive multiple data streams. But note that a Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Therefore, it is important to remember that a Spark Streaming application needs to be allocated enough cores (or threads, if running locally) to process the received data, as well as to run the receiver(s).

【译】我们将会在本节的稍后位置讨论之前出现过的一些源。

注意,如果你想在你的streaming应用中平行的获取多个数据流,你可以创建多输入Dstreams(在之后的Performance Tuning章节讨论)。这将会创建多个receivers,这些receivers将会同时接收多个数据流。但是请注意,spark的 worker/executor是一个长时间运行的任务,因此他会占用spark streaming应用分配的内核之一。因此,谨记spark streaming 应用需要被分配足够的内核(或者线程、如果本地运行至少两个)去处理接收到的数据和运行receviers。

Points to remember

When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, wheren> number of receivers to run (seeSpark Propertiesfor information on how to set the master).

Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

【译】谨记:

1.当在本地运行spark streaming的程序的时候,不要使用“local”或者“local[1]”作为master的URL。他们中的每个都意味着我们只分配一个线程去本地运行任务。如果你使用一个基于receiver的DStream输入,那么被分配的这唯一一个线程将会被使用运行receiver(【个人理解】也就是说这一个线程只用来接受数据),却没有线程来处理接收到的数据。因此,当本地运行的时候,经常使用"local[n]"作为master的URL,并且这里的n要大于需要运行的receivers的数量。

2.把逻辑扩展到在集群中运行时,被分配给spark streaming 应用的内核的数量必须多于receviers的数量。否则系统只会接受数据,不会去处理运行他。

Basic Sources

We have already taken a look at thessc.socketTextStream(...)in thequick examplewhich creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files and Akka actors as input sources.

【译】基本源:

我们已经在之前的例子中看过了ssc.socketTextStream(...)符号,该例子接收来自于TCP端口的文本数据。除了端口以外,StreamingContext的api提供多种方法用于创建以文件和akka actors作为输入源的DStreams。

File Streams:For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:

【译】文件流:在任何兼容HDFS API的文件系统中读取数据,DStream的创建方式如下:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming will monitor the directorydataDirectoryand process any files created in that directory (files written in nested directories not supported). Note that

·The files must have the same data format.

·The files must be created in thedataDirectoryby atomicallymovingorrenamingthem into the data directory.

·Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

For simple text files, there is an easier methodstreamingContext.textFileStream(dataDirectory). And file streams do not require running a receiver, hence does not require allocating cores.

【译】spark streaming将会监测dataDIrectory这个目录,并且处理任何在此目录下被创建的文件(写入到内嵌的目录中的文件并不支持),注意:

1.文件必须有着同样的数据结构

2.文件必须以自动移动或者重命名到数据目录下这种方式被创建【个人理解】:这句话我还真不明白说滴是什么

3.一旦移动,files文件不能够被更改,如果文件一直持续不断的被增加,新增加的内容不会被读取。

比如文本文件,有一个简单的方法就是我们熟知的:streamingContext.textFileStream(dataDirectory)。并且文件流不需要运行receiver,因此不需要分配内核。

Advanced Sources

This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources has been moved to separate libraries that can belinkedto explicitly when necessary. For example, if you want to create a DStream using data from Twitter’s stream of tweets, you have to do the following:

1.Linking: Add the artifactspark-streaming-twitter_2.10to the SBT/Maven project dependencies.

2.Programming: Import theTwitterUtilsclass and create a DStream withTwitterUtils.createStreamas shown below.

3.Deploying: Generate an uber JAR with all the dependencies (including the dependencyspark-streaming-twitter_2.10and its transitive dependencies) and then deploy the application. This is further explained in theDeploying section.

【译】高级源:

这种源需要与额外的非spark包进行对接,有些包有着非常复杂的依赖(比如kafka和flume)。因此,想要尽量减少关于版本冲突和依赖的问题,必要的时候,从这些源创建DStream的功能被移动到一个独立的并且明确指定的包中。比如,如果你想使用来自于twitter的流中创造一个DStream,你应该做如下操作:

1.Lingking:加入包spark-streaming-twitter_2.10到你的SBT/Maven依赖中。

2.Programming:导入TwitterUtils类并且使用TwitterUtils.createStream去创建DStream(如下所示)。

3.Deploying:产生一个优化的JAR包,该包包含这所有的依赖(包括spark-streaming-twitter_2.10的依赖和他的多重依赖)并且发布应用。这将会在Deploying章节解释。

importorg.apache.spark.streaming.twitter._TwitterUtils.createStream(ssc,None)

Note that these advanced sources are not available in the Spark shell, hence applications based on these advanced sources cannot be tested in the shell. If you really want to use them in the Spark shell you will have to download the corresponding Maven artifact’s JAR along with its dependencies and add it to the classpath.

Some of these advanced sources are as follows.

Kafka:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1. See theKafka Integration Guidefor more details.

Flume:Spark Streaming 1.6.1 is compatible with Flume 1.6.0. See theFlume Integration Guidefor more details.

Kinesis:Spark Streaming 1.6.1 is compatible with Kinesis Client Library 1.2.1. See theKinesis Integration Guidefor more details.

Twitter:Spark Streaming’s TwitterUtils uses Twitter4j to get the public stream of tweets usingTwitter’s Streaming API. Authentication information can be provided by any of themethodssupported by Twitter4J library. You can either get the public stream, or get the filtered stream based on a keywords. See the API documentation (Scala,Java) and examples (TwitterPopularTagsandTwitterAlgebirdCMS).

【译】注意,这些高级源在spark shell中并不可用,因此基于这些高级源的应用在shell中并不能被测试。 如果你真的想在spark shell中使用他们,你不得不下载相应的maven的JAR包和他的依赖包,并把它们加入到你的classpath中。

一些高级源如下:

1.Kafka:spark streaming 1.6.1与Kafka 0.8.2.1相兼容,详细请看xxx。

2.Flume:spark streaming 1.6.1与Flume 1.6.0相兼容,详细请看xxx。

【后两种用的不多,我就不介绍,下面我们进入Kafka的页面进行查看学习,以下内容并不包含在本页面中,为了方便查看,放在下面进行】

Spark Streaming + Kafka Integration Guide

Apache Kafkais publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka’s high-level API, and a new experimental approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees, so read on for more details.

【译】Apache Kafka 是一个发布-订阅的 分布式的、分区的、多副本的消息提交的日志服务。这里我们讲述如何配置spark streaming来从Kafka获取数据。目前有两种方式:一种比较旧的办法就是使用Receivers和Kafka的高级APi,一种比较新的办法(在spark 1.3以后被提供)是不使用Receiver。他们有着不通的编程模型、表现特征、语义担保(【个人理解】这个真不知道啥意思,大牛们请指导),往下阅读获取更多详细信息。
[page]

Approach 1: Receiver-based Approach

This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.

However, under default configuration, this approach can lose data under failures (seereceiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. SeeDeploying sectionin the streaming programming guide for more details on Write Ahead Logs.

Next, we discuss how to use this approach in your streaming application.

【译】方法1:基于Receiver的方法

这种方法使用Receiver去获取数据。Receiver是使用Kafka的高级消费者API来实现的。对于所有的receivers来说,从Kafka接收到的数据被保存在Spark的executors上,接着被spark streaming提交的job开始处理数据。然而,默认配置的情况下,这种方式在失败的情况下会丢失数据(详情可查看xxxxx,想要确保零数据丢失,你需要在Spark Streaming中额外使用write Ahead logs)。这种方式同步保存所有从Kafka接收到的数据,并把其写入到在分布式系统中的ahead logs中,因此所有的数据都可以被恢复当发生失败的时候 。查看xxxx在streaming 编程指南以获取更多的关于Write Ahead Logs的信息。

Next, we discuss how to use this approach in your streaming application.

1.

Linking:For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (seeLinking sectionin the main programming guide for further information).

【译】下面,我们讨论一下如何使用这种方法到你的streaming应用中。

1.Linking(链接):对于使用SBT或者Maven定义的scala/Java应用来说,链接你的streaming应用到如下artifact

groupId = org.apache.spark

artifactId = spark-streaming-kafka_2.10

version = 1.6.1

For Python applications, you will have to add this above library and its dependencies when deploying your application. See theDeployingsubsection below.

【译】对于python的应用来说,你将不得不首先增加这个包和他的依赖当你发布你的应用的时候。如下发布部分所示:

2.Programming:In the streaming application code, importKafkaUtilsand create an input DStream as follows.

【译】Programming(编程):在streaming的应用代码当中,导入KafkaUtils然后创建input DStream如下:

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,

[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

You can also specify the key and value classes and their corresponding decoder classes using variations ofcreateStream. See theAPI docsand theexample.

【译】你同样可以指定key和value的类和相应的解码后的类通过使用createStream变量。查看API文档和例子

Points to remember:

·

Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in theKafkaUtils.createStream()only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.

·

Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.

·

If you have enabled Write Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream toStorageLevel.MEMORY_AND_DISK_SER(that is, useKafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

【译】谨记:

1.在Kafka中的Topic分区并不能关联产生在spark streaming中的RDDs分区。所以增加在KafkaUtils.createStream()中的topic-specific的分区数量,仅仅增加了单个receiver消费的topics的线程数量。他不会增加处理数据中的平行的spark的数量。查看主要的文档以获取更多的信息。

2.多个Kafka输入的DStreams可以使用不同的groups、topics创建,使用多个receivers平行的接受数据。

3.如果你应用了write ahead logs 伴随着多个副本文件系统就像是HDFS,被接受的数据已经在log中被赋值。【后面这句真不知道该怎么翻译】

3.Deploying:As with any Spark applications,spark-submitis used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.

For Scala and Java applications, if you are using SBT or Maven for project management, then packagespark-streaming-kafka_2.10and its dependencies into the application JAR. Make surespark-core_2.10andspark-streaming_2.10are marked asprovideddependencies as those are already present in a Spark installation. Then usespark-submitto launch your application (seeDeploying sectionin the main programming guide).

【译】Deploying(发布):对于任何的Spark 应用,spark-submit是用来提交你的应用的。然而,对于不通语言scala、java、python的应用其中的细节还是有略微不同的。对于scala和java的应用,如果你选择使用SBT或者Maven作为你项目的管理者,那么导入spark-streaming-kafka_2.10和他的依赖到应用的JAR中。确保spark-core_2.10和spark-streaming_2.10被标记为provided依赖因为这些已经在spark的安装文件中所包含。那么使用spark-submit来提交你的应用。

Approach 2: Direct Approach (No Receivers)

This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

【译】方法二,直连的方式(没有Receivers)

为了确保更加健壮和端对端之间的保证,新的不带receiver的直连方法在spark1.3之后被推出。舍弃了传统的使用receiver去接收数据,新的方法定期的查询Kafka来获取在每个topic+partition上的最新的offsets(位移),并且根据定义位移区间来处理每一个批次。当处理数据的任务被提交,Kafka的简单消费者API会去读取定义在Kafka上的的位移区间(类似于从本地文件系统中读取文件)。注意这是在spark1.3之后为scala和java引入的特性,对于python的api来讲是在1.4之后。

This approach has the following advantages over the receiver-based approach (i.e. Approach 1).·

Simplified Parallelism:No need to create multiple input Kafka streams and union them. WithdirectStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

Efficiency:Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.

Exactly-once semantics:The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (seeSemantics of output operationsin the main programming guide for further information).

【译】相比较基于recevers的方法,本方法有以下几个有点:

1.简易的平行度:不需要创建多个Kafka streams 并且聚合他们。使用directStream,spark streaming会创建与Kafka分区数一样多的RDD分区来消费,这也就一位这从Kafka读取数据是平行化的。所以Kafka和RDD的分区是一比一的,这便于理解。

2.高效的:第一种方式中实现零数据丢失需要把数据存储在Write Ahead Log中,后者继而复制数据。这实际上是不高效的,因为数据被复制了两次,一次被Kafka复制,一次被Write Ahead Log复制。第二种方式消除了这个问题因为根本没有使用receiver,因此没有必要使用WriteAhead Logs。只要你拥有充足的空间,消息可以从Kafka恢复。

3.Exactly-once semantics【不知道咋翻译】:第一种方式使用Kafka的高级API来储存消费的offsets(位移)到zookeeper中。这是从Kafka消费数据的传统方式。然而这种方式(结合write ahead logs)能够确灵数据丢失。所以仍然有一定几率因为一些失败而导致数据被消费两次的情况发生。这是由于从spark streaming接收到的数据与从zookeeper中追踪到的位移不一致导致的。所以,在第二中方式中,我们使用简易的Kafka API 而不是用zookeeper。Offset(位移量)被spark streaming的checkpoint所记录。这消除了spark streaming和zookeeper/Kafka之间的不一致,因此每条记录都被spark streaming高效的接收不管失败与否。为了达到exactly-once semantics,你保存数据到外部的操作必须要么idempotent,或者原子性的transaction 用于保存结果和偏移量。

Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).

【译】:注意,关于此方法的一个弊端是他并不会更新zookeeper的offsets,因此,基于zookeeper的kafka监测工具不会显示过程。然而,你可以自己获取在每个批次的偏移量然后人为的更新zookeeper。如下:

Next, we discuss how to use this approach in your streaming application.

1.Linking:This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (seeLinking sectionin the main programming guide for further information).

【译】接下来,我们讨论如何使用这个方法在你的streaming应用中

1.linking(链接):这个方法只在scala和java应用中被支持。链接你的SBT、Maven项目到如下的aritifact。

groupId = org.apache.spark

artifactId = spark-streaming-kafka_2.10

version = 1.6.1

2.Programming:In the streaming application code, importKafkaUtilsand create an input DStream as follows.

【译】programming(编程):在streaming 应用的代码中,导入KafkaUtils然后如下创造输入DStream。

import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[

[key class], [value class], [key decoder class], [value decoder class] ](

streamingContext, [map of Kafka parameters], [set of topics to consume])

You can also pass amessageHandlertocreateDirectStreamto accessMessageAndMetadatathat contains metadata about the current message and transform it to any desired type. See theAPI docsand theexample.

In the Kafka parameters, you must specify eithermetadata.broker.listorbootstrap.servers. By default, it will start consuming from the latest offset of each Kafka partition. If you set configurationauto.offset.resetin Kafka parameters tosmallest, then it will start consuming from the smallest offset.

You can also start consuming from any arbitrary offset using other variations ofKafkaUtils.createDirectStream. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following.

【译】你可以传递一个messageHandler到createDirectStream中来获取MessageAndMetadata,后者包含关于目前消息的元数据信息并且把他穿华为任何需要的leixing.

在Kafka的参数中,你必须指明要么metadata.broker.list 或者bootstrap.server。默认,他会从每个Kafka的分区中的最新的offset开始消费。如果你在你的kafka参数中设置了auto.offset.reset并且值为smallest,那么他将会从最小的偏移量开始消费。

你同样可以从任意的偏移量开始消费只要你使用其他的变量如kafkaUitls.createDirectStream。此外,如果你想要获得每个批次中Kafka被消费的位移量,你可以使用如下方法:

// Hold a reference to the current offset ranges, so it can be used downstream

var offsetRanges = Array[OffsetRange]()

directKafkaStream.transform { rdd =>

offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd

}.map {

...

}.foreachRDD { rdd =>

for (o <- offsetRanges) {

println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

}

...

}

You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is,configurationsof the formspark.streaming.receiver.*) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use theconfigurationsspark.streaming.kafka.*. An important one isspark.streaming.kafka.maxRatePerPartitionwhich is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.

【译】你可以使用如上方法手动更新zookeeper如果你想在你的streaming应用中使用基于zookeeper的Kafka监测工具去显示进程的话。

注意,转换类型到HasOffsetRanges仅仅当在directKafkaStream中调用第一个方法的时候才会成功。在以后的其他方法中不会被成功。你可以使用transform()命令来替代foreachRDD()作为你的第一个被调用的方法用来获取offset,然后调用进一步的spark方法。然而,注意关于RDD分区与Kafka分区的一对一的映射关系在任何的shuffle或者repartition操作后就会失效。比如reduceByKey()或者 window()

另外一件需要注意的就是自从这个方法不再使用receivers之后,标准的receiver有关的不会被应用到被方法创建的input DStream中。取而代之的是,使用spark.streaming.kafka.*配置,。另外一个重要的就是spark.streaming.kafka.maxRatePerpartition这个参数将会指定每个将要被直接api读取的Kafka分区上的最大比率。
[page]

【下面是个人根据官方文档尝试的基于Reciver的方式】:

首先介绍一下我的环境,我有四台虚拟机:sun、moon、jupiter、neptune。每台都安装了kafka,brokerid依次类推为:1,2,3,4。并且后面三个安装了zookeeper,至于具体的安装和配置这里我不做过多介绍。先启动你的所有的zookeeper,因为按照之前所讲,zookeeper上要保存着消费的offsets(偏移量)。以下是我的程序代码:

import org.apache.spark.{SparkContext, HashPartitioner, SparkConf}

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

* Created by Administrator on 2016/5/25.

*/

object KafkaWordCount {

val func = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {

iter.flatMap {

case (x, y, z) => Some(

y.sum + z.getOrElse(0)

).map(i => (x, i))

}

}

def main(args: Array[String]) {

val Array(zkQuorum, group, topics, numThreads) = args

val conf = new SparkConf().setAppName("kafkaWordCount").setMaster("local[2]")

val s = new SparkContext(conf)

s.setLogLevel("FATAL")

val sc = new StreamingContext(s, Seconds(5))

sc.checkpoint("D://ck1")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

val data = KafkaUtils.createStream(sc, zkQuorum, group, topicMap)

val words = data.map(_._2).flatMap(_.split(" "))

val wordCounts = words.map((_, 1)).updateStateByKey(func, new HashPartitioner(sc.sparkContext.defaultParallelism), true)

wordCounts.print()

sc.start()

sc.awaitTermination()

}

}

然后这里有几点注意的地方,setMaster的URL的时候,记得一定要至少分配两个线程,一个去接受,一个去计算(如果你有一个receiver的时候),否则你可能只会接收数据却没有处理数据。s.setLogLevel是为了设置日志级别,我这里设置FATAL只会在发生重大错误的时候打印(此处是为了不打印没必须要的信息从而影响结果的显示)。我们使用updateStateByKey是为了计算包含之前批次和当前批次的和。详情不做过多介绍,然后设置run configuration如下:

 

 

这里的参数解释一下:前面的是我所有的zookeeper的主机名和端口,g1代表组名,也可以分为分为很多组。kafkaspark表示topic的名字,最后的2表示用几个线程运行。然后我们运行,接着在kafka的一台机器上我们开启一个producer然后生产数据,产生的数据将会被我们的程序所计数。如图:

 

 

这里我输入了一个tom,我们查看统计结果(之前有个hello):

 

 

可以看到,hello的数量为1,tom的数量为1,继续输入从而继续统计。

【注】再次过程中可能会出现在producer端中生产数据,但是客户端却始终手机不多数据的情况,个人感觉大多数情况是由于权限导致的,查看你的kafka和日志的相应权限是否为root还是当前用户的,如果不是当前用户的话,修改权限。如果实在不知道的话,可以查看日志。
[page]

【下面给大家介绍一个关于直连的方式】:

首先还是环境我不过多介绍,与上边相同,该启动的启动,先给大家看一下我的代码,一个简单的wordcount(单词计数)的例子:

import kafka.serializer.StringDecoder

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

* mhc

* Created by Administrator on 2016/5/25.

*/

object KafkaDirect {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("directKafka").setMaster("local[2]")

val sc = new SparkContext(conf)

sc.setLogLevel("FATAL")

val ssc = new StreamingContext(sc, Seconds(5))

ssc.checkpoint("D://ch3")

val Array(brokers, topics) = args

val topicSet = topics.split(",").toSet

val kafkaParams = Map[String,String]("metadata.broker.list"->brokers)

val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

lines.foreachRDD { rdd =>

val result = rdd.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect()

println(result.toBuffer)

}

ssc.start()

ssc.awaitTermination()

}

}

这里可看到,我们使用createDirectStream这种方式,它里面的参数由我们控制台输入。主要是kafka的broker的地址和topic的地址。注意这里我们并没有给zookeeper的地址,因为在直连这种方式是不需要zookeeper来保管偏移量的。下面是启动的配置文件:

 

 

启动idea(或者你的eclipse),然后在linux端启动一个生产者,往kafkaspark这个topic进行生产数据如下:

 

 

然后在idea中我们可以查看到如下:

 

 

这说明了我们的直连方式已经成功,这里只是个简易的批次统计,如果要全部统计个人感觉可以使用hashmap进行人工计数,那么到这里,直连方式成功。

感谢开源,让技术走近你我。

相关TAG标签
上一篇:第10课:SparkStreaming源码解读之流数据不断接收全生命周期彻底研究和思考
下一篇:Openstackliberty源码分析之云主机的启动过程2
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站