Spring Boot如何实现与KAFKA消息队列的集成示例?

2026-05-16 05:562阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计546个文字,预计阅读时间需要3分钟。

Spring Boot如何实现与KAFKA消息队列的集成示例?

这里使用Spring Kafka依赖和KafkaTemplate对象操作Kafka服务。一、添加依赖和配置项:

1.在Pom文件中添加依赖:

xml org.springframework.kafka spring-kafka

2.添加配置项:

propertiesspring.kafka.bootstrap-servers=kafka-broker1:9092,kafka-broker2:9092

这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务。

一、添加依赖和添加配置项

1.1、在 Pom 文件中添加依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

1.2、添加配置项

spring: kafka: bootstrap-servers: 12.168.3.62:9092 # 指定kafka 代理地址,可以多个 producer: retries: 2 # 写入失败时,重试次数。当retris为0时,produce不会重复。 batch-size: 1000 #每次批量发送消息的数量,produce积累到一定数据,一次发送 buffer-memory: 33554432 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 acks: 0 #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,如果设置为零,则生产者将不会等待来自服务器的任何确认。 key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定消息key和消息体的编解码方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer

二、代码编写

Spring Boot如何实现与KAFKA消息队列的集成示例?

2.1、添加一个消息类

package com.jsh.mgt.kafkaTemplate.kafka; import java.util.Date; import lombok.Data; /** * @since 2020/5/21 14:13 */ @Data public class Message { private Long id; //id private String msg; //消息 private Date sendTime; //时间戳 }

2.2、设置消息生产者

package com.jsh.mgt.kafkaTemplate.Controllers; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.jsh.mgt.kafkaTemplate.kafka.Message; import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; /** * @since 2020/5/21 11:19 */ @RestController public class KafkaController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; private Gson gson = new GsonBuilder().create(); @GetMapping("/kafka/{msg}") public Object test(@PathVariable("msg") String msg) { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()+ "-"+msg); message.setSendTime(new Date()); kafkaTemplate.send("topic-create",gson.toJson(message)); return "ok"; } }

以上就是Spring boot 整合 KAFKA 消息队列的示例的详细内容,更多关于Spring boot 整合消息队列的资料请关注易盾网络其它相关文章!

标签:

本文共计546个文字,预计阅读时间需要3分钟。

Spring Boot如何实现与KAFKA消息队列的集成示例?

这里使用Spring Kafka依赖和KafkaTemplate对象操作Kafka服务。一、添加依赖和配置项:

1.在Pom文件中添加依赖:

xml org.springframework.kafka spring-kafka

2.添加配置项:

propertiesspring.kafka.bootstrap-servers=kafka-broker1:9092,kafka-broker2:9092

这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务。

一、添加依赖和添加配置项

1.1、在 Pom 文件中添加依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

1.2、添加配置项

spring: kafka: bootstrap-servers: 12.168.3.62:9092 # 指定kafka 代理地址,可以多个 producer: retries: 2 # 写入失败时,重试次数。当retris为0时,produce不会重复。 batch-size: 1000 #每次批量发送消息的数量,produce积累到一定数据,一次发送 buffer-memory: 33554432 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 acks: 0 #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,如果设置为零,则生产者将不会等待来自服务器的任何确认。 key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定消息key和消息体的编解码方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer

二、代码编写

Spring Boot如何实现与KAFKA消息队列的集成示例?

2.1、添加一个消息类

package com.jsh.mgt.kafkaTemplate.kafka; import java.util.Date; import lombok.Data; /** * @since 2020/5/21 14:13 */ @Data public class Message { private Long id; //id private String msg; //消息 private Date sendTime; //时间戳 }

2.2、设置消息生产者

package com.jsh.mgt.kafkaTemplate.Controllers; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.jsh.mgt.kafkaTemplate.kafka.Message; import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; /** * @since 2020/5/21 11:19 */ @RestController public class KafkaController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; private Gson gson = new GsonBuilder().create(); @GetMapping("/kafka/{msg}") public Object test(@PathVariable("msg") String msg) { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()+ "-"+msg); message.setSendTime(new Date()); kafkaTemplate.send("topic-create",gson.toJson(message)); return "ok"; } }

以上就是Spring boot 整合 KAFKA 消息队列的示例的详细内容,更多关于Spring boot 整合消息队列的资料请关注易盾网络其它相关文章!

标签: