如何将RabbitMQ与Spring结合,实现长尾词的RPC技术调用?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1556个文字,预计阅读时间需要7分钟。
请请求端配置文件rabbitmq-context-client.xml,并自行重写Spring默认的监听容器。导入以下包:net.nxmax.atp.remoting;org.slf4j.Logger;org.slf4j.LoggerFactory;org.springframework.amqp.core.Queue。
package net.nxmax.atp.remoting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; public class DynamicReplyMessageListenerContainer extends SimpleMessageListenerContainer { protected static final Logger logger = LoggerFactory.getLogger(DynamicReplyMessageListenerContainer.class); @Override protected void doInitialize() throws Exception { logger.info("执行DynamicReplyMessageListenerContainer.doInitialize()"); super.doInitialize(); Object listener = getMessageListener(); if (listener instanceof RabbitTemplate) { //这个队列是服务端的答复队列, Queue queue1 = new Queue("JC_TO_RMPS_QUEUE_1"); Queue queue2 = new Queue("JC_TO_RMPS_QUEUE_2"); setQueues(queue1,queue2); } } } 客户端请求实现类
package com.hjh.rabbitmq.cross; import java.util.concurrent.Semaphore; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 交叉检测业务服务类,通过mq的方式请求 * @author jianhua.huang * */ //@Service("crossDetectionByMq") public class CrossDetectionByMqService { private RabbitTemplate amqpTemplate; //信号量,用来控制并发线程的最大数量 private Semaphore sempahore = new Semaphore(6); public RabbitTemplate getAmqpTemplate() { return amqpTemplate; } public void setAmqpTemplate(RabbitTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } /** * 交叉检测方法入口 * @author jianhua.huang * */ public void doCrossDetection(String msgExt, String routingKey) { //通过mq远程请求交叉检测数据 MessageProperties msgProp = new MessageProperties(); msgProp.setContentType("text"); String str = "凹凸曼,凹凸曼 | " + msgExt + "," + routingKey; Message msg = new Message(str.getBytes(),msgProp); amqpTemplate.setReplyTimeout(11000); Object response = null; long start = System.currentTimeMillis(); try { sempahore.acquire(); System.out.println("发送出去的消息为 : {" + str + "}"); amqpTemplate.setReplyAddress(routingKey); response = amqpTemplate.convertSendAndReceive("rmpsToJcKey", msg); System.out.println("类型 : " + response.getClass().getSimpleName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(msgExt + "释放一个许可"); sempahore.release(); } long end = System.currentTimeMillis(); System.out.println("response 为: {" + response + "},耗时 = " + (end - start)); } } 客户端main启动实现,主要是测试并发
package net.nxmax.atp.startup; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.hjh.rabbitmq.cross.CrossDetectionByMqService; /** * 非客户端启动类 * @author jianhua.huang * */ public class SimpleClientStartup implements Runnable { private String routeKey; private CountDownLatch countDown; private CrossDetectionByMqService cross; public SimpleClientStartup(String routeKey, CountDownLatch countDown, CrossDetectionByMqService cross) { this.routeKey = routeKey; this.countDown = countDown; this.cross = cross; } public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("rpc/fixq/rabbitmq-context-client.xml"); CrossDetectionByMqService cross = ctx.getBean("crossDetection",CrossDetectionByMqService.class); cross.doCrossDetection(Thread.currentThread().getName(), "JC_TO_RMPS_KEY_1"); // ExecutorService service = Executors.newCachedThreadPool(); // final CountDownLatch countDown = new CountDownLatch(10); // // String[] keys = {"JC_TO_RMPS_KEY_1","JC_TO_RMPS_KEY_2"}; // // for(int i=0;i<10;i++) // { // int idx = i % 2; // String routeKey = keys[idx]; // service.execute(new SimpleClientStartup(routeKey, countDown, cross)); // countDown.countDown(); // } // service.shutdown(); } @Override public void run() { try { countDown.await(); cross.doCrossDetection(Thread.currentThread().getName(), routeKey); } catch (Exception e) { e.printStackTrace(); } } public void testCountDown() { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for(int i=0;i<3;i++) { Runnable runnable = new Runnable() { public void run() { try { System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令"); cdOrder.await(); System.out.println("线程" + Thread.currentThread().getName() + "已接受命令"); // Thread.sleep((long)(Math.random()*10000)); cdAnswer.countDown(); System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果"); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令"); cdOrder.countDown(); System.out.println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果"); cdAnswer.await(); System.out.println("线程" + Thread.currentThread().getName() + "已收到所有响应结果"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } } 服务端配置文件applicationContext.xml
package com.hjh.rabbitmq.cross; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; @Service public class CrossDetectionConsumer implements MessageListener { private RabbitTemplate amqpTemplate; public void onMessage(Message message) { String msg = new String(message.getBody()); String json = "模拟交叉检测平台的响应信息, 响应线程名称为rpc 请求!"; MessageProperties msgProp = message.getMessageProperties(); String replyTo = msgProp.getReplyTo(); System.out.println("replyTo = " + replyTo); System.out.println("当前处理线程名称 : " + Thread.currentThread().getName()); Message responseMsg = new Message(json.getBytes(),message.getMessageProperties()); //"jcToRmpsKey" try { Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception } amqpTemplate.send(replyTo, responseMsg); } public RabbitTemplate getAmqpTemplate() { return amqpTemplate; } public void setAmqpTemplate(RabbitTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } } 服务端main启动类
package net.nxmax.atp.startup; import java.io.BufferedReader; import java.io.InputStreamReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ServerStartup { /** Logger */ private static final Logger log = LoggerFactory .getLogger(ServerStartup.class); public static void main(String[] args) throws Exception { new ServerStartup().go(); } private void go() throws Exception { log.info("Create ApplicationContext"); ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("rpc/fixq/applicationContext.xml"); AmqpTemplate amqpTemplate = ctx.getBean("amqpTemplateInternetProxy",AmqpTemplate.class); // amqpTemplate.convertAndSend("", ""); log.info("Create ApplicationContext Finish."); BufferedReader r = new BufferedReader(new InputStreamReader(System.in)); while (!r.readLine().equalsIgnoreCase("exit")) { } log.info("Exit now"); ctx.destroy(); } } 注意: 由于用spring整合rabbitmq来实现rpc调用技术,在客户端(即request端)访问远程服务器的时候,其本质就是用jdk的 阻塞队列做的线程同步,因此并不限于服务器到底是使用异步的rabbitmq还是同步的rpc机制,这个总结是基于作者阅读 源码发现的。
本文共计1556个文字,预计阅读时间需要7分钟。
请请求端配置文件rabbitmq-context-client.xml,并自行重写Spring默认的监听容器。导入以下包:net.nxmax.atp.remoting;org.slf4j.Logger;org.slf4j.LoggerFactory;org.springframework.amqp.core.Queue。
package net.nxmax.atp.remoting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; public class DynamicReplyMessageListenerContainer extends SimpleMessageListenerContainer { protected static final Logger logger = LoggerFactory.getLogger(DynamicReplyMessageListenerContainer.class); @Override protected void doInitialize() throws Exception { logger.info("执行DynamicReplyMessageListenerContainer.doInitialize()"); super.doInitialize(); Object listener = getMessageListener(); if (listener instanceof RabbitTemplate) { //这个队列是服务端的答复队列, Queue queue1 = new Queue("JC_TO_RMPS_QUEUE_1"); Queue queue2 = new Queue("JC_TO_RMPS_QUEUE_2"); setQueues(queue1,queue2); } } } 客户端请求实现类
package com.hjh.rabbitmq.cross; import java.util.concurrent.Semaphore; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 交叉检测业务服务类,通过mq的方式请求 * @author jianhua.huang * */ //@Service("crossDetectionByMq") public class CrossDetectionByMqService { private RabbitTemplate amqpTemplate; //信号量,用来控制并发线程的最大数量 private Semaphore sempahore = new Semaphore(6); public RabbitTemplate getAmqpTemplate() { return amqpTemplate; } public void setAmqpTemplate(RabbitTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } /** * 交叉检测方法入口 * @author jianhua.huang * */ public void doCrossDetection(String msgExt, String routingKey) { //通过mq远程请求交叉检测数据 MessageProperties msgProp = new MessageProperties(); msgProp.setContentType("text"); String str = "凹凸曼,凹凸曼 | " + msgExt + "," + routingKey; Message msg = new Message(str.getBytes(),msgProp); amqpTemplate.setReplyTimeout(11000); Object response = null; long start = System.currentTimeMillis(); try { sempahore.acquire(); System.out.println("发送出去的消息为 : {" + str + "}"); amqpTemplate.setReplyAddress(routingKey); response = amqpTemplate.convertSendAndReceive("rmpsToJcKey", msg); System.out.println("类型 : " + response.getClass().getSimpleName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(msgExt + "释放一个许可"); sempahore.release(); } long end = System.currentTimeMillis(); System.out.println("response 为: {" + response + "},耗时 = " + (end - start)); } } 客户端main启动实现,主要是测试并发
package net.nxmax.atp.startup; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.hjh.rabbitmq.cross.CrossDetectionByMqService; /** * 非客户端启动类 * @author jianhua.huang * */ public class SimpleClientStartup implements Runnable { private String routeKey; private CountDownLatch countDown; private CrossDetectionByMqService cross; public SimpleClientStartup(String routeKey, CountDownLatch countDown, CrossDetectionByMqService cross) { this.routeKey = routeKey; this.countDown = countDown; this.cross = cross; } public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("rpc/fixq/rabbitmq-context-client.xml"); CrossDetectionByMqService cross = ctx.getBean("crossDetection",CrossDetectionByMqService.class); cross.doCrossDetection(Thread.currentThread().getName(), "JC_TO_RMPS_KEY_1"); // ExecutorService service = Executors.newCachedThreadPool(); // final CountDownLatch countDown = new CountDownLatch(10); // // String[] keys = {"JC_TO_RMPS_KEY_1","JC_TO_RMPS_KEY_2"}; // // for(int i=0;i<10;i++) // { // int idx = i % 2; // String routeKey = keys[idx]; // service.execute(new SimpleClientStartup(routeKey, countDown, cross)); // countDown.countDown(); // } // service.shutdown(); } @Override public void run() { try { countDown.await(); cross.doCrossDetection(Thread.currentThread().getName(), routeKey); } catch (Exception e) { e.printStackTrace(); } } public void testCountDown() { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for(int i=0;i<3;i++) { Runnable runnable = new Runnable() { public void run() { try { System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令"); cdOrder.await(); System.out.println("线程" + Thread.currentThread().getName() + "已接受命令"); // Thread.sleep((long)(Math.random()*10000)); cdAnswer.countDown(); System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果"); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令"); cdOrder.countDown(); System.out.println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果"); cdAnswer.await(); System.out.println("线程" + Thread.currentThread().getName() + "已收到所有响应结果"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } } 服务端配置文件applicationContext.xml
package com.hjh.rabbitmq.cross; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; @Service public class CrossDetectionConsumer implements MessageListener { private RabbitTemplate amqpTemplate; public void onMessage(Message message) { String msg = new String(message.getBody()); String json = "模拟交叉检测平台的响应信息, 响应线程名称为rpc 请求!"; MessageProperties msgProp = message.getMessageProperties(); String replyTo = msgProp.getReplyTo(); System.out.println("replyTo = " + replyTo); System.out.println("当前处理线程名称 : " + Thread.currentThread().getName()); Message responseMsg = new Message(json.getBytes(),message.getMessageProperties()); //"jcToRmpsKey" try { Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception } amqpTemplate.send(replyTo, responseMsg); } public RabbitTemplate getAmqpTemplate() { return amqpTemplate; } public void setAmqpTemplate(RabbitTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } } 服务端main启动类
package net.nxmax.atp.startup; import java.io.BufferedReader; import java.io.InputStreamReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ServerStartup { /** Logger */ private static final Logger log = LoggerFactory .getLogger(ServerStartup.class); public static void main(String[] args) throws Exception { new ServerStartup().go(); } private void go() throws Exception { log.info("Create ApplicationContext"); ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("rpc/fixq/applicationContext.xml"); AmqpTemplate amqpTemplate = ctx.getBean("amqpTemplateInternetProxy",AmqpTemplate.class); // amqpTemplate.convertAndSend("", ""); log.info("Create ApplicationContext Finish."); BufferedReader r = new BufferedReader(new InputStreamReader(System.in)); while (!r.readLine().equalsIgnoreCase("exit")) { } log.info("Exit now"); ctx.destroy(); } } 注意: 由于用spring整合rabbitmq来实现rpc调用技术,在客户端(即request端)访问远程服务器的时候,其本质就是用jdk的 阻塞队列做的线程同步,因此并不限于服务器到底是使用异步的rabbitmq还是同步的rpc机制,这个总结是基于作者阅读 源码发现的。

