如何为Kafka配置安全验证机制?

2026-05-26 00:351阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计723个文字,预计阅读时间需要3分钟。

如何为Kafka配置安全验证机制?

目录+服务端配置+1. 修改config目录,添加kafka_server_jaas.conf配置文件+2. 修改kafka-run-class.sh脚本,添加3. 修改config/server.properties文件,客户端接入配置+完整配置示例+综合考量性能、管理、安全等层级要求

目录
  • 服务端配置
    • 1. config 目录添加kafka_server_jaas.conf 配置文件
    • 2. kafka-run-class.sh 添加
    • 3. config/server.properties
  • 客户端接入改造
    • 完整配置示例
  • 综合考虑性能影响、管理成本、安全等级要求,接入便利程度。
  • 鉴权采用SASL+PLAINTEXT 方式。
  • 每个集群会分配统一的访问账号及密码用于客户端访问。

如何为Kafka配置安全验证机制?

服务端配置

1. config 目录添加kafka_server_jaas.conf 配置文件

内容:

KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="7f8d9dsf789ds7ffsdfdsfu9" user_admin="7f8d9dsf789ds7ffsdfdsfu9" user_alice="xjfkddjfdssifds"; };

2. kafka-run-class.sh 添加

KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/home/finance/App/kafka_2.12-2.5.1/config/kafka_server_jaas.conf"

3. config/server.properties

添加

security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN listener, advertised.listeners 添加对应SASL_PLAINTEXT 监听器 listeners=SASL_PLAINTEXT://10.193.196.112:9092 advertised.listeners=SASL_PLAINTEXT://10.193.196.112:9092

客户端接入改造

就是在java程序中将安全参数配置进来

生产者、消费者属性配置加入一下配置

props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice;"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN");

springboot 高版本估计有支持在properties文件中直接配置,此处没有验证。

完整配置示例

此方法,可以用KafkaProperties 获取properties配置文件中的参数后,再往里面添加新的参数

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.Map; /** * kafka配置 */ @Configuration @EnableKafka public class KafkaProducerConfig { @Autowired private KafkaProperties kafkaProperties; /** * 消费者配置 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); Map<String, Object> props = kafkaProperties.buildConsumerProperties(); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); factory.setConcurrency(2); factory.getContainerProperties().setPollTimeout(1500); return factory; } /** * 生产者配置 */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = kafkaProperties.buildProducerProperties(); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } }

以上为个人经验,希望能给大家一个参考,也希望大家多多支持自由互联。

本文共计723个文字,预计阅读时间需要3分钟。

如何为Kafka配置安全验证机制?

目录+服务端配置+1. 修改config目录,添加kafka_server_jaas.conf配置文件+2. 修改kafka-run-class.sh脚本,添加3. 修改config/server.properties文件,客户端接入配置+完整配置示例+综合考量性能、管理、安全等层级要求

目录
  • 服务端配置
    • 1. config 目录添加kafka_server_jaas.conf 配置文件
    • 2. kafka-run-class.sh 添加
    • 3. config/server.properties
  • 客户端接入改造
    • 完整配置示例
  • 综合考虑性能影响、管理成本、安全等级要求,接入便利程度。
  • 鉴权采用SASL+PLAINTEXT 方式。
  • 每个集群会分配统一的访问账号及密码用于客户端访问。

如何为Kafka配置安全验证机制?

服务端配置

1. config 目录添加kafka_server_jaas.conf 配置文件

内容:

KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="7f8d9dsf789ds7ffsdfdsfu9" user_admin="7f8d9dsf789ds7ffsdfdsfu9" user_alice="xjfkddjfdssifds"; };

2. kafka-run-class.sh 添加

KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/home/finance/App/kafka_2.12-2.5.1/config/kafka_server_jaas.conf"

3. config/server.properties

添加

security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN listener, advertised.listeners 添加对应SASL_PLAINTEXT 监听器 listeners=SASL_PLAINTEXT://10.193.196.112:9092 advertised.listeners=SASL_PLAINTEXT://10.193.196.112:9092

客户端接入改造

就是在java程序中将安全参数配置进来

生产者、消费者属性配置加入一下配置

props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice;"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN");

springboot 高版本估计有支持在properties文件中直接配置,此处没有验证。

完整配置示例

此方法,可以用KafkaProperties 获取properties配置文件中的参数后,再往里面添加新的参数

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.Map; /** * kafka配置 */ @Configuration @EnableKafka public class KafkaProducerConfig { @Autowired private KafkaProperties kafkaProperties; /** * 消费者配置 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); Map<String, Object> props = kafkaProperties.buildConsumerProperties(); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); factory.setConcurrency(2); factory.getContainerProperties().setPollTimeout(1500); return factory; } /** * 生产者配置 */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = kafkaProperties.buildProducerProperties(); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } }

以上为个人经验,希望能给大家一个参考,也希望大家多多支持自由互联。