如何通过ABP SignalR优化重构消息服务流程?

2026-05-19 20:061阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3618个文字,预计阅读时间需要15分钟。

如何通过ABP SignalR优化重构消息服务流程?

使用ABP+SignalR重构消息服务(二)

上篇主要讲解了使用ABP+SignalR重构消息服务的(一),重点介绍了SignalR的基础知识和前端如何使用SignalR。本篇将深入讲解SignalR源码,帮助读者更好地理解其实现原理。

SignalR源码分析:

1. SignalR的架构

SignalR采用基于长轮询(Long Polling)、服务器发送事件(Server-Sent Events)、WebSockets等技术的混合模式来实现客户端与服务器之间的实时通信。

2. SignalR的核心组件

- IHubProtocol:定义了消息的序列化和反序列化规则,负责消息的传递。- IHub:表示一个SignalR通信的会话,包含了客户端和服务器之间的交互逻辑。- HubConnection:客户端与服务器之间建立连接的类,负责发送和接收消息。- HubContext:用于从服务器端访问Hub的上下文,如获取在线用户列表、发送消息等。

3. SignalR的工作流程

- 客户端通过HTTP请求连接到SignalR服务器。- SignalR服务器根据客户端的请求创建一个HubConnection。- 客户端与服务器端建立WebSocket连接(如果支持)或长轮询连接。- 客户端可以调用服务器端的Hub方法,发送消息。- 服务器端处理客户端的消息,并将结果返回给客户端。

4. SignalR源码分析

- IHubProtocol:在SignalR中,消息的序列化和反序列化是通过JsonProtocol实现的。JsonProtocol将消息转换为JSON格式,便于在客户端和服务器端传输。- IHub:SignalR提供了多个内置的Hub,如Hub、GroupHub、UserHub等。这些Hub实现了不同的通信场景,如一对一通信、群组通信、广播等。- HubConnection:HubConnection负责维护客户端与服务器之间的连接,包括发送和接收消息。在建立连接时,客户端需要提供一个连接ID,以便服务器端识别客户端。- HubContext:HubContext用于从服务器端访问Hub的上下文,如获取在线用户列表、发送消息等。通过HubContext,客户端可以调用服务器端的Hub方法。

通过以上分析,我们可以了解到SignalR源码的架构和核心组件。在实际项目中,我们可以根据需求选择合适的SignalR内置Hub,并利用其提供的功能实现实时通信。

如何通过ABP SignalR优化重构消息服务流程?

使用ABP SignalR重构消息服务(二)

上篇使用ABP SignalR重构消息服务(一)主要讲的是SignalR的基础知识和前端如何使用SignalR,这段时间也是落实方案设计。这篇我主要讲解SignalR源码(最近我手头工作比较忙@蟹老板)。

SignalR源码分析(原地址,原地址已经停止维护了合并到了AspNetCore)

使用SignalR我们主要是添加services.AddSignalR();,添加ChatHub类继承我们的Hub ,然后管道注入endpoints.MapHub<ChatHub>("/ChatHub");
通过services.AddSignalR()可以看到使用的类是SignalRDependencyInjectionExtensions
通过Hub类可以看到程序集是Microsoft.AspNetCore.SignalR.Core
通过MapHub<ChatHub>可以看到使用的类是HubEndpointRouteBuilderExtensions

SignalR服务注册

我们先分析services.AddSignalR()注入做了什么准备

这里我们要讲一个东西Microsoft.AspNetCore.SignalR.Core类库有一个SignalRDependencyInjectionExtensions
Microsoft.AspNetCore.SignalR类库也存在一个SignalRDependencyInjectionExtensions

Microsoft.AspNetCore.SignalR类库中的SignalRDependencyInjectionExtensions解读

public static class SignalRDependencyInjectionExtensions { // 单独注入SignalR配置 public static ISignalRServerBuilder AddHubOptions<THub>(this ISignalRServerBuilder signalrBuilder, Action<HubOptions<THub>> configure) where THub : Hub { if (signalrBuilder == null) { throw new ArgumentNullException(nameof(signalrBuilder)); } signalrBuilder.Services.AddSingleton<IConfigureOptions<HubOptions<THub>>, HubOptionsSetup<THub>>(); signalrBuilder.Services.Configure(configure); return signalrBuilder; } // 添加SignalR服务 public static ISignalRServerBuilder AddSignalR(this IServiceCollection services) { if (services == null) { throw new ArgumentNullException(nameof(services)); } // ConnectionsDependencyInjectionExtensions拓展类 添加请求路由、添加身份验证、添加Http连接调度程序、添加Http连接管理器 services.AddConnections(); // 禁用WebSocket保持活动,因为SignalR有它自己的 services.Configure<WebSocketOptions>(o => o.KeepAliveInterval = TimeSpan.Zero); services.TryAddSingleton<SignalRMarkerService>(); services.TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<HubOptions>, HubOptionsSetup>()); //调用 Microsoft.AspNetCore.SignalR.Core 类库中的 SignalRDependencyInjectionExtensions return services.AddSignalRCore(); } // 添加SignalR服务。注入SignalR配置信息 public static ISignalRServerBuilder AddSignalR(this IServiceCollection services, Action<HubOptions> configure) { if (services == null) { throw new ArgumentNullException(nameof(services)); } var signalrBuilder = services.AddSignalR(); services.Configure(configure); return signalrBuilder; } }

Microsoft.AspNetCore.SignalR.Core类库中的SignalRDependencyInjectionExtensions解读
这里面注入了SignalR中核心类,所以下面的代码我们一定要仔细研读了。

public static class SignalRDependencyInjectionExtensions { // 将最小的基本SignalR服务添加IServiceCollection 中 public static ISignalRServerBuilder AddSignalRCore(this IServiceCollection services) { // 用于标记SignalR是否注入 services.TryAddSingleton<SignalRCoreMarkerService>(); // 注入默认集线器生命周期管理器 services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>)); // 注入默认集线器协议解析器 services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver)); // 注入集线器上下文 services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>)); services.TryAddSingleton(typeof(IHubContext<,>), typeof(HubContext<,>)); // 注入集线器中心连接处理程序 services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>)); // 注入获取用户唯一标识方法 services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider)); // 注入默认中心调度员 services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>)); // 注入默认激活中心 services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>)); // 添加授权 services.AddAuthorization(); var builder = new SignalRServerBuilder(services); // 添加Protocol转json builder.AddJsonProtocol(); return builder; } } SignalR集线器设计

通过Hub类可以看到程序集是Microsoft.AspNetCore.SignalR.Core

// Hub 是一个抽象类 public abstract class Hub : IDisposable { private bool _disposed; // 客户端链接 private IHubCallerClients _clients = default!; // 集线器呼叫中心上下文 private HubCallerContext _context = default!; // 集线器组管理 private IGroupManager _groups = default!; // 客户端链接(管理所有用户链接) public IHubCallerClients Clients { get { CheckDisposed(); return _clients; } set { CheckDisposed(); _clients = value; } } // 集线器上下文(保存当前用户链接信息) public HubCallerContext Context { get { CheckDisposed(); return _context; } set { CheckDisposed(); _context = value; } } // 组管理(对于组进行添加或者删除) public IGroupManager Groups { get { CheckDisposed(); return _groups; } set { CheckDisposed(); _groups = value; } } // 连接方法(用于兼容用户连接操作) public virtual Task OnConnectedAsync() { return Task.CompletedTask; } // 链接释放方法(用于监控用户下线操作) public virtual Task OnDisconnectedAsync(Exception? exception) { return Task.CompletedTask; } protected virtual void Dispose(bool disposing) { } public void Dispose() { if (_disposed) { return; } Dispose(true); _disposed = true; } private void CheckDisposed() { if (_disposed) { throw new ObjectDisposedException(GetType().Name); } } } SignalR中间件

通过MapHub可以看到使用的类是HubEndpointRouteBuilderExtensions

app.UseEndpoints(endpoints => { endpoints.MapHub<ChatHub>("/ChatHub"); });

HubEndpointRouteBuilderExtensions源代码

public static class HubEndpointRouteBuilderExtensions { ................................ // 注册集线器 public static HubEndpointConventionBuilder MapHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IEndpointRouteBuilder endpoints, string pattern, Action<HttpConnectionDispatcherOptions>? configureOptions) where THub : Hub { // 这个就是我们上面注册SignalR保留来判断是否注入 var marker = endpoints.ServiceProvider.GetService<SignalRMarkerService>(); if (marker == null) { throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " + "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code."); } // SignalR配置信息 var options = new HttpConnectionDispatcherOptions(); configureOptions?.Invoke(options); // endpoints.MapConnections用来接收第一次连接请求,然后开启对于协议连接 var conventionBuilder = endpoints.MapConnections(pattern, options, b => { // SignalRConnectionBuilderExtensions拓展类(这里是一个重点,将我们的泛型集线器连接进行注入,就可以开始它的工作了) b.UseHub<THub>(); }); .................................... return new HubEndpointConventionBuilder(conventionBuilder); } }

SignalRConnectionBuilderExtensions源代码

public static class SignalRConnectionBuilderExtensions { public static IConnectionBuilder UseHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IConnectionBuilder connectionBuilder) where THub : Hub { var marker = connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService)); if (marker == null) { throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " + "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code."); } // 1.connectionBuilder.UseConnectionHandler拓展方法在 ConnectionBuilderExtensions中 // 2.HubConnectionHandler这个不就是我们注入服务的集线器中心连接处理程序吗? return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>(); } }

ConnectionBuilderExtensions源代码

public static class ConnectionBuilderExtensions { // 执行集线器的连接方法,到了这里就代表本次连接成功了 public static IConnectionBuilder UseConnectionHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler { var handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices); // 这是一个终端中间件,所以没有必要使用'next'参数 return connectionBuilder.Run(connection => handler.OnConnectedAsync(connection)); } } 小结

通过services.AddSignalR()进行SignalR基础服务进行注册。
通过Hub抽象工程,由不同的集线器继承,定义同一的连接、断开方法、客户端连接管理、群组管理、当前上下文信息。
通过MapHub<ChatHub>通过中间件路由规则进行流量划分。
当我们看完上面调用链路,脑中是不是已经有了一个清晰的方向了,它是怎么与前端进行连接的,并且对于注入的服务有一定的了解。

HubConnectionHandler连接处理

我们已经知道进入中间件之后就会进入HubConnectionHandler.OnConnectedAsync()方法

public override async Task OnConnectedAsync(ConnectionContext connection) { // 我们检查是否设置了HubOptions<THub>,因为它们优先于全局hub选项。 // 然后将keepAlive和handshakeTimeout值设置为HubOptionsSetup中的默认值,当它们显式地设置为null时。 var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols; if (supportedProtocols == null || supportedProtocols.Count == 0) { throw new InvalidOperationException("There are no supported protocols"); } // 默认握手超时15分钟 var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout; // 集线器连接配置 var contextOptions = new HubConnectionContextOptions() { KeepAliveInterval = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval, ClientTimeoutInterval = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval, StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity, MaximumReceiveMessageSize = _maximumMessageSize, SystemClock = SystemClock, MaximumParallelInvocations = _maxParallelInvokes, }; Log.ConnectedStarting(_logger); // 创建连接上下文,将用户信息添加到上下文中 var connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory); var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList<string>) ?? supportedProtocols.ToList(); if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors)) { return; } // 已建立connectionContext try { // 默认集线器生命周期管理器(DefaultHubLifetimeManager)将当前用户添加到连接池中 await _lifetimeManager.OnConnectedAsync(connectionContext); // 获取我们对应的集线器,执行OnConnectedAsync()方法,这个时候就真正的开始执行我们写的代码了。 // 里面有一个消息分配方法DispatchMessagesAsync(),获取我们交互的信息进行处理 await RunHubAsync(connectionContext); } finally { connectionContext.Cleanup(); Log.ConnectedEnding(_logger); // 当处理消息方法跳出,之后代表当前用户已经断开连接了,所以我们需要触发断线方法 await _lifetimeManager.OnDisconnectedAsync(connectionContext); } } SignalR异步分派消息

// 异步分派消息 private async Task DispatchMessagesAsync(HubConnectionContext connection) { var input = connection.Input; var protocol = connection.Protocol; connection.BeginClientTimeout(); var binder = new HubConnectionBinder<THub>(_dispatcher, connection); while (true) { var result = await input.ReadAsync(); var buffer = result.Buffer; try { if (result.IsCanceled) { break; } // 存在消息 if (!buffer.IsEmpty) { bool messageReceived = false; // 没有消息限制,只是解析和分派 if (_maximumMessageSize == null) { while (protocol.TryParseMessage(ref buffer, binder, out var message)) { connection.StopClientTimeout(); // 我们接收到了消息,停止超时检查 messageReceived = true; // 将接收的消息,根据不同的类型进行分发处理 await _dispatcher.DispatchMessageAsync(connection, message); } if (messageReceived) { // 处理完接收消息之后,开启超时检查 connection.BeginClientTimeout(); } } else { // 我们给解析器一个默认消息大小的滑动窗口 var maxMessageSize = _maximumMessageSize.Value; while (!buffer.IsEmpty) { var segment = buffer; var overLength = false; // 切分消息,慢慢进行处理 if (segment.Length > maxMessageSize) { segment = segment.Slice(segment.Start, maxMessageSize); overLength = true; } if (protocol.TryParseMessage(ref segment, binder, out var message)) { connection.StopClientTimeout(); // 我们接收到了消息,停止超时检查 messageReceived = true; // 将接收的消息,根据不同的类型进行分发处理 await _dispatcher.DispatchMessageAsync(connection, message); } else if (overLength) { throw new InvalidDataException($"The maximum message size of {maxMessageSize}B was exceeded. The message size can be configured in AddHubOptions."); } else { // No need to update the buffer since we didn't parse anything break; } // Update the buffer to the remaining segment buffer = buffer.Slice(segment.Start); } if (messageReceived) { connection.BeginClientTimeout(); } } } if (result.IsCompleted) { if (!buffer.IsEmpty) { throw new InvalidDataException("Connection terminated while reading a message."); } break; } } finally { // 缓冲区被分割到它被消耗的地方,所以我们可以直接开始。 我们把检查标记为缓冲。 结束,如果我们没有收到完整的帧,我们将等待更多的数据 再读一遍之前。 input.AdvanceTo(buffer.Start, buffer.End); } } SignalR针对用户发送消息

针对于群发消息,我们知道有一个组的容器,我们只要将大家添加到一个组中就可以了,那么我们想根据用户发送消息1:1的模式,SignalR源码中是怎么处理的呢?

在注册SignalR服务中我们可以看到这个services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));

public class DefaultUserIdProvider : IUserIdProvider { // 获取当前用户标识 public virtual string? GetUserId(HubConnectionContext connection) { // 这个也就是为什么我们在不做任何处理之下想使用SignalR用户模式,需要在Jwt中添加一个ClaimTypes.NameIdentifier了 return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value; } }

我们只需要自己定义一个实现类,将默认实现替换掉就可以了。

// 用户模式发送源码 public override Task SendUserAsync(string userId, string methodName, object?[] args, CancellationToken cancellationToken = default) { // connection.UserIdentifier就是执行了GetUserId()方法获取的用户标识 return SendToAllConnections(methodName, args, (connection, state) => string.Equals(connection.UserIdentifier, (string)state!, StringComparison.Ordinal), userId, cancellationToken); } SignalR项目使用设计

上面我们已经讲完,SignalR从连接==>处理消息以及用户模式的源码设计,相信小伙伴已经脑海中已经有点东西了,那么就开始项目中实践方式

我主要负责提供基础SignalR库,给到不同的部门进行使用,所以我首先需要考虑到一个高内聚,低耦合的设计,这里我首先不能掺杂业务逻辑,但是又需要所有业务聚合到我这边,然后通过不同的业务进行不同的处理。
设计思路:

  • 定义两个接口IReceiveMessageISendMessage,接口中分别有MessageType属性,HandlerAsync(input)方法
  • 定义一个公用的集线器注入IEnumerable<IReceiveMessage>IEnumerable<ISendMessage>添加Receive(input)Send(input)方法通过不同的入参中的MessageType属性,从注入集合中获取对应的消息实现进行处理

集线器伪代码

public class SignalRHub : Hub { private readonly IEnumerable<IReceiveMessage> _receiveMessages; private readonly IEnumerable<ISendMessage> _sendMessages; public SignalRHub(IEnumerable<IReceiveMessage> receiveMessages, IEnumerable<ISendMessage> sendMessages) { _receiveMessages = receiveMessages; _sendMessages = sendMessages; } public async Task Receive(SignalRReceiveMessage input) { await _receiveMessages.FirstOrDefault(x => string.Compare(x.MessageType, input.MessageType, true) == 0).HandlerAsync(input); } public async Task Send(SignalRSendMessage outInput) { await _sendMessages.FirstOrDefault(x => string.Compare(x.MessageType, outInput.MessageType, true) == 0).HandlerAsync(outInput); } }

业务实现示例

public class NotificationSendMessage : ISendMessage, ISingletonDependency { public string MessageType { get => SignalRSendMessageEnum.Notification.ToString(); } public Task HandlerAsync(SignalRSendMessage message) { //.......业务逻辑...... } }

这样我就只需要接收消息,进行转发给对应实现就可以了,我给同事提供了SignalR服务,又不干涉他们的业务。

我曾七次鄙视自己的灵魂:
第一次,当它本可进取时,却故作谦卑;
第二次,当它空虚时,用爱欲来填充;
第三次,在困难和容易之间,它选择了容易;
第四次,它犯了错,却借由别人也会犯错来宽慰自己;
第五次,它自由软弱,却把它认为是生命的坚韧;
第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;
第七次,它侧身于生活的污泥中虽不甘心,却又畏首畏尾。

本文共计3618个文字,预计阅读时间需要15分钟。

如何通过ABP SignalR优化重构消息服务流程?

使用ABP+SignalR重构消息服务(二)

上篇主要讲解了使用ABP+SignalR重构消息服务的(一),重点介绍了SignalR的基础知识和前端如何使用SignalR。本篇将深入讲解SignalR源码,帮助读者更好地理解其实现原理。

SignalR源码分析:

1. SignalR的架构

SignalR采用基于长轮询(Long Polling)、服务器发送事件(Server-Sent Events)、WebSockets等技术的混合模式来实现客户端与服务器之间的实时通信。

2. SignalR的核心组件

- IHubProtocol:定义了消息的序列化和反序列化规则,负责消息的传递。- IHub:表示一个SignalR通信的会话,包含了客户端和服务器之间的交互逻辑。- HubConnection:客户端与服务器之间建立连接的类,负责发送和接收消息。- HubContext:用于从服务器端访问Hub的上下文,如获取在线用户列表、发送消息等。

3. SignalR的工作流程

- 客户端通过HTTP请求连接到SignalR服务器。- SignalR服务器根据客户端的请求创建一个HubConnection。- 客户端与服务器端建立WebSocket连接(如果支持)或长轮询连接。- 客户端可以调用服务器端的Hub方法,发送消息。- 服务器端处理客户端的消息,并将结果返回给客户端。

4. SignalR源码分析

- IHubProtocol:在SignalR中,消息的序列化和反序列化是通过JsonProtocol实现的。JsonProtocol将消息转换为JSON格式,便于在客户端和服务器端传输。- IHub:SignalR提供了多个内置的Hub,如Hub、GroupHub、UserHub等。这些Hub实现了不同的通信场景,如一对一通信、群组通信、广播等。- HubConnection:HubConnection负责维护客户端与服务器之间的连接,包括发送和接收消息。在建立连接时,客户端需要提供一个连接ID,以便服务器端识别客户端。- HubContext:HubContext用于从服务器端访问Hub的上下文,如获取在线用户列表、发送消息等。通过HubContext,客户端可以调用服务器端的Hub方法。

通过以上分析,我们可以了解到SignalR源码的架构和核心组件。在实际项目中,我们可以根据需求选择合适的SignalR内置Hub,并利用其提供的功能实现实时通信。

如何通过ABP SignalR优化重构消息服务流程?

使用ABP SignalR重构消息服务(二)

上篇使用ABP SignalR重构消息服务(一)主要讲的是SignalR的基础知识和前端如何使用SignalR,这段时间也是落实方案设计。这篇我主要讲解SignalR源码(最近我手头工作比较忙@蟹老板)。

SignalR源码分析(原地址,原地址已经停止维护了合并到了AspNetCore)

使用SignalR我们主要是添加services.AddSignalR();,添加ChatHub类继承我们的Hub ,然后管道注入endpoints.MapHub<ChatHub>("/ChatHub");
通过services.AddSignalR()可以看到使用的类是SignalRDependencyInjectionExtensions
通过Hub类可以看到程序集是Microsoft.AspNetCore.SignalR.Core
通过MapHub<ChatHub>可以看到使用的类是HubEndpointRouteBuilderExtensions

SignalR服务注册

我们先分析services.AddSignalR()注入做了什么准备

这里我们要讲一个东西Microsoft.AspNetCore.SignalR.Core类库有一个SignalRDependencyInjectionExtensions
Microsoft.AspNetCore.SignalR类库也存在一个SignalRDependencyInjectionExtensions

Microsoft.AspNetCore.SignalR类库中的SignalRDependencyInjectionExtensions解读

public static class SignalRDependencyInjectionExtensions { // 单独注入SignalR配置 public static ISignalRServerBuilder AddHubOptions<THub>(this ISignalRServerBuilder signalrBuilder, Action<HubOptions<THub>> configure) where THub : Hub { if (signalrBuilder == null) { throw new ArgumentNullException(nameof(signalrBuilder)); } signalrBuilder.Services.AddSingleton<IConfigureOptions<HubOptions<THub>>, HubOptionsSetup<THub>>(); signalrBuilder.Services.Configure(configure); return signalrBuilder; } // 添加SignalR服务 public static ISignalRServerBuilder AddSignalR(this IServiceCollection services) { if (services == null) { throw new ArgumentNullException(nameof(services)); } // ConnectionsDependencyInjectionExtensions拓展类 添加请求路由、添加身份验证、添加Http连接调度程序、添加Http连接管理器 services.AddConnections(); // 禁用WebSocket保持活动,因为SignalR有它自己的 services.Configure<WebSocketOptions>(o => o.KeepAliveInterval = TimeSpan.Zero); services.TryAddSingleton<SignalRMarkerService>(); services.TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<HubOptions>, HubOptionsSetup>()); //调用 Microsoft.AspNetCore.SignalR.Core 类库中的 SignalRDependencyInjectionExtensions return services.AddSignalRCore(); } // 添加SignalR服务。注入SignalR配置信息 public static ISignalRServerBuilder AddSignalR(this IServiceCollection services, Action<HubOptions> configure) { if (services == null) { throw new ArgumentNullException(nameof(services)); } var signalrBuilder = services.AddSignalR(); services.Configure(configure); return signalrBuilder; } }

Microsoft.AspNetCore.SignalR.Core类库中的SignalRDependencyInjectionExtensions解读
这里面注入了SignalR中核心类,所以下面的代码我们一定要仔细研读了。

public static class SignalRDependencyInjectionExtensions { // 将最小的基本SignalR服务添加IServiceCollection 中 public static ISignalRServerBuilder AddSignalRCore(this IServiceCollection services) { // 用于标记SignalR是否注入 services.TryAddSingleton<SignalRCoreMarkerService>(); // 注入默认集线器生命周期管理器 services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>)); // 注入默认集线器协议解析器 services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver)); // 注入集线器上下文 services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>)); services.TryAddSingleton(typeof(IHubContext<,>), typeof(HubContext<,>)); // 注入集线器中心连接处理程序 services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>)); // 注入获取用户唯一标识方法 services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider)); // 注入默认中心调度员 services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>)); // 注入默认激活中心 services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>)); // 添加授权 services.AddAuthorization(); var builder = new SignalRServerBuilder(services); // 添加Protocol转json builder.AddJsonProtocol(); return builder; } } SignalR集线器设计

通过Hub类可以看到程序集是Microsoft.AspNetCore.SignalR.Core

// Hub 是一个抽象类 public abstract class Hub : IDisposable { private bool _disposed; // 客户端链接 private IHubCallerClients _clients = default!; // 集线器呼叫中心上下文 private HubCallerContext _context = default!; // 集线器组管理 private IGroupManager _groups = default!; // 客户端链接(管理所有用户链接) public IHubCallerClients Clients { get { CheckDisposed(); return _clients; } set { CheckDisposed(); _clients = value; } } // 集线器上下文(保存当前用户链接信息) public HubCallerContext Context { get { CheckDisposed(); return _context; } set { CheckDisposed(); _context = value; } } // 组管理(对于组进行添加或者删除) public IGroupManager Groups { get { CheckDisposed(); return _groups; } set { CheckDisposed(); _groups = value; } } // 连接方法(用于兼容用户连接操作) public virtual Task OnConnectedAsync() { return Task.CompletedTask; } // 链接释放方法(用于监控用户下线操作) public virtual Task OnDisconnectedAsync(Exception? exception) { return Task.CompletedTask; } protected virtual void Dispose(bool disposing) { } public void Dispose() { if (_disposed) { return; } Dispose(true); _disposed = true; } private void CheckDisposed() { if (_disposed) { throw new ObjectDisposedException(GetType().Name); } } } SignalR中间件

通过MapHub可以看到使用的类是HubEndpointRouteBuilderExtensions

app.UseEndpoints(endpoints => { endpoints.MapHub<ChatHub>("/ChatHub"); });

HubEndpointRouteBuilderExtensions源代码

public static class HubEndpointRouteBuilderExtensions { ................................ // 注册集线器 public static HubEndpointConventionBuilder MapHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IEndpointRouteBuilder endpoints, string pattern, Action<HttpConnectionDispatcherOptions>? configureOptions) where THub : Hub { // 这个就是我们上面注册SignalR保留来判断是否注入 var marker = endpoints.ServiceProvider.GetService<SignalRMarkerService>(); if (marker == null) { throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " + "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code."); } // SignalR配置信息 var options = new HttpConnectionDispatcherOptions(); configureOptions?.Invoke(options); // endpoints.MapConnections用来接收第一次连接请求,然后开启对于协议连接 var conventionBuilder = endpoints.MapConnections(pattern, options, b => { // SignalRConnectionBuilderExtensions拓展类(这里是一个重点,将我们的泛型集线器连接进行注入,就可以开始它的工作了) b.UseHub<THub>(); }); .................................... return new HubEndpointConventionBuilder(conventionBuilder); } }

SignalRConnectionBuilderExtensions源代码

public static class SignalRConnectionBuilderExtensions { public static IConnectionBuilder UseHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IConnectionBuilder connectionBuilder) where THub : Hub { var marker = connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService)); if (marker == null) { throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " + "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code."); } // 1.connectionBuilder.UseConnectionHandler拓展方法在 ConnectionBuilderExtensions中 // 2.HubConnectionHandler这个不就是我们注入服务的集线器中心连接处理程序吗? return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>(); } }

ConnectionBuilderExtensions源代码

public static class ConnectionBuilderExtensions { // 执行集线器的连接方法,到了这里就代表本次连接成功了 public static IConnectionBuilder UseConnectionHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler { var handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices); // 这是一个终端中间件,所以没有必要使用'next'参数 return connectionBuilder.Run(connection => handler.OnConnectedAsync(connection)); } } 小结

通过services.AddSignalR()进行SignalR基础服务进行注册。
通过Hub抽象工程,由不同的集线器继承,定义同一的连接、断开方法、客户端连接管理、群组管理、当前上下文信息。
通过MapHub<ChatHub>通过中间件路由规则进行流量划分。
当我们看完上面调用链路,脑中是不是已经有了一个清晰的方向了,它是怎么与前端进行连接的,并且对于注入的服务有一定的了解。

HubConnectionHandler连接处理

我们已经知道进入中间件之后就会进入HubConnectionHandler.OnConnectedAsync()方法

public override async Task OnConnectedAsync(ConnectionContext connection) { // 我们检查是否设置了HubOptions<THub>,因为它们优先于全局hub选项。 // 然后将keepAlive和handshakeTimeout值设置为HubOptionsSetup中的默认值,当它们显式地设置为null时。 var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols; if (supportedProtocols == null || supportedProtocols.Count == 0) { throw new InvalidOperationException("There are no supported protocols"); } // 默认握手超时15分钟 var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout; // 集线器连接配置 var contextOptions = new HubConnectionContextOptions() { KeepAliveInterval = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval, ClientTimeoutInterval = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval, StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity, MaximumReceiveMessageSize = _maximumMessageSize, SystemClock = SystemClock, MaximumParallelInvocations = _maxParallelInvokes, }; Log.ConnectedStarting(_logger); // 创建连接上下文,将用户信息添加到上下文中 var connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory); var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList<string>) ?? supportedProtocols.ToList(); if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors)) { return; } // 已建立connectionContext try { // 默认集线器生命周期管理器(DefaultHubLifetimeManager)将当前用户添加到连接池中 await _lifetimeManager.OnConnectedAsync(connectionContext); // 获取我们对应的集线器,执行OnConnectedAsync()方法,这个时候就真正的开始执行我们写的代码了。 // 里面有一个消息分配方法DispatchMessagesAsync(),获取我们交互的信息进行处理 await RunHubAsync(connectionContext); } finally { connectionContext.Cleanup(); Log.ConnectedEnding(_logger); // 当处理消息方法跳出,之后代表当前用户已经断开连接了,所以我们需要触发断线方法 await _lifetimeManager.OnDisconnectedAsync(connectionContext); } } SignalR异步分派消息

// 异步分派消息 private async Task DispatchMessagesAsync(HubConnectionContext connection) { var input = connection.Input; var protocol = connection.Protocol; connection.BeginClientTimeout(); var binder = new HubConnectionBinder<THub>(_dispatcher, connection); while (true) { var result = await input.ReadAsync(); var buffer = result.Buffer; try { if (result.IsCanceled) { break; } // 存在消息 if (!buffer.IsEmpty) { bool messageReceived = false; // 没有消息限制,只是解析和分派 if (_maximumMessageSize == null) { while (protocol.TryParseMessage(ref buffer, binder, out var message)) { connection.StopClientTimeout(); // 我们接收到了消息,停止超时检查 messageReceived = true; // 将接收的消息,根据不同的类型进行分发处理 await _dispatcher.DispatchMessageAsync(connection, message); } if (messageReceived) { // 处理完接收消息之后,开启超时检查 connection.BeginClientTimeout(); } } else { // 我们给解析器一个默认消息大小的滑动窗口 var maxMessageSize = _maximumMessageSize.Value; while (!buffer.IsEmpty) { var segment = buffer; var overLength = false; // 切分消息,慢慢进行处理 if (segment.Length > maxMessageSize) { segment = segment.Slice(segment.Start, maxMessageSize); overLength = true; } if (protocol.TryParseMessage(ref segment, binder, out var message)) { connection.StopClientTimeout(); // 我们接收到了消息,停止超时检查 messageReceived = true; // 将接收的消息,根据不同的类型进行分发处理 await _dispatcher.DispatchMessageAsync(connection, message); } else if (overLength) { throw new InvalidDataException($"The maximum message size of {maxMessageSize}B was exceeded. The message size can be configured in AddHubOptions."); } else { // No need to update the buffer since we didn't parse anything break; } // Update the buffer to the remaining segment buffer = buffer.Slice(segment.Start); } if (messageReceived) { connection.BeginClientTimeout(); } } } if (result.IsCompleted) { if (!buffer.IsEmpty) { throw new InvalidDataException("Connection terminated while reading a message."); } break; } } finally { // 缓冲区被分割到它被消耗的地方,所以我们可以直接开始。 我们把检查标记为缓冲。 结束,如果我们没有收到完整的帧,我们将等待更多的数据 再读一遍之前。 input.AdvanceTo(buffer.Start, buffer.End); } } SignalR针对用户发送消息

针对于群发消息,我们知道有一个组的容器,我们只要将大家添加到一个组中就可以了,那么我们想根据用户发送消息1:1的模式,SignalR源码中是怎么处理的呢?

在注册SignalR服务中我们可以看到这个services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));

public class DefaultUserIdProvider : IUserIdProvider { // 获取当前用户标识 public virtual string? GetUserId(HubConnectionContext connection) { // 这个也就是为什么我们在不做任何处理之下想使用SignalR用户模式,需要在Jwt中添加一个ClaimTypes.NameIdentifier了 return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value; } }

我们只需要自己定义一个实现类,将默认实现替换掉就可以了。

// 用户模式发送源码 public override Task SendUserAsync(string userId, string methodName, object?[] args, CancellationToken cancellationToken = default) { // connection.UserIdentifier就是执行了GetUserId()方法获取的用户标识 return SendToAllConnections(methodName, args, (connection, state) => string.Equals(connection.UserIdentifier, (string)state!, StringComparison.Ordinal), userId, cancellationToken); } SignalR项目使用设计

上面我们已经讲完,SignalR从连接==>处理消息以及用户模式的源码设计,相信小伙伴已经脑海中已经有点东西了,那么就开始项目中实践方式

我主要负责提供基础SignalR库,给到不同的部门进行使用,所以我首先需要考虑到一个高内聚,低耦合的设计,这里我首先不能掺杂业务逻辑,但是又需要所有业务聚合到我这边,然后通过不同的业务进行不同的处理。
设计思路:

  • 定义两个接口IReceiveMessageISendMessage,接口中分别有MessageType属性,HandlerAsync(input)方法
  • 定义一个公用的集线器注入IEnumerable<IReceiveMessage>IEnumerable<ISendMessage>添加Receive(input)Send(input)方法通过不同的入参中的MessageType属性,从注入集合中获取对应的消息实现进行处理

集线器伪代码

public class SignalRHub : Hub { private readonly IEnumerable<IReceiveMessage> _receiveMessages; private readonly IEnumerable<ISendMessage> _sendMessages; public SignalRHub(IEnumerable<IReceiveMessage> receiveMessages, IEnumerable<ISendMessage> sendMessages) { _receiveMessages = receiveMessages; _sendMessages = sendMessages; } public async Task Receive(SignalRReceiveMessage input) { await _receiveMessages.FirstOrDefault(x => string.Compare(x.MessageType, input.MessageType, true) == 0).HandlerAsync(input); } public async Task Send(SignalRSendMessage outInput) { await _sendMessages.FirstOrDefault(x => string.Compare(x.MessageType, outInput.MessageType, true) == 0).HandlerAsync(outInput); } }

业务实现示例

public class NotificationSendMessage : ISendMessage, ISingletonDependency { public string MessageType { get => SignalRSendMessageEnum.Notification.ToString(); } public Task HandlerAsync(SignalRSendMessage message) { //.......业务逻辑...... } }

这样我就只需要接收消息,进行转发给对应实现就可以了,我给同事提供了SignalR服务,又不干涉他们的业务。

我曾七次鄙视自己的灵魂:
第一次,当它本可进取时,却故作谦卑;
第二次,当它空虚时,用爱欲来填充;
第三次,在困难和容易之间,它选择了容易;
第四次,它犯了错,却借由别人也会犯错来宽慰自己;
第五次,它自由软弱,却把它认为是生命的坚韧;
第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;
第七次,它侧身于生活的污泥中虽不甘心,却又畏首畏尾。