如何在.NET Framework中实战使用阿里云版RocketMQ?
- 内容介绍
- 文章标签
- 相关推荐
本文共计6566个文字,预计阅读时间需要27分钟。
章节+第一篇:https://blog.51cto.com/kimiliucn/7263756第二篇:作者:西瓜程序猿主页传送门:https://blog.51cto.com/kimiliucn开发背景+在开发某一需求时,领导要求使用RocketMQ(阿里云版)
章节
第一章:blog.51cto.com/kimiliucn/7263756
第二章:
作者:西瓜程序猿
主页传送门:blog.51cto.com/kimiliucn
开发背景
在开发某一个需求的时候,领导要求使用RocketMQ(阿里云版) 作为消息队列。使用的版本是5.x,目前也已经没有4.x购买的入口了,所以只能买5.x系列。公司项目还是用的比较老的技术.NET Framework 4.8,生产者主要有WebAPI/MVC/JOB(控制台应用程序),然后消费者采用的是Windows服务进行长链接消费信息。这期间因为各种原因踩过很多坑,然后咨询了客服说RocketMQ(阿里云版)5.0不支持.NET Framework,但最终操作下来竟然能使用(只支持集群模式,不支持订阅模式),那今天[西瓜程序猿]来记录一下如何使用RocketMQ(阿里云版),给各位小伙伴作为参考防止踩坑。
环境版本
阿里云RocketMQ版本:5.0系列
.NET版本:.NET Framework 4.8
.NET版本:生产端(WebAPI/MVC/JOB)、消费端(Windows服务)
如果不知道怎么选,或者不知道怎么买云消息队列RocketMQ(阿里云版)?可以联系我[西瓜程序猿],如果需要特价购买可以通过下面地址访问:
活动地址:官网地址
一、RocketMQ基本介绍
官网地址:RocketMQ · 官方网站 | RocketMQ
RocketMQ阿里云-官方文档:如何快速入门RocketMQ_云消息队列 MQ-阿里云帮助中心
1.1-RocketMQ简介
RocketMQ(Apache RocketMQ)是一个开源的分布式消息中间件系统,由阿里巴巴集团旗下的阿里云计算平台团队开发和维护。它最初是为满足阿里巴巴内部大规模分布式消息传递需求而设计的,后来成为 Apache 基金会的顶级开源项目之一。
1.2-RocketMQ优势
在众多应用场景中广泛应用,如电子商务、物流配送、金融支付、大数据处理等。它被许多企业用于构建高性能和可靠的消息队列系统,实现异步通信和解耦应用程序组件。RocketMQ 提供了可靠、可扩展和高性能的消息传递解决方案,具备以下特点:
- 异步通信:RocketMQ 支持发布-订阅和点对点两种消息通信模式,以满足不同场景下的需求。
- 高可靠性:提供多种存储选项,包括本地文件存储和远程共享存储,以确保消息的可靠传输和持久化。
- 高吞吐量:支持水平扩展,可以轻松应对大规模消息传递和高并发的需求。
- 严格有序性:支持消息按照发送顺序和消费顺序进行有序处理,保证消息的顺序性。
- 分布式架构:采用分布式架构,具备良好的横向扩展能力和高可用性。
1.3-RocketMQ基本概念
此段内容根据阿里云官方文档整理:如何快速入门RocketMQ_云消息队列 MQ-阿里云帮助中心
主题(Topic):云消息队列 RocketMQ 版中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。
消息类型(MessageType):云消息队列 RocketMQ 版中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。云消息队列 RocketMQ 版支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
消息队列(MessageQueue):队列是云消息队列 RocketMQ 版中消息存储和传输的实际容器,也是消息的最小存储单元。云消息队列 RocketMQ 版的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
消息(Message):消息是云消息队列 RocketMQ 版中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到云消息队列 RocketMQ 版服务端,服务端按照相关语义将消息投递到消费端进行消费。
消息视图(MessageView):消息视图是云消息队列 RocketMQ 版面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag):消息标签是云消息队列 RocketMQ 版提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。
消息位点(MessageQueueOffset):消息是按到达云消息队列 RocketMQ 版服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset):一条消息被某个消费者消费完成后不会立即从队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引(MessageKey):消息索引是云消息队列 RocketMQ 版提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer):生产者是云消息队列 RocketMQ 版系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成云消息队列 RocketMQ 版的消息并发送至服务端。
事务检查器(TransactionChecker):云消息队列 RocketMQ 版中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
事务状态(TransactionResolution):云消息队列 RocketMQ 版中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。
消费者分组(ConsumerGroup):消费者分组是云消息队列 RocketMQ 版系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在云消息队列 RocketMQ 版中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
消费者(Consumer):消费者是云消息队列 RocketMQ 版中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从云消息队列 RocketMQ 版服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
消费结果(ConsumeResult):云消息队列 RocketMQ 版中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription):订阅关系是云消息队列 RocketMQ 版系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
消息过滤:消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在云消息队列 RocketMQ 版的服务端完成。
重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到云消息队列 RocketMQ 版服务端的消息。
消息轨迹:在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由云消息队列 RocketMQ 版服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积:生产者已经将消息发送到云消息队列 RocketMQ 版的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在云消息队列 RocketMQ 版的服务端保存着未被消费的消息,该状态即消息堆积。
事务消息:事务消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息:定时/延时消息是云消息队列 RocketMQ 版提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息:顺序消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
二、RocketMQ前期准备
首先需要下载相关.NET相关的SDK,然后在阿里云后台找到等信息,最后还需要创建和用于给我们调用。
2.1-下载资源包及SDK
[西瓜程序猿]给正在看这篇文章的小伙伴提供了资源包,文件夹里面包含使用RocketMQ阿里云版本要依赖的DLL文件,文件夹包含了.NET Framework使用RocketMQ阿里云版本要用到的SDK文件,文件夹包含了Visual C++ 2015运行时环境安装包,因为C++ DLL文件需要依赖这个,这个需要进行安装。还包含其他辅助的工具及代码。
可以访问下载(如果失效了,请联系我)。
下载地址(编码:stalua6n):yongteng.lanzoub.com/ice5a16p978h
密码:1q81
文件截图:
2.2-查询基本配置信息
(1)首先点击下面链接进入消息队列RocketMQ工作台,如果没有登录首先要进行登录。然后在里面找到要操作的地域列表,点击。
消息队列RocketMQ(阿里云版):阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台
(2)然后可以看到实例列表,找到要操作的实例,点击。
(3)然后在中找到和,注意不是实例ID/实例名称。
(5)然后还在当前页面,往下翻到中找到接入点和网络信息。如果大家需要在外网访问自行开通公网访问,好像需要另外付费。[西瓜程序猿]这边只能通过访问,也就是只能在内网访问。所以我以VPC专有网络来介绍。
那我们就把必要的信息都集齐全了,分别是。
2.3-配置Topic和Group
那什么是Topic呢?云消息队列 RocketMQ 版中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。可以理解为不同的系统、不同的发布环境配置不同的Topic。然后来说一下如何配Topic和GroupID。
(1)在左侧导航栏找到,然后点击。名称和描述都是必填的,消息类型根据自己业务场景选择。[西瓜程序猿]这边要求消息按照顺序发送和消费,所以选择。
(2)然后再来创建GroupID。一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
那我们就把必要的资源都创建好了,分别是。
Topic名称:
GroupID:
二、RockeetMQ核心部分封装
官方开发文档:RocketMQONS系列.NETSDK的版本信息_云消息队列 MQ-阿里云帮助中心
2.1-创建类库项目
(1)点击,然后选择。
目录:
(2)然后新建一个文件夹,将下载好的资源包里面文件夹的文件,复制到项目中文件夹里面。
资源包:
项目:
(3)然后就安装相关的包,分别是用来记录日志,用来做JSON序列化和反序列化。(如果自己项目中有日志系统和反序列化工具,也可以不安装,根据自己项目依赖公共辅助层去使用)
<package id="log4net" versinotallow="2.0.15" targetFramework="net48" />
<package id="Newtonsoft.Json" versinotallow="12.0.1" targetFramework="net48" />
(4)创建了一个文件夹写一个JSON反序列化的帮助类(根据自己业务需要创建)。
目录:
代码:
public class JsonUtility
{
/// <summary>
/// 将实体类序列化为JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data"></param>
/// <returns></returns>
static public string SerializeJSON<T>(T data)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(data);
}
/// <summary>
/// 反序列化JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="json"></param>
/// <returns></returns>
static public T DeserializeJSON<T>(string json)
{
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(json);
}
/// <summary>
/// 将IEnumerable<T,V>序列化为JSON
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
static public string SerializeDictionary(IEnumerable<KeyValuePair<string, string>> value)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(value.Select(I => new { label = I.Key, value = I.Value }));
}
}
(5)然后在创建一个文件夹。在里面新建两个Attribute特性,一个用来区分Tag标签,另一个用来区分事件类型。
目录:
代码:
/// <summary>
/// Tag标签
/// </summary>
public class ConsumerTagAttribute : Attribute
{
public string Tag { get; set; }
public ConsumerTagAttribute(string tag)
{
Tag = tag;
}
}
/// <summary>
/// 事件类型
/// </summary>
public class EventTypeAttribute : Attribute
{
public string EventType { get; set; }
public EventTypeAttribute(string eventType)
{
EventType = eventType;
}
}
2.2-封装传输实体模型
然后我们需要设计生产者和消费者直接需要传输共同的消息时哪些。
目前想到的(如果有好的建议可以在评论区讨论哈):
MessageId:消息id
Tag:对应RocketMQ中Tag
SendTime:发送时间
Source:消息来源
EventType:事件类型
Body:消息体
目录:
(1)创建一个文件夹,用来存相关的实体。然后创建生产者/消费者公共模型接口,然后创建文件实现IQueueOnsCommonModel接口。
IQueueOnsCommonModel:
/// <summary>
/// 生产者/消费者公共模型接口
/// </summary>
public interface IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
string MessageId { get; set; }
/// <summary>
/// 对应RocketMQ中Tag
/// </summary>
string Tag { get; set; }
/// <summary>
/// 发送时间
/// </summary>
DateTime SendTime { get; set; }
/// <summary>
/// 消息来源
/// </summary>
string Source { get; set; }
/// <summary>
/// 事件类型
/// </summary>
string EventType { get; set; }
/// <summary>
/// 消息体
/// </summary>
string Body { get; set; }
}
QueueOnsCommonModel:
/// <summary>
/// 生产者/消费者公共模型实现
/// </summary>
public class QueueOnsCommonModel : IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
public string MessageId { get; set; }
/// <summary>
/// 对应RocketMQ中Tag
/// </summary>
public string Tag { get; set; }
/// <summary>
/// 发送时间
/// </summary>
public DateTime SendTime { get; set; }
/// <summary>
/// 消息来源
/// </summary>
public string Source { get; set; }
/// <summary>
/// 事件类型
/// </summary>
public string EventType { get; set; }
/// <summary>
/// 消息体
/// </summary>
public string Body { get; set; }
}
(2)创建一个文件,用来做配置文件的实体。
/// <summary>
/// RocketMQ配置属性
/// </summary>
public class ONSPropertyConfigModel
{
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
/// </summary>
public string AccessKey { get; set; }
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
/// </summary>
public string SecretKey { get; set; }
/// <summary>
/// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
/// </summary>
public string GroupId { get; set; }
/// <summary>
/// 您在云消息队列 RocketMQ 版控制台创建的Topic。
/// </summary>
public string Topics { get; set; }
/// <summary>
/// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”
/// </summary>
public string NAMESRV_ADDR { get; set; }
/// <summary>
/// 消费者/生产者目标来源
/// </summary>
public string OnsClientCode { get; set; }
}
(3)然后创建一个文件,用来订单消息队列Tag常量,和一个文件,用来定义事件类型。
目录:
QueueTagConsts:
/// <summary>
/// 消息队列Tag常量定义
/// 命名规范:项目名_自定义业务名_Tag
/// </summary>
public class QueueTagConsts
{
/// <summary>
/// 测试Sample
/// </summary>
public const string XG_Blog_Sample_Tag = "XG_Blog_Sample_Tag";
}
QueueOnsEventType:
/// <summary>
/// 消息队列-事件类型
/// </summary>
public class QueueOnsEventType
{
/// <summary>
/// RocketMQ测试
/// </summary>
public const string RocketMQ_TEST = "RocketMQ_TEST";
}
2.3-封装连接RocketMQ
创建一个文件夹,然后创建一个消费接口,和一个文件用来封装RocketMQ生产者连接。
目录:
IConsumerMsg:
/// <summary>
/// 消费接口
/// </summary>
public interface IConsumerMsg
{
void Consume(QueueOnsCommonModel model);
}
QueueOnsProducer:
/// <summary>
/// 消息队列-RocketMQ生产者
/// </summary>
public class QueueOnsProducer
{
private static Producer _producer;
private static PushConsumer _consumer;
private readonly static ILog logger = LogManager.GetLogger(typeof(QueueOnsProducer));
private static string Ons_Topic = "";
private static string Ons_AccessKey = "";
private static string Ons_SecretKey = "";
private static string Ons_GroupId = "";
private static string Ons_NameSrv = "";
private static int Ons_ConsumptionPattern = 1;
private static string Ons_Client_Code = "Test_RocketMQ_Producer";
private const string Ons_LogPath = "C://rocket_mq_logs";
public static string getOnsTopic
{
get
{
return Ons_Topic;
}
}
public static string getOnsClientCode
{
get
{
return Ons_Client_Code;
}
}
private static ONSFactoryProperty getFactoryPropertyProducer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
return factoryInfo;
}
private static ONSFactoryProperty getFactoryPropertyConsumer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
//消费模式(1:集群消费、2:广播消费)
if (Ons_ConsumptionPattern == 1)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
}
else if (Ons_ConsumptionPattern == 2)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
}
return factoryInfo;
}
public static void CreateProducer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_producer = ONSFactory.getInstance().createProducer(getFactoryPropertyProducer());
}
public static void StartProducer()
{
if (_producer != null)
{
_producer.start();
string msg = $"-:[{DateTime.Now}]生产者 启动 成功!";
logger.Info(msg);
}
else
{
throw new ArgumentNullException("_producer is null,请先执行[CreateProducer]创建生产者后启动");
}
}
public static void ShutdownProducer()
{
if (_producer != null)
{
_producer.shutdown();
string msg = $"-:[{DateTime.Now}]生产者 已关闭连接!";
logger.Info(msg);
}
}
public static string SendMessage(QueueOnsCommonModel model, string tag = "RegisterLog")
{
if (model == null) { throw new ArgumentNullException("model is null"); }
model.SendTime = DateTime.Now;
model.Source = Ons_Client_Code;
var send_str = JsonUtility.SerializeJSON(model);
byte[] bytes = Encoding.UTF8.GetBytes(send_str);
string str_new_msg = Encoding.Default.GetString(bytes);
logger.Info("消息内容:" + str_new_msg);
string msg_key = model.MessageId;
string msg_id = string.Empty;
Message msg = new Message(Ons_Topic, tag, str_new_msg);
msg.setKey(msg_key);
try
{
SendResultONS sendResult = _producer.send(msg);
msg_id = sendResult.getMessageId();
logger.Info("消息ID:" + msg_id);
}
catch (Exception ex)
{
logger.Error($"发生异常了:{ex.Message}", ex);
throw ex;
}
return msg_id;
}
public static void CreatePushConsumer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
// 集群消费。
Ons_ConsumptionPattern = 1;
// 广播消费。
//Ons_ConsumptionPattern = 2;
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryPropertyConsumer());
}
public static void SetPushConsumer(MessageListener listener, string subExpression = "*")
{
_consumer.subscribe(Ons_Topic, subExpression, listener);
}
public static void StartPushConsumer()
{
_consumer.start();
string msg = $"-:[{DateTime.Now}]消费者 启动 成功!";
logger.Info(msg);
}
public static void ShutdownPushConsumer()
{
if (_consumer != null)
{
_consumer.shutdown();
string msg = $"-:[{DateTime.Now}]消费者 已关闭连接!";
logger.Info(msg);
}
}
}
三、生产端实现
3.1-创建生产者
3.1.1-创建MVC项目
(1)然后创建一个生产者,可以创建WebAPI/MVC/JOB(控制台应用程序)等等,那[西瓜程序猿]以MVC项目作为例子来介绍一下,创建一个名为项目。
运行测试一下:
3.1.2-项目依赖配置
阿里云提供的.NET版本是基于云消息队列 RocketMQ 版的CPP版本的托管封装,这样能保证.NET完全不依赖于Windows.NET公共库。内部采用C++多线程并发处理,保证.NET版本的高效稳定。
(1)底层的C++ DLL相关文件,以及Visual C++ 2015运行时环境安装包。如果没有安装Visual Studio 2015运行时环境,需要在资源包找到文件进行安装。
(2)在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击项目,然后点击。
(3)点击左侧选项的,然后将目标平台改为。
(3)将资源包文件夹里面所有的文件,复制到目录下。
资源包:
项目:
3.1.3-使用log4net
(1)使用lo4net输出日志,大家也可以用别的日志框架,记得在用到写入日志的地方自行进行修改。那[西瓜程序猿]使用log4net来介绍。我们在项目的根目录下创建一个文件为。
(2)内容如下。
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
</configSections>
<system.web>
<compilation debug="true" targetFramework="4.5.2" />
<blog.51cto.com/kimiliucn/7263756
本文共计6566个文字,预计阅读时间需要27分钟。
章节+第一篇:https://blog.51cto.com/kimiliucn/7263756第二篇:作者:西瓜程序猿主页传送门:https://blog.51cto.com/kimiliucn开发背景+在开发某一需求时,领导要求使用RocketMQ(阿里云版)
章节
第一章:blog.51cto.com/kimiliucn/7263756
第二章:
作者:西瓜程序猿
主页传送门:blog.51cto.com/kimiliucn
开发背景
在开发某一个需求的时候,领导要求使用RocketMQ(阿里云版) 作为消息队列。使用的版本是5.x,目前也已经没有4.x购买的入口了,所以只能买5.x系列。公司项目还是用的比较老的技术.NET Framework 4.8,生产者主要有WebAPI/MVC/JOB(控制台应用程序),然后消费者采用的是Windows服务进行长链接消费信息。这期间因为各种原因踩过很多坑,然后咨询了客服说RocketMQ(阿里云版)5.0不支持.NET Framework,但最终操作下来竟然能使用(只支持集群模式,不支持订阅模式),那今天[西瓜程序猿]来记录一下如何使用RocketMQ(阿里云版),给各位小伙伴作为参考防止踩坑。
环境版本
阿里云RocketMQ版本:5.0系列
.NET版本:.NET Framework 4.8
.NET版本:生产端(WebAPI/MVC/JOB)、消费端(Windows服务)
如果不知道怎么选,或者不知道怎么买云消息队列RocketMQ(阿里云版)?可以联系我[西瓜程序猿],如果需要特价购买可以通过下面地址访问:
活动地址:官网地址
一、RocketMQ基本介绍
官网地址:RocketMQ · 官方网站 | RocketMQ
RocketMQ阿里云-官方文档:如何快速入门RocketMQ_云消息队列 MQ-阿里云帮助中心
1.1-RocketMQ简介
RocketMQ(Apache RocketMQ)是一个开源的分布式消息中间件系统,由阿里巴巴集团旗下的阿里云计算平台团队开发和维护。它最初是为满足阿里巴巴内部大规模分布式消息传递需求而设计的,后来成为 Apache 基金会的顶级开源项目之一。
1.2-RocketMQ优势
在众多应用场景中广泛应用,如电子商务、物流配送、金融支付、大数据处理等。它被许多企业用于构建高性能和可靠的消息队列系统,实现异步通信和解耦应用程序组件。RocketMQ 提供了可靠、可扩展和高性能的消息传递解决方案,具备以下特点:
- 异步通信:RocketMQ 支持发布-订阅和点对点两种消息通信模式,以满足不同场景下的需求。
- 高可靠性:提供多种存储选项,包括本地文件存储和远程共享存储,以确保消息的可靠传输和持久化。
- 高吞吐量:支持水平扩展,可以轻松应对大规模消息传递和高并发的需求。
- 严格有序性:支持消息按照发送顺序和消费顺序进行有序处理,保证消息的顺序性。
- 分布式架构:采用分布式架构,具备良好的横向扩展能力和高可用性。
1.3-RocketMQ基本概念
此段内容根据阿里云官方文档整理:如何快速入门RocketMQ_云消息队列 MQ-阿里云帮助中心
主题(Topic):云消息队列 RocketMQ 版中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。
消息类型(MessageType):云消息队列 RocketMQ 版中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。云消息队列 RocketMQ 版支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
消息队列(MessageQueue):队列是云消息队列 RocketMQ 版中消息存储和传输的实际容器,也是消息的最小存储单元。云消息队列 RocketMQ 版的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
消息(Message):消息是云消息队列 RocketMQ 版中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到云消息队列 RocketMQ 版服务端,服务端按照相关语义将消息投递到消费端进行消费。
消息视图(MessageView):消息视图是云消息队列 RocketMQ 版面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag):消息标签是云消息队列 RocketMQ 版提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。
消息位点(MessageQueueOffset):消息是按到达云消息队列 RocketMQ 版服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset):一条消息被某个消费者消费完成后不会立即从队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引(MessageKey):消息索引是云消息队列 RocketMQ 版提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer):生产者是云消息队列 RocketMQ 版系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成云消息队列 RocketMQ 版的消息并发送至服务端。
事务检查器(TransactionChecker):云消息队列 RocketMQ 版中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
事务状态(TransactionResolution):云消息队列 RocketMQ 版中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。
消费者分组(ConsumerGroup):消费者分组是云消息队列 RocketMQ 版系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在云消息队列 RocketMQ 版中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
消费者(Consumer):消费者是云消息队列 RocketMQ 版中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从云消息队列 RocketMQ 版服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
消费结果(ConsumeResult):云消息队列 RocketMQ 版中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription):订阅关系是云消息队列 RocketMQ 版系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
消息过滤:消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在云消息队列 RocketMQ 版的服务端完成。
重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到云消息队列 RocketMQ 版服务端的消息。
消息轨迹:在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由云消息队列 RocketMQ 版服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积:生产者已经将消息发送到云消息队列 RocketMQ 版的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在云消息队列 RocketMQ 版的服务端保存着未被消费的消息,该状态即消息堆积。
事务消息:事务消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息:定时/延时消息是云消息队列 RocketMQ 版提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息:顺序消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
二、RocketMQ前期准备
首先需要下载相关.NET相关的SDK,然后在阿里云后台找到等信息,最后还需要创建和用于给我们调用。
2.1-下载资源包及SDK
[西瓜程序猿]给正在看这篇文章的小伙伴提供了资源包,文件夹里面包含使用RocketMQ阿里云版本要依赖的DLL文件,文件夹包含了.NET Framework使用RocketMQ阿里云版本要用到的SDK文件,文件夹包含了Visual C++ 2015运行时环境安装包,因为C++ DLL文件需要依赖这个,这个需要进行安装。还包含其他辅助的工具及代码。
可以访问下载(如果失效了,请联系我)。
下载地址(编码:stalua6n):yongteng.lanzoub.com/ice5a16p978h
密码:1q81
文件截图:
2.2-查询基本配置信息
(1)首先点击下面链接进入消息队列RocketMQ工作台,如果没有登录首先要进行登录。然后在里面找到要操作的地域列表,点击。
消息队列RocketMQ(阿里云版):阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台
(2)然后可以看到实例列表,找到要操作的实例,点击。
(3)然后在中找到和,注意不是实例ID/实例名称。
(5)然后还在当前页面,往下翻到中找到接入点和网络信息。如果大家需要在外网访问自行开通公网访问,好像需要另外付费。[西瓜程序猿]这边只能通过访问,也就是只能在内网访问。所以我以VPC专有网络来介绍。
那我们就把必要的信息都集齐全了,分别是。
2.3-配置Topic和Group
那什么是Topic呢?云消息队列 RocketMQ 版中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。可以理解为不同的系统、不同的发布环境配置不同的Topic。然后来说一下如何配Topic和GroupID。
(1)在左侧导航栏找到,然后点击。名称和描述都是必填的,消息类型根据自己业务场景选择。[西瓜程序猿]这边要求消息按照顺序发送和消费,所以选择。
(2)然后再来创建GroupID。一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
那我们就把必要的资源都创建好了,分别是。
Topic名称:
GroupID:
二、RockeetMQ核心部分封装
官方开发文档:RocketMQONS系列.NETSDK的版本信息_云消息队列 MQ-阿里云帮助中心
2.1-创建类库项目
(1)点击,然后选择。
目录:
(2)然后新建一个文件夹,将下载好的资源包里面文件夹的文件,复制到项目中文件夹里面。
资源包:
项目:
(3)然后就安装相关的包,分别是用来记录日志,用来做JSON序列化和反序列化。(如果自己项目中有日志系统和反序列化工具,也可以不安装,根据自己项目依赖公共辅助层去使用)
<package id="log4net" versinotallow="2.0.15" targetFramework="net48" />
<package id="Newtonsoft.Json" versinotallow="12.0.1" targetFramework="net48" />
(4)创建了一个文件夹写一个JSON反序列化的帮助类(根据自己业务需要创建)。
目录:
代码:
public class JsonUtility
{
/// <summary>
/// 将实体类序列化为JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data"></param>
/// <returns></returns>
static public string SerializeJSON<T>(T data)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(data);
}
/// <summary>
/// 反序列化JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="json"></param>
/// <returns></returns>
static public T DeserializeJSON<T>(string json)
{
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(json);
}
/// <summary>
/// 将IEnumerable<T,V>序列化为JSON
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
static public string SerializeDictionary(IEnumerable<KeyValuePair<string, string>> value)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(value.Select(I => new { label = I.Key, value = I.Value }));
}
}
(5)然后在创建一个文件夹。在里面新建两个Attribute特性,一个用来区分Tag标签,另一个用来区分事件类型。
目录:
代码:
/// <summary>
/// Tag标签
/// </summary>
public class ConsumerTagAttribute : Attribute
{
public string Tag { get; set; }
public ConsumerTagAttribute(string tag)
{
Tag = tag;
}
}
/// <summary>
/// 事件类型
/// </summary>
public class EventTypeAttribute : Attribute
{
public string EventType { get; set; }
public EventTypeAttribute(string eventType)
{
EventType = eventType;
}
}
2.2-封装传输实体模型
然后我们需要设计生产者和消费者直接需要传输共同的消息时哪些。
目前想到的(如果有好的建议可以在评论区讨论哈):
MessageId:消息id
Tag:对应RocketMQ中Tag
SendTime:发送时间
Source:消息来源
EventType:事件类型
Body:消息体
目录:
(1)创建一个文件夹,用来存相关的实体。然后创建生产者/消费者公共模型接口,然后创建文件实现IQueueOnsCommonModel接口。
IQueueOnsCommonModel:
/// <summary>
/// 生产者/消费者公共模型接口
/// </summary>
public interface IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
string MessageId { get; set; }
/// <summary>
/// 对应RocketMQ中Tag
/// </summary>
string Tag { get; set; }
/// <summary>
/// 发送时间
/// </summary>
DateTime SendTime { get; set; }
/// <summary>
/// 消息来源
/// </summary>
string Source { get; set; }
/// <summary>
/// 事件类型
/// </summary>
string EventType { get; set; }
/// <summary>
/// 消息体
/// </summary>
string Body { get; set; }
}
QueueOnsCommonModel:
/// <summary>
/// 生产者/消费者公共模型实现
/// </summary>
public class QueueOnsCommonModel : IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
public string MessageId { get; set; }
/// <summary>
/// 对应RocketMQ中Tag
/// </summary>
public string Tag { get; set; }
/// <summary>
/// 发送时间
/// </summary>
public DateTime SendTime { get; set; }
/// <summary>
/// 消息来源
/// </summary>
public string Source { get; set; }
/// <summary>
/// 事件类型
/// </summary>
public string EventType { get; set; }
/// <summary>
/// 消息体
/// </summary>
public string Body { get; set; }
}
(2)创建一个文件,用来做配置文件的实体。
/// <summary>
/// RocketMQ配置属性
/// </summary>
public class ONSPropertyConfigModel
{
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
/// </summary>
public string AccessKey { get; set; }
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
/// </summary>
public string SecretKey { get; set; }
/// <summary>
/// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
/// </summary>
public string GroupId { get; set; }
/// <summary>
/// 您在云消息队列 RocketMQ 版控制台创建的Topic。
/// </summary>
public string Topics { get; set; }
/// <summary>
/// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”
/// </summary>
public string NAMESRV_ADDR { get; set; }
/// <summary>
/// 消费者/生产者目标来源
/// </summary>
public string OnsClientCode { get; set; }
}
(3)然后创建一个文件,用来订单消息队列Tag常量,和一个文件,用来定义事件类型。
目录:
QueueTagConsts:
/// <summary>
/// 消息队列Tag常量定义
/// 命名规范:项目名_自定义业务名_Tag
/// </summary>
public class QueueTagConsts
{
/// <summary>
/// 测试Sample
/// </summary>
public const string XG_Blog_Sample_Tag = "XG_Blog_Sample_Tag";
}
QueueOnsEventType:
/// <summary>
/// 消息队列-事件类型
/// </summary>
public class QueueOnsEventType
{
/// <summary>
/// RocketMQ测试
/// </summary>
public const string RocketMQ_TEST = "RocketMQ_TEST";
}
2.3-封装连接RocketMQ
创建一个文件夹,然后创建一个消费接口,和一个文件用来封装RocketMQ生产者连接。
目录:
IConsumerMsg:
/// <summary>
/// 消费接口
/// </summary>
public interface IConsumerMsg
{
void Consume(QueueOnsCommonModel model);
}
QueueOnsProducer:
/// <summary>
/// 消息队列-RocketMQ生产者
/// </summary>
public class QueueOnsProducer
{
private static Producer _producer;
private static PushConsumer _consumer;
private readonly static ILog logger = LogManager.GetLogger(typeof(QueueOnsProducer));
private static string Ons_Topic = "";
private static string Ons_AccessKey = "";
private static string Ons_SecretKey = "";
private static string Ons_GroupId = "";
private static string Ons_NameSrv = "";
private static int Ons_ConsumptionPattern = 1;
private static string Ons_Client_Code = "Test_RocketMQ_Producer";
private const string Ons_LogPath = "C://rocket_mq_logs";
public static string getOnsTopic
{
get
{
return Ons_Topic;
}
}
public static string getOnsClientCode
{
get
{
return Ons_Client_Code;
}
}
private static ONSFactoryProperty getFactoryPropertyProducer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
return factoryInfo;
}
private static ONSFactoryProperty getFactoryPropertyConsumer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
//消费模式(1:集群消费、2:广播消费)
if (Ons_ConsumptionPattern == 1)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
}
else if (Ons_ConsumptionPattern == 2)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
}
return factoryInfo;
}
public static void CreateProducer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_producer = ONSFactory.getInstance().createProducer(getFactoryPropertyProducer());
}
public static void StartProducer()
{
if (_producer != null)
{
_producer.start();
string msg = $"-:[{DateTime.Now}]生产者 启动 成功!";
logger.Info(msg);
}
else
{
throw new ArgumentNullException("_producer is null,请先执行[CreateProducer]创建生产者后启动");
}
}
public static void ShutdownProducer()
{
if (_producer != null)
{
_producer.shutdown();
string msg = $"-:[{DateTime.Now}]生产者 已关闭连接!";
logger.Info(msg);
}
}
public static string SendMessage(QueueOnsCommonModel model, string tag = "RegisterLog")
{
if (model == null) { throw new ArgumentNullException("model is null"); }
model.SendTime = DateTime.Now;
model.Source = Ons_Client_Code;
var send_str = JsonUtility.SerializeJSON(model);
byte[] bytes = Encoding.UTF8.GetBytes(send_str);
string str_new_msg = Encoding.Default.GetString(bytes);
logger.Info("消息内容:" + str_new_msg);
string msg_key = model.MessageId;
string msg_id = string.Empty;
Message msg = new Message(Ons_Topic, tag, str_new_msg);
msg.setKey(msg_key);
try
{
SendResultONS sendResult = _producer.send(msg);
msg_id = sendResult.getMessageId();
logger.Info("消息ID:" + msg_id);
}
catch (Exception ex)
{
logger.Error($"发生异常了:{ex.Message}", ex);
throw ex;
}
return msg_id;
}
public static void CreatePushConsumer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
// 集群消费。
Ons_ConsumptionPattern = 1;
// 广播消费。
//Ons_ConsumptionPattern = 2;
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryPropertyConsumer());
}
public static void SetPushConsumer(MessageListener listener, string subExpression = "*")
{
_consumer.subscribe(Ons_Topic, subExpression, listener);
}
public static void StartPushConsumer()
{
_consumer.start();
string msg = $"-:[{DateTime.Now}]消费者 启动 成功!";
logger.Info(msg);
}
public static void ShutdownPushConsumer()
{
if (_consumer != null)
{
_consumer.shutdown();
string msg = $"-:[{DateTime.Now}]消费者 已关闭连接!";
logger.Info(msg);
}
}
}
三、生产端实现
3.1-创建生产者
3.1.1-创建MVC项目
(1)然后创建一个生产者,可以创建WebAPI/MVC/JOB(控制台应用程序)等等,那[西瓜程序猿]以MVC项目作为例子来介绍一下,创建一个名为项目。
运行测试一下:
3.1.2-项目依赖配置
阿里云提供的.NET版本是基于云消息队列 RocketMQ 版的CPP版本的托管封装,这样能保证.NET完全不依赖于Windows.NET公共库。内部采用C++多线程并发处理,保证.NET版本的高效稳定。
(1)底层的C++ DLL相关文件,以及Visual C++ 2015运行时环境安装包。如果没有安装Visual Studio 2015运行时环境,需要在资源包找到文件进行安装。
(2)在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击项目,然后点击。
(3)点击左侧选项的,然后将目标平台改为。
(3)将资源包文件夹里面所有的文件,复制到目录下。
资源包:
项目:
3.1.3-使用log4net
(1)使用lo4net输出日志,大家也可以用别的日志框架,记得在用到写入日志的地方自行进行修改。那[西瓜程序猿]使用log4net来介绍。我们在项目的根目录下创建一个文件为。
(2)内容如下。
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
</configSections>
<system.web>
<compilation debug="true" targetFramework="4.5.2" />
<blog.51cto.com/kimiliucn/7263756

