1、下载
wget http://mirrors.hust.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
2、解压
tar -zxvf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1
3、启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
4、启动kafka server
bin/kafka-server-start.sh config/server.properties &
5、创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
6、查询主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
7、查询节点的状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test????? PartitionCount:1??????? ReplicationFactor:1???? Configs:
??????? Topic: test???? Partition: 0??? Leader: 0?????? Replicas: 0???? Isr: 0
8、向生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
...
This is a message
This is another message
^C
9、启动消费者获取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
...
This is a message
This is another message
^C
在上述基础上搭建集群:
1、修改配置
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
2、修改配置内容
config/server-1.properties:
????broker.id=1
????listeners=PLAINTEXT://:9093
????log.dir=/tmp/kafka-logs-1
?
config/server-2.properties:
????broker.id=2
????listeners=PLAINTEXT://:9094
????log.dir=/tmp/kafka-logs-2
3、启动新服务
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
4、创建一个测试集群的主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
5、查询节点的状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic?? PartitionCount:1??? ReplicationFactor:3 Configs:
????Topic: my-replicated-topic? Partition: 0??? Leader: 1?? Replicas: 1,2,0 Isr: 1,2,0
6、生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
7、消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
8、杀掉第一个节点后,Leader被重新选举为1,0不再被同步
ps aux | grep kafkaServer
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic?????? PartitionCount:1??????? ReplicationFactor:3???? Configs:
??????? Topic: my-replicated-topic????? Partition: 0??? Leader: 1?????? Replicas: 0,1,2 Isr: 1,2
问题一:
bin/zookeeper-server-start.sh config/zookeeper.properties
/root/src/kafka_2.11-1.0.1/bin/kafka-run-class.sh: line 270: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.45.x86_64/bin/java: No such file or directory
/root/src/kafka_2.11-1.0.1/bin/kafka-run-class.sh: line 270: exec: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.45.x86_64/bin/java: cannot execute: No such file or directory
解决:
查得环境中存在/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.45.x86_64路径,但是没有bin文件,分析为系统没有安装jdk,只是具备jre的运行环境
故安装jdk即可解决
问题二:
[2018-03-27 15:24:07,059] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-03-27 15:24:07,138] FATAL [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.net.UnknownHostException: vm: vm: Name or service not known
??????? at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
??????? at kafka.server.KafkaHealthcheck$$anonfun$1.apply(KafkaHealthcheck.scala:63)
??????? at kafka.server.KafkaHealthcheck$$anonfun$1.apply(KafkaHealthcheck.scala:61)
??????? at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
??????? at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
??????? at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
??????? at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
??????? at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
??????? at scala.collection.AbstractTraversable.map(Traversable.scala:104)
??????? at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:61)
??????? at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:53)
??????? at kafka.server.KafkaServer.startup(KafkaServer.scala:287)
??????? at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
??????? at kafka.Kafka$.main(Kafka.scala:92)
??????? at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.UnknownHostException: vm: Name or service not known
??????? at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
??????? at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
??????? at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
??????? at java.net.InetAddress.getLocalHost(InetAddress.java:1469)
??????? ... 14 more
分析应该是主机名解析有问题
通过hostname命令查询得到主机名为vm
ping vm不同
故,在/etc/hosts中添加:
127.0.0.1 vm