什么是消息队列?在Node.js中如何高效利用消息队列实现异步处理?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1930个文字,预计阅读时间需要8分钟。
消息队列是什么?消息队列是一种用于异步通信的系统,它允许消息在生产者和消费者之间传递,而无需它们直接交互。消息在队列中以顺序存储,生产者将消息放入队列,消费者从队列中取出并处理这些消息。
下面简要介绍了消息队列的基本概念,并说明了在Node.js中如何使用消息队列。
1. 消息队列消息队列(Message Queue)是一种用于消息传递的系统,它允许消息的生产者将消息发送到一个队列中,然后由消费者从队列中获取并处理这些消息。这种方式可以解耦消息的发送者和接收者,提高系统的可靠性和可伸缩性。
消息队列的基本工作原理如下:- 生产者将消息放入队列。- 消费者从队列中取出消息进行处理。
这种方式的好处包括:- 异步处理:允许生产者和消费者在不同的时间处理消息,提高了系统的响应性和可伸缩性。- 解耦:生产者和消费者无需直接交互,降低了系统的耦合度。- 可靠性:即使消费者失败,消息也不会丢失,因为它们会一直存储在队列中。
2. Node.js中的消息队列在Node.js中,可以使用各种消息队列实现,如RabbitMQ、Kafka、ActiveMQ等。以下是在Node.js中使用消息队列的简单示例:
javascript// 使用RabbitMQ的例子const amqp=require('amqplib/callback_api');
amqp.connect('amqp://localhost', (err, conn)=> { conn.createChannel((err, ch)=> { const q='hello';
ch.assertQueue(q, { durable: true }); console.log(' [*] Waiting for messages in %s. To exit press CTRL+C', q);
ch.consume(q, (msg)=> { console.log(' [x] Received %s', msg.content.toString()); }, { noAck: true }); });
setTimeout(()=> { conn.close(); process.exit(0); }, 500);});
这段代码使用RabbitMQ创建了一个队列,并启动了一个消费者来处理队列中的消息。这是一个基本的Node.js消息队列使用示例,希望对大家有所帮助。
什么是消息队列?下面本篇文章带大家了解一下消息队列的基本概念,介绍一下node中如何使用消息队列,希望对大家有所帮助!1.消息队列
什么是消息队列
消息队列就是消息的传输过程中保存消息的容器,本质是一个队列(先进先出)
消息指的是需要传输的数据,可以是一些文本,字符串,或者是对象等信息。
消息队列则是两个应用间的通信服务,消息的产生者将数据存放到消息队列中就可以立即返回,不需要等待消息的接收者应答。即:生产者保证数据插入队列,谁来取这条消息不需要管。消息的接收者则只专注于接受消息并处理。
消息队列能做什么
解耦 上面介绍了,消息队列将消息的生产者和消息的接收者分开,彼此都不受影响。
异步 异步就是为了减少请求的响应时间,消息的生产者只需要处理简单的逻辑,并将数据放到消息队列中即可返回,复杂的逻辑,比如:数据库操作,IO操作由消息的接收者处理。
削峰 消息队列应用在服务时,能将瞬时大量涌入的请求信息保存到消息队列中,并立即返回。再由消息的接收者根据数据处理请求。
应用场景 游戏活动,秒杀活动,下单等会造成瞬时流量暴增的应用。
2.消息队列的概念
介绍完消息队列的基本信息,在开发消息队列之前先介绍一下消息队列的一些基本概念~
消息的生产者(producer)与消费者(customer)
上文提到的生产者与消费者,提供的是
链接,通道与队列
链接(connection):表示服务程序与消息队列之间的一条链接。一个服务程序可以创建多条链接。
通道(channel):消息队列链接之间的一个通,一个链接可以有多个通道。
队列(queue):消息队列中存放数据的队列,一个消息队列服务可以有多个队列。
总结一下,链接,通道队列之间的关系是这样的
交换机(exchange)
消息队列发送消息时必须要有一个交换机,如果没有指定则用的是默认的交换机。交换机的作用就是将消息才推到对应的队列中。消息队列中一共有4种交换机
Direct: 指定队列模式,消息来了,只发给指定的Queue,其他Queue都收不到。
fanout: 广播模式,消息来了,就会发送给所有的队列。
topic: 模糊匹配模式,通过模糊匹配的方式进行相应转发。
header: 与Direct模式类似。
3.node使用rabbitMQ
安装rabbitMQ
- 安装rabbitMQ可以通过官网上进行下载安装,传送门
- MAC可以直接用brew命令安装
brew install rabbitmq
- 安装完成后启动rabbitmq服务
然后再本地中访问 localhost:15672/ 就可以看到rabbitmq服务的后台。初始的账号密码均为 guest
node项目安装amqplib
amqplib是node中使用消息队列的一套工具,可以让我们快速地使用消息队列
地址:www.npmjs.com/package/amqplib
创建生产者
/** product.js 消费者 */ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { const connection = await amqplib.connect(connectUrl); const channel = await connection.createChannel(); const exchangeName = 'testExchange'; const key = 'testQueue'; const sendMsg = 'hello rabbitmq'; // 知道交换机类型 await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一个队列 await channel.assertQueue(key); for (let i = 0; i < 100; i++) { channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } await channel.close(); await connection.close(); })();
运行后在后台可以看到新增了一个有100条消息的队列
创建消费者
/** customer.js 消费者 */ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { let connection = await amqplib.connect(connectUrl); const exchangeName = 'testExchange'; const key = 'testQueue'; // 创建两个通道 const channel1 = await connection.createChannel(); const channel2 = await connection.createChannel(); // 指定一个交换机 await channel1.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一个队列 await channel1.assertQueue(key); await channel1.bindQueue(key, exchangeName, key); channel1.consume(key, (msg) => { console.log('channel 1', msg.content.toString()); }); await channel2.assertExchange(exchangeName, 'fanout', { durable: true, }); await channel2.assertQueue(key); await channel2.bindQueue(key, exchangeName, key); channel2.consume(key, (msg) => { console.log('channel 2', msg.content.toString()); }); })();
执行后可以看到,两个通道可以同时工作接收消息
更多node相关知识,请访问:nodejs 教程!
以上就是什么是消息队列?node中如何使用消息队列?的详细内容,更多请关注自由互联其它相关文章!
本文共计1930个文字,预计阅读时间需要8分钟。
消息队列是什么?消息队列是一种用于异步通信的系统,它允许消息在生产者和消费者之间传递,而无需它们直接交互。消息在队列中以顺序存储,生产者将消息放入队列,消费者从队列中取出并处理这些消息。
下面简要介绍了消息队列的基本概念,并说明了在Node.js中如何使用消息队列。
1. 消息队列消息队列(Message Queue)是一种用于消息传递的系统,它允许消息的生产者将消息发送到一个队列中,然后由消费者从队列中获取并处理这些消息。这种方式可以解耦消息的发送者和接收者,提高系统的可靠性和可伸缩性。
消息队列的基本工作原理如下:- 生产者将消息放入队列。- 消费者从队列中取出消息进行处理。
这种方式的好处包括:- 异步处理:允许生产者和消费者在不同的时间处理消息,提高了系统的响应性和可伸缩性。- 解耦:生产者和消费者无需直接交互,降低了系统的耦合度。- 可靠性:即使消费者失败,消息也不会丢失,因为它们会一直存储在队列中。
2. Node.js中的消息队列在Node.js中,可以使用各种消息队列实现,如RabbitMQ、Kafka、ActiveMQ等。以下是在Node.js中使用消息队列的简单示例:
javascript// 使用RabbitMQ的例子const amqp=require('amqplib/callback_api');
amqp.connect('amqp://localhost', (err, conn)=> { conn.createChannel((err, ch)=> { const q='hello';
ch.assertQueue(q, { durable: true }); console.log(' [*] Waiting for messages in %s. To exit press CTRL+C', q);
ch.consume(q, (msg)=> { console.log(' [x] Received %s', msg.content.toString()); }, { noAck: true }); });
setTimeout(()=> { conn.close(); process.exit(0); }, 500);});
这段代码使用RabbitMQ创建了一个队列,并启动了一个消费者来处理队列中的消息。这是一个基本的Node.js消息队列使用示例,希望对大家有所帮助。
什么是消息队列?下面本篇文章带大家了解一下消息队列的基本概念,介绍一下node中如何使用消息队列,希望对大家有所帮助!1.消息队列
什么是消息队列
消息队列就是消息的传输过程中保存消息的容器,本质是一个队列(先进先出)
消息指的是需要传输的数据,可以是一些文本,字符串,或者是对象等信息。
消息队列则是两个应用间的通信服务,消息的产生者将数据存放到消息队列中就可以立即返回,不需要等待消息的接收者应答。即:生产者保证数据插入队列,谁来取这条消息不需要管。消息的接收者则只专注于接受消息并处理。
消息队列能做什么
解耦 上面介绍了,消息队列将消息的生产者和消息的接收者分开,彼此都不受影响。
异步 异步就是为了减少请求的响应时间,消息的生产者只需要处理简单的逻辑,并将数据放到消息队列中即可返回,复杂的逻辑,比如:数据库操作,IO操作由消息的接收者处理。
削峰 消息队列应用在服务时,能将瞬时大量涌入的请求信息保存到消息队列中,并立即返回。再由消息的接收者根据数据处理请求。
应用场景 游戏活动,秒杀活动,下单等会造成瞬时流量暴增的应用。
2.消息队列的概念
介绍完消息队列的基本信息,在开发消息队列之前先介绍一下消息队列的一些基本概念~
消息的生产者(producer)与消费者(customer)
上文提到的生产者与消费者,提供的是
链接,通道与队列
链接(connection):表示服务程序与消息队列之间的一条链接。一个服务程序可以创建多条链接。
通道(channel):消息队列链接之间的一个通,一个链接可以有多个通道。
队列(queue):消息队列中存放数据的队列,一个消息队列服务可以有多个队列。
总结一下,链接,通道队列之间的关系是这样的
交换机(exchange)
消息队列发送消息时必须要有一个交换机,如果没有指定则用的是默认的交换机。交换机的作用就是将消息才推到对应的队列中。消息队列中一共有4种交换机
Direct: 指定队列模式,消息来了,只发给指定的Queue,其他Queue都收不到。
fanout: 广播模式,消息来了,就会发送给所有的队列。
topic: 模糊匹配模式,通过模糊匹配的方式进行相应转发。
header: 与Direct模式类似。
3.node使用rabbitMQ
安装rabbitMQ
- 安装rabbitMQ可以通过官网上进行下载安装,传送门
- MAC可以直接用brew命令安装
brew install rabbitmq
- 安装完成后启动rabbitmq服务
然后再本地中访问 localhost:15672/ 就可以看到rabbitmq服务的后台。初始的账号密码均为 guest
node项目安装amqplib
amqplib是node中使用消息队列的一套工具,可以让我们快速地使用消息队列
地址:www.npmjs.com/package/amqplib
创建生产者
/** product.js 消费者 */ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { const connection = await amqplib.connect(connectUrl); const channel = await connection.createChannel(); const exchangeName = 'testExchange'; const key = 'testQueue'; const sendMsg = 'hello rabbitmq'; // 知道交换机类型 await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一个队列 await channel.assertQueue(key); for (let i = 0; i < 100; i++) { channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } await channel.close(); await connection.close(); })();
运行后在后台可以看到新增了一个有100条消息的队列
创建消费者
/** customer.js 消费者 */ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { let connection = await amqplib.connect(connectUrl); const exchangeName = 'testExchange'; const key = 'testQueue'; // 创建两个通道 const channel1 = await connection.createChannel(); const channel2 = await connection.createChannel(); // 指定一个交换机 await channel1.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一个队列 await channel1.assertQueue(key); await channel1.bindQueue(key, exchangeName, key); channel1.consume(key, (msg) => { console.log('channel 1', msg.content.toString()); }); await channel2.assertExchange(exchangeName, 'fanout', { durable: true, }); await channel2.assertQueue(key); await channel2.bindQueue(key, exchangeName, key); channel2.consume(key, (msg) => { console.log('channel 2', msg.content.toString()); }); })();
执行后可以看到,两个通道可以同时工作接收消息
更多node相关知识,请访问:nodejs 教程!
以上就是什么是消息队列?node中如何使用消息队列?的详细内容,更多请关注自由互联其它相关文章!

