消息队列中,推拉模式的核心差异体现在哪方面?

2026-05-22 15:111阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1575个文字,预计阅读时间需要7分钟。

消息队列中,推拉模式的核心差异体现在哪方面?

家好,我是【架构摆渡人】,一名十年的程序猿。这是消息队列系列的第五篇文章,本系列将分享很多在实际工作中实用的经验。如有收获,请分享给更多朋友。在学以致用方面,有哪些经验可以分享?

大家好,我是,一只十年的程序猿。这是消息队列的第五篇文章,这个系列会给大家分享很多在实际工作中有用的经验,如果有收获,还请分享给更多的朋友。

在学习消息队列的时候,大家都有一个共同的问题,那就是消息到底是服务端推送给客户端还是客户端主动去服务端拉取然后进行消费。今天这篇文章就来解答大家的这个的疑问。

推模式

首先我们来解决下什么是推模式,顾名思义,推模式就是我推给你。在MQ中也就是Broker收到消息后主动推送给Consumer的操作,叫做推模式。

推模式的实现是客户端会与服务端(Broker)建立长连接,当有消息时服务端会通过长连接通道将消息推送给客户端,这样客户端就能实时消费到最新的消息。

优点:

  • 实时性强,有消息立马推送给客户端。

  • 客户端实现简单,只需要监听服务端的推送即可。
    缺点:

  • 容易导致客户端发生消息堆积的情况,因为每个客户端的消费能力是不同的,如果简单粗暴的有消息就推送,就会会出现堆积情况。

  • 服务端逻辑复杂,因为简单的推送会导致客户端出现堆积问题,所以服务端需要进行优化。记录给每个客户端的推送数据,然后根据每个客户端的消费能力去平衡数据推送的速度。

拉模式

拉模式,顾名思义,就是我主动去拉取消息。在MQ中也就是Consumer主动向Broker询问:有没有消息啊,有的话给我一部分呗,我先拉1000条进行处理,处理完成之后再拉1000条。

拉模式肯定不能用传统的定时拉取,定时长及时性无法保证,定时短,在没有消息的情况下对服务端会一直请求。所以很多拉模式都是基于长轮询来实现。

长轮询就是客户端向服务端发起请求,如果此时有数据就直接返回,如果没有数据就保持连接,等到有数据时就直接返回。如果一直没有数据,超时后客户端再次发起请求,保持连接,这就是长轮询的实现原理。很多的开源框架都是用的这种方式,比如配置中心Apollo的推送。

优点:

  • 不会造成客户端消息积压,消费完了再去拉取,主动权在自己手中。

  • 长轮询实现的拉模式实时性也能够保证。
    缺点:

    消息队列中,推拉模式的核心差异体现在哪方面?

  • 客户端的逻辑实现相对复杂点,简化了服务端的逻辑。
    推和拉都有各自的优势和劣势,不过目前主流的消息队列大部分都用的拉模式,比如RocketMQ,Kafka。

拉模式代码实现

Java中可以使用Spring DeferredResult来实现异步请求,如果有消息就直接返回,没有消息则将此请求存储起来,等到有消息是再通知该请求进行返回。如果一直没有消息那么就等到超时,客户端收到超时消息重新进行消息的查询。

首先我们定义一个消息查询的接口,定义如下:

@GetMapping("/queryMessage") public DeferredResult<String> queryMessage(String client) { DeferredResult<String> deferredResult = new DeferredResult<>(10000L); String msg = messageQueue.poll(); if (Objects.nonNull(msg)) { deferredResult.setResult(msg); } else { deferredResult.onTimeout(() -> { deferredResultMap.remove(client); }); deferredResultMap.put(client, deferredResult); } return deferredResult; }

指定DeferredResult的超时时间为10秒,然后从messageQueue中获取消息,此处的逻辑就是获取没有被消息的消息,这里只是模拟。

如果有消息直接设置DeferredResult的result,立马返回。如果当前没有消息则注册一个超时的回调,进行DeferredResult的移除动作。同时将DeferredResult对象缓存起来。

然后我们在写一个添加消息的接口,定义如下:

@GetMapping("/addMessage") public String addMessage(String client) { messageQueue.add("test"); DeferredResult deferredResult = deferredResultMap.get(client); if (Objects.nonNull(deferredResult)) { deferredResult.setResult("test"); } return "success"; }

当有消息添加的时候,根据对应的client获取缓存的DeferredResult,如果有的话就直接设置结果,立马返回,这样客户端就能立马收到新的消息,实时性也有保证。

接下来模拟一个客户端去查询消息,定义如下:

ublic class MqClient { public static void main(String[] args) { queryMessage(); } private static void queryMessage() { String result = request("localhost:8080/queryMessage?client=xxx"); if (result != null) { // 本地进行消费 // ...... } // 继续拉取消息 queryMessage(); } private static String request(String url) { HttpURLConnection connection = null; BufferedReader reader = null; try { URL getUrl = new URL(url); connection = (HttpURLConnection) getUrl.openConnection(); connection.setReadTimeout(20000); connection.setConnectTimeout(3000); connection.setRequestMethod("GET"); connection.setRequestProperty("Accept-Charset", "utf-8"); connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("Charset", "UTF-8"); System.out.println(connection.getResponseCode()); if (200 == connection.getResponseCode()) { reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line = null; while ((line = reader.readLine()) != null) { result.append(line); } System.out.println("结果 " + result); return result.toString(); } } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { connection.disconnect(); } } return null; } }

这里需要注意的是,客户端的请求超时时间要大于服务端定义的超时时间,主流程就是有消息进行本地消费,然后继续拉取。

如果没有消息,请求会一直等待,知道服务端超时,此时客户端这边会拿到github.com/yinjihuan/fox-mock

本文共计1575个文字,预计阅读时间需要7分钟。

消息队列中,推拉模式的核心差异体现在哪方面?

家好,我是【架构摆渡人】,一名十年的程序猿。这是消息队列系列的第五篇文章,本系列将分享很多在实际工作中实用的经验。如有收获,请分享给更多朋友。在学以致用方面,有哪些经验可以分享?

大家好,我是,一只十年的程序猿。这是消息队列的第五篇文章,这个系列会给大家分享很多在实际工作中有用的经验,如果有收获,还请分享给更多的朋友。

在学习消息队列的时候,大家都有一个共同的问题,那就是消息到底是服务端推送给客户端还是客户端主动去服务端拉取然后进行消费。今天这篇文章就来解答大家的这个的疑问。

推模式

首先我们来解决下什么是推模式,顾名思义,推模式就是我推给你。在MQ中也就是Broker收到消息后主动推送给Consumer的操作,叫做推模式。

推模式的实现是客户端会与服务端(Broker)建立长连接,当有消息时服务端会通过长连接通道将消息推送给客户端,这样客户端就能实时消费到最新的消息。

优点:

  • 实时性强,有消息立马推送给客户端。

  • 客户端实现简单,只需要监听服务端的推送即可。
    缺点:

  • 容易导致客户端发生消息堆积的情况,因为每个客户端的消费能力是不同的,如果简单粗暴的有消息就推送,就会会出现堆积情况。

  • 服务端逻辑复杂,因为简单的推送会导致客户端出现堆积问题,所以服务端需要进行优化。记录给每个客户端的推送数据,然后根据每个客户端的消费能力去平衡数据推送的速度。

拉模式

拉模式,顾名思义,就是我主动去拉取消息。在MQ中也就是Consumer主动向Broker询问:有没有消息啊,有的话给我一部分呗,我先拉1000条进行处理,处理完成之后再拉1000条。

拉模式肯定不能用传统的定时拉取,定时长及时性无法保证,定时短,在没有消息的情况下对服务端会一直请求。所以很多拉模式都是基于长轮询来实现。

长轮询就是客户端向服务端发起请求,如果此时有数据就直接返回,如果没有数据就保持连接,等到有数据时就直接返回。如果一直没有数据,超时后客户端再次发起请求,保持连接,这就是长轮询的实现原理。很多的开源框架都是用的这种方式,比如配置中心Apollo的推送。

优点:

  • 不会造成客户端消息积压,消费完了再去拉取,主动权在自己手中。

  • 长轮询实现的拉模式实时性也能够保证。
    缺点:

    消息队列中,推拉模式的核心差异体现在哪方面?

  • 客户端的逻辑实现相对复杂点,简化了服务端的逻辑。
    推和拉都有各自的优势和劣势,不过目前主流的消息队列大部分都用的拉模式,比如RocketMQ,Kafka。

拉模式代码实现

Java中可以使用Spring DeferredResult来实现异步请求,如果有消息就直接返回,没有消息则将此请求存储起来,等到有消息是再通知该请求进行返回。如果一直没有消息那么就等到超时,客户端收到超时消息重新进行消息的查询。

首先我们定义一个消息查询的接口,定义如下:

@GetMapping("/queryMessage") public DeferredResult<String> queryMessage(String client) { DeferredResult<String> deferredResult = new DeferredResult<>(10000L); String msg = messageQueue.poll(); if (Objects.nonNull(msg)) { deferredResult.setResult(msg); } else { deferredResult.onTimeout(() -> { deferredResultMap.remove(client); }); deferredResultMap.put(client, deferredResult); } return deferredResult; }

指定DeferredResult的超时时间为10秒,然后从messageQueue中获取消息,此处的逻辑就是获取没有被消息的消息,这里只是模拟。

如果有消息直接设置DeferredResult的result,立马返回。如果当前没有消息则注册一个超时的回调,进行DeferredResult的移除动作。同时将DeferredResult对象缓存起来。

然后我们在写一个添加消息的接口,定义如下:

@GetMapping("/addMessage") public String addMessage(String client) { messageQueue.add("test"); DeferredResult deferredResult = deferredResultMap.get(client); if (Objects.nonNull(deferredResult)) { deferredResult.setResult("test"); } return "success"; }

当有消息添加的时候,根据对应的client获取缓存的DeferredResult,如果有的话就直接设置结果,立马返回,这样客户端就能立马收到新的消息,实时性也有保证。

接下来模拟一个客户端去查询消息,定义如下:

ublic class MqClient { public static void main(String[] args) { queryMessage(); } private static void queryMessage() { String result = request("localhost:8080/queryMessage?client=xxx"); if (result != null) { // 本地进行消费 // ...... } // 继续拉取消息 queryMessage(); } private static String request(String url) { HttpURLConnection connection = null; BufferedReader reader = null; try { URL getUrl = new URL(url); connection = (HttpURLConnection) getUrl.openConnection(); connection.setReadTimeout(20000); connection.setConnectTimeout(3000); connection.setRequestMethod("GET"); connection.setRequestProperty("Accept-Charset", "utf-8"); connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("Charset", "UTF-8"); System.out.println(connection.getResponseCode()); if (200 == connection.getResponseCode()) { reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line = null; while ((line = reader.readLine()) != null) { result.append(line); } System.out.println("结果 " + result); return result.toString(); } } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { connection.disconnect(); } } return null; } }

这里需要注意的是,客户端的请求超时时间要大于服务端定义的超时时间,主流程就是有消息进行本地消费,然后继续拉取。

如果没有消息,请求会一直等待,知道服务端超时,此时客户端这边会拿到github.com/yinjihuan/fox-mock