如何深入理解activeMQ的源码实现?

2026-05-05 23:042阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1699个文字,预计阅读时间需要7分钟。

如何深入理解activeMQ的源码实现?

一、hello word,public class Pro_demo_1 { /* 第一步:建立ConnectionFactory工厂对象,需要填写用户名、密码以及要连接的地址,常用默认即可,端口为61616。 第二步:通过ConnectionFactory创建Connection连接对象。 */

一、hello word

public class Pro_demo_1 { /** * 第一步:建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认即可,默认端口为"tcp://localhost:61616"。 * 第二步:通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的。 * 第三步:通过Connection对象创建Session会话(上下文环境对象),用于接受消息,参数配置1为是否开启是事务,参数配置2位签收模式,一般我们设置自动签收。 * 第四部:通过Session创建Destination对象,指的是一个客户端用来指定生产消费者目标和消费消息来源的对象。在PTP模式中,Destination被称作Queue即队列; * 在Pub/Sub模式,Destination被称作主题。在程序中可以使用多个Queue和Topic。 * 第五步:我们需要通过Session对象创建消息的发送和接受对象(生产者和消费者)MessageProducer/MessageConsumer。 * 第六步:我们可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)。 * 第七补:最后我们使用JMS规范TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法进行 * 接受数据。最后不要忘记关闭Connection连接。 */ public static void main(String[] args) throws Exception{ // 创建一个链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://localhost:61616"); // 从工厂中创建一个链接 Connection connection = connectionFactory.createConnection(); connection.start(); // 创建一个事务(这里通过参数可以设置事务的级别) Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); // 创建一个消息队列 Queue queue = session.createQueue("queue1"); // 创建生产者 MessageProducer messageProducer = session.createProducer(queue); // 设置持久化特性和非持久化特性 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // 创建数据 TextMessage textMessage = session.createTextMessage("生产者生产"); messageProducer.send(textMessage); session.commit(); if(connection != null) { connection.close(); } session.close(); connection.close(); } }

过程分析

1创建ActiveMQConnectionFactory

创建ActiveMQConnectionFactory入参是url,指定schema以及要连接的ip和端口号,

2 创建ActiveMQConnection

创建ActiveMQConnection,tcp协议交互肯定是要使用Socket类,所以说明下ActiveMQConnection->Transport->Socket的关系,Transport是对Socket的封装,而ActiveMQConnection则是对Transport的封装,如下图所示:

代码如下:

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { // 创建Transport类 Transport transport = createTransport(); connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); configureConnection(connection); // 创建连接 transport.start(); if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; } catch (JMSException e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw e; } catch (Exception e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); } } 2.1 创建Transport


创建Transport ,因为上图已经说明ActiveMQConnection和Transport是组合关系,所以创建ActiveMQConnection时首先要创建Transport,因为ActiveMQ的交互方式分为Tcp、Udp以及HTTP协议,ActiveMQ使用了非常经典的简单工厂设计模式,使用这个模式的好处是工厂可以根据uri的schema头动态创建相应的TransportFactory工厂,例如用户输入tcp://localhost:61616,ObjectFactory则可以获取到schema是tcp然后来实例化TcpTransportFactory,然后在调用TcpTransportFactory工厂来生产TcpTransport对象,简单工厂模式如下图,我是把2个工厂画到了一起:

代码如下:

protected Transport createTransport() throws JMSException { try { return TransportFactory.connect(brokerURL); } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } } public static Transport connect(URI location) throws Exception { // 根据uri的schema头动态创建相应的TransportFactory工厂 TransportFactory tf = findTransportFactory(location); // 创建Transport,封装来实现相应的业务处理 return tf.doConnect(location); }

获取对应的TransportFacotry工厂

private static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { // Try to load if from a META-INF property. try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }

创建Transport,封装来实现相应的业务处理

public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); // 创建WireFormat WireFormat wf = createWireFormat(options); // 创建TcpTransport Transport transport = createTransport(location, wf); // 封装来实现相应的业务处理 Transport rc = configure(transport, wf, options); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }

创建Transport,这里只是创建了一个空的Sokcet,没有建立连接,

protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { URI localLocation = null; String path = location.getPath(); // see if the path is a local URI location if (path != null && path.length() > 0) { int localPortIndex = path.indexOf(':'); try { Integer.parseInt(path.substring(localPortIndex + 1, path.length())); String localString = location.getScheme() + ":/" + path; localLocation = new URI(localString); } catch (Exception e) { LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage()); if(LOG.isDebugEnabled()) { LOG.debug("Failure detail", e); } } } SocketFactory socketFactory = createSocketFactory(); return createTcpTransport(wf, socketFactory, location, localLocation); } public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { this.wireFormat = wireFormat; this.socketFactory = socketFactory; try { // 创建空的Socket,没有建立连接 this.socket = socketFactory.createSocket(); } catch (SocketException e) { this.socket = null; } this.remoteLocation = remoteLocation; this.localLocation = localLocation; setDaemon(false); }

封装来实现相应的业务处理,此时ResponseCorrelator持有MutexTransportFilter,MutexTransportFilter持有WireFormatNegotiator,WireFormatNegotiator持有InactivityMonitor,InactivityMonitor持有TcpTransport,当建立连接或获取参数时一次调用

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { // 创建InactivityMonitor和WireFormatNegotiator transport = compositeConfigure(transport, wf, options); // 创建MutexTransportFilter transport = new MutexTransport(transport); // 创建ResponseCorrelator transport = new ResponseCorrelator(transport); return transport; }

MutexTransportFilter类实现了对每个请求的同步锁,同一时间只允许发送一个请求,如果有第二个请求需要等待第一个请求发送完毕才可继续发送。
WireFormatNegotiator类实现了在客户端连接broker的时候先发送数据解析相关的协议信息,例如解析版本号,是否使用缓存等信息。
InactivityMonitor类实现了连接成功后启动心跳检查机制,客户端每10秒发送一次心跳信息,服务端每30秒读一次心跳信息,如果没有读到则会断开连接,心跳检测是相互的,客户端也会每30秒读取服务端发送来的心跳信息,如果没有读到也一样会断开连接。
ResponseCorrelator类实现了异步请求但需要获取响应信息否则就会阻塞等待功能。

3 建立连接

从上面我们知道在创建TcpTransport会创建一个没有连接的Socket,并且创建了TcpTransport的包装类, 当transport.start();,会依次建立连接,代码如下

public void start() throws Exception { if (started.compareAndSet(false, true)) { boolean success = false; stopped.set(false); try { preStart(); doStart(); success = true; } finally { started.set(success); } for(ServiceListener l:this.serviceListeners) { l.started(this); } } }

@Override protected void doStart() throws Exception { connect(); stoppedLatch.set(new CountDownLatch(1)); super.doStart(); }

protected void connect() throws Exception { if (socket == null && socketFactory == null) { throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); } InetSocketAddress localAddress = null; InetSocketAddress remoteAddress = null; if (localLocation != null) { localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); } if (remoteLocation != null) { String host = resolveHostName(remoteLocation.getHost()); remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); } // Set the traffic class before the socket is connected when possible so // that the connection packets are given the correct traffic class. this.trafficClassSet = setTrafficClass(socket); if (socket != null) { if (localAddress != null) { socket.bind(localAddress); } // If it's a server accepted socket.. we don't need to connect it // to a remote address. if (remoteAddress != null) { if (connectionTimeout >= 0) { socket.connect(remoteAddress, connectionTimeout); } else { socket.connect(remoteAddress); } } } else { // For SSL sockets.. you can't create an unconnected socket :( // This means the timout option are not supported either. if (localAddress != null) { socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort()); } else { socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); } } initialiseSocket(socket); initializeStreams(); }

TcpTransport实现了Runnable接口,当start()时候也就启动了线程

public void run() { LOG.trace("TCP consumer thread for " + this + " starting"); this.runnerThread=Thread.currentThread(); try { while (!isStopped()) { doRun(); } } catch (IOException e) { stoppedLatch.get().countDown(); onException(e); } catch (Throwable e){ stoppedLatch.get().countDown(); IOException ioe=new IOException("Unexpected error occured: " + e); ioe.initCause(e); onException(ioe); }finally { stoppedLatch.get().countDown(); } } protected void doRun() throws IOException { try { // 获取数据,消费 Object command = readCommand(); doConsume(command); } catch (SocketTimeoutException e) { } catch (InterruptedIOException e) { } } 总结:

如何深入理解activeMQ的源码实现?

本文共计1699个文字,预计阅读时间需要7分钟。

如何深入理解activeMQ的源码实现?

一、hello word,public class Pro_demo_1 { /* 第一步:建立ConnectionFactory工厂对象,需要填写用户名、密码以及要连接的地址,常用默认即可,端口为61616。 第二步:通过ConnectionFactory创建Connection连接对象。 */

一、hello word

public class Pro_demo_1 { /** * 第一步:建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认即可,默认端口为"tcp://localhost:61616"。 * 第二步:通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的。 * 第三步:通过Connection对象创建Session会话(上下文环境对象),用于接受消息,参数配置1为是否开启是事务,参数配置2位签收模式,一般我们设置自动签收。 * 第四部:通过Session创建Destination对象,指的是一个客户端用来指定生产消费者目标和消费消息来源的对象。在PTP模式中,Destination被称作Queue即队列; * 在Pub/Sub模式,Destination被称作主题。在程序中可以使用多个Queue和Topic。 * 第五步:我们需要通过Session对象创建消息的发送和接受对象(生产者和消费者)MessageProducer/MessageConsumer。 * 第六步:我们可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)。 * 第七补:最后我们使用JMS规范TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法进行 * 接受数据。最后不要忘记关闭Connection连接。 */ public static void main(String[] args) throws Exception{ // 创建一个链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://localhost:61616"); // 从工厂中创建一个链接 Connection connection = connectionFactory.createConnection(); connection.start(); // 创建一个事务(这里通过参数可以设置事务的级别) Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); // 创建一个消息队列 Queue queue = session.createQueue("queue1"); // 创建生产者 MessageProducer messageProducer = session.createProducer(queue); // 设置持久化特性和非持久化特性 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // 创建数据 TextMessage textMessage = session.createTextMessage("生产者生产"); messageProducer.send(textMessage); session.commit(); if(connection != null) { connection.close(); } session.close(); connection.close(); } }

过程分析

1创建ActiveMQConnectionFactory

创建ActiveMQConnectionFactory入参是url,指定schema以及要连接的ip和端口号,

2 创建ActiveMQConnection

创建ActiveMQConnection,tcp协议交互肯定是要使用Socket类,所以说明下ActiveMQConnection->Transport->Socket的关系,Transport是对Socket的封装,而ActiveMQConnection则是对Transport的封装,如下图所示:

代码如下:

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { // 创建Transport类 Transport transport = createTransport(); connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); configureConnection(connection); // 创建连接 transport.start(); if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; } catch (JMSException e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw e; } catch (Exception e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); } } 2.1 创建Transport


创建Transport ,因为上图已经说明ActiveMQConnection和Transport是组合关系,所以创建ActiveMQConnection时首先要创建Transport,因为ActiveMQ的交互方式分为Tcp、Udp以及HTTP协议,ActiveMQ使用了非常经典的简单工厂设计模式,使用这个模式的好处是工厂可以根据uri的schema头动态创建相应的TransportFactory工厂,例如用户输入tcp://localhost:61616,ObjectFactory则可以获取到schema是tcp然后来实例化TcpTransportFactory,然后在调用TcpTransportFactory工厂来生产TcpTransport对象,简单工厂模式如下图,我是把2个工厂画到了一起:

代码如下:

protected Transport createTransport() throws JMSException { try { return TransportFactory.connect(brokerURL); } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } } public static Transport connect(URI location) throws Exception { // 根据uri的schema头动态创建相应的TransportFactory工厂 TransportFactory tf = findTransportFactory(location); // 创建Transport,封装来实现相应的业务处理 return tf.doConnect(location); }

获取对应的TransportFacotry工厂

private static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { // Try to load if from a META-INF property. try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }

创建Transport,封装来实现相应的业务处理

public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); // 创建WireFormat WireFormat wf = createWireFormat(options); // 创建TcpTransport Transport transport = createTransport(location, wf); // 封装来实现相应的业务处理 Transport rc = configure(transport, wf, options); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }

创建Transport,这里只是创建了一个空的Sokcet,没有建立连接,

protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { URI localLocation = null; String path = location.getPath(); // see if the path is a local URI location if (path != null && path.length() > 0) { int localPortIndex = path.indexOf(':'); try { Integer.parseInt(path.substring(localPortIndex + 1, path.length())); String localString = location.getScheme() + ":/" + path; localLocation = new URI(localString); } catch (Exception e) { LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage()); if(LOG.isDebugEnabled()) { LOG.debug("Failure detail", e); } } } SocketFactory socketFactory = createSocketFactory(); return createTcpTransport(wf, socketFactory, location, localLocation); } public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { this.wireFormat = wireFormat; this.socketFactory = socketFactory; try { // 创建空的Socket,没有建立连接 this.socket = socketFactory.createSocket(); } catch (SocketException e) { this.socket = null; } this.remoteLocation = remoteLocation; this.localLocation = localLocation; setDaemon(false); }

封装来实现相应的业务处理,此时ResponseCorrelator持有MutexTransportFilter,MutexTransportFilter持有WireFormatNegotiator,WireFormatNegotiator持有InactivityMonitor,InactivityMonitor持有TcpTransport,当建立连接或获取参数时一次调用

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { // 创建InactivityMonitor和WireFormatNegotiator transport = compositeConfigure(transport, wf, options); // 创建MutexTransportFilter transport = new MutexTransport(transport); // 创建ResponseCorrelator transport = new ResponseCorrelator(transport); return transport; }

MutexTransportFilter类实现了对每个请求的同步锁,同一时间只允许发送一个请求,如果有第二个请求需要等待第一个请求发送完毕才可继续发送。
WireFormatNegotiator类实现了在客户端连接broker的时候先发送数据解析相关的协议信息,例如解析版本号,是否使用缓存等信息。
InactivityMonitor类实现了连接成功后启动心跳检查机制,客户端每10秒发送一次心跳信息,服务端每30秒读一次心跳信息,如果没有读到则会断开连接,心跳检测是相互的,客户端也会每30秒读取服务端发送来的心跳信息,如果没有读到也一样会断开连接。
ResponseCorrelator类实现了异步请求但需要获取响应信息否则就会阻塞等待功能。

3 建立连接

从上面我们知道在创建TcpTransport会创建一个没有连接的Socket,并且创建了TcpTransport的包装类, 当transport.start();,会依次建立连接,代码如下

public void start() throws Exception { if (started.compareAndSet(false, true)) { boolean success = false; stopped.set(false); try { preStart(); doStart(); success = true; } finally { started.set(success); } for(ServiceListener l:this.serviceListeners) { l.started(this); } } }

@Override protected void doStart() throws Exception { connect(); stoppedLatch.set(new CountDownLatch(1)); super.doStart(); }

protected void connect() throws Exception { if (socket == null && socketFactory == null) { throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); } InetSocketAddress localAddress = null; InetSocketAddress remoteAddress = null; if (localLocation != null) { localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); } if (remoteLocation != null) { String host = resolveHostName(remoteLocation.getHost()); remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); } // Set the traffic class before the socket is connected when possible so // that the connection packets are given the correct traffic class. this.trafficClassSet = setTrafficClass(socket); if (socket != null) { if (localAddress != null) { socket.bind(localAddress); } // If it's a server accepted socket.. we don't need to connect it // to a remote address. if (remoteAddress != null) { if (connectionTimeout >= 0) { socket.connect(remoteAddress, connectionTimeout); } else { socket.connect(remoteAddress); } } } else { // For SSL sockets.. you can't create an unconnected socket :( // This means the timout option are not supported either. if (localAddress != null) { socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort()); } else { socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); } } initialiseSocket(socket); initializeStreams(); }

TcpTransport实现了Runnable接口,当start()时候也就启动了线程

public void run() { LOG.trace("TCP consumer thread for " + this + " starting"); this.runnerThread=Thread.currentThread(); try { while (!isStopped()) { doRun(); } } catch (IOException e) { stoppedLatch.get().countDown(); onException(e); } catch (Throwable e){ stoppedLatch.get().countDown(); IOException ioe=new IOException("Unexpected error occured: " + e); ioe.initCause(e); onException(ioe); }finally { stoppedLatch.get().countDown(); } } protected void doRun() throws IOException { try { // 获取数据,消费 Object command = readCommand(); doConsume(command); } catch (SocketTimeoutException e) { } catch (InterruptedIOException e) { } } 总结:

如何深入理解activeMQ的源码实现?