想要跑Java程序,就必须安装JDK。JDK版本,本人用的是JDK1.7。?
基本操作如下:
cd / cd etc vim profile 然后进行修改 添加如下部分: export JAVA_HOME=/opt/JDK/jdk1.7.0_79 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
改好后的profile文件信息如下:
# /etc/profile: system-wide .profile file for the Bourne shell (sh(1)) # and Bourne compatible shells (bash(1), ksh(1), ash(1), ...). if [ "$PS1" ]; then if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then # The file bash.bashrc already sets the default PS1. # PS1='\h:\w\$ ' if [ -f /etc/bash.bashrc ]; then . /etc/bash.bashrc fi else if [ "`id -u`" -eq 0 ]; then PS1='# ' else PS1='$ ' fi fi fi # The default umask is now handled by pam_umask. # See pam_umask(8) and /etc/login.defs. if [ -d /etc/profile.d ]; then for i in /etc/profile.d/*.sh; do if [ -r $i ]; then . $i fi done unset i fi export JAVA_HOME=/opt/JDK/jdk1.7.0_79 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar按下ESC键后,输入“wq!”,按回车保存信息;
kafka具体的步骤如下:
下载kafka安装包,我下的包是kafka_2.11-1.1.0.tgz,这个官网可找到这;# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 port=9092 host.name=阿里云内网地址advertised.listeners=PLAINTEXT://阿里云外网映射地址:9092? ?
接下来修改kafka安装目录下 bin/kafka-server-start.sh的参数,把jvm的内存设置小一点(原来默认1G),如下:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M"
(因为我的阿里云虚机内存只有1G,不改小的话,jvm会报内存错误)
入方向允许 TCP 9092端口
1. 打开一个新的Shell,切换到kafka安装目录,执行如下命令启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2. 打开一个新的Shell,切换到kafka安装目录,执行如下命令启动broker
bin/kafka-server-start.sh config/server.properties
3.?打开一个新的Shell,切换到kafka安装目录,执行如下命令启动consumer
bin/kafka-console-consumer.sh --bootstrap-server? 阿里云公网IP:9092? --topic test? --from-beginning
4.?打开一个新的Shell,切换到kafka安装目录,执行如下命令启动producer
bin/kafka-console-producer.sh --broker-list 阿里云公网IP:9092? --topic test
之后,在第4个shell里输入消息,便可以在第3个Shell里接收到消息了。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0modelVersion> <groupId>testgroupId> <artifactId>kafkaartifactId> <version>0.0.1-SNAPSHOTversion> <dependencies> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka_2.11artifactId> <version>1.1.0version> dependency> dependencies> project>
Java code
package kafka; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.KafkaProducer; public class KafkaProducerTest { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "阿里云公网IP:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer (props); for (int i = 0; i < 100; i++) { Future result = producer.send(new ProducerRecord ("test", Integer.toString(i),Integer.toString(i))); producer.flush(); } producer.close(); } }