Spring Cloud微服务如何通过WebSocket实现长连接通信?

2026-04-17 23:521阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计900个文字,预计阅读时间需要4分钟。

Spring Cloud微服务如何通过WebSocket实现长连接通信?

WebSocket简介:WebSocket是一种在单个TCP连接上进行全双工通信的协议,允许双向数据推送。与一般微服务提供的RESTful API不同,它允许前后端进行实时通信。

WebSocket长连接是一种在单一TCP连接上实现全双工通信的协议,允许双向数据实时推送。

一、webSocket简介

webSocket长连接是一种在单个tcp连接上进行全双工通信的协议,允许双向数据推送。一般微服务提供的restful API只是对前端请求做出相应。使用webSocket可以实现后端主动向前端推送消息。

二、网关配置

spring cloud 的网关组件有zuul和getway

1、getway

配置网关的时候注意添加ws协议

spring: cloud: gateway: discovery: locator: lowerCaseServiceId: true enabled: true routes: - id: zhgsgl-warning-websocket # 路由的唯一标识 uri: lb:ws://zhgsgl-warning # 修改点 predicates: - Path=/ws/warning/** filters: - StripPrefix=2 # 修改点 - id: zhgsgl-data-websocket# 路由的唯一标识 uri: lb:ws://zhgsgl-data # 修改点 predicates: - Path=/ws/data/** filters: - StripPrefix=2 #修改点 # 安全配置 security: # 不校验白名单 ignore: whites: - /ws/** # 修改点 2、zuul

zuul只能管理192.168.2.137:9403/websocket?user=张三 // 通过网关模块的连接地址 ws://192.168.2.137:8080/ws/websocket?user=张三 // 测试发送消息格式:{"content":"内容","targetId":"0"} registry .addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/websocket") // 指定自定义拦截器 .addInterceptors(new WebSocketInterceptor()) // 允许跨域 .setAllowedOrigins("*"); // sockjs通道 registry .addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/sock-js") .addInterceptors(new WebSocketInterceptor()) .setAllowedOrigins("*") // 开启sockJs支持 .withSockJS(); } }

科普:websocket与sockJS的区别

Spring Cloud微服务如何通过WebSocket实现长连接通信?

实际上就是前端的区别,一些浏览器不支持websocket,则用sockJS库来处理ws连接,一般都是用sockJS

3.添加处理器

import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class WebSocketHandler extends AbstractWebSocketHandler { /** * 存储sessionId和webSocketSession * 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储 * 在分布式系统中,要想别的办法实现webSocketSession共享 */ private static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>(); private static Map<String, String> userMap = new ConcurrentHashMap<>(); /** * webSocket连接创建后调用 */ @Override public void afterConnectionEstablished(WebSocketSession session) { // 获取参数 String user = String.valueOf(session.getAttributes().get("user")); userMap.put(user, session.getId()); sessionMap.put(session.getId(), session); log.info("############### [ws : 连接成功] ###############"); } /** * 接收到消息会调用 */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { log.info("############### [ws消息:{}] ###############", message.getPayload().toString()); JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString()); String content = jsonObject.getString("content"); String targetAdminId = jsonObject.getString("targetId"); if ("0".equals(targetAdminId)) { // 推送给所有人 userMap.forEach((key, value) -> { try { this.sendMessage(key, content); } catch (IOException e) { e.printStackTrace(); } }); } else { sendMessage("1", content); } log.info("// ############### [ws 处理消息成功] ###############"); } /** * 连接出错会调用 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { sessionMap.remove(session.getId()); } /** * 连接关闭会调用 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { sessionMap.remove(session.getId()); } @Override public boolean supportsPartialMessages() { return false; } /** * 后端发送消息 */ public void sendMessage(String user, String message) throws IOException { String sessionId = userMap.get(user); if (StringUtils.isEmpty(sessionId)) { return; } WebSocketSession session = sessionMap.get(sessionId); if (session == null) { return; } session.sendMessage(new TextMessage(message)); } } 4.添加拦截器

import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; public class WebSocketInterceptor implements HandshakeInterceptor { /** * handler处理前调用,attributes属性最终在WebSocketSession里, * 可能通过webSocketSession.getAttributes().get(key值)获得 */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request; // 获取请求路径携带的参数 String user = serverHttpRequest.getServletRequest().getParameter("user"); attributes.put("user", user); return true; } else { return false; } } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Exception exception) { } }

测试连接,亲测使用若依cloud框架是可以使用的,就不展示测试代码

四、参考文献

原文地址

微服务springcloud环境下基于Netty搭建websocket集群实现高并发,高性能,高可用的服务器消息推送--经典案例(已在工作中实战应用)netty是yyds!

IDEA 中 同一个微服务 按照多个端口启动

本文共计900个文字,预计阅读时间需要4分钟。

Spring Cloud微服务如何通过WebSocket实现长连接通信?

WebSocket简介:WebSocket是一种在单个TCP连接上进行全双工通信的协议,允许双向数据推送。与一般微服务提供的RESTful API不同,它允许前后端进行实时通信。

WebSocket长连接是一种在单一TCP连接上实现全双工通信的协议,允许双向数据实时推送。

一、webSocket简介

webSocket长连接是一种在单个tcp连接上进行全双工通信的协议,允许双向数据推送。一般微服务提供的restful API只是对前端请求做出相应。使用webSocket可以实现后端主动向前端推送消息。

二、网关配置

spring cloud 的网关组件有zuul和getway

1、getway

配置网关的时候注意添加ws协议

spring: cloud: gateway: discovery: locator: lowerCaseServiceId: true enabled: true routes: - id: zhgsgl-warning-websocket # 路由的唯一标识 uri: lb:ws://zhgsgl-warning # 修改点 predicates: - Path=/ws/warning/** filters: - StripPrefix=2 # 修改点 - id: zhgsgl-data-websocket# 路由的唯一标识 uri: lb:ws://zhgsgl-data # 修改点 predicates: - Path=/ws/data/** filters: - StripPrefix=2 #修改点 # 安全配置 security: # 不校验白名单 ignore: whites: - /ws/** # 修改点 2、zuul

zuul只能管理192.168.2.137:9403/websocket?user=张三 // 通过网关模块的连接地址 ws://192.168.2.137:8080/ws/websocket?user=张三 // 测试发送消息格式:{"content":"内容","targetId":"0"} registry .addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/websocket") // 指定自定义拦截器 .addInterceptors(new WebSocketInterceptor()) // 允许跨域 .setAllowedOrigins("*"); // sockjs通道 registry .addHandler(new com.jtsmartway.zhgsgl.data.config.WebSocketHandler(), "/sock-js") .addInterceptors(new WebSocketInterceptor()) .setAllowedOrigins("*") // 开启sockJs支持 .withSockJS(); } }

科普:websocket与sockJS的区别

Spring Cloud微服务如何通过WebSocket实现长连接通信?

实际上就是前端的区别,一些浏览器不支持websocket,则用sockJS库来处理ws连接,一般都是用sockJS

3.添加处理器

import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class WebSocketHandler extends AbstractWebSocketHandler { /** * 存储sessionId和webSocketSession * 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储 * 在分布式系统中,要想别的办法实现webSocketSession共享 */ private static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>(); private static Map<String, String> userMap = new ConcurrentHashMap<>(); /** * webSocket连接创建后调用 */ @Override public void afterConnectionEstablished(WebSocketSession session) { // 获取参数 String user = String.valueOf(session.getAttributes().get("user")); userMap.put(user, session.getId()); sessionMap.put(session.getId(), session); log.info("############### [ws : 连接成功] ###############"); } /** * 接收到消息会调用 */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { log.info("############### [ws消息:{}] ###############", message.getPayload().toString()); JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString()); String content = jsonObject.getString("content"); String targetAdminId = jsonObject.getString("targetId"); if ("0".equals(targetAdminId)) { // 推送给所有人 userMap.forEach((key, value) -> { try { this.sendMessage(key, content); } catch (IOException e) { e.printStackTrace(); } }); } else { sendMessage("1", content); } log.info("// ############### [ws 处理消息成功] ###############"); } /** * 连接出错会调用 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { sessionMap.remove(session.getId()); } /** * 连接关闭会调用 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { sessionMap.remove(session.getId()); } @Override public boolean supportsPartialMessages() { return false; } /** * 后端发送消息 */ public void sendMessage(String user, String message) throws IOException { String sessionId = userMap.get(user); if (StringUtils.isEmpty(sessionId)) { return; } WebSocketSession session = sessionMap.get(sessionId); if (session == null) { return; } session.sendMessage(new TextMessage(message)); } } 4.添加拦截器

import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; public class WebSocketInterceptor implements HandshakeInterceptor { /** * handler处理前调用,attributes属性最终在WebSocketSession里, * 可能通过webSocketSession.getAttributes().get(key值)获得 */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request; // 获取请求路径携带的参数 String user = serverHttpRequest.getServletRequest().getParameter("user"); attributes.put("user", user); return true; } else { return false; } } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Exception exception) { } }

测试连接,亲测使用若依cloud框架是可以使用的,就不展示测试代码

四、参考文献

原文地址

微服务springcloud环境下基于Netty搭建websocket集群实现高并发,高性能,高可用的服务器消息推送--经典案例(已在工作中实战应用)netty是yyds!

IDEA 中 同一个微服务 按照多个端口启动