RocketMQ如何实现高吞吐量消息队列?

2026-05-24 03:191阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2467个文字,预计阅读时间需要10分钟。

RocketMQ如何实现高吞吐量消息队列?

目录+带着问题向下看+(namesrv)nameserver+启动的逻辑+nameserver+功能+nameserver+问题解答+我们在写组件的时候+如何管理version+遍历+Field[K]+KVConfigManager+有什么作用+KVConfigManager+持久化+broker

目录
  • 带着问题 往下看 (namesrv)
  • nameserver 启动的逻辑
    • nameserver 功能
  • nameserver 问题解答
    • 我们在写组件的时候 怎么管理version
  • 遍历 Field[]
    • KVConfigManager 有什么作用
      • KVConfigManager 持久化
        • broker 是不是master判断
          • @ImportantField
            • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?
              • 总结

                带着问题 往下看 (namesrv)

                • 我们在写组件的时候 怎么管理version
                • 如果现在让你 维护一个 各个jar包公用的属性
                • System.exit(-1); 0 -1 -2 各种数都是干什么的,什么时候 用哪个
                • 环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?
                • 我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改
                • 大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?
                • 如果我们想 自己设置使用的log 组件,怎么办
                • 遍历 Field[] 的时候 想跳过 static的属性 怎么写代码?
                • 多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写
                • KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?
                • KVConfigManager 怎么保证的 持久化?
                • 怎么在 并发操作的时候 保证数据的安全性?
                • 方法的参数 使用final 有什么用?
                • 怎么判断的broker 是不是master
                • netty 怎么让nameserver 通知broker 信息的。
                • nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改
                • Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?
                • @ImportantField 干什么的? 什么时候 使用
                • 在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?
                • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

                请思考下 写写你的答案 再往下看

                nameserver 启动的逻辑

                nameserver 功能

                • 管理broker 集群
                • 属于注册中心 业务端 和nameserver 进行连接,获取broker地址
                • 负责维护broker 连接/心跳/监控

                nameserver 问题解答

                我们在写组件的时候 怎么管理version

                一方面是 在父类的 pom.xml 通过 进行 控制版本,然后 业务端通过

                <dependencyManagement> <dependencies> <dependency> <groupId>com.xxx</groupId> <artifactId>xxx</artifactId> <version>4.0.0-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>

                这是第一个 ,第二个 是 rocketmq 这种 在common 包下面 新建一个 MQVersion 管理版本

                这里会有一个问题,那这个版本管理 我用在哪里啊,不用不行么?

                • 为了方便测试,测试的时候 可能因为版本有差异 导致的问题。指定version 就没有这个问题了 2 broker 操作也是,(其实一句话 为了之后的版本兼容)比如

                if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { result.setCode(ResponseCode.SYSTEM_ERROR); result.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version)); log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return result; } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { }

                如果现在让你 维护一个 各个jar包公用的属性

                1 在common包搞一个 公共的实体类 随时用随时取呗,大不了就一个map 然后就put get

                2 System.setProperty 底层就是全局 map 进行put get

                extends Hashtable<Object,Object>

                环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?

                设置 rocketmq.home.dir=xxx

                我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改

                nettyServerConfig.setListenPort(9876);

                代码指定 的netty 监听端口,一般情况不改

                大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?

                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

                这是先把commandLine 转变为 Properties 对象,然后调用 namesrvConfig 反射方法 赋值

                如果我们想 自己设置使用的log 组件,怎么办

                LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

                遍历 Field[]

                遍历 Field[]的时候 想跳过 static的属性 怎么写代码?

                (field.getModifiers() & 0x00000008) != 0 如果为true 就是 static false为 非static

                多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写

                for (Entry<Object, Object> next : from.entrySet()) { Object fromObj = next.getValue(), toObj = to.get(next.getKey()); if (toObj != null && !toObj.equals(fromObj)) { log.info("Replace, key: {}, value: {} -> {}", next.getKey(), toObj, fromObj); } to.put(next.getKey(), fromObj); }

                因为 可能同时操作这个对象 导致 数据不一致 ,所以要加上 读写锁的 写锁

                KVConfigManager 有什么作用

                KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?

                是 kv 配置的管理器,主要是

                HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable

                以后写map 也要像这种方式 写注释。

                //读取的是 ./namesrv/kvConfig.json kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

                行吧 ,现在还不知道 这些kv的作用,先看看怎么存储的,到用的时候 我们接上,先知道 kv 存储在KVConfigManager类 configTable 属性中

                putKVConfig 使用的 ReentrantReadWriteLock 的写锁 保证数据一致性,如果map的key 存在了,不会进行覆盖,而是 跳过。

                KVConfigManager 持久化

                KVConfigManager 怎么保证的 持久化?

                RocketMQ如何实现高吞吐量消息队列?

                执行过 上面的 那些方法,执行 persist ,加读锁,如下图

                怎么在 并发操作的时候 保证数据的安全性?

                一方面 是 不可变类,其中返回属性的时候 要进行copy 简单来说 就是我通过get 方法出去的 对象 是 copy的对象,而不是 原来的对象,防止 外面通过引用 修改 属性值,把我们的对象 属性 进行修改。

                方法的参数 使用final 有什么用?

                • 确保,不会也不能对于参数进行修改,保证了调用发起方数据的安全;
                • 避免在方法体中修改参数,引起不必要的错误
                • 程序员工作不是一个人的工作,你设置为final,别人将来维护的时候一看就知道这个变量不能修改,而不需要去记忆这个是不能变化的值,是常量。这个是代码规范。

                broker 是不是master判断

                怎么判断的broker 是不是master

                //0 == brokerId MixAll.MASTER_ID == brokerId

                这个其实可以 抽出来一个公共的方法, 方便之后的修改

                netty 怎么让nameserver 通知broker 信息的。

                netty 保存的 channel 到时候用了 直接从map 获取 然后发送消息

                nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改

                BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2

                static final 写死的,如果 最后一次心跳时间 + 2分钟 都小于System.currentTimeMillis() 执行删除操作

                • 关闭 netty channel
                • brokerLiveTable 删除对应的实例

                这是一个定时任务 从项目 启动5s之后 ,每10s执行一次,说明 对broker的感知 会有些许 延迟。(最大也就 20s,一般10s以内感知)

                Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?

                当程序正常退出,系统调用 System.exit方法或虚拟机被关闭时才会执行添加的shutdownHook线程。其中shutdownHook是一个已初始化但并不有启动的线程,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以可通过这些钩子在jvm关闭的时候进行内存清理、资源回收等工作。

                @ImportantField

                @ImportantField 干什么的? 什么时候 使用

                最后的true 代表 是否只打印关键属性,写@ImportantField的 就一定会打,没有这个注解的就不打印了

                MixAll.printObjectProperties(console, brokerConfig, true);

                在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?

                isolateLogEnable 改为 true

                if (brokerConfig.isIsolateLogEnable()) { System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()); } if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) { System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId()); }

                broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

                无论是 -p 还是 -m 都是print 输出,本来就是希望打印日志,然后进程停止。

                opt = new Option("p", "printConfigItem", false, "Print all config item"); opt = new Option("m", "printImportantConfig", false, "Print important config item");

                总结

                这些 只是 namestr 的NamesrvController 初始化,更多关于RocketMQ NameServer 解析的资料请关注自由互联其它相关文章!

                本文共计2467个文字,预计阅读时间需要10分钟。

                RocketMQ如何实现高吞吐量消息队列?

                目录+带着问题向下看+(namesrv)nameserver+启动的逻辑+nameserver+功能+nameserver+问题解答+我们在写组件的时候+如何管理version+遍历+Field[K]+KVConfigManager+有什么作用+KVConfigManager+持久化+broker

                目录
                • 带着问题 往下看 (namesrv)
                • nameserver 启动的逻辑
                  • nameserver 功能
                • nameserver 问题解答
                  • 我们在写组件的时候 怎么管理version
                • 遍历 Field[]
                  • KVConfigManager 有什么作用
                    • KVConfigManager 持久化
                      • broker 是不是master判断
                        • @ImportantField
                          • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?
                            • 总结

                              带着问题 往下看 (namesrv)

                              • 我们在写组件的时候 怎么管理version
                              • 如果现在让你 维护一个 各个jar包公用的属性
                              • System.exit(-1); 0 -1 -2 各种数都是干什么的,什么时候 用哪个
                              • 环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?
                              • 我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改
                              • 大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?
                              • 如果我们想 自己设置使用的log 组件,怎么办
                              • 遍历 Field[] 的时候 想跳过 static的属性 怎么写代码?
                              • 多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写
                              • KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?
                              • KVConfigManager 怎么保证的 持久化?
                              • 怎么在 并发操作的时候 保证数据的安全性?
                              • 方法的参数 使用final 有什么用?
                              • 怎么判断的broker 是不是master
                              • netty 怎么让nameserver 通知broker 信息的。
                              • nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改
                              • Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?
                              • @ImportantField 干什么的? 什么时候 使用
                              • 在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?
                              • broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

                              请思考下 写写你的答案 再往下看

                              nameserver 启动的逻辑

                              nameserver 功能

                              • 管理broker 集群
                              • 属于注册中心 业务端 和nameserver 进行连接,获取broker地址
                              • 负责维护broker 连接/心跳/监控

                              nameserver 问题解答

                              我们在写组件的时候 怎么管理version

                              一方面是 在父类的 pom.xml 通过 进行 控制版本,然后 业务端通过

                              <dependencyManagement> <dependencies> <dependency> <groupId>com.xxx</groupId> <artifactId>xxx</artifactId> <version>4.0.0-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>

                              这是第一个 ,第二个 是 rocketmq 这种 在common 包下面 新建一个 MQVersion 管理版本

                              这里会有一个问题,那这个版本管理 我用在哪里啊,不用不行么?

                              • 为了方便测试,测试的时候 可能因为版本有差异 导致的问题。指定version 就没有这个问题了 2 broker 操作也是,(其实一句话 为了之后的版本兼容)比如

                              if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { result.setCode(ResponseCode.SYSTEM_ERROR); result.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version)); log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return result; } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { }

                              如果现在让你 维护一个 各个jar包公用的属性

                              1 在common包搞一个 公共的实体类 随时用随时取呗,大不了就一个map 然后就put get

                              2 System.setProperty 底层就是全局 map 进行put get

                              extends Hashtable<Object,Object>

                              环境变量如果不想使用 ROCKETMQ_HOME, 想变为 xxx 这怎么做,能做么?

                              设置 rocketmq.home.dir=xxx

                              我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改

                              nettyServerConfig.setListenPort(9876);

                              代码指定 的netty 监听端口,一般情况不改

                              大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?

                              MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

                              这是先把commandLine 转变为 Properties 对象,然后调用 namesrvConfig 反射方法 赋值

                              如果我们想 自己设置使用的log 组件,怎么办

                              LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

                              遍历 Field[]

                              遍历 Field[]的时候 想跳过 static的属性 怎么写代码?

                              (field.getModifiers() & 0x00000008) != 0 如果为true 就是 static false为 非static

                              多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写

                              for (Entry<Object, Object> next : from.entrySet()) { Object fromObj = next.getValue(), toObj = to.get(next.getKey()); if (toObj != null && !toObj.equals(fromObj)) { log.info("Replace, key: {}, value: {} -> {}", next.getKey(), toObj, fromObj); } to.put(next.getKey(), fromObj); }

                              因为 可能同时操作这个对象 导致 数据不一致 ,所以要加上 读写锁的 写锁

                              KVConfigManager 有什么作用

                              KVConfigManager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?

                              是 kv 配置的管理器,主要是

                              HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable

                              以后写map 也要像这种方式 写注释。

                              //读取的是 ./namesrv/kvConfig.json kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

                              行吧 ,现在还不知道 这些kv的作用,先看看怎么存储的,到用的时候 我们接上,先知道 kv 存储在KVConfigManager类 configTable 属性中

                              putKVConfig 使用的 ReentrantReadWriteLock 的写锁 保证数据一致性,如果map的key 存在了,不会进行覆盖,而是 跳过。

                              KVConfigManager 持久化

                              KVConfigManager 怎么保证的 持久化?

                              RocketMQ如何实现高吞吐量消息队列?

                              执行过 上面的 那些方法,执行 persist ,加读锁,如下图

                              怎么在 并发操作的时候 保证数据的安全性?

                              一方面 是 不可变类,其中返回属性的时候 要进行copy 简单来说 就是我通过get 方法出去的 对象 是 copy的对象,而不是 原来的对象,防止 外面通过引用 修改 属性值,把我们的对象 属性 进行修改。

                              方法的参数 使用final 有什么用?

                              • 确保,不会也不能对于参数进行修改,保证了调用发起方数据的安全;
                              • 避免在方法体中修改参数,引起不必要的错误
                              • 程序员工作不是一个人的工作,你设置为final,别人将来维护的时候一看就知道这个变量不能修改,而不需要去记忆这个是不能变化的值,是常量。这个是代码规范。

                              broker 是不是master判断

                              怎么判断的broker 是不是master

                              //0 == brokerId MixAll.MASTER_ID == brokerId

                              这个其实可以 抽出来一个公共的方法, 方便之后的修改

                              netty 怎么让nameserver 通知broker 信息的。

                              netty 保存的 channel 到时候用了 直接从map 获取 然后发送消息

                              nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改

                              BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2

                              static final 写死的,如果 最后一次心跳时间 + 2分钟 都小于System.currentTimeMillis() 执行删除操作

                              • 关闭 netty channel
                              • brokerLiveTable 删除对应的实例

                              这是一个定时任务 从项目 启动5s之后 ,每10s执行一次,说明 对broker的感知 会有些许 延迟。(最大也就 20s,一般10s以内感知)

                              Runtime.getRuntime().addShutdownHook 有什么用,没有不行么?

                              当程序正常退出,系统调用 System.exit方法或虚拟机被关闭时才会执行添加的shutdownHook线程。其中shutdownHook是一个已初始化但并不有启动的线程,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以可通过这些钩子在jvm关闭的时候进行内存清理、资源回收等工作。

                              @ImportantField

                              @ImportantField 干什么的? 什么时候 使用

                              最后的true 代表 是否只打印关键属性,写@ImportantField的 就一定会打,没有这个注解的就不打印了

                              MixAll.printObjectProperties(console, brokerConfig, true);

                              在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?

                              isolateLogEnable 改为 true

                              if (brokerConfig.isIsolateLogEnable()) { System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()); } if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) { System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId()); }

                              broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?

                              无论是 -p 还是 -m 都是print 输出,本来就是希望打印日志,然后进程停止。

                              opt = new Option("p", "printConfigItem", false, "Print all config item"); opt = new Option("m", "printImportantConfig", false, "Print important config item");

                              总结

                              这些 只是 namestr 的NamesrvController 初始化,更多关于RocketMQ NameServer 解析的资料请关注自由互联其它相关文章!