微服务如何同时高效接入多个Kafka实例?

2026-05-23 23:122阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1020个文字,预计阅读时间需要5分钟。

微服务如何同时高效接入多个Kafka实例?

准备工作+自行搭建一个Kafka环境。从官方下载Kafka,选择与Spring Boot兼容的版本。当前最新版是3.2.1,支持2.12-3.2.1范围的版本,覆盖了Spring Boot 2.0x至Spring Boot 3。

准备工作

自己搭建一个Kafka从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x。​​kafka.apache.org/downloads​​

解压安装

进入bin目录,执行如下命令,按照如下顺序启动Linux

# 配置文件选择自己对应的目录zookeeper-server-start.sh ../config/zookeeper.properties

Windows

windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServerLinux

kafka-server-start.sh ../config/server.properties

Windows

windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka如下是最小化配置Kafkapom.xml 引入依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>

application.properties

server.port=8090spring.application.name=single-kafka-server#kafka 服务器地址spring.kafka.bootstrap-servers=localhost:9092#消费者分组,配置后,自动创建spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

@Slf4j@Component@EnableSchedulingpublic class KafkaProducer { @Resource private KafkaTemplate kafkaTemplate; private void sendTest() { //topic 会自动创建 kafkaTemplate.send("topic1", "hello kafka"); } @Scheduled(fixedRate = 1000 * 10) public void testKafka() { log.info("send message..."); sendTest(); }}

KafkaConsumer 消费者

@Slf4j@Componentpublic class KafkaConsumer { @KafkaListener(topics = {"topic1"}) public void processMessage(String spuId) { log.warn("process spuId ={}", spuId); }}

运行效果:

多Kafka配置配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖pom.xml

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>

application.properties

server.port=8090spring.application.name=kafka-server#kafka1#服务器地址spring.kafka.one.bootstrap-servers=localhost:9092spring.kafka.one.consumer.group-id=default_group#kafka2spring.kafka.two.bootstrap-servers=localhost:9092spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称KafkaOneConfig

@Configurationpublic class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.consumer.group-id}") private String groupId; @Bean public KafkaTemplate<String, String> kafkaOneTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean(name = "kafkaOneContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } private ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,producerFactory 生产者工厂consumerFactory 消费者工厂producerConfigs 生产者配置consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个KafkaKafkaTwoConfig

@Configurationpublic class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Bean public KafkaTemplate<String, String> kafkaTwoTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean(name = "kafkaTwoContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }}

创建一个测试的消费者,注意配置不同的监听容器containerFactoryKafkaConsumer

微服务如何同时高效接入多个Kafka实例?

@Slf4j@Componentpublic class KafkaConsumer { @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory") public void oneProcessItemcenterSpuMessage(String spuId) { log.warn("one process spuId ={}", spuId); } @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory") public void twoProcessItemcenterSpuMessage(String spuId) { log.warn("two process spuId ={}", spuId); }}

创建一个测试的生产者,定时往两个topic中发送消息KafkaProducer

@Slf4j@Componentpublic class KafkaProducer { @Resource private KafkaTemplate kafkaOneTemplate; @Resource private KafkaTemplate kafkaTwoTemplate; private void sendTest() { kafkaOneTemplate.send("topic1", "hello kafka one"); kafkaTwoTemplate.send("topic2", "hello kafka two"); } @Scheduled(fixedRate = 1000 * 10) public void testKafka() { log.info("send message..."); sendTest(); }}

最后运行效果:

本文共计1020个文字,预计阅读时间需要5分钟。

微服务如何同时高效接入多个Kafka实例?

准备工作+自行搭建一个Kafka环境。从官方下载Kafka,选择与Spring Boot兼容的版本。当前最新版是3.2.1,支持2.12-3.2.1范围的版本,覆盖了Spring Boot 2.0x至Spring Boot 3。

准备工作

自己搭建一个Kafka从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x。​​kafka.apache.org/downloads​​

解压安装

进入bin目录,执行如下命令,按照如下顺序启动Linux

# 配置文件选择自己对应的目录zookeeper-server-start.sh ../config/zookeeper.properties

Windows

windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServerLinux

kafka-server-start.sh ../config/server.properties

Windows

windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka如下是最小化配置Kafkapom.xml 引入依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>

application.properties

server.port=8090spring.application.name=single-kafka-server#kafka 服务器地址spring.kafka.bootstrap-servers=localhost:9092#消费者分组,配置后,自动创建spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

@Slf4j@Component@EnableSchedulingpublic class KafkaProducer { @Resource private KafkaTemplate kafkaTemplate; private void sendTest() { //topic 会自动创建 kafkaTemplate.send("topic1", "hello kafka"); } @Scheduled(fixedRate = 1000 * 10) public void testKafka() { log.info("send message..."); sendTest(); }}

KafkaConsumer 消费者

@Slf4j@Componentpublic class KafkaConsumer { @KafkaListener(topics = {"topic1"}) public void processMessage(String spuId) { log.warn("process spuId ={}", spuId); }}

运行效果:

多Kafka配置配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖pom.xml

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>

application.properties

server.port=8090spring.application.name=kafka-server#kafka1#服务器地址spring.kafka.one.bootstrap-servers=localhost:9092spring.kafka.one.consumer.group-id=default_group#kafka2spring.kafka.two.bootstrap-servers=localhost:9092spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称KafkaOneConfig

@Configurationpublic class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.consumer.group-id}") private String groupId; @Bean public KafkaTemplate<String, String> kafkaOneTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean(name = "kafkaOneContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } private ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,producerFactory 生产者工厂consumerFactory 消费者工厂producerConfigs 生产者配置consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个KafkaKafkaTwoConfig

@Configurationpublic class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Bean public KafkaTemplate<String, String> kafkaTwoTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean(name = "kafkaTwoContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }}

创建一个测试的消费者,注意配置不同的监听容器containerFactoryKafkaConsumer

微服务如何同时高效接入多个Kafka实例?

@Slf4j@Componentpublic class KafkaConsumer { @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory") public void oneProcessItemcenterSpuMessage(String spuId) { log.warn("one process spuId ={}", spuId); } @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory") public void twoProcessItemcenterSpuMessage(String spuId) { log.warn("two process spuId ={}", spuId); }}

创建一个测试的生产者,定时往两个topic中发送消息KafkaProducer

@Slf4j@Componentpublic class KafkaProducer { @Resource private KafkaTemplate kafkaOneTemplate; @Resource private KafkaTemplate kafkaTwoTemplate; private void sendTest() { kafkaOneTemplate.send("topic1", "hello kafka one"); kafkaTwoTemplate.send("topic2", "hello kafka two"); } @Scheduled(fixedRate = 1000 * 10) public void testKafka() { log.info("send message..."); sendTest(); }}

最后运行效果: