频道栏目
首页 > 资讯 > 其他 > 正文

架构设计:系统间通信(30)——Kafka及场景应用(中3)

16-05-09        来源:[db:作者]  
收藏   我要投稿
4-5、Kafka原理:消费者

作为Apache Kafka消息队列,它的性能指标相当一部分取决于消费者们的性能——只要消息能被快速消费掉不在Broker端形成拥堵,整个Apache Kafka就不会出现性能瓶颈问题。

4-5-1、基本使用

我们首先使用Kafka Client For JAVA API为各位读者演示一下最简单的Kafka消费者端的使用。以下示例代码可以和上文中所给出的生产者代码相对应,形成一个完整的消息创建——接收——发送过程:

package kafkaTQ;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

/**

* 这是Kafka的topic消费者

* @author yinwenjie

*/

public class KafkaConsumer_GroupOne {

public static void main(String[] args) throws RuntimeException {

// ==============首先各种连接属性

// Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs

// 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)

// 这里我们设置几个关键属性

Properties props = new Properties();

// zookeeper相关的,如果有多个zk节点,这里以“,”进行分割

props.put("zookeeper.connect", "192.168.61.140:2181");

props.put("zookeeper.connection.timeout.ms", "10000");

// 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。

// 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样

String groupname = "group2";

props.put("group.id", groupname);

//==============

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

// 我们只创建一个消费者

HashMap map = new HashMap();

map.put("my_topic2", 1);

Map>> topicMessageStreams = consumerConnector.createMessageStreams(map);

// 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition

// 但是一个partition只能分配到一个消费线程去

KafkaStream stream = topicMessageStreams.get("my_topic2").get(0);

new Thread(new ConsumerThread(stream)).start();

// 接着锁住主线程,让其不退出

synchronized (KafkaConsumer_GroupTwo.class) {

try {

KafkaConsumer_GroupTwo.class.wait();

} catch (InterruptedException e) {

e.printStackTrace(System.out);

}

}

}

/**

* @author yinwenjie

*/

private static class ConsumerThread implements Runnable {

private KafkaStream stream;

/**

* @param stream

*/

public ConsumerThread(KafkaStream stream) {

this.stream = stream;

}

@Override

public void run() {

ConsumerIterator iterator = this.stream.iterator();

//============这个消费者获取的数据在这里

while(iterator.hasNext()){

MessageAndMetadata message = iterator.next();

int partition = message.partition();

String topic = message.topic();

String messageT = new String(message.message());

System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]");

}

}

}

}

以上代码片段有几个关键点需要进行一下说明:

“map.put(“my_topic2”, 1);” 这句代码表示将会为名叫“my_topic2”的队列创建数量为1的消费者。在一个进程的连接中,您可以指定创建多个topic的消费者数量。例如:

......

# 为my_topic2的队列创建数量为1的消费者

# 并且为my_topic3的队列创建数量为4的消费者

map.put("my_topic2", 1);

map.put("my_topic3", 4);

......

每一个消费者都需要一个独立的线程进行工作。您可以将工作任务放入已经创建好的线程池(推荐这样做),也可以像以上代码示例中那样创建一个线程并运行任务。

......

# 使用线程池

# 这里的参数就是消费者的总数量

ExecutorService threadPool = Executors.newFixedThreadPool(1);

threadPool.execute(new ConsumerThread(stream));

......

在开发过程中,消费者端无需知道任何一个Broker的位置。但是必须至少知道一个zookeeper服务节点的位置。通过这个位置,消费者端首先会去zookeeper服务上查找指定的topic的分区情况和已有的消费者情况。

4-5-2、分区与消费者负载

Apache Kafka集群中的消费者以线程为单位,如在上一小节代码段所示:我们在一个进程中,为Topic为“my_topic2”的队列创建了一个线程,这个线程就是一个消费者——属于名为“group2”的用户组。这时,Topic中所有分区的消息都会交给这个消费线程进行消费。如下图所示:

 

这里写图片描述

 

虽然一个消费者可以同时消费Topic中多个分区(Partition)的消息,但在生产环境下为了获得更优的消费性能并不建议这样做。由于单个消费者线程的处理能力是有限的,一旦出现数据洪峰,消息就会堆积在Broker端无法被处理(如果消费者端使用了线程池,则可能堆积在消费者端,这要看您怎么编写代码)。所以上一个小节那样的消费者编码方式,最多就是用来做做“Hello World”那样的示例,没有更多的使用价值了。

4-5-3、优化 一:

第一种改进方法,就是让一个消费者只消费一个分区(Partition)中的消息,且整个系统中的消费者大于等于Topic中的分区数量。设计方案如下:

 

这里写图片描述

 

如上图所示,这个Topic下一共有四个分区(Partition),对应的消费者数量也有四个,但是这四个消费者同属于一个进程,存在于同一个物理节点上。我们根据这个设计方案,更改之前消费者端的代码,如下(为了节约篇幅,只给出主要的更改位置):

......

// 后续创建的所有消费者线程,都是属于group2的消费组

String groupname = "group2";

props.put("group.id", groupname);

......

// 在这个进程中,为topic名为my_topic2的队列创建了四个消费者

HashMap map = new HashMap();

map.put("my_topic2", 4);

Map>> topicMessageStreams = consumerConnector.createMessageStreams(map);

......

// 为这四个消费者分配四个不同的线程

// 消费者线程1

KafkaStream stream = topicMessageStreams.get("my_topic2").get(0);

new Thread(new ConsumerThread(stream)).start();

// 消费者线程2

stream = topicMessageStreams.get("my_topic2").get(1);

new Thread(new ConsumerThread(stream)).start();

// 消费者线程3

stream = topicMessageStreams.get("my_topic2").get(2);

new Thread(new ConsumerThread(stream)).start();

// 消费者线程4

stream = topicMessageStreams.get("my_topic2").get(3);

new Thread(new ConsumerThread(stream)).start();

......

// 接着锁住主线程,让其不退出

synchronized (KafkaConsumer_GroupTwo.class) {

try {

KafkaConsumer_GroupTwo.class.wait();

} catch (InterruptedException e) {

e.printStackTrace(System.out);

}

}

......

4-5-4、优化 二:

显然“优化方案一”中的做法虽然实现了4消费者分别对应4个分区的负载均衡方案,但是受限于单个物理节点的处理性能,所以这种方案的处理性能还有进一步优化的可能。我们可以在多个节点物理节点上均匀散步这些消费者,对Topic分区中的消息进行一一对应的消费,如下图所示:

 

这里写图片描述

 

上图所示的设计思路中,我们使用了2个物理节点完成消息的消费任务,每个服务节点上开启的消费进程上有两个消费者线程。这样Topic中4个分区的消息就会被均匀分布到2个物理节点中,且每一个物理节点处理两个分区中的消息。

注意:可能您在分别启动这些消费进程的时候,由于时间上存在差异,某一台服务节点上的消费进程将暂时被分配多个分区进行消息接收。但没有关系,当这个消费者性能到达瓶颈,分区中的消息出现拥堵的时候,这个分区就会被新的消费者所代替,直到10个消费者线程分别和10个分区建立一一对应关系为止

相关TAG标签
上一篇:Richardson成熟度模型(Richardson Maturity Model) - 通往真正REST的步骤
下一篇:IBM Watson物联网平台的两个MQTT工具
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站