如何将SpringBoot与Redis结合,改写为高效长尾词消息队列解决方案?

2026-04-19 20:212阅读0评论SEO问题
  • 内容介绍
  • 相关推荐

本文共计1146个文字,预计阅读时间需要5分钟。

如何将SpringBoot与Redis结合,改写为高效长尾词消息队列解决方案?

Redis的list数据结构是一种按照插入顺序排序的字符串链表。它可以通过lpush和rpop,或者rpush和lpop来实现消息队列的功能。

例如,使用lpush和rpop可以实现先进先出(FIFO)的消息队列,而使用rpush和lpop可以实现后进先出(LIFO)的消息队列。

1. lpush和rpop:实现FIFO消息队列bashlpush queue message1lpush queue message2rpop queue输出:message1

2. rpush和lpop:实现LIFO消息队列bashrpush queue message1rpush queue message2lpop queue输出:message2

list 原理说明

Redis 的 list 是按照插入顺序排序的字符串链表。

如图所示,可以通过 lpush和 rpop 或者 rpush 和 lpop 实现消息队列。

1 lpush 和 rpop

2 rpush 和 lpop

消息队列功能实现

引入 Redis 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>

applicat.yml添加Redis配置

spring: redis: host: 127.0.0.1 database: 0 port: 6379 jedis: pool: max-active: 256 max-idle: 8 min-idle: 1

Redis配置类

package com.sb.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }

MQ发送和接收接口

package com.sb.service; public interface MQService { void produce(String string); void consume(); }

MQ发送和接收实现类

package com.sb.service.impl; import com.sb.service.MQService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.lang.Nullable; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; @Service public class MQServiceImpl implements MQService { private static Logger log = LoggerFactory.getLogger(MQServiceImpl.class); private static final String MESSAGE_KEY = "message:queue"; @Resource private RedisTemplate redisTemplate; @Override public void produce(String string) { redisTemplate.opsForList().leftPush(MESSAGE_KEY, string); } @Override public void consume() { String string = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info("consume : {}", string); } }

MQ发送和接收API接口

package com.sb.controller; import com.sb.service.MQService; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping(value="/api") public class MQController { @Resource private MQService mQService; @RequestMapping(value = "/produce", method=RequestMethod.GET) public void produce(@RequestParam(name = "key") String key) { mQService.produce(key); } @RequestMapping(value="/consume", method=RequestMethod.GET) public void consume() { while (true) { mQService.consume(); } } }

消息队列功能测试

调用 localhost:8080/api/produce 接口往队列里面添加 a、b、c、d元素。

调用 localhost:8080/api/consume 消费队列里面的元素。

从截图我们可以看到,即使当队列为空,消费者依然在不停的 pop 数据,这就是浪费生命的空轮询。

那如何解决这个空轮询的问题呢?

你也许会想使用 Thread.sleep() 让消费者线程隔一段时间再消费。

使用 Thread.sleep() 会有什么问题么?

A 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

B 如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

有没有更优雅和更合适的方式呢?

brpop 和 blpop 实现阻塞读取,下面以 blpop 为例来说明问题。

blpop 理论说明

blpop 命令

blpop key1...keyN timeout

blpop 说明

如何将SpringBoot与Redis结合,改写为高效长尾词消息队列解决方案?

blpop 是阻塞式列表的弹出原语。 当给定列表内没有任何元素可供弹出的时候, 连接将被 blpop 命令阻塞。直到有另一个客户端对给定的这些 key 的任意一个执行 lpush或 rpush 命令为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

key1...keyN:表示不同的队列名。

timeout:阻塞队列超时时间。

blpop 代码实现

public void blockingConsume() { List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { //队列没有元素会阻塞操作,直到队列获取新的元素或超时 return connection.bLPop(TIME_OUT, MESSAGE_KEY.getBytes()); } },new StringRedisSerializer()); for (Object str: obj) { log.info("blockingConsume : {}", str); } }

阻塞线程每隔10s超时执行一次。该方法解决了 CPU 空转的问题。

到此这篇关于SpringBoot集成Redis实现消息队列的方法的文章就介绍到这了,更多相关SpringBoot Redis消息队列内容请搜索易盾网络以前的文章或继续浏览下面的相关文章希望大家以后多多支持易盾网络!

本文共计1146个文字,预计阅读时间需要5分钟。

如何将SpringBoot与Redis结合,改写为高效长尾词消息队列解决方案?

Redis的list数据结构是一种按照插入顺序排序的字符串链表。它可以通过lpush和rpop,或者rpush和lpop来实现消息队列的功能。

例如,使用lpush和rpop可以实现先进先出(FIFO)的消息队列,而使用rpush和lpop可以实现后进先出(LIFO)的消息队列。

1. lpush和rpop:实现FIFO消息队列bashlpush queue message1lpush queue message2rpop queue输出:message1

2. rpush和lpop:实现LIFO消息队列bashrpush queue message1rpush queue message2lpop queue输出:message2

list 原理说明

Redis 的 list 是按照插入顺序排序的字符串链表。

如图所示,可以通过 lpush和 rpop 或者 rpush 和 lpop 实现消息队列。

1 lpush 和 rpop

2 rpush 和 lpop

消息队列功能实现

引入 Redis 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>

applicat.yml添加Redis配置

spring: redis: host: 127.0.0.1 database: 0 port: 6379 jedis: pool: max-active: 256 max-idle: 8 min-idle: 1

Redis配置类

package com.sb.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }

MQ发送和接收接口

package com.sb.service; public interface MQService { void produce(String string); void consume(); }

MQ发送和接收实现类

package com.sb.service.impl; import com.sb.service.MQService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.lang.Nullable; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; @Service public class MQServiceImpl implements MQService { private static Logger log = LoggerFactory.getLogger(MQServiceImpl.class); private static final String MESSAGE_KEY = "message:queue"; @Resource private RedisTemplate redisTemplate; @Override public void produce(String string) { redisTemplate.opsForList().leftPush(MESSAGE_KEY, string); } @Override public void consume() { String string = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info("consume : {}", string); } }

MQ发送和接收API接口

package com.sb.controller; import com.sb.service.MQService; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping(value="/api") public class MQController { @Resource private MQService mQService; @RequestMapping(value = "/produce", method=RequestMethod.GET) public void produce(@RequestParam(name = "key") String key) { mQService.produce(key); } @RequestMapping(value="/consume", method=RequestMethod.GET) public void consume() { while (true) { mQService.consume(); } } }

消息队列功能测试

调用 localhost:8080/api/produce 接口往队列里面添加 a、b、c、d元素。

调用 localhost:8080/api/consume 消费队列里面的元素。

从截图我们可以看到,即使当队列为空,消费者依然在不停的 pop 数据,这就是浪费生命的空轮询。

那如何解决这个空轮询的问题呢?

你也许会想使用 Thread.sleep() 让消费者线程隔一段时间再消费。

使用 Thread.sleep() 会有什么问题么?

A 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

B 如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

有没有更优雅和更合适的方式呢?

brpop 和 blpop 实现阻塞读取,下面以 blpop 为例来说明问题。

blpop 理论说明

blpop 命令

blpop key1...keyN timeout

blpop 说明

如何将SpringBoot与Redis结合,改写为高效长尾词消息队列解决方案?

blpop 是阻塞式列表的弹出原语。 当给定列表内没有任何元素可供弹出的时候, 连接将被 blpop 命令阻塞。直到有另一个客户端对给定的这些 key 的任意一个执行 lpush或 rpush 命令为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

key1...keyN:表示不同的队列名。

timeout:阻塞队列超时时间。

blpop 代码实现

public void blockingConsume() { List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { //队列没有元素会阻塞操作,直到队列获取新的元素或超时 return connection.bLPop(TIME_OUT, MESSAGE_KEY.getBytes()); } },new StringRedisSerializer()); for (Object str: obj) { log.info("blockingConsume : {}", str); } }

阻塞线程每隔10s超时执行一次。该方法解决了 CPU 空转的问题。

到此这篇关于SpringBoot集成Redis实现消息队列的方法的文章就介绍到这了,更多相关SpringBoot Redis消息队列内容请搜索易盾网络以前的文章或继续浏览下面的相关文章希望大家以后多多支持易盾网络!