如何用.NET构建支持长尾关键词的异步延迟队列?

2026-03-30 10:461阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2120个文字,预计阅读时间需要9分钟。

如何用.NET构建支持长尾关键词的异步延迟队列?

目录介绍使用场景案例Redis过期事件配置控制台订阅WebApi订阅RabbitMq延迟队列生产消息消费消息其他方案介绍具有队列特性的队列,支持延迟消费消息的功能。

特性具有队列的常规特性,如:- 队列长度限制- 消费者自动负载均衡- 消息持久化- 支持多种消息格式

延迟消费队列支持延迟消费,可以将消息延迟一段时间后消费。

功能- 延迟消费消息- 支持多种延迟策略- 支持消息过期后自动删除

目录
  • 介绍
  • 使用场景
  • 方案
    • Redis过期事件
      • 配置
      • 控制台订阅
      • WebApi中订阅
    • RabbitMq延迟队列
      • 生产消息
      • 消费消息
    • 其他方案

    介绍

    具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

    如何用.NET构建支持长尾关键词的异步延迟队列?

    使用场景

    延时队列在项目中的应用还是比较多的,尤其像电商类平台:

    • 订单成功后,在30分钟内没有支付,自动取消订单
    • 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
    • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
    • 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

    该介绍来自其他文章

    方案

    下面的例子没有进行封装,所以代码仅供参考

    Redis过期事件

    注意:

    不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大

    不保证一定送达

    发送即忘策略,不包含持久化

    但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

    redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

    配置

    需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

    -- 取消注释 notify-keyspace-events Ex -- 注释 #notify-keyspace-events ""

    然后重新启动服务器,比如windows

    .\redis-server.exe .\redis.windows.conf

    或者linux中使用docker-compose重新部署redis

    redis: container_name: redis image: redis hostname: redis restart: always ports: - "6379:6379" volumes: - $PWD/redis/redis.conf:/etc/redis.conf - /root/common-docker-compose/redis/data:/data command: /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件

    然后使用客户端订阅事件

    -- windows .\redis-cli -- linux docker exec -it 容器标识 redis-cli psubscribe __keyevent@0__:expired

    控制台订阅

    使用StackExchange.Redis组件订阅过期事件

    var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection); var db = connectionMultiplexer.GetDatabase(0); db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10)); Console.WriteLine("开始订阅"); var subscriber = connectionMultiplexer.GetSubscriber(); //订阅库0的过期通知事件 subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => { Console.WriteLine($"key过期 channel:{channel} key:{key}"); }); Console.ReadLine();

    输出结果:

    key过期 channel:keyevent@0:expired key:orderno:123456

    如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

    WebApi中订阅

    创建RedisListenService继承自:BackgroundService

    public class RedisListenService : BackgroundService { private readonly ISubscriber _subscriber; public RedisListenService(IServiceScopeFactory serviceScopeFactory) { using var scope = serviceScopeFactory.CreateScope(); var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>(); var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]); var db = connectionMultiplexer.GetDatabase(0); _subscriber = connectionMultiplexer.GetSubscriber(); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { //订阅库0的过期通知事件 _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => { Console.WriteLine($"key过期 channel:{channel} key:{key}"); }); return Task.CompletedTask; } }

    注册该后台服务

    services.AddHostedService<RedisListenService>();

    启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

    RabbitMq延迟队列

    版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

    要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

    插件介绍:blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

    下载地址:github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

    docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine

    进入容器

    docker exec -it 容器名称/标识 bash

    启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    查看是否启用

    rabbitmq-plugins list

    [E]和[e]表示启用,然后重启服务

    rabbitmq-server restart

    然后在管理界面添加交换机可以看到

    生产消息

    发送的消息类型是:x-delayed-message

    [HttpGet("send/delay")] public string SendDelayedMessage() { var factory = new ConnectionFactory() { HostName = "localhost",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "123456",//用户密码 VirtualHost = "customer" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); var exchangeName = "delay-exchange"; var routingkey = "delay.delay"; var queueName = "delay_queueName"; //设置Exchange队列类型 var argMaps = new Dictionary<string, object>() { {"x-delayed-type", "topic"} }; //设置当前消息为延时队列 channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps); channel.QueueDeclare(queueName, true, false, false, argMaps); channel.QueueBind(queueName, exchangeName, routingkey); var time = 1000 * 5; var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}"; var body = Encoding.UTF8.GetBytes(message); var props = channel.CreateBasicProperties(); //设置消息的过期时间 props.Headers = new Dictionary<string, object>() { { "x-delay", time } }; channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body); Console.WriteLine("成功发送消息:" + message); return "success"; }

    消费消息

    消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

    public class RabbitmqDelayedHostService : BackgroundService { private readonly IModel _channel; private readonly IConnection _connection; public RabbitmqDelayedHostService() { var connFactory = new ConnectionFactory//创建连接工厂对象 { HostName = "localhost",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "123456",//用户密码 VirtualHost = "customer" }; _connection = connFactory.CreateConnection(); _channel = _connection.CreateModel(); //交换机名称 var exchangeName = "exchangeDelayed"; var queueName = "delay_queueName"; var routingkey = "delay.delay"; var argMaps = new Dictionary<string, object>() { {"x-delayed-type", "topic"} }; _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps); _channel.QueueDeclare(queueName, true, false, false, argMaps); _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey); //声明为手动确认 _channel.BasicQos(0, 1, false); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { var queueName = "delay_queueName"; var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} "); //手动确认 _channel.BasicAck(ea.DeliveryTag, true); }; _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); return Task.CompletedTask; } public override void Dispose() { _connection.Dispose(); _channel.Dispose(); base.Dispose(); } }

    注册该后台任务

    services.AddHostedService<RabbitmqDelayedHostService>();

    输出结果

    成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

    成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

    成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

    成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

    成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

    成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

    接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

    接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

    接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

    接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

    接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

    接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

    其他方案

    • Hangfire延迟队列

    BackgroundJob.Schedule( () => Console.WriteLine("Delayed!"), TimeSpan.FromDays(7));

    • 时间轮
    • Redisson DelayQueue
    • 计时管理器

    到此这篇关于.Net实现延迟队列的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持易盾网络。

    本文共计2120个文字,预计阅读时间需要9分钟。

    如何用.NET构建支持长尾关键词的异步延迟队列?

    目录介绍使用场景案例Redis过期事件配置控制台订阅WebApi订阅RabbitMq延迟队列生产消息消费消息其他方案介绍具有队列特性的队列,支持延迟消费消息的功能。

    特性具有队列的常规特性,如:- 队列长度限制- 消费者自动负载均衡- 消息持久化- 支持多种消息格式

    延迟消费队列支持延迟消费,可以将消息延迟一段时间后消费。

    功能- 延迟消费消息- 支持多种延迟策略- 支持消息过期后自动删除

    目录
    • 介绍
    • 使用场景
    • 方案
      • Redis过期事件
        • 配置
        • 控制台订阅
        • WebApi中订阅
      • RabbitMq延迟队列
        • 生产消息
        • 消费消息
      • 其他方案

      介绍

      具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

      如何用.NET构建支持长尾关键词的异步延迟队列?

      使用场景

      延时队列在项目中的应用还是比较多的,尤其像电商类平台:

      • 订单成功后,在30分钟内没有支付,自动取消订单
      • 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
      • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
      • 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

      该介绍来自其他文章

      方案

      下面的例子没有进行封装,所以代码仅供参考

      Redis过期事件

      注意:

      不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大

      不保证一定送达

      发送即忘策略,不包含持久化

      但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

      redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

      配置

      需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

      -- 取消注释 notify-keyspace-events Ex -- 注释 #notify-keyspace-events ""

      然后重新启动服务器,比如windows

      .\redis-server.exe .\redis.windows.conf

      或者linux中使用docker-compose重新部署redis

      redis: container_name: redis image: redis hostname: redis restart: always ports: - "6379:6379" volumes: - $PWD/redis/redis.conf:/etc/redis.conf - /root/common-docker-compose/redis/data:/data command: /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件

      然后使用客户端订阅事件

      -- windows .\redis-cli -- linux docker exec -it 容器标识 redis-cli psubscribe __keyevent@0__:expired

      控制台订阅

      使用StackExchange.Redis组件订阅过期事件

      var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection); var db = connectionMultiplexer.GetDatabase(0); db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10)); Console.WriteLine("开始订阅"); var subscriber = connectionMultiplexer.GetSubscriber(); //订阅库0的过期通知事件 subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => { Console.WriteLine($"key过期 channel:{channel} key:{key}"); }); Console.ReadLine();

      输出结果:

      key过期 channel:keyevent@0:expired key:orderno:123456

      如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

      WebApi中订阅

      创建RedisListenService继承自:BackgroundService

      public class RedisListenService : BackgroundService { private readonly ISubscriber _subscriber; public RedisListenService(IServiceScopeFactory serviceScopeFactory) { using var scope = serviceScopeFactory.CreateScope(); var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>(); var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]); var db = connectionMultiplexer.GetDatabase(0); _subscriber = connectionMultiplexer.GetSubscriber(); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { //订阅库0的过期通知事件 _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => { Console.WriteLine($"key过期 channel:{channel} key:{key}"); }); return Task.CompletedTask; } }

      注册该后台服务

      services.AddHostedService<RedisListenService>();

      启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

      RabbitMq延迟队列

      版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

      要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

      插件介绍:blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

      下载地址:github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

      将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

      docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine

      进入容器

      docker exec -it 容器名称/标识 bash

      启用插件

      rabbitmq-plugins enable rabbitmq_delayed_message_exchange

      查看是否启用

      rabbitmq-plugins list

      [E]和[e]表示启用,然后重启服务

      rabbitmq-server restart

      然后在管理界面添加交换机可以看到

      生产消息

      发送的消息类型是:x-delayed-message

      [HttpGet("send/delay")] public string SendDelayedMessage() { var factory = new ConnectionFactory() { HostName = "localhost",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "123456",//用户密码 VirtualHost = "customer" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); var exchangeName = "delay-exchange"; var routingkey = "delay.delay"; var queueName = "delay_queueName"; //设置Exchange队列类型 var argMaps = new Dictionary<string, object>() { {"x-delayed-type", "topic"} }; //设置当前消息为延时队列 channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps); channel.QueueDeclare(queueName, true, false, false, argMaps); channel.QueueBind(queueName, exchangeName, routingkey); var time = 1000 * 5; var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}"; var body = Encoding.UTF8.GetBytes(message); var props = channel.CreateBasicProperties(); //设置消息的过期时间 props.Headers = new Dictionary<string, object>() { { "x-delay", time } }; channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body); Console.WriteLine("成功发送消息:" + message); return "success"; }

      消费消息

      消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

      public class RabbitmqDelayedHostService : BackgroundService { private readonly IModel _channel; private readonly IConnection _connection; public RabbitmqDelayedHostService() { var connFactory = new ConnectionFactory//创建连接工厂对象 { HostName = "localhost",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "123456",//用户密码 VirtualHost = "customer" }; _connection = connFactory.CreateConnection(); _channel = _connection.CreateModel(); //交换机名称 var exchangeName = "exchangeDelayed"; var queueName = "delay_queueName"; var routingkey = "delay.delay"; var argMaps = new Dictionary<string, object>() { {"x-delayed-type", "topic"} }; _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps); _channel.QueueDeclare(queueName, true, false, false, argMaps); _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey); //声明为手动确认 _channel.BasicQos(0, 1, false); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { var queueName = "delay_queueName"; var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} "); //手动确认 _channel.BasicAck(ea.DeliveryTag, true); }; _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); return Task.CompletedTask; } public override void Dispose() { _connection.Dispose(); _channel.Dispose(); base.Dispose(); } }

      注册该后台任务

      services.AddHostedService<RabbitmqDelayedHostService>();

      输出结果

      成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

      成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

      成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

      成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

      成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

      成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

      接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

      接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

      接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

      接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

      接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

      接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

      其他方案

      • Hangfire延迟队列

      BackgroundJob.Schedule( () => Console.WriteLine("Delayed!"), TimeSpan.FromDays(7));

      • 时间轮
      • Redisson DelayQueue
      • 计时管理器

      到此这篇关于.Net实现延迟队列的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持易盾网络。