如何通过long pull技术构建简易的消息队列MQ系统?

2026-05-22 14:511阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计816个文字,预计阅读时间需要4分钟。

如何通过long pull技术构建简易的消息队列MQ系统?

我们都使用过消息中间件,其作用无需过多描述。但对消费者而言,确实有一些权限问题。例如,是使用push模式还是pull模式,各有优缺点。这并非文本想讨论的问题。我们更关注如何设计更高效的消息处理策略。

  我们都用过消息中间件,它的作用自不必多说。但对于消费者却一直有一些权衡,就是使用push,还是pull模式的问题,这当然是各有优劣。当然,这并不是本文想讨论的问题。我们想在不使用长连接的情意下,如何实现实时的消息消费,而不至于让server端压力过大。大体上来说,这是一种主动拉取pull的方式。具体情况如何,且看且听。

1. 架构示意图

  既然是一个消息中间的作用,我们必须得模拟一个生产消费者模型,如下:

  生产者集群->消息中心集群->消费者集群

  只是这里的生产和消息中心也许我们可以合二为一,为简单起见,可能我们消费者只是想知道数据发生了变化。

  以上是一个通用模型,接下来再说说我如何以long pull消息消费,其流程图如下:

如何通过long pull技术构建简易的消息队列MQ系统?

  消费者一直请求连接->消息中心->有数据到来或者超时->消费者处理数据->发送ack确认->继续请求连接

  如此一来,我们基本上就实现了一个消费模型了。但是有个问题,我们一直在不停地请求server,这会不会让server疲于奔命?是的,如果按照正常的localhost:8081/simpleMessageCenter/consumeData?topicName=q&offset=19&maxWait=50000 // 2. 再访问生产者 localhost:8081/simpleMessageCenter/sendMsg?topicName=q&extraId=d3&data=aaaaaaaaaaa

  在生产者没有数据进来前,消费者会一直在等待,而生产者产生数据后,消费者就立即展示结果了。我们要实现的,不就是这个效果吗?

5.4. 消费者一直请求样例

  在浏览器上我们看到的只是一次请求,但如果真正想实现,一直消费数据,则必须有一种订阅的感觉。其实就是不停的请求,处理,再请求的过程。

public class SimpleMessageCenterTest { @Test public void testConsumerSubscribe() { long offset = 0; String urlPrefix = "localhost:8081/simpleMessageCenter/consumeData?topicName=q&maxWait=50000&offset="; while (!Thread.interrupted()) { String dataListStr = HttpUtils.doGet(urlPrefix + offset); System.out.println("offsetStart: " + offset + ", got data:" + dataListStr); List<Object> dataListParsed = JSONObject.parseArray(dataListStr); // 不解析最终的offset了,大概就是根据最后一次offset再发起请求即可 offset += dataListParsed.size(); } } }

  以上,就是本次分享的小轮子了。我们抛却了消息系统中的一个重要且复杂的环节:存储。供参考。

不要害怕今日的苦,你要相信明天,更苦!

本文共计816个文字,预计阅读时间需要4分钟。

如何通过long pull技术构建简易的消息队列MQ系统?

我们都使用过消息中间件,其作用无需过多描述。但对消费者而言,确实有一些权限问题。例如,是使用push模式还是pull模式,各有优缺点。这并非文本想讨论的问题。我们更关注如何设计更高效的消息处理策略。

  我们都用过消息中间件,它的作用自不必多说。但对于消费者却一直有一些权衡,就是使用push,还是pull模式的问题,这当然是各有优劣。当然,这并不是本文想讨论的问题。我们想在不使用长连接的情意下,如何实现实时的消息消费,而不至于让server端压力过大。大体上来说,这是一种主动拉取pull的方式。具体情况如何,且看且听。

1. 架构示意图

  既然是一个消息中间的作用,我们必须得模拟一个生产消费者模型,如下:

  生产者集群->消息中心集群->消费者集群

  只是这里的生产和消息中心也许我们可以合二为一,为简单起见,可能我们消费者只是想知道数据发生了变化。

  以上是一个通用模型,接下来再说说我如何以long pull消息消费,其流程图如下:

如何通过long pull技术构建简易的消息队列MQ系统?

  消费者一直请求连接->消息中心->有数据到来或者超时->消费者处理数据->发送ack确认->继续请求连接

  如此一来,我们基本上就实现了一个消费模型了。但是有个问题,我们一直在不停地请求server,这会不会让server疲于奔命?是的,如果按照正常的localhost:8081/simpleMessageCenter/consumeData?topicName=q&offset=19&maxWait=50000 // 2. 再访问生产者 localhost:8081/simpleMessageCenter/sendMsg?topicName=q&extraId=d3&data=aaaaaaaaaaa

  在生产者没有数据进来前,消费者会一直在等待,而生产者产生数据后,消费者就立即展示结果了。我们要实现的,不就是这个效果吗?

5.4. 消费者一直请求样例

  在浏览器上我们看到的只是一次请求,但如果真正想实现,一直消费数据,则必须有一种订阅的感觉。其实就是不停的请求,处理,再请求的过程。

public class SimpleMessageCenterTest { @Test public void testConsumerSubscribe() { long offset = 0; String urlPrefix = "localhost:8081/simpleMessageCenter/consumeData?topicName=q&maxWait=50000&offset="; while (!Thread.interrupted()) { String dataListStr = HttpUtils.doGet(urlPrefix + offset); System.out.println("offsetStart: " + offset + ", got data:" + dataListStr); List<Object> dataListParsed = JSONObject.parseArray(dataListStr); // 不解析最终的offset了,大概就是根据最后一次offset再发起请求即可 offset += dataListParsed.size(); } } }

  以上,就是本次分享的小轮子了。我们抛却了消息系统中的一个重要且复杂的环节:存储。供参考。

不要害怕今日的苦,你要相信明天,更苦!