Kafka是广泛使用消息服务,很多情况下关于认证部分我都是默认的配置,也就是不需要用户名/密码,也不配置证书。在内网或者在项目组内部可以,但是设计的跨部门时一般处于安全考虑都需要加上认证,防止kafka被误用,产生大量垃圾信息,干扰了正常业务的运行。
Kafka提供的多种认证方式,比如SASL, 本文主要介绍启用了SASL_PLAINTEXT时,如何在kafka client配置jaas文件,以却把kafka client客户端能正确连接到kafka server上。
先上官方文档:
加载jaas文件的方式有三种
1, 设置系统属性,java.security.auth.login.config
2, 配置运行程序时用到的jre的安全文件 login.config.url.n属性(路径为加热的 lib/security/java.security ),
3, 编程是调用 javax.security.auth.login.Configuration.setConfiguration(Configuration)设置相关信息
还有一种,直接设置Producer或者Consumer的sasl.jaas.config属性
下面我们直接上最简单的这种方式。(关于jaas文件的内容,本文不再赘述,其他文章已经有提及),本文直接引用。
producer 代码.
注意,确保你的kafka client版本和你的kafka server是匹配的,server版本是kafka_2.11-1.1.0 , client版本是1.1.0
我使用的
org.apache.kafka kafka-clients 1.1.0
package com.yq; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import java.io.IOException; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; import static org.slf4j.Logger.*; /** * Simple to Introduction * className: SendMessageMain * */ public class SendMessageMain { private static final Logger logger = LoggerFactory.getLogger(SendMessageMain.class); public static void main(String... args) throws Exception { try { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";"); //System.setProperty("java.security.auth.login.config", "D:\workspaceGithub\\kafka_client_jaas.conf"); //配置文件路径 System.out.println("create KafkaProducer"); Producerproducer = new KafkaProducer (props); String data = "aaa"; ProducerRecord producerRecord = new ProducerRecord("topic01", data); for (int i = 0; i < 100; i++) { System.out.println(i); producer.send(producerRecord, new org.apache.kafka.clients.producer.Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { System.out.println("onCompletion exception"); e.printStackTrace(); } System.out.println("The offset of the record we just sent is: " + metadata); } }); System.out.println("flush producer"); producer.flush(); } System.out.println("close producer"); producer.close(); } catch (Exception ex) { ex.printStackTrace(); System.out.println("when calling kafka output error." + ex.getMessage()); } } }
核心就在于下面这段, 其中的sasl.jaas.config内容,参考官方文档,一般为如下格式
org.apache.kafka.common.security.plain.PlainLoginModule required username=”alice” password=”alice-secret”;
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");
下面是consumer代码
package com.yq; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.AppConfigurationEntry; import java.util.Arrays; import java.util.Properties; /** * Simple to Introduction * className: SendMessageMain */ public class ReceiveMessageMain { private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageMain.class); public static void main(String... args) throws Exception { try { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.119.121:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";"); System.out.println("create KafkaConsumer"); System.out.println("receive data"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic01")); while (true) { ConsumerRecords records = consumer.poll(100); System.out.println("receive data01"); for (ConsumerRecord record: records) { System.out.printf("offset = %d, key= %s , value = %s\n", record.offset(), record.key(), record.value()); } } } catch (Exception ex) { ex.printStackTrace(); System.out.println("when calling kafka output error." + ex.getMessage()); } } }
从以上代码,我们可以看到,我也是用 System.setProperty(“java.security.auth.login.config”, “D:\workspaceGithub、\kafka_client_jaas.conf”); //配置文件路径, 也就是官方文档中的额设置系统属性。
我们还可以在程序启动参数中设置java.security.auth.login.config, 也就是启动参数加上-Djava.security.auth.login.config=D:/workspaceGithub/kafka_client_jaas.conf。