频道栏目
首页 > 资讯 > 云计算 > 正文

阿里云部署kafka_2.11-1.1.0方法

18-05-04        来源:[db:作者]  
收藏   我要投稿

1. 安装JDK

想要跑Java程序,就必须安装JDK。JDK版本,本人用的是JDK1.7。?
基本操作如下:

从JDK官网获取JDK的tar.gz包;
将tar包上传到服务器上的opt/JDK下面;
解压tar包;
更改etc/profile文件,将下列信息写在后面;(ps mac环境需要sudo su 以root权限进行操作)
 cd /
 cd etc
 vim profile
 然后进行修改 添加如下部分:
 export JAVA_HOME=/opt/JDK/jdk1.7.0_79
 export PATH=$JAVA_HOME/bin:$PATH
 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

改好后的profile文件信息如下:

# /etc/profile: system-wide .profile file for the Bourne shell (sh(1))
# and Bourne compatible shells (bash(1), ksh(1), ash(1), ...).

if [ "$PS1" ]; then
  if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then
    # The file bash.bashrc already sets the default PS1.
    # PS1='\h:\w\$ '
    if [ -f /etc/bash.bashrc ]; then
      . /etc/bash.bashrc
    fi
  else
    if [ "`id -u`" -eq 0 ]; then
      PS1='# '
    else
      PS1='$ '
    fi
  fi
fi

# The default umask is now handled by pam_umask.
# See pam_umask(8) and /etc/login.defs.

if [ -d /etc/profile.d ]; then
  for i in /etc/profile.d/*.sh; do
    if [ -r $i ]; then
      . $i
    fi
  done
  unset i
fi

export JAVA_HOME=/opt/JDK/jdk1.7.0_79
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
按下ESC键后,输入“wq!”,按回车保存信息;
输入 source /etc/profile使环境变量的配置立刻生效

2.?安装Kafka

kafka具体的步骤如下:

下载kafka安装包,我下的包是kafka_2.11-1.1.0.tgz,这个官网可找到这;
将kafka包上传到阿里云服务器上的opt/kafka目录下;
将kafka包解压;
进入config目录下,修改server.properties文件;?
主要修改内容为:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
port=9092
host.name=阿里云内网地址advertised.listeners=PLAINTEXT://阿里云外网映射地址:9092? ?

接下来修改kafka安装目录下 bin/kafka-server-start.sh的参数,把jvm的内存设置小一点(原来默认1G),如下:

export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M"

(因为我的阿里云虚机内存只有1G,不改小的话,jvm会报内存错误)

3. 配置阿里云的安全组规则

入方向允许 TCP 9092端口

4. 启动Kafka

1. 打开一个新的Shell,切换到kafka安装目录,执行如下命令启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

2. 打开一个新的Shell,切换到kafka安装目录,执行如下命令启动broker

bin/kafka-server-start.sh config/server.properties

3.?打开一个新的Shell,切换到kafka安装目录,执行如下命令启动consumer

bin/kafka-console-consumer.sh --bootstrap-server? 阿里云公网IP:9092? --topic test? --from-beginning

4.?打开一个新的Shell,切换到kafka安装目录,执行如下命令启动producer

bin/kafka-console-producer.sh --broker-list 阿里云公网IP:9092? --topic test

之后,在第4个shell里输入消息,便可以在第3个Shell里接收到消息了。

5. Java 客户端向阿里云Kafka发送消息示例

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0modelVersion>
  <groupId>testgroupId>
  <artifactId>kafkaartifactId>
  <version>0.0.1-SNAPSHOTversion>
  <dependencies>
      <dependency>
         <groupId>org.apache.kafkagroupId>
         <artifactId>kafka_2.11artifactId>
         <version>1.1.0version>
      dependency>
  dependencies>
project>

Java code

package kafka;

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaProducerTest {
   public static void main(String[] args){
       Properties props = new Properties();
       props.put("bootstrap.servers", "阿里云公网IP:9092");
       props.put("acks", "all");
       props.put("retries", 0);
       props.put("batch.size", 16384);
       props.put("linger.ms", 1);
       props.put("buffer.memory", 33554432);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       Producer producer = new KafkaProducer(props);
       for (int i = 0; i < 100; i++) {
         Future result =  producer.send(new ProducerRecord("test", Integer.toString(i),Integer.toString(i)));
            producer.flush();
        }
       producer.close();

   }

}
相关TAG标签
上一篇:java堆内存与栈内存解析
下一篇:网络协议Http知识总结
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站