Spring Cloud Stream如何与Kafka完美融合?

2026-05-21 08:521阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计374个文字,预计阅读时间需要2分钟。

Spring Cloud Stream如何与Kafka完美融合?

1. 在 `pom.xml` 文件中引入 Kafka 依赖:xml org.springframework.cloud spring-cloud-stream-binder-kafka

2. 在 `application.yml` 文件中配置 Kafka 连接信息:yamlspring: cloud: stream: kafka: binder: brokers: xxx.xxx.xx

1.pom文件导入依赖

<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>

2.application.yml文件配置

spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址 bindings: xxx_output: // 通道名称 destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) xxx_input: destination: xxx // 消息发往的目的地,对应topic group: xxx // 对应kafka的group

3.创建消息发送者

@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道 @Service public class MqService { @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) private MessageChannel oesWorkbenchChannel; /** * 发送一条kafka消息 */ public boolean sendLifeData(Object object) { return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } // 发布通道 public interface Source { @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel }

4.创建消息监听者

@Slf4j @EnableBinding(Sink.class) public class WorkbenchStreamListener { @Resource private FileService fileService; @StreamListener(KafkaConstants.xxx_input) // 监听接受通道 public void receiveData(MoveMessage moveMessage) { } } // 接受通道 public interface Sink { @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel }

接下来就可以愉快的发送监听消息了

Spring Cloud Stream如何与Kafka完美融合?

到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索易盾网络以前的文章或继续浏览下面的相关文章希望大家以后多多支持易盾网络!

本文共计374个文字,预计阅读时间需要2分钟。

Spring Cloud Stream如何与Kafka完美融合?

1. 在 `pom.xml` 文件中引入 Kafka 依赖:xml org.springframework.cloud spring-cloud-stream-binder-kafka

2. 在 `application.yml` 文件中配置 Kafka 连接信息:yamlspring: cloud: stream: kafka: binder: brokers: xxx.xxx.xx

1.pom文件导入依赖

<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>

2.application.yml文件配置

spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址 bindings: xxx_output: // 通道名称 destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) xxx_input: destination: xxx // 消息发往的目的地,对应topic group: xxx // 对应kafka的group

3.创建消息发送者

@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道 @Service public class MqService { @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) private MessageChannel oesWorkbenchChannel; /** * 发送一条kafka消息 */ public boolean sendLifeData(Object object) { return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } // 发布通道 public interface Source { @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel }

4.创建消息监听者

@Slf4j @EnableBinding(Sink.class) public class WorkbenchStreamListener { @Resource private FileService fileService; @StreamListener(KafkaConstants.xxx_input) // 监听接受通道 public void receiveData(MoveMessage moveMessage) { } } // 接受通道 public interface Sink { @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel }

接下来就可以愉快的发送监听消息了

Spring Cloud Stream如何与Kafka完美融合?

到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索易盾网络以前的文章或继续浏览下面的相关文章希望大家以后多多支持易盾网络!