频道栏目
首页 > 系统 > 其他 > 正文

Kafka启用SASL_PLAINTEXt并动态配置JAAS文件的方法介绍

2018-07-18 14:22:00         来源:russle的专栏  
收藏   我要投稿

Kafka是广泛使用消息服务,很多情况下关于认证部分我都是默认的配置,也就是不需要用户名/密码,也不配置证书。在内网或者在项目组内部可以,但是设计的跨部门时一般处于安全考虑都需要加上认证,防止kafka被误用,产生大量垃圾信息,干扰了正常业务的运行。

Kafka提供的多种认证方式,比如SASL, 本文主要介绍启用了SASL_PLAINTEXT时,如何在kafka client配置jaas文件,以却把kafka client客户端能正确连接到kafka server上。

先上官方文档:

加载jaas文件的方式有三种

1, 设置系统属性,java.security.auth.login.config

2, 配置运行程序时用到的jre的安全文件 login.config.url.n属性(路径为加热的 lib/security/java.security ),

3, 编程是调用 javax.security.auth.login.Configuration.setConfiguration(Configuration)设置相关信息

还有一种,直接设置Producer或者Consumer的sasl.jaas.config属性

下面我们直接上最简单的这种方式。(关于jaas文件的内容,本文不再赘述,其他文章已经有提及),本文直接引用。

producer 代码.

注意,确保你的kafka client版本和你的kafka server是匹配的,server版本是kafka_2.11-1.1.0 , client版本是1.1.0

我使用的

        
            org.apache.kafka
            kafka-clients
            1.1.0
        
package com.yq;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

import static org.slf4j.Logger.*;

/**
 * Simple to Introduction
 * className: SendMessageMain
 *
 */
public class SendMessageMain {
    private static final Logger logger = LoggerFactory.getLogger(SendMessageMain.class);
    public static void main(String... args) throws Exception {
        try {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");


            //System.setProperty("java.security.auth.login.config", "D:\workspaceGithub\\kafka_client_jaas.conf"); //配置文件路径

            System.out.println("create KafkaProducer");
            Producer producer = new KafkaProducer(props);
            String data = "aaa";
            ProducerRecord producerRecord = new ProducerRecord("topic01", data);

            for (int i = 0; i < 100; i++) {
                System.out.println(i);
                producer.send(producerRecord,
                        new org.apache.kafka.clients.producer.Callback() {
                            @Override
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if(e != null) {
                                    System.out.println("onCompletion exception");
                                    e.printStackTrace();
                                }
                                System.out.println("The offset of the record we just sent is: " + metadata);
                            }
                        });
                System.out.println("flush producer");
                producer.flush();
            }

            System.out.println("close producer");
            producer.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("when calling kafka output error." + ex.getMessage());
        }
    }

}

核心就在于下面这段, 其中的sasl.jaas.config内容,参考官方文档,一般为如下格式
org.apache.kafka.common.security.plain.PlainLoginModule required username=”alice” password=”alice-secret”;

props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");

下面是consumer代码

package com.yq;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.login.AppConfigurationEntry;
import java.util.Arrays;
import java.util.Properties;

/**
 * Simple to Introduction
 * className: SendMessageMain
 */
public class ReceiveMessageMain {
    private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageMain.class);
    public static void main(String... args) throws Exception {
        try {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.119.121:9092");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");

            System.out.println("create KafkaConsumer");
            System.out.println("receive data");
            KafkaConsumer consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("topic01"));
            while (true) {
                ConsumerRecords records = consumer.poll(100);
                System.out.println("receive data01");
                for (ConsumerRecord record: records) {
                    System.out.printf("offset = %d, key= %s , value = %s\n", record.offset(), record.key(), record.value());
                }
            }

        }
        catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("when calling kafka output error." + ex.getMessage());
        }
    }

}

从以上代码,我们可以看到,我也是用 System.setProperty(“java.security.auth.login.config”, “D:\workspaceGithub、\kafka_client_jaas.conf”); //配置文件路径, 也就是官方文档中的额设置系统属性。

我们还可以在程序启动参数中设置java.security.auth.login.config, 也就是启动参数加上-Djava.security.auth.login.config=D:/workspaceGithub/kafka_client_jaas.conf。

上一篇:集群、负载均衡、分布式三者的区别与联系详解
下一篇:安装和卸载 OpenCV 的实例教程
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站