Springboot如何实现与Active消息队列的完美整合?

2026-04-30 09:252阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2091个文字,预计阅读时间需要9分钟。

Springboot如何实现与Active消息队列的完美整合?

简单理解:ActiveMQ是Apache开源项目中的一款消息总线,是一个兼容Java Message Service(JMS)面向消息的中间件。它提供灵活的应用程序架构,主要用于服务间进行异步通信。

简单理解:

Active是Apache公司旗下的一个消息总线,ActiveMQ是一个开源兼容Java Message Service(JMS) 面向消息的中件间. 是一个提供松耦合的应用程序架构.

主要用来在服务与服务之间进行异步通信的。

一、搭建步骤
1、相应jar包

<!-- 整合消息队列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 如果配置线程池则加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>

2、application.properties文件

#整合jms测试,安装在别的机器,防火墙和端口号记得开放 spring.activemq.broker-url=tcp://47.96.44.110:61616 spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依赖 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 #集群配置(后续需要在配上) #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) #消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉) # spring.jms.pub-sub-domain=true

3、Springboot主类

<!-- 主类需要多加一个@EnableJms注解,不过貌似我没有加的时候,也能运行,为安全起见姑且加上 --> @SpringBootApplication @EnableJms

4.5.......根据不同消息模式来写了。

二、点对点案例
我在这里案例中创建了两个点对点队列,所以他会有两个queue对象,同样对应每个queue对象,都会有单一对应的消费者。

1、Springboot主类

Springboot如何实现与Active消息队列的完美整合?

@SpringBootApplication @EnableJms public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } //新建一个的Queue对象,交给sringboot管理,这个queue的名称叫"first.queue". @Bean public Queue queue(){ return new ActiveMQQueue("first.queue"); } }

2.1、first.queue对应消费者

@Component public class FirstConsumer { //名为"first.queue"消息队列的消费者,通过JmsListener进行监听有没有消息,有消息会立刻读取过来 @JmsListener(destination="first.queue") public void receiveQueue(String text){ System.out.println("FirstConsumer收到的报文为:"+text); } }

2.2、two.queue对应消费者(后面会创建)

@Component public class TwoConsumer { //名为"two.queue"消息队列的消费者 @JmsListener(destination="two.queue") public void receiveQueue(String text){ System.out.println("TwoConsumer收到的报文为:"+text); } }

3、Service类

/** * 功能描述:消息生产 */ public interface ProducerService { // 功能描述:指定消息队列,还有消息 public void sendMessage(Destination destination, final String message); // 功能描述:使用默认消息队列, 发送消息 public void sendMessage( final String message); }

4、ServiceImpl实现类

/** * 功能描述:消息生产者实现类 */ @Service public class ProducerServiceImpl implements ProducerService{ //这个队列就是Springboot主类中bean的对象 @Autowired private Queue queue; //用来发送消息到broker的对象,可以理解连接数据库的JDBC @Autowired private JmsMessagingTemplate jmsTemplate; //发送消息,destination是发送到的队列,message是待发送的消息 @Override public void sendMessage(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } //发送消息,queue是发送到的队列,message是待发送的消息 @Override public void sendMessage(final String message) { jmsTemplate.convertAndSend(this.queue, message); } }

5.QueueController类

/** * 功能描述:点对点消息队列控制层 */ @RestController @RequestMapping("/api/v1") public class QueueController { @Autowired private ProducerService producerService; // 这里后面调用的是Springboot主类的quene队列 @GetMapping("first") public Object common(String msg){ producerService.sendMessage(msg); return "Success"; } // 这个队列是新建的一个名为two.queue的点对点消息队列 @GetMapping("two") public Object order(String msg){ Destination destination = new ActiveMQQueue("two.queue"); producerService.sendMessage(destination, msg); return "Success"; } }

6、案例演示:

从演示效果可以得出以下结论:

1:当springboot启动时候,就生成了这两个队列,而且他们都会有一个消费者

2:当我通过页面访问的时候,就相当于生产者把消息放到队列中,一旦放进去就会被消费者监听到,就可以获取生产者放进去的值并在后台打印出

顺便对页面中四个单词进行解释:

Number Of Pending Messages :待处理消息的数量。我们每次都会被监听处理掉,所以不存在待处理,如果存在就说这里面哪里出故障了,需要排查

Number Of Consumers :消费者数量

Messages Enqueued: 消息排列,这个只增不见,代表已经处理多少消息

Messages Dequeued: 消息出队。

三、发布/订阅者模式

在上面点对点代码的基础上,添加发布/订阅相关代码

1.appliaction.properties文件

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉) spring.jms.pub-sub-domain=true

2.Springboot主类添加

//新建一个topic队列 @Bean public Topic topic(){ return new ActiveMQTopic("video.topic"); }

3.添加多个消费者类

//这里定义了三个消费者 @Component public class TopicSub { @JmsListener(destination="video.topic") public void receive1(String text){ System.out.println("video.topic 消费者:receive1="+text); } @JmsListener(destination="video.topic") public void receive2(String text){ System.out.println("video.topic 消费者:receive2="+text); } @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消费者:receive3="+text); } }

4.Service类

//功能描述:消息发布者 public void publish(String msg);

5.ServiceImpl实现类

//=======发布订阅相关代码========= @Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic, msg); }

6.Controller类

// 这个队列是新建的一个名为two.queue的点对点消息队列 @GetMapping("topic") public Object topic(String msg){ producerService.publish(msg); return "Success"; }

7.演示效果:

从演示效果总结如下:

1:Springboot启动的时候,在Topics目录下,一共出现了5个消费者。first.queue一个消费者、two.queue一个消费者、video.topic三个消费者

2:当我在控制台输入信息后,video.topic的三个消费者都会监听video.topic发布的消息,并在控制台打印。

四、如何让点对点和发布订阅同时有效

为什么这么说呢,因为当我向上面一样同时开启,会发现点对点模式已经失效了。

效果演示

从演示效果,可以得出如下结论:

1:我们发现我们在页面输入..../two?msg=555消息后,后台并没有成功打印消息。再看Active界面发现,这个queue对象,确实有一条待处理的消息,但是我们发现,它对应的消费者数量是为0.

2:然而我们在打开topic页面发现,这里却存在一个消费者。

所以我个人理解是,当同时启动的时候,所产生的消费者默认都是Topic消费者,没有Queue消费者,所以它监听不到queue所待处理的消息。

当配置文件不加:spring.jms.pub-sub-domain=true 那么系统会默认支持quene(点对点模式),但一旦加上这段配置,系统又变成只支持发布订阅模式。

那如何同时都可以成功呢?

思路如下:

第一步:还是需要去掉配置文件中的:

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉) #spring.jms.pub-sub-domain=true

第二步:在发布订阅者的中消费者中指定独立的containerFactory

因为你去掉上面的配置,那么系统就默认是queue,所以@JmsListener如果不指定独立的containerFactory的话是只能消费queue消息

@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("video.topic 消费者:receive1="+text); } @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("video.topic 消费者:receive2="+text); } //第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否会打印出 @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消费者:receive3="+text); }

第三步:定义独立的topic定义独立的JmsListenerContainer

在springboot主类中添加:

@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }

效果:

得出结论:

1:点对点,和发布订阅都有用

2:receive3没有指定独立的containerFactory一样没有打印出来。

源码
github地址:github.com/yudiandemingzi/springbootAcitveMQ

本文共计2091个文字,预计阅读时间需要9分钟。

Springboot如何实现与Active消息队列的完美整合?

简单理解:ActiveMQ是Apache开源项目中的一款消息总线,是一个兼容Java Message Service(JMS)面向消息的中间件。它提供灵活的应用程序架构,主要用于服务间进行异步通信。

简单理解:

Active是Apache公司旗下的一个消息总线,ActiveMQ是一个开源兼容Java Message Service(JMS) 面向消息的中件间. 是一个提供松耦合的应用程序架构.

主要用来在服务与服务之间进行异步通信的。

一、搭建步骤
1、相应jar包

<!-- 整合消息队列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 如果配置线程池则加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>

2、application.properties文件

#整合jms测试,安装在别的机器,防火墙和端口号记得开放 spring.activemq.broker-url=tcp://47.96.44.110:61616 spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依赖 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 #集群配置(后续需要在配上) #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) #消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉) # spring.jms.pub-sub-domain=true

3、Springboot主类

<!-- 主类需要多加一个@EnableJms注解,不过貌似我没有加的时候,也能运行,为安全起见姑且加上 --> @SpringBootApplication @EnableJms

4.5.......根据不同消息模式来写了。

二、点对点案例
我在这里案例中创建了两个点对点队列,所以他会有两个queue对象,同样对应每个queue对象,都会有单一对应的消费者。

1、Springboot主类

Springboot如何实现与Active消息队列的完美整合?

@SpringBootApplication @EnableJms public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } //新建一个的Queue对象,交给sringboot管理,这个queue的名称叫"first.queue". @Bean public Queue queue(){ return new ActiveMQQueue("first.queue"); } }

2.1、first.queue对应消费者

@Component public class FirstConsumer { //名为"first.queue"消息队列的消费者,通过JmsListener进行监听有没有消息,有消息会立刻读取过来 @JmsListener(destination="first.queue") public void receiveQueue(String text){ System.out.println("FirstConsumer收到的报文为:"+text); } }

2.2、two.queue对应消费者(后面会创建)

@Component public class TwoConsumer { //名为"two.queue"消息队列的消费者 @JmsListener(destination="two.queue") public void receiveQueue(String text){ System.out.println("TwoConsumer收到的报文为:"+text); } }

3、Service类

/** * 功能描述:消息生产 */ public interface ProducerService { // 功能描述:指定消息队列,还有消息 public void sendMessage(Destination destination, final String message); // 功能描述:使用默认消息队列, 发送消息 public void sendMessage( final String message); }

4、ServiceImpl实现类

/** * 功能描述:消息生产者实现类 */ @Service public class ProducerServiceImpl implements ProducerService{ //这个队列就是Springboot主类中bean的对象 @Autowired private Queue queue; //用来发送消息到broker的对象,可以理解连接数据库的JDBC @Autowired private JmsMessagingTemplate jmsTemplate; //发送消息,destination是发送到的队列,message是待发送的消息 @Override public void sendMessage(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } //发送消息,queue是发送到的队列,message是待发送的消息 @Override public void sendMessage(final String message) { jmsTemplate.convertAndSend(this.queue, message); } }

5.QueueController类

/** * 功能描述:点对点消息队列控制层 */ @RestController @RequestMapping("/api/v1") public class QueueController { @Autowired private ProducerService producerService; // 这里后面调用的是Springboot主类的quene队列 @GetMapping("first") public Object common(String msg){ producerService.sendMessage(msg); return "Success"; } // 这个队列是新建的一个名为two.queue的点对点消息队列 @GetMapping("two") public Object order(String msg){ Destination destination = new ActiveMQQueue("two.queue"); producerService.sendMessage(destination, msg); return "Success"; } }

6、案例演示:

从演示效果可以得出以下结论:

1:当springboot启动时候,就生成了这两个队列,而且他们都会有一个消费者

2:当我通过页面访问的时候,就相当于生产者把消息放到队列中,一旦放进去就会被消费者监听到,就可以获取生产者放进去的值并在后台打印出

顺便对页面中四个单词进行解释:

Number Of Pending Messages :待处理消息的数量。我们每次都会被监听处理掉,所以不存在待处理,如果存在就说这里面哪里出故障了,需要排查

Number Of Consumers :消费者数量

Messages Enqueued: 消息排列,这个只增不见,代表已经处理多少消息

Messages Dequeued: 消息出队。

三、发布/订阅者模式

在上面点对点代码的基础上,添加发布/订阅相关代码

1.appliaction.properties文件

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉) spring.jms.pub-sub-domain=true

2.Springboot主类添加

//新建一个topic队列 @Bean public Topic topic(){ return new ActiveMQTopic("video.topic"); }

3.添加多个消费者类

//这里定义了三个消费者 @Component public class TopicSub { @JmsListener(destination="video.topic") public void receive1(String text){ System.out.println("video.topic 消费者:receive1="+text); } @JmsListener(destination="video.topic") public void receive2(String text){ System.out.println("video.topic 消费者:receive2="+text); } @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消费者:receive3="+text); } }

4.Service类

//功能描述:消息发布者 public void publish(String msg);

5.ServiceImpl实现类

//=======发布订阅相关代码========= @Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic, msg); }

6.Controller类

// 这个队列是新建的一个名为two.queue的点对点消息队列 @GetMapping("topic") public Object topic(String msg){ producerService.publish(msg); return "Success"; }

7.演示效果:

从演示效果总结如下:

1:Springboot启动的时候,在Topics目录下,一共出现了5个消费者。first.queue一个消费者、two.queue一个消费者、video.topic三个消费者

2:当我在控制台输入信息后,video.topic的三个消费者都会监听video.topic发布的消息,并在控制台打印。

四、如何让点对点和发布订阅同时有效

为什么这么说呢,因为当我向上面一样同时开启,会发现点对点模式已经失效了。

效果演示

从演示效果,可以得出如下结论:

1:我们发现我们在页面输入..../two?msg=555消息后,后台并没有成功打印消息。再看Active界面发现,这个queue对象,确实有一条待处理的消息,但是我们发现,它对应的消费者数量是为0.

2:然而我们在打开topic页面发现,这里却存在一个消费者。

所以我个人理解是,当同时启动的时候,所产生的消费者默认都是Topic消费者,没有Queue消费者,所以它监听不到queue所待处理的消息。

当配置文件不加:spring.jms.pub-sub-domain=true 那么系统会默认支持quene(点对点模式),但一旦加上这段配置,系统又变成只支持发布订阅模式。

那如何同时都可以成功呢?

思路如下:

第一步:还是需要去掉配置文件中的:

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉) #spring.jms.pub-sub-domain=true

第二步:在发布订阅者的中消费者中指定独立的containerFactory

因为你去掉上面的配置,那么系统就默认是queue,所以@JmsListener如果不指定独立的containerFactory的话是只能消费queue消息

@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("video.topic 消费者:receive1="+text); } @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("video.topic 消费者:receive2="+text); } //第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否会打印出 @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消费者:receive3="+text); }

第三步:定义独立的topic定义独立的JmsListenerContainer

在springboot主类中添加:

@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }

效果:

得出结论:

1:点对点,和发布订阅都有用

2:receive3没有指定独立的containerFactory一样没有打印出来。

源码
github地址:github.com/yudiandemingzi/springbootAcitveMQ