Spring中的Web访问:WebSocket支持
本文涵盖了 Servlet 堆栈支持、WebSocket 消息传递(包括原始 WebSocket 交互)、通过 SockJS 的 WebSocket 模拟以及通过 STOMP 作为 WebSocket 上的子协议的发布-订阅消息传递的内容。
# WebSocket 简介
WebSocket 协议,RFC 6455 (opens new window),提供了一种标准化的方法,用于在客户端和服务器之间通过单个 TCP 连接建立全双工、双向通信通道。它是一种与 HTTP 不同的 TCP 协议,但设计为通过 HTTP 工作,使用端口 80 和 443,并允许重用现有的防火墙规则。
WebSocket 交互始于一个 HTTP 请求,该请求使用 HTTP Upgrade
标头来升级,或者在这种情况下,切换到 WebSocket 协议。以下示例显示了这种交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket # (1)
Connection: Upgrade # (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
- (1)
Upgrade
标头。 - (2) 使用
Upgrade
连接。
具有 WebSocket 支持的服务器不返回通常的 200 状态代码,而是返回类似于以下内容的输出:
HTTP/1.1 101 Switching Protocols # (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
- (1) 协议切换
成功握手后,HTTP 升级请求的基础 TCP 套接字保持打开状态,客户端和服务器都可以继续发送和接收消息。
完整介绍 WebSocket 的工作原理超出了本文档的范围。请参阅 RFC 6455、HTML5 的 WebSocket 章节或 Web 上的许多介绍和教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如,nginx)后面运行,您可能需要配置它以将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查云提供商的与 WebSocket 支持相关的说明。
# HTTP 与 WebSocket
即使 WebSocket 设计为与 HTTP 兼容并且以 HTTP 请求开始,但重要的是要理解这两种协议会导致非常不同的架构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为许多 URL。为了与应用程序交互,客户端访问这些 URL,采用请求-响应风格。服务器根据 HTTP URL、方法和标头将请求路由到适当的处理程序。
相比之下,在 WebSockets 中,通常只有一个 URL 用于初始连接。随后,所有应用程序消息都在同一 TCP 连接上流动。这指向完全不同的异步、事件驱动的消息传递架构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定消息内容的任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。
WebSocket 客户端和服务器可以通过 HTTP 握手请求中的 Sec-WebSocket-Protocol
标头协商使用更高级别的消息传递协议(例如,STOMP)。在没有它的情况下,他们需要提出自己的约定。
# 何时使用 WebSockets
WebSockets 可以使网页具有动态性和交互性。但是,在许多情况下,AJAX 和 HTTP 流式传输或长轮询的组合可以提供简单有效的解决方案。
例如,新闻、邮件和社交 Feed 需要动态更新,但每隔几分钟执行一次可能完全可以。另一方面,协作、游戏和金融应用程序需要更接近实时。
延迟本身不是决定性因素。如果消息量相对较低(例如,监视网络故障),HTTP 流式传输或轮询可以提供有效的解决方案。低延迟、高频率和高容量的组合是使用 WebSocket 的最佳理由。
还要记住,在 Internet 上,您无法控制的限制性代理可能会排除 WebSocket 交互,原因可能是它们未配置为传递 Upgrade
标头,或者因为它们关闭了看似空闲的长期连接。这意味着在防火墙内的内部应用程序中使用 WebSocket 比面向公众的应用程序更直接。
# WebSocket API
Spring Framework 提供了 WebSocket API,你可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。
# WebSocketHandler
创建 WebSocket 服务器就像实现 WebSocketHandler
,或者更常见的是,继承 TextWebSocketHandler
或 BinaryWebSocketHandler
一样简单。以下示例使用 TextWebSocketHandler
:
public class MyHandler extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}
}
Spring 提供了专用的 WebSocket 编程配置和 XML 命名空间支持,用于将前面的 WebSocket 处理程序映射到特定的 URL,如以下示例所示:
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler");
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
</websocket:handlers>
<bean id="myHandler" class="org.springframework.docs.web.websocket.websocketserverhandler.MyHandler"/>
</beans>
前面的示例用于 Spring MVC 应用程序,应包含在 DispatcherServlet
的配置中。但是,Spring 的 WebSocket 支持不依赖于 Spring MVC。借助 WebSocketHttpRequestHandler
,将 WebSocketHandler
集成到其他 HTTP 服务环境中相对简单。
当直接使用 WebSocketHandler
API 而不是间接使用时(例如,通过 STOMP 消息传递),应用程序必须同步消息的发送,因为底层标准 WebSocket 会话 (JSR-356) 不允许并发发送。一种选择是使用 ConcurrentWebSocketSessionDecorator
包装 WebSocketSession
。
# WebSocket 握手
自定义初始 HTTP WebSocket 握手请求的最简单方法是通过 HandshakeInterceptor
,它公开了“before”和“after”握手的方法。你可以使用这样的拦截器来阻止握手或使任何属性可用于 WebSocketSession
。以下示例使用内置拦截器将 HTTP 会话属性传递给 WebSocket 会话:
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyHandler(), "/myHandler")
.addInterceptors(new HttpSessionHandshakeInterceptor());
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
<websocket:handshake-interceptors>
<bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/>
</websocket:handshake-interceptors>
</websocket:handlers>
<bean id="myHandler" class="org.springframework.docs.web.websocket.websocketserverhandler.MyHandler"/>
</beans>
一个更高级的选项是扩展 DefaultHandshakeHandler
,它执行 WebSocket 握手的步骤, 包括验证客户端来源,协商子协议和其他详细信息。如果应用程序需要配置自定义的 RequestUpgradeStrategy
,以便适应尚未支持的WebSocket服务器引擎和版本,则可能还需要使用此选项(有关此主题的更多信息,请参见 Deployment 章节)。Java 配置和 XML 命名空间都可以配置自定义的 HandshakeHandler
。
提示:
Spring 提供了一个
WebSocketHandlerDecorator
基类,你可以使用它来使用其他行为装饰WebSocketHandler
。使用 WebSocket Java 配置或 XML 命名空间时,默认情况下会提供并添加日志记录和异常处理实现。ExceptionWebSocketHandlerDecorator
捕获从任何WebSocketHandler
方法产生的所有未捕获的异常,并使用状态1011
关闭 WebSocket 会话,表明服务器错误。
# 部署
Spring WebSocket API 易于集成到 Spring MVC 应用程序中,其中 DispatcherServlet
同时为 HTTP WebSocket 握手和其他 HTTP 请求提供服务。通过调用 WebSocketHttpRequestHandler
,也可以轻松地将其集成到其他 HTTP 处理方案中。这很方便且易于理解。但是,对于 JSR-356 运行时,有一些特殊的考虑事项。
Jakarta WebSocket API (JSR-356) 提供了两种部署机制。第一种涉及启动时 Servlet 容器类路径扫描(Servlet 3 功能)。另一种是用于 Servlet 容器初始化的注册 API。这些机制都无法为所有 HTTP 处理使用单个“前端控制器” —— 包括 WebSocket 握手和所有其他 HTTP 请求 —— 例如 Spring MVC 的 DispatcherServlet
。
这是 JSR-356 的一个重大限制,即使在 JSR-356 运行时运行,Spring 的 WebSocket 支持也可以通过服务器特定的 RequestUpgradeStrategy
实现来解决。此类策略当前存在于 Tomcat、Jetty、GlassFish、WebLogic、WebSphere 和 Undertow(和 WildFly)中。从 Jakarta WebSocket 2.1 开始,可以使用标准的请求升级策略,Spring 会在基于 Jakarta EE 10 的 Web 容器(如 Tomcat 10.1 和 Jetty 12)上选择该策略。
第二个考虑因素是,具有 JSR-356 支持的 Servlet 容器应执行 ServletContainerInitializer
(SCI) 扫描,这会减慢应用程序的启动速度 —— 在某些情况下,会大大减慢。如果在升级到具有 JSR-356 支持的 Servlet 容器版本后观察到重大影响,则应可以通过使用 web.xml
中的 <absolute-ordering />
元素有选择地启用或禁用 Web 片段(和 SCI 扫描),如以下示例所示:
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/web-app_5_0.xsd"
version="5.0">
<absolute-ordering/>
</web-app>
然后,你可以按名称选择性地启用 Web 片段,例如 Spring 自己的 SpringServletContainerInitializer
,它为 Servlet 3 Java 初始化 API 提供支持。以下示例显示了如何执行此操作:
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/web-app_5_0.xsd"
version="5.0">
<absolute-ordering>
<name>spring_web</name>
</absolute-ordering>
</web-app>
# 配置服务器
你可以配置底层 WebSocket 服务器,例如输入消息缓冲区大小、空闲超时等。
对于 Jakarta WebSocket 服务器,你可以将 ServletServerContainerFactoryBean
添加到你的配置中。例如:
@Configuration
public class WebSocketConfiguration {
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
}
<bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean">
<property name="maxTextMessageBufferSize" value="8192"/>
<property name="maxBinaryMessageBufferSize" value="8192"/>
</bean>
注意:
对于客户端 Jakarta WebSocket 配置,请在编程配置中使用
ContainerProvider.getWebSocketContainer()
,或在 XML 中使用WebSocketContainerFactoryBean
。
对于 Jetty,你可以提供一个回调来配置 WebSocket 服务器:
@Configuration
@EnableWebSocket
public class JettyWebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(echoWebSocketHandler(), "/echo").setHandshakeHandler(handshakeHandler());
}
@Bean
public WebSocketHandler echoWebSocketHandler() {
return new MyEchoHandler();
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
JettyRequestUpgradeStrategy strategy = new JettyRequestUpgradeStrategy();
strategy.addWebSocketConfigurer(configurable -> {
configurable.setInputBufferSize(8192);
configurable.setIdleTimeout(Duration.ofSeconds(600));
});
return new DefaultHandshakeHandler(strategy);
}
}
提示:
使用 STOMP over WebSocket 时,你还需要配置 STOMP WebSocket transport 属性。
# 允许的来源
从 Spring Framework 4.1.5 开始,WebSocket 和 SockJS 的默认行为是仅接受同源请求。也可以允许所有来源或指定的来源列表。此检查主要针对浏览器客户端。没有任何措施可以阻止其他类型的客户端修改 Origin
标头值(有关更多详细信息,请参阅 RFC 6454:Web 来源概念 (opens new window))。
三种可能的行为是:
- 仅允许同源请求(默认):在此模式下,启用 SockJS 时,Iframe HTTP 响应标头
X-Frame-Options
设置为SAMEORIGIN
,并且禁用 JSONP 传输,因为它不允许检查请求的来源。因此,启用此模式后,不支持 IE6 和 IE7。 - 允许指定的来源列表:每个允许的来源必须以
http://
或https://
开头。在此模式下,启用 SockJS 时,将禁用 IFrame 传输。因此,启用此模式后,不支持 IE6 到 IE9。 - 允许所有来源:要启用此模式,你应该提供
*
作为允许的来源值。在此模式下,所有传输均可用。
你可以配置 WebSocket 和 SockJS 允许的来源,如以下示例所示:
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("https://mydomain.com");
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers allowed-origins="https://mydomain.com">
<websocket:mapping path="/myHandler" handler="myHandler" />
</websocket:handlers>
<bean id="myHandler" class="org.springframework.docs.web.websocket.websocketserverhandler.MyHandler" />
</beans>
# SockJS 支持
在公共互联网上,不受你控制的限制性代理可能会阻止 WebSocket 交互,原因可能是它们未配置为传递 Upgrade
标头,或者它们会关闭看似空闲的长时间连接。
解决这个问题的方法是 WebSocket 模拟,即首先尝试使用 WebSocket,然后在必要时回退到基于 HTTP 的技术,这些技术模拟 WebSocket 交互并公开相同的应用程序级别 API。
在 Servlet 栈上,Spring Framework 提供了对 SockJS 协议的服务器端(以及客户端)支持。
# 概述
SockJS 的目标是让应用程序使用 WebSocket API,但在运行时必要时回退到非 WebSocket 替代方案,而无需更改应用程序代码。
SockJS 包括:
- 以可执行测试形式定义的 SockJS 协议 (opens new window)。
- SockJS JavaScript 客户端 (opens new window) — 一个用于浏览器的客户端库。
- SockJS 服务器实现,包括 Spring Framework
spring-websocket
模块中的一个。 spring-websocket
模块中的 SockJS Java 客户端(自 4.1 版本起)。
SockJS 专为在浏览器中使用而设计。它使用多种技术来支持各种浏览器版本。有关 SockJS 传输类型和浏览器的完整列表,请参见 SockJS 客户端 (opens new window) 页面。传输方式分为三大类:WebSocket、HTTP Streaming 和 HTTP Long Polling。有关这些类别的概述,请参见 此博客文章 (opens new window)。
SockJS 客户端首先发送 GET /info
以从服务器获取基本信息。之后,它必须决定使用哪种传输方式。如果可能,则使用 WebSocket。如果不可行,则在大多数浏览器中,至少有一个 HTTP streaming 选项。如果没有,则使用 HTTP (long) polling。
所有传输请求都具有以下 URL 结构:
https://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
其中:
{server-id}
对于在集群中路由请求很有用,但在其他情况下不使用。{session-id}
关联属于 SockJS 会话的 HTTP 请求。{transport}
指示传输类型(例如,websocket
、xhr-streaming
等)。
WebSocket 传输只需要一个 HTTP 请求即可进行 WebSocket 握手。之后,所有消息都在该套接字上交换。
HTTP 传输需要更多请求。例如,Ajax/XHR streaming 依赖于一个长时间运行的请求来传输服务器到客户端的消息,并依赖于额外的 HTTP POST 请求来传输客户端到服务器的消息。Long polling 与此类似,只是它在每次服务器到客户端发送后都会结束当前请求。
SockJS 添加了最少的消息帧。例如,服务器最初发送字母 o
(“open” 帧),消息作为 a["message1","message2"]
(JSON 编码的数组)发送,如果没有消息流动 25 秒(默认情况下),则发送字母 h
(“heartbeat” 帧),并发送字母 c
(“close” 帧)以关闭会话。
要了解更多信息,请在浏览器中运行示例并观察 HTTP 请求。SockJS 客户端允许固定传输列表,因此可以一次查看每种传输。SockJS 客户端还提供了一个调试标志,该标志可在浏览器控制台中启用有用的消息。在服务器端,你可以为 org.springframework.web.socket
启用 TRACE
日志记录。要获得更多详细信息,请参见 SockJS 协议 带注释的测试 (opens new window)。
# 启用 SockJS
你可以通过配置启用 SockJS,如以下示例所示:
@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").withSockJS();
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
<websocket:sockjs/>
</websocket:handlers>
<bean id="myHandler" class="org.springframework.docs.web.websocket.websocketserverhandler.MyHandler"/>
</beans>
前面的示例用于 Spring MVC 应用程序,应包含在 DispatcherServlet
的配置中。但是,Spring 的 WebSocket 和 SockJS 支持不依赖于 Spring MVC。借助 SockJsHttpRequestHandler
(opens new window),相对容易地将其集成到其他 HTTP 服务环境中。
在浏览器端,应用程序可以使用 sockjs-client
(opens new window)(版本 1.0.x)。它模拟 W3C WebSocket API 并与服务器通信,以根据其运行的浏览器选择最佳传输选项。请参见 sockjs-client (opens new window) 页面以及浏览器支持的传输类型列表。客户端还提供了多个配置选项,例如,指定要包含哪些传输。
# IE 8 和 9
Internet Explorer 8 和 9 仍在被使用。它们是使用 SockJS 的一个主要原因。本节介绍有关在这些浏览器中运行的重要注意事项。
SockJS 客户端通过使用 Microsoft 的 XDomainRequest
(opens new window) 在 IE 8 和 9 中支持 Ajax/XHR streaming。这可以跨域工作,但不支持发送 Cookie。Cookie 对于 Java 应用程序通常至关重要。但是,由于 SockJS 客户端可以与多种服务器类型一起使用(不仅仅是 Java 服务器),因此它需要知道 Cookie 是否重要。如果是这样,SockJS 客户端会首选 Ajax/XHR for streaming。否则,它依赖于基于 iframe 的技术。
来自 SockJS 客户端的第一个 /info
请求是用于请求信息的请求,该信息会影响客户端对传输方式的选择。其中一个细节是服务器应用程序是否依赖 Cookie(例如,用于身份验证目的或使用粘性会话进行集群)。Spring 的 SockJS 支持包括一个名为 sessionCookieNeeded
的属性。默认情况下启用该属性,因为大多数 Java 应用程序都依赖于 JSESSIONID
Cookie。如果你的应用程序不需要它,则可以关闭此选项,SockJS 客户端应在 IE 8 和 9 中选择 xdr-streaming
。
如果确实使用基于 iframe 的传输,请记住,可以通过将 HTTP 响应标头 X-Frame-Options
设置为 DENY
、SAMEORIGIN
或 ALLOW-FROM <origin>
来指示浏览器阻止在给定页面上使用 IFrame。这用于防止 点击劫持 (opens new window)。
Spring Security 3.2+ 提供了在每个响应上设置
X-Frame-Options
的支持。默认情况下,Spring Security Java 配置将其设置为DENY
。在 3.2 中,Spring Security XML 命名空间默认情况下未设置该标头,但可以配置为这样做。将来,它可能会默认设置它。有关如何配置
X-Frame-Options
标头的设置的详细信息,请参见 Spring Security 文档的 默认安全标头 (opens new window)。你还可以参见 gh-2718 (opens new window) 以获取其他背景信息。
如果你的应用程序添加了 X-Frame-Options
响应标头(应该这样做!),并且依赖于基于 iframe 的传输,则需要将标头值设置为 SAMEORIGIN
或 ALLOW-FROM <origin>
。Spring SockJS 支持还需要知道 SockJS 客户端的位置,因为它从 iframe 加载。默认情况下,iframe 设置为从 CDN 位置下载 SockJS 客户端。最好配置此选项以使用与应用程序相同来源的 URL。
以下示例显示了如何在 Java 配置中执行此操作:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS()
.setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js");
}
// ...
}
XML 命名空间通过 <websocket:sockjs>
元素提供类似的选项。
在初始开发期间,请启用 SockJS 客户端
devel
模式,该模式可防止浏览器缓存 SockJS 请求(如 iframe),否则这些请求将被缓存。有关如何启用它的详细信息,请参见 SockJS 客户端 (opens new window) 页面。
# 心跳
SockJS 协议要求服务器发送心跳消息,以防止代理得出连接挂起的结论。Spring SockJS 配置具有一个名为 heartbeatTime
的属性,可以使用该属性自定义频率。默认情况下,如果在该连接上没有发送其他消息,则在 25 秒后发送心跳。此 25 秒的值符合以下 IETF 建议 (opens new window),适用于公共互联网应用程序。
当使用 STOMP over WebSocket 和 SockJS 时,如果 STOMP 客户端和服务器协商要交换的心跳,则会禁用 SockJS 心跳。
Spring SockJS 支持还允许你配置 TaskScheduler
以安排心跳任务。任务调度程序由线程池支持,默认设置基于可用处理器的数量。你应该考虑根据你的特定需求自定义设置。
# 客户端断开连接
HTTP streaming 和 HTTP long polling SockJS 传输需要连接保持打开的时间比平时更长。有关这些技术的概述,请参见 此博客文章 (opens new window)。
在 Servlet 容器中,这是通过 Servlet 3 异步支持完成的,该支持允许退出 Servlet 容器线程、处理请求并从另一个线程继续写入响应。
一个具体的问题是,Servlet API 不提供有关客户端已断开连接的通知。参见 eclipse-ee4j/servlet-api#44 (opens new window)。但是,Servlet 容器会在后续尝试写入响应时引发异常。由于 Spring 的 SockJS Service 支持服务器发送的心跳(默认情况下每 25 秒一次),这意味着通常会在该时间段内检测到客户端断开连接(或者如果消息发送得更频繁,则更早)。
因此,由于客户端已断开连接,可能会发生网络 I/O 故障,这可能会用不必要的堆栈跟踪填充日志。Spring 会尽最大努力识别代表客户端断开连接的此类网络故障(特定于每个服务器),并通过使用专用日志类别
DISCONNECTED_CLIENT_LOG_CATEGORY
(在AbstractSockJsSession
中定义)记录最少的消息。如果需要查看堆栈跟踪,可以将该日志类别设置为 TRACE。
# SockJS 和 CORS
如果允许跨域请求(请参见 允许的来源),则 SockJS 协议使用 CORS 在 XHR streaming 和 polling 传输中提供跨域支持。因此,会自动添加 CORS 标头,除非检测到响应中存在 CORS 标头。因此,如果应用程序已经配置为提供 CORS 支持(例如,通过 Servlet 过滤器),则 Spring 的 SockJsService
会跳过此部分。
也可以通过在 Spring 的 SockJsService 中设置 suppressCors
属性来禁用添加这些 CORS 标头。
SockJS 需要以下标头和值:
Access-Control-Allow-Origin
:从Origin
请求标头的值初始化。Access-Control-Allow-Credentials
:始终设置为true
。Access-Control-Request-Headers
:从等效请求标头中的值初始化。Access-Control-Allow-Methods
:传输支持的 HTTP 方法(请参见TransportType
枚举)。Access-Control-Max-Age
:设置为 31536000(1 年)。
有关确切的实现,请参见 AbstractSockJsService
中的 addCorsHeaders
和源代码中的 TransportType
枚举。
或者,如果 CORS 配置允许,请考虑排除带有 SockJS 端点前缀的 URL,从而让 Spring 的 SockJsService
处理它。
# SockJsClient
Spring 提供了一个 SockJS Java 客户端,用于连接到远程 SockJS 端点,而无需使用浏览器。当需要在公共网络上的两台服务器之间进行双向通信时(也就是说,网络代理可能会阻止使用 WebSocket 协议),这尤其有用。SockJS Java 客户端对于测试目的(例如,模拟大量并发用户)也非常有用。
SockJS Java 客户端支持 websocket
、xhr-streaming
和 xhr-polling
传输。其余的只在浏览器中有意义。
你可以使用以下内容配置 WebSocketTransport
:
- JSR-356 运行时的
StandardWebSocketClient
。 - 通过使用 Jetty 9+ 原生 WebSocket API 的
JettyWebSocketClient
。 - Spring 的
WebSocketClient
的任何实现。
根据定义,XhrTransport
支持 xhr-streaming
和 xhr-polling
,因为从客户端的角度来看,除了用于连接到服务器的 URL 之外,没有其他区别。目前有两个实现:
RestTemplateXhrTransport
使用 Spring 的RestTemplate
进行 HTTP 请求。JettyXhrTransport
使用 Jetty 的HttpClient
进行 HTTP 请求。
以下示例显示了如何创建 SockJS 客户端并连接到 SockJS 端点:
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");
SockJS 使用 JSON 格式化的数组来传递消息。默认情况下,使用 Jackson 2,并且需要在类路径上。或者,你可以配置
SockJsMessageCodec
的自定义实现,并在SockJsClient
上配置它。
要使用 SockJsClient
模拟大量并发用户,你需要配置底层 HTTP 客户端(对于 XHR 传输)以允许足够数量的连接和线程。以下示例显示了如何使用 Jetty 执行此操作:
HttpClient jettyHttpClient = new HttpClient();
jettyHttpClient.setMaxConnectionsPerDestination(1000);
jettyHttpClient.setExecutor(new QueuedThreadPool(1000));
以下示例显示了服务器端 SockJS 相关属性(有关详细信息,请参见 javadoc),你也应该考虑自定义:
@Configuration
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/sockjs").withSockJS()
.setStreamBytesLimit(512 * 1024) // (1) 设置 streamBytesLimit 属性为 512KB (默认是 128KB — 128 * 1024).
.setHttpMessageCacheSize(1000) // (2) 设置 httpMessageCacheSize 属性为 1,000 (默认是 100).
.setDisconnectDelay(30 * 1000); // (3) 设置 disconnectDelay 属性为 30 秒 (默认是 5 秒 — 5 * 1000).
}
// ...
}
# STOMP
WebSocket 协议定义了两种类型的消息(文本和二进制),但它们的内容是未定义的。该协议定义了一种机制,供客户端和服务器协商一个子协议(即,一个更高级别的消息传递协议),以便在 WebSocket 之上使用,从而定义每种消息可以发送什么类型,格式是什么,每条消息的内容等等。 使用子协议是可选的,但无论如何,客户端和服务器都需要就一些协议达成一致,以定义消息内容。
# 概述
STOMP (opens new window)(Simple Text Oriented Messaging Protocol,简单文本定向消息协议)最初是为脚本语言(如 Ruby、Python 和 Perl)连接到企业消息代理而创建的。它旨在解决常用消息传递模式的最小子集。STOMP 可以通过任何可靠的双向流网络协议(如 TCP 和 WebSocket)使用。虽然 STOMP 是一种面向文本的协议,但消息负载可以是文本或二进制。
STOMP 是一种基于帧的协议,其帧以 HTTP 为模型。以下列表显示了 STOMP 帧的结构:
COMMAND
header1:value1
header2:value2
Body^@
客户端可以使用 SEND
或 SUBSCRIBE
命令来发送或订阅消息,以及一个 destination
标头,用于描述消息的内容以及谁应该接收它。 这实现了一个简单的发布-订阅机制,你可以使用它来通过代理将消息发送到其他连接的客户端,或者将消息发送到服务器以请求执行某些工作。
当使用 Spring 的 STOMP 支持时,Spring WebSocket 应用程序充当客户端的 STOMP 代理。 消息被路由到 @Controller
消息处理方法或简单的内存中代理,该代理跟踪订阅并将消息广播给订阅的用户。 你也可以配置 Spring 以与专用的 STOMP 代理(例如 RabbitMQ、ActiveMQ 等)一起使用,以进行消息的实际广播。 在这种情况下,Spring 维护与代理的 TCP 连接,将消息中继到代理,并将来自代理的消息传递到连接的 WebSocket 客户端。 因此,Spring Web 应用程序可以依靠统一的基于 HTTP 的安全性、通用验证和熟悉的消息处理编程模型。
以下示例显示了客户端订阅接收股票报价,服务器可能会定期发出股票报价(例如,通过计划任务,通过 SimpMessagingTemplate
将消息发送到代理):
SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@
以下示例显示了客户端发送交易请求,服务器可以通过 @MessageMapping
方法处理该请求:
SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@
执行后,服务器可以将交易确认消息和详细信息广播到客户端。
目标地的含义在 STOMP 规范中有意保持不透明。 它可以是任何字符串,并且完全取决于 STOMP 服务器来定义其支持的目标地的语义和语法。 然而,非常常见的是,目的地是类似路径的字符串,其中 /topic/..
意味着发布-订阅(一对多),而 /queue/
意味着点对点(一对一)消息交换。
STOMP 服务器可以使用 MESSAGE
命令将消息广播到所有订阅者。 以下示例显示了服务器将股票报价发送到已订阅的客户端:
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@
服务器无法发送未经请求的消息。 来自服务器的所有消息必须是对特定客户端订阅的响应,并且服务器消息的 subscription
标头必须与客户端订阅的 id
标头匹配。
前面的概述旨在提供对 STOMP 协议的最基本理解。 我们建议完整阅读协议规范 (opens new window)。
# 优势
使用STOMP作为子协议,相较于直接使用WebSocket,Spring Framework和Spring Security可以提供更丰富的编程模型。这与HTTP相对于原始TCP的作用类似,HTTP使得Spring MVC和其他Web框架能够提供丰富的功能。以下列出了一些优势:
- 无需发明自定义的消息协议和消息格式。
- STOMP客户端可用,包括Spring Framework中的 Java客户端。
- 您可以(可选地)使用消息代理(例如RabbitMQ、ActiveMQ等)来管理订阅和广播消息。
- 应用程序逻辑可以组织在任意数量的
@Controller
实例中,并且可以根据STOMP目标头将消息路由到它们,而无需使用单个WebSocketHandler
为给定连接处理原始WebSocket消息。 - 您可以使用Spring Security来基于STOMP目标和消息类型来保护消息。
# 启用 STOMP
spring-messaging
和 spring-websocket
模块提供了基于 WebSocket 的 STOMP 支持。添加这些依赖后,你可以通过 WebSocket 暴露一个 STOMP 端点,如下例所示:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// /portfolio 是一个 HTTP URL, WebSocket (或 SockJS) 客户端需要连接到这个端点才能进行 WebSocket 握手
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 目标 header 以 /app 开头的 STOMP 消息会被路由到 @Controller 类中的 @MessageMapping 方法
config.setApplicationDestinationPrefixes("/app");
// 使用内置的消息代理来进行订阅和广播,并将目标 header 以 /topic 或 /queue 开头的消息路由到代理
config.enableSimpleBroker("/topic", "/queue");
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio" />
<websocket:simple-broker prefix="/topic, /queue"/>
</websocket:message-broker>
</beans>
注意: 对于内置的简单代理,/topic
和 /queue
前缀没有任何特殊含义。它们仅仅是一种约定,用于区分发布-订阅(即,多个订阅者)与点对点消息传递(即,一个消费者)。当你使用外部代理时,请查看代理的 STOMP 文档,以了解它支持哪种 STOMP 目标和前缀。
要从浏览器连接 STOMP,可以使用 stomp-js/stompjs (opens new window),这是目前维护最活跃的 JavaScript 库。
以下示例代码基于该库:
const stompClient = new StompJs.Client({
brokerURL: 'ws://domain.com/portfolio',
onConnect: () => {
// ...
}
});
或者,如果通过 SockJS 进行连接,可以在服务器端使用 registry.addEndpoint("/portfolio").withSockJS()
启用 SockJS Fallback,并在 JavaScript 端按照 这些说明 (opens new window) 操作。
请注意,在前面的示例中,stompClient
不需要指定 login
和 passcode
headers。即使指定了,它们也会在服务器端被忽略(或者说,被覆盖)。有关身份验证的更多信息,请参阅 连接到代理 和 身份验证。
更多示例代码请参考:
- 使用 WebSocket 构建交互式 Web 应用程序 (opens new window) - 入门指南。
- 股票投资组合 (opens new window) - 示例应用程序。
# WebSocket Transport
本节介绍如何配置底层的 WebSocket 服务器传输。
对于 Jakarta WebSocket 服务器,在配置中添加一个 ServletServerContainerFactoryBean
。例如,请参阅 WebSocket 部分下的配置服务器。
对于 Jetty WebSocket 服务器,请按如下方式自定义 JettyRequestUpgradeStrategy
:
@Configuration
@EnableWebSocketMessageBroker
public class JettyWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
JettyRequestUpgradeStrategy strategy = new JettyRequestUpgradeStrategy();
strategy.addWebSocketConfigurer(configurable -> {
configurable.setInputBufferSize(4 * 8192);
configurable.setIdleTimeout(Duration.ofSeconds(600));
});
return new DefaultHandshakeHandler(strategy);
}
}
除了 WebSocket 服务器属性外,还有 STOMP WebSocket 传输属性可以自定义,如下所示:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(4 * 8192);
registry.setTimeToFirstMessage(30000);
}
}
# 消息的流程
一旦暴露了 STOMP 端点,Spring 应用程序就成为了连接客户端的 STOMP 代理。本节描述了服务端消息的流程。
spring-messaging
模块包含对消息传递应用程序的基础支持,这些应用程序起源于 Spring Integration (opens new window),后来被提取并合并到 Spring Framework 中,以便在许多 Spring 项目 (opens new window) 和应用场景中更广泛地使用。以下列表简要描述了一些可用的消息传递抽象:
- Message (opens new window): 消息的简单表示,包括header和payload。
- MessageHandler (opens new window): 处理消息的契约。
- MessageChannel (opens new window): 用于发送消息的契约,可以实现生产者和消费者之间的松耦合。
- SubscribableChannel (opens new window): 具有
MessageHandler
订阅者的MessageChannel
。 - ExecutorSubscribableChannel (opens new window): 使用
Executor
传递消息的SubscribableChannel
。
Java 配置(即 @EnableWebSocketMessageBroker
)和 XML 命名空间配置(即 <websocket:message-broker>
)都使用上述组件来组装消息工作流。下图显示了启用简单的内置消息代理时使用的组件:
!message flow simple broker
上图显示了三个消息通道:
clientInboundChannel
: 用于传递从 WebSocket 客户端接收的消息。clientOutboundChannel
: 用于向 WebSocket 客户端发送服务器消息。brokerChannel
: 用于从服务器端应用程序代码向消息代理发送消息。
下图显示了配置外部代理(例如 RabbitMQ)来管理订阅和广播消息时使用的组件:
!message flow broker relay
上述两个图的主要区别在于使用了“代理中继”通过 TCP 将消息传递到外部 STOMP 代理,以及将消息从代理传递到订阅客户端。
当从 WebSocket 连接接收到消息时,它们被解码为 STOMP 帧,转换为 Spring Message
表示,并发送到 clientInboundChannel
以进行进一步处理。 例如,目标 header 以 /app
开头的 STOMP 消息可能会路由到带注解的控制器中的 @MessageMapping
方法,而 /topic
和 /queue
消息可能会直接路由到消息代理。
处理来自客户端的 STOMP 消息的带注解的 @Controller
可以通过 brokerChannel
向消息代理发送消息,并且代理通过 clientOutboundChannel
将消息广播到匹配的订阅者。 相同的控制器也可以响应 HTTP 请求执行相同的操作,因此客户端可以执行 HTTP POST,然后 @PostMapping
方法可以向消息代理发送消息以广播给订阅的客户端。
我们可以通过一个简单的例子来追踪流程。 考虑以下设置服务器的示例:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "" + getTimestamp() + ": " + greeting;
}
private String getTimestamp() {
return new SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date());
}
}
上述示例支持以下流程:
- 客户端连接到
http://localhost:8080/portfolio
,一旦建立 WebSocket 连接,STOMP 帧就开始在其上传输。 - 客户端发送一个 SUBSCRIBE 帧,其目标 header 为
/topic/greeting
。 接收并解码后,该消息被发送到clientInboundChannel
,然后路由到消息代理,该代理存储客户端订阅。 - 客户端向
/app/greeting
发送一个 SEND 帧。/app
前缀有助于将其路由到带注解的控制器。 在剥离/app
前缀后,目标中的剩余/greeting
部分将映射到GreetingController
中的@MessageMapping
方法。 - 从
GreetingController
返回的值将转换为一个 SpringMessage
,其 payload 基于返回值,并且默认目标 header 为/topic/greeting
(从输入目标派生,其中/app
替换为/topic
)。 生成的消息被发送到brokerChannel
并由消息代理处理。 - 消息代理找到所有匹配的订阅者,并通过
clientOutboundChannel
向每个订阅者发送一个 MESSAGE 帧,消息从那里编码为 STOMP 帧并在 WebSocket 连接上发送。
下一节提供了关于带注解的方法的更多细节,包括支持的参数和返回值类型。
# 注解式控制器
应用程序可以使用注解式的 @Controller
类来处理来自客户端的消息。这些类可以声明 @MessageMapping
、@SubscribeMapping
和 @ExceptionHandler
方法,如下所述:
@MessageMapping
@SubscribeMapping
@MessageExceptionHandler
# @MessageMapping
可以使用 @MessageMapping
注解方法,以便根据消息的目标地址来路由消息。它支持方法级别和类型级别。在类型级别,@MessageMapping
用于表达控制器中所有方法的共享映射。
默认情况下,映射值是 Ant 风格的路径模式(例如,/thing*
、/thing/**
),包括对模板变量的支持(例如,/thing/{id}
)。可以通过 @DestinationVariable
方法参数引用这些值。应用程序也可以切换到使用点分隔的目标地址约定进行映射,如[使用点作为分隔符中所述。
# 支持的方法参数
下表描述了支持的方法参数:
方法参数 | 描述 |
---|---|
Message | 用于访问完整消息。 |
MessageHeaders | 用于访问 Message 中的消息头。 |
MessageHeaderAccessor ,SimpMessageHeaderAccessor 和 StompHeaderAccessor | 用于通过类型化的访问器方法访问消息头。 |
@Payload | 用于访问消息的负载,该负载由配置的 MessageConverter 进行转换(例如,从 JSON 转换)。 |
默认情况下,如果未找到其他匹配的参数,则假定使用此注解,因此可以不添加此注解。 | |
可以使用 @jakarta.validation.Valid 或 Spring 的 @Validated 注解负载参数,以便自动验证负载参数。 | |
@Header | 用于访问特定的消息头值,并根据需要使用 org.springframework.core.convert.converter.Converter 进行类型转换。 |
@Headers | 用于访问消息中的所有消息头。此参数必须可以分配给 java.util.Map 。 |
@DestinationVariable | 用于访问从消息目标地址中提取的模板变量。如有必要,这些值将转换为声明的方法参数类型。 |
java.security.Principal | 反映 WebSocket HTTP 握手时登录的用户。 |
# 返回值
默认情况下,@MessageMapping
方法的返回值通过匹配的 MessageConverter
序列化为负载,并作为 Message
发送到 brokerChannel
,然后从该通道广播给订阅者。出站消息的目标地址与入站消息的目标地址相同,但前缀为 /topic
。
可以使用 @SendTo
和 @SendToUser
注解来自定义出站消息的目标地址。@SendTo
用于自定义目标地址或指定多个目标地址。@SendToUser
用于将出站消息仅定向到与入站消息关联的用户。请参阅用户目标地址。
可以在同一方法上同时使用 @SendTo
和 @SendToUser
,并且两者都支持在类级别使用,在这种情况下,它们充当该类中方法的默认值。但是,请记住,任何方法级别的 @SendTo
或 @SendToUser
注解都会覆盖类级别的任何此类注解。
消息可以异步处理,并且 @MessageMapping
方法可以返回 ListenableFuture
、CompletableFuture
或 CompletionStage
。
请注意,@SendTo
和 @SendToUser
仅仅是为了方便使用 SimpMessagingTemplate
发送消息。如有必要,对于更高级的场景,@MessageMapping
方法可以退回到直接使用 SimpMessagingTemplate
。可以代替返回值或除了返回值之外使用此方法。请参阅发送消息。
# @SubscribeMapping
@SubscribeMapping
类似于 @MessageMapping
,但将映射范围缩小到仅订阅消息。它支持与 @MessageMapping
相同的方法参数。但是对于返回值,默认情况下,消息直接发送到客户端(通过 clientOutboundChannel
,以响应订阅),而不是发送到消息代理(通过 brokerChannel
,作为对匹配订阅的广播)。添加 @SendTo
或 @SendToUser
会覆盖此行为,并将消息发送到消息代理。
这在什么情况下有用?假设消息代理映射到 /topic
和 /queue
,而应用程序控制器映射到 /app
。在此设置中,消息代理存储所有针对 /topic
和 /queue
的订阅,这些订阅旨在进行重复广播,并且不需要应用程序参与。客户端还可以订阅某个 /app
目标地址,控制器可以返回值以响应该订阅,而无需消息代理存储或再次使用该订阅(实际上是一次性请求-响应交换)。这种情况的一个用例是在启动时使用初始数据填充 UI。
这在什么情况下没有用?除非您希望消息代理和控制器独立处理消息(包括订阅),否则请勿尝试将消息代理和控制器映射到相同目标地址前缀。入站消息是并行处理的。无法保证消息代理或控制器是否先处理给定的消息。如果目标是在订阅存储并准备好进行广播时收到通知,则如果服务器支持,客户端应请求回执(简单消息代理不支持)。例如,使用 Java STOMP 客户端,您可以执行以下操作来添加回执:
@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// 在初始化期间..
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// 订阅的时候..
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
// 订阅准备就绪...
});
服务器端的选项是在 brokerChannel
上注册 ExecutorChannelInterceptor
并实现 afterMessageHandled
方法,该方法在处理完消息(包括订阅)后调用。
# @MessageExceptionHandler
应用程序可以使用 @MessageExceptionHandler
方法来处理来自 @MessageMapping
方法的异常。您可以在注解本身中声明异常,或者通过方法参数声明异常(如果您想访问异常实例)。以下示例通过方法参数声明异常:
@Controller
public class MyController {
// ...
@MessageExceptionHandler
public ApplicationError handleException(MyException exception) {
// ...
return appError;
}
}
@MessageExceptionHandler
方法支持灵活的方法签名,并支持与 @MessageMapping
方法相同的方法参数类型和返回值。
通常,@MessageExceptionHandler
方法适用于声明它们的 @Controller
类(或类层次结构)中。如果您希望此类方法更全局地应用(跨控制器),则可以在标记为 @ControllerAdvice
的类中声明它们。这与 Spring MVC 中提供的类似支持相当。
# 发送消息
如果你想从应用程序的任何部分向连接的客户端发送消息该怎么办?任何应用程序组件都可以向 brokerChannel
发送消息。最简单的方法是注入一个 SimpMessagingTemplate
并使用它来发送消息。通常,你会按类型注入它,如下例所示:
@Controller
public class GreetingController {
private SimpMessagingTemplate template;
@Autowired
public GreetingController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path="/greetings", method=POST)
public void greet(String greeting) {
String text = "[" + getTimestamp() + "]:" + greeting;
this.template.convertAndSend("/topic/greetings", text);
}
}
但是,如果存在相同类型的另一个 bean,你也可以按其名称(brokerMessagingTemplate
)限定它。
# Simple Broker
内置的简单消息代理处理来自客户端的订阅请求,将它们存储在内存中,并将消息广播到具有匹配目标地址的已连接客户端。该代理支持类似路径的目标地址,包括对 Ant 风格的目标地址模式的订阅。
注意:应用程序也可以使用点分隔(而不是斜杠分隔)的目标地址。请参阅 Dots as Separators。
如果配置了任务调度器,简单代理支持 STOMP 心跳 (opens new window)。要配置调度器,您可以声明自己的 TaskScheduler
Bean,并通过 MessageBrokerRegistry
设置它。或者,您可以使用内置 WebSocket 配置中自动声明的那个,但是,您需要使用 @Lazy
来避免内置 WebSocket 配置和您的 WebSocketMessageBrokerConfigurer
之间的循环依赖。例如:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
private TaskScheduler messageBrokerTaskScheduler;
@Autowired
public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(new long[] {10000, 20000}) // 设置心跳值,发送间隔10秒,接收间隔20秒
.setTaskScheduler(this.messageBrokerTaskScheduler); // 设置任务调度器
// ...
}
}
# 外部Broker
简单的broker非常适合入门,但仅支持STOMP命令的一个子集(它不支持acks、receipts以及其他一些特性),依赖于简单的消息发送循环,并且不适合集群。作为一种替代方案,您可以升级您的应用程序以使用功能完备的消息broker。
请参考您选择的消息broker的STOMP文档(例如 RabbitMQ (opens new window)、ActiveMQ (opens new window) 等),安装broker,并在启用STOMP支持的情况下运行它。然后,您可以在Spring配置中启用STOMP broker relay(而不是简单的broker)。
以下示例配置启用了功能完备的broker:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
}
上述配置中的STOMP broker relay是一个Spring MessageHandler
,它通过将消息转发到外部消息broker来处理消息。为此,它会建立到broker的TCP连接,将所有消息转发给它,然后通过WebSocket会话将从broker收到的所有消息转发给客户端。本质上,它充当一个“relay”,在两个方向上转发消息。
注意:
添加
io.projectreactor.netty:reactor-netty
和io.netty:netty-all
依赖到你的项目里,以支持TCP连接管理。
此外,应用程序组件(例如HTTP请求处理方法、业务服务等)也可以向broker relay发送消息,如 发送消息 中所述,以便向订阅的WebSocket客户端广播消息。
实际上,broker relay实现了健壮且可扩展的消息广播。
# 连接到 Broker
STOMP Broker Relay 维护一个到 Broker 的“系统” TCP 连接。此连接仅用于来自服务器端应用程序的消息,不用于接收消息。您可以为此连接配置 STOMP 凭据(即 STOMP 帧的 login
和 passcode
头信息)。这在 XML 命名空间和 Java 配置中都以 systemLogin
和 systemPasscode
属性的形式公开,默认值为 guest
和 guest
。
STOMP Broker Relay 还为每个连接的 WebSocket 客户端创建一个单独的 TCP 连接。您可以配置用于代表客户端创建的所有 TCP 连接的 STOMP 凭据。这在 XML 命名空间和 Java 配置中都以 clientLogin
和 clientPasscode
属性的形式公开,默认值为 guest
和 guest
。
注意: STOMP Broker Relay 始终在代表客户端转发到 Broker 的每个 CONNECT
帧上设置 login
和 passcode
头信息。因此,WebSocket 客户端无需设置这些头信息,它们会被忽略。正如 身份验证 章节所解释的,WebSocket 客户端应该依赖 HTTP 身份验证来保护 WebSocket 端点并建立客户端身份。
STOMP Broker Relay 还会通过“系统” TCP 连接向消息 Broker 发送和接收心跳。您可以配置发送和接收心跳的间隔(默认情况下均为 10 秒)。如果与 Broker 的连接丢失,Broker Relay 将每 5 秒尝试重新连接,直到成功为止。
任何 Spring Bean 都可以实现 ApplicationListener<BrokerAvailabilityEvent>
接口,以便在与 Broker 的“系统”连接丢失和重新建立时接收通知。例如,一个广播股票报价的股票报价服务可以在没有活动的“系统”连接时停止尝试发送消息。
默认情况下,STOMP Broker Relay 始终连接到同一主机和端口,并在连接丢失时根据需要重新连接。如果您希望提供多个地址,可以在每次尝试连接时配置一个地址提供器,而不是固定的主机和端口。以下示例展示了如何做到这一点:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.remoteAddress(() -> new InetSocketAddress(0)),
new StompReactorNettyCodec());
}
}
您还可以使用 virtualHost
属性配置 STOMP Broker Relay。此属性的值设置为每个 CONNECT
帧的 host
头信息,并且可能很有用(例如,在云环境中,建立 TCP 连接的实际主机与提供基于云的 STOMP 服务的主机不同)。
# 使用点作为分隔符
当消息被路由到 @MessageMapping
方法时,它们使用 AntPathMatcher
进行匹配。默认情况下,模式应使用斜杠(/
)作为分隔符。这在 Web 应用程序中是一种很好的约定,类似于 HTTP URL。但是,如果您更习惯于消息传递约定,则可以切换为使用点(.
)作为分隔符。
以下示例展示了如何实现:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
<websocket:stomp-endpoint path="/stomp"/>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
<bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
<constructor-arg index="0" value="."/>
</bean>
</beans>
之后,控制器可以在 @MessageMapping
方法中使用点(.
)作为分隔符,如下例所示:
@Controller
@MessageMapping("red")
public class RedController {
@MessageMapping("blue.{green}")
public void handleGreen(@DestinationVariable String green) {
// ...
}
}
客户端现在可以向 /app/red.blue.green123
发送消息。
在前面的示例中,我们没有更改“broker relay”上的前缀,因为这些前缀完全取决于外部消息代理。 请参阅您使用的代理的 STOMP 文档页面,以了解它支持的目标 header 的约定。
另一方面,“simple broker”确实依赖于配置的 PathMatcher
,因此,如果您切换分隔符,则该更改也适用于代理以及代理将消息中的目标与订阅中的模式进行匹配的方式。
# Authentication
每个基于 WebSocket 的 STOMP 消息会话都以 HTTP 请求开始。这可能是一个升级到 WebSocket 的请求(即 WebSocket 握手),或者,在 SockJS Fallback 的情况下,是一系列 SockJS HTTP 传输请求。
许多 Web 应用程序已经具备身份验证和授权机制,以保护 HTTP 请求。通常,用户通过 Spring Security 使用某种机制(如登录页面、HTTP 基本身份验证或其他方式)进行身份验证。已验证用户的安全上下文保存在 HTTP 会话中,并与同一基于 Cookie 的会话中的后续请求相关联。
因此,对于 WebSocket 握手或 SockJS HTTP 传输请求,通常已经存在一个可以通过 HttpServletRequest#getUserPrincipal()
访问的已验证用户。Spring 会自动将该用户与为其创建的 WebSocket 或 SockJS 会话相关联,并随后通过 user header 与通过该会话传输的所有 STOMP 消息相关联。
简而言之,一个典型的 Web 应用程序无需在其已有的安全措施之外做任何其他事情。用户在 HTTP 请求级别通过安全上下文进行身份验证,该安全上下文通过基于 Cookie 的 HTTP 会话维护(然后与为该用户创建的 WebSocket 或 SockJS 会话相关联),并导致 user header 被标记在流经应用程序的每个 Message
上。
STOMP 协议确实在 CONNECT
帧上具有 login
和 passcode
header。这些最初是为基于 TCP 的 STOMP 设计的,并且也是必需的。但是,对于基于 WebSocket 的 STOMP,默认情况下,Spring 会忽略 STOMP 协议级别的身份验证 header,并假定用户已经在 HTTP 传输层进行了身份验证。期望是 WebSocket 或 SockJS 会话包含已验证的用户。
# Token 认证
Spring Security OAuth (opens new window) 提供了对基于 Token 的安全性的支持,包括 JSON Web Token (JWT)。你可以将其用作 Web 应用中的身份验证机制,包括通过 WebSocket 的 STOMP 交互,如前一节所述(即,通过基于 Cookie 的会话来维护身份)。
然而,基于 Cookie 的会话并不总是最佳选择(例如,在不维护服务器端会话的应用程序中,或者在移动应用程序中,通常使用 Header 进行身份验证)。
WebSocket 协议,RFC 6455 (opens new window) "没有规定服务器可以在 WebSocket 握手期间对客户端进行身份验证的任何特定方式"。然而,在实践中,浏览器客户端只能使用标准身份验证 Header(即,基本的 HTTP 身份验证)或 Cookie,而不能(例如)提供自定义 Header。同样,SockJS JavaScript 客户端也没有提供通过 SockJS 传输请求发送 HTTP Header 的方法。请参阅 sockjs-client issue 196 (opens new window)。虽然它允许发送可用于发送 Token 的查询参数,但这也有其自身的缺点(例如,Token 可能会在服务器日志中与 URL 一起被无意中记录下来)。
注意:
上面的限制适用于基于浏览器的客户端,不适用于基于 Spring Java 的 STOMP 客户端,后者支持通过 WebSocket 和 SockJS 请求发送 Header。
因此,希望避免使用 Cookie 的应用程序可能没有在 HTTP 协议级别进行身份验证的任何好的替代方案。它们可能更喜欢在 STOMP 消息协议级别使用 Header 进行身份验证,而不是使用 Cookie。这样做需要两个简单的步骤:
- 使用 STOMP 客户端在连接时传递身份验证 Header。
- 使用
ChannelInterceptor
处理身份验证 Header。
下面的例子使用服务器端配置来注册一个自定义的身份验证拦截器。请注意,拦截器只需要验证身份并在 CONNECT Message
上设置 user Header。Spring 会记录并保存已验证的用户,并将其与同一会话上的后续 STOMP 消息关联。下面的例子显示了如何注册一个自定义的身份验证拦截器:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// Access authentication header(s) and invoke accessor.setUser(user)
// 访问身份验证 Header 并调用 accessor.setUser(user)
}
return message;
}
});
}
}
另外,请注意,当你使用 Spring Security 的消息授权时,目前你需要确保身份验证 ChannelInterceptor
配置的顺序在 Spring Security 之前。最好的方法是在 WebSocketMessageBrokerConfigurer
的自身实现中声明自定义拦截器,并用 @Order(Ordered.HIGHEST_PRECEDENCE + 99)
标记它。
# Authorization
Spring Security 提供了 WebSocket 子协议授权 (opens new window),它使用 ChannelInterceptor
来基于消息中的用户头部信息对消息进行授权。此外,Spring Session 提供了 WebSocket 集成 (opens new window),以确保用户的 HTTP 会话在 WebSocket 会话仍然处于活动状态时不会过期。
# 用户目标(User Destinations)
应用程序可以发送针对特定用户的消息,Spring的STOMP支持识别以此为目的,并使用 user/
为前缀的目标地址。例如,客户端可以订阅 user/queue/position-updates
目标地址。UserDestinationMessageHandler
会处理此目标地址,并将其转换为用户会话唯一的地址(例如 queue/position-updates-user123
)。这提供了订阅通用命名目标的便利性,同时确保与其他订阅相同目标的用户没有冲突,以便每个用户都可以收到唯一的股票头寸更新。
提示: 使用用户目标时,配置broker和application destination prefixes 非常重要,如 Enable STOMP 所示,否则 broker 会处理本应仅由
UserDestinationMessageHandler
处理的、带有 "/user" 前缀的消息。
在发送端,消息可以发送到类似 /user/{username}/queue/position-updates
的目标地址,然后由 UserDestinationMessageHandler
将其转换为一个或多个目标地址,每个目标地址对应于与用户关联的会话。这让应用程序中的任何组件都可以发送针对特定用户的消息,而无需知道除了用户名和通用目标地址之外的任何信息。这也可以通过注解和消息传递模板来实现。
消息处理方法可以通过 @SendToUser
注解(也支持在类级别上使用,以共享一个公共目标地址)向与正在处理的消息关联的用户发送消息,如下例所示:
@Controller
public class PortfolioController {
@MessageMapping("/trade")
@SendToUser("/queue/position-updates")
public TradeResult executeTrade(Trade trade, Principal principal) {
// ...
return tradeResult;
}
}
如果用户有多个会话,默认情况下,所有订阅了给定目标地址的会话都将成为目标。但是,有时可能需要仅以发送正在处理的消息的会话为目标。可以通过将 broadcast
属性设置为 false
来实现这一点,如下例所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handleAction() throws Exception{
// 在这里抛出 MyBusinessException 异常
}
@MessageExceptionHandler
@SendToUser(destinations="/queue/errors", broadcast=false)
public ApplicationError handleException(MyBusinessException exception) {
// ...
return appError;
}
}
注意: 虽然用户目标通常意味着已验证的用户,但这不是严格要求的。未与已验证用户关联的 WebSocket 会话可以订阅用户目标。在这种情况下,
@SendToUser
注解的行为与broadcast=false
完全相同(也就是说,仅以发送正在处理的消息的会话为目标)。
可以通过例如注入由 Java 配置或 XML 命名空间创建的 SimpMessagingTemplate
(如果需要使用 @Qualifier
进行限定,则 bean 名称为 brokerMessagingTemplate
)从任何应用程序组件向用户目标发送消息。以下示例展示了如何做到这一点:
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
}
注意: 当您使用用户目标与外部消息代理时,应该检查 broker 文档,了解如何管理非活动队列,以便在用户会话结束时删除所有唯一的用户队列。例如,当您使用诸如
/exchange/amq.direct/position-updates
之类的目标地址时,RabbitMQ 会创建自动删除队列。因此,在这种情况下,客户端可以订阅/user/exchange/amq.direct/position-updates
。类似地,ActiveMQ 具有用于清除非活动目标地址的配置选项。
在多应用服务器场景中,用户目标可能因为用户连接到不同的服务器而无法解析。在这种情况下,您可以配置目标地址来广播未解析的消息,以便其他服务器有机会尝试。这可以通过 Java 配置中的 MessageBrokerRegistry
的 userDestinationBroadcast
属性和 XML 中的 message-broker
元素的 user-destination-broadcast
属性来完成。
# 消息顺序
从broker发出的消息会被发布到clientOutboundChannel
,然后从这里写入到WebSocket会话中。由于该通道由ThreadPoolExecutor
支持,消息在不同的线程中处理,因此客户端收到的消息顺序可能与发布的顺序不完全一致。
要启用有序发布,请按如下所示设置setPreservePublishOrder
标志:
@Configuration
@EnableWebSocketMessageBroker
public class PublishOrderWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker preserve-publish-order="true">
<!-- ... -->
</websocket:message-broker>
</beans>
设置此标志后,同一客户端会话中的消息将一次一个地发布到clientOutboundChannel
,从而保证发布顺序。请注意,这会产生少量的性能开销,因此仅在需要时才应启用它。
同样,来自客户端的消息也會發送到clientInboundChannel
,然后根据其目标前缀进行处理。 由于该通道由ThreadPoolExecutor
支持,因此消息在不同的线程中处理,并且处理的结果顺序可能与接收到的顺序不完全匹配。
要启用有序接收,请按如下所示设置setPreserveReceiveOrder
标志:
@Configuration
@EnableWebSocketMessageBroker
public class ReceiveOrderWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.setPreserveReceiveOrder(true);
}
}
# Events
一些 ApplicationContext
事件会被发布,可以通过实现 Spring 的 ApplicationListener
接口来接收:
BrokerAvailabilityEvent
: 指示 Broker 何时可用或不可用。虽然“简单” Broker 在启动时立即可用,并在应用程序运行时保持可用,但 STOMP “broker relay” 可能会失去与功能齐全的 Broker 的连接(例如,如果 Broker 重新启动)。Broker Relay 具有重新连接逻辑,并在 Broker 恢复时重新建立与 Broker 的“系统”连接。因此,每当状态从已连接变为已断开或反之时,都会发布此事件。使用SimpMessagingTemplate
的组件应订阅此事件,并避免在 Broker 不可用时发送消息。在任何情况下,他们都应该准备好在发送消息时处理MessageDeliveryException
。SessionConnectEvent
: 当收到新的 STOMP CONNECT 时发布,以指示新的客户端会话的开始。该事件包含表示连接的消息,包括会话 ID、用户信息(如果有)以及客户端发送的任何自定义标头。这对于跟踪客户端会话非常有用。订阅此事件的组件可以使用SimpMessageHeaderAccessor
或StompMessageHeaderAccessor
包装包含的消息。SessionConnectedEvent
: 在SessionConnectEvent
之后不久发布,当 Broker 发送 STOMP CONNECTED 帧以响应 CONNECT 时发布。此时,可以认为 STOMP 会话已完全建立。SessionSubscribeEvent
: 当收到新的 STOMP SUBSCRIBE 时发布。SessionUnsubscribeEvent
: 当收到新的 STOMP UNSUBSCRIBE 时发布。SessionDisconnectEvent
: 当 STOMP 会话结束时发布。DISCONNECT 可能是从客户端发送的,也可能是在 WebSocket 会话关闭时自动生成的。在某些情况下,每个会话可能会多次发布此事件。组件对于多个断开连接事件应该是幂等的。
Note: 当您使用功能齐全的 Broker 时,如果 Broker 暂时不可用,STOMP “broker relay” 会自动重新连接“系统”连接。但是,客户端连接不会自动重新连接。假设启用了心跳,客户端通常会在 10 秒内注意到 Broker 没有响应。客户端需要实现自己的重新连接逻辑。
# Interception
事件(Events)为 STOMP 连接的生命周期提供通知,但不是为每个客户端消息都提供。应用程序还可以注册一个 ChannelInterceptor
来拦截任何消息,并且可以在处理链的任何部分进行拦截。以下示例显示如何拦截来自客户端的入站消息:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new MyChannelInterceptor());
}
}
自定义 ChannelInterceptor
可以使用 StompHeaderAccessor
或 SimpMessageHeaderAccessor
来访问有关消息的信息,如下例所示:
public class MyChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
// ...
return message;
}
}
应用程序还可以实现 ExecutorChannelInterceptor
,它是 ChannelInterceptor
的一个子接口,在处理消息的线程中具有回调。虽然 ChannelInterceptor
对于发送到通道的每个消息都调用一次,但 ExecutorChannelInterceptor
在订阅来自通道的消息的每个 MessageHandler
的线程中提供钩子。
请注意,与前面描述的 SessionDisconnectEvent
一样,DISCONNECT 消息可能来自客户端,也可能在 WebSocket 会话关闭时自动生成。在某些情况下,拦截器可能会为每个会话多次拦截此消息。组件对于多个断开连接事件应该是幂等的。
# STOMP 客户端
Spring 提供了基于 WebSocket 的 STOMP 客户端和基于 TCP 的 STOMP 客户端。
首先,你可以创建并配置 WebSocketStompClient
,如下例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // 用于心跳
在前面的例子中,你可以将 StandardWebSocketClient
替换为 SockJsClient
,因为它也是 WebSocketClient
的一个实现。SockJsClient
可以使用 WebSocket 或基于 HTTP 的传输作为备选方案。更多详细信息,请参见 SockJsClient。
接下来,你可以建立连接,并为 STOMP 会话提供一个处理器,如下例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话准备好使用时,会通知处理器,如下例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦会话建立,任何有效载荷都可以被发送,并使用配置的 MessageConverter
进行序列化,如下例所示:
session.send("/topic/something", "payload");
你也可以订阅目的地。subscribe
方法需要一个用于订阅消息的处理器,并返回一个你可以用来取消订阅的 Subscription
句柄。对于每个接收到的消息,处理器可以指定有效载荷应该被反序列化的目标 Object
类型,如下例所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要启用 STOMP 心跳,你可以使用 TaskScheduler
配置 WebSocketStompClient
,并可选择自定义心跳间隔(10 秒用于写入不活动,这将导致发送心跳;10 秒用于读取不活动,这将关闭连接)。
WebSocketStompClient
仅在不活动时(即没有其他消息发送时)才发送心跳。当使用外部代理时,这可能会带来挑战,因为目标不是代理的消息表示活动,但实际上并没有转发到代理。在这种情况下,你可以在初始化 外部代理 时配置一个 TaskScheduler
,以确保即使仅发送目标不是代理的消息时,心跳也会转发到代理。
注意: 当你使用
WebSocketStompClient
进行性能测试以从同一台机器模拟数千个客户端时,请考虑关闭心跳,因为每个连接都会调度自己的心跳任务,并且这对于在同一台机器上运行的大量客户端来说没有优化。
STOMP 协议还支持回执,客户端必须添加一个 receipt
标头,服务器在处理完发送或订阅后会使用 RECEIPT 帧进行响应。为了支持这一点,StompSession
提供了 setAutoReceipt(boolean)
,这会导致在每个后续的发送或订阅事件上添加一个 receipt
标头。或者,你也可以手动将回执标头添加到 StompHeaders
。发送和订阅都会返回一个 Receiptable
的实例,你可以使用它来注册回执成功和失败的回调。对于此功能,你必须使用 TaskScheduler
和回执过期前的时间量(默认为 15 秒)配置客户端。
请注意,StompSessionHandler
本身是一个 StompFrameHandler
,除了处理消息的异常 handleException
回调和传输级别的错误(包括 ConnectionLostException
)之外,它还可以处理 ERROR 帧。
你可以使用 WebSocketStompClient
的 inboundMessageSizeLimit
和 outboundMessageSizeLimit
属性来限制入站和出站 WebSocket 消息的最大大小。当出站 STOMP 消息超过限制时,它会被拆分为部分帧,接收者必须重新组装这些帧。默认情况下,出站消息没有大小限制。当入站 STOMP 消息大小超过配置的限制时,会抛出 StompConversionException
。入站消息的默认大小限制为 64KB
。
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB
# WebSocket Scope
每个 WebSocket 会话都有一组属性。该 Map 作为 header 附加到 inbound 客户端消息,并且可以从控制器方法中访问,如下例所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handle(SimpMessageHeaderAccessor headerAccessor) {
Map<String, Object> attrs = headerAccessor.getSessionAttributes();
// ...
}
}
你可以在 websocket
作用域中声明一个由 Spring 管理的 Bean。你可以将 WebSocket 作用域的 Bean 注入到控制器和注册在 clientInboundChannel
上的任何通道拦截器中。这些通常是单例,并且比任何单个 WebSocket 会话的生命周期都长。因此,你需要为 WebSocket 作用域的 Bean 使用作用域代理模式,如下例所示:
@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
@PostConstruct
public void init() {
// 依赖注入完成后调用
}
// ...
@PreDestroy
public void destroy() {
// WebSocket 会话结束时调用
}
}
@Controller
public class MyController {
private final MyBean myBean;
@Autowired
public MyController(MyBean myBean) {
this.myBean = myBean;
}
@MessageMapping("/action")
public void handle() {
// this.myBean 来自当前 WebSocket 会话
}
}
与任何自定义作用域一样,Spring 在首次从控制器访问 MyBean
实例时,会初始化一个新的 MyBean
实例,并将该实例存储在 WebSocket 会话属性中。随后会返回同一个实例,直到会话结束。WebSocket 作用域的 Bean 会调用所有 Spring 生命周期方法,如前面的示例所示。
# 性能
在性能方面并没有银弹。许多因素都会影响性能,包括消息的大小和数量,应用程序方法是否执行需要阻塞的工作,以及外部因素(例如网络速度和其他问题)。本节的目标是概述可用的配置选项,并提供一些关于如何考虑扩展的想法。
在一个消息传递应用程序中,消息通过通道进行异步执行,这些通道由线程池支持。配置这样的应用程序需要对通道和消息流有很好的了解。因此,建议阅读 消息流。
最明显的起点是配置支持 clientInboundChannel
和 clientOutboundChannel
的线程池。默认情况下,两者都配置为可用处理器数量的两倍。
如果带注解的方法中处理消息主要是 CPU 密集型操作,那么 clientInboundChannel
的线程数应该保持接近处理器数量。如果他们所做的工作更多是 IO 密集型,并且需要阻塞或等待数据库或其他外部系统,那么线程池大小可能需要增加。
注意:
ThreadPoolExecutor
具有三个重要属性:核心线程池大小、最大线程池大小以及用于存储没有可用线程的任务的队列的容量。一个常见的混淆点是,配置核心池大小(例如,10)和最大池大小(例如,20)会导致一个具有 10 到 20 个线程的线程池。实际上,如果容量保持其默认值
Integer.MAX_VALUE
,则线程池永远不会超出核心池大小,因为所有附加任务都会排队。请参阅
ThreadPoolExecutor
的 javadoc 以了解这些属性如何工作以及了解各种排队策略。
在 clientOutboundChannel
端,所有操作都是关于将消息发送到 WebSocket 客户端。如果客户端位于快速网络上,则线程数应保持接近可用处理器数量。如果它们速度较慢或带宽较低,则它们需要更长的时间来消耗消息并给线程池带来负担。因此,增加线程池大小变得必要。
虽然 clientInboundChannel
的工作负载是可以预测的(毕竟,它是基于应用程序所做的事情),但如何配置 "clientOutboundChannel" 更加困难,因为它基于超出应用程序控制范围的因素。因此,有两个附加属性与消息的发送有关:sendTimeLimit
和 sendBufferSizeLimit
。您可以使用这些方法来配置允许发送花费的时间以及发送消息到客户端时可以缓冲多少数据。有关重要的其他详细信息,请参阅 javadoc 和 XML 模式的文档。
总的思路是,在任何给定时间,只能使用单个线程来向客户端发送消息。与此同时,所有其他消息都会被缓冲,您可以使用这些属性来决定允许发送消息花费的时间以及在此期间可以缓冲多少数据。
以下示例显示了一种可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
}
// ...
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport send-timeout="15000" send-buffer-size="524288" />
<!-- ... -->
</websocket:message-broker>
</beans>
您还可以使用前面显示的 WebSocket 传输配置来配置传入 STOMP 消息的最大允许大小。从理论上讲,WebSocket 消息的大小几乎没有限制。但在实践中,WebSocket 服务器会施加限制(例如,Tomcat 上为 8K,Jetty 上为 64K)。因此,诸如 stomp-js/stompjs
(opens new window) 之类的 STOMP 客户端和其他客户端会在 16K 边界处拆分较大的 STOMP 消息,并将它们作为多个 WebSocket 消息发送,这要求服务器缓冲和重新组装。
Spring 的 STOMP-over-WebSocket 支持可以做到这一点,因此应用程序可以配置 STOMP 消息的最大大小,而无需考虑 WebSocket 服务器特定的消息大小。请记住,如有必要,WebSocket 消息大小会自动调整,以确保它们至少可以承载 16K WebSocket 消息。
以下示例显示了一种可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class MessageSizeLimitWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(128 * 1024);
}
// ...
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport message-size="131072" />
<!-- ... -->
</websocket:message-broker>
</beans>
关于扩展的一个重要点是使用多个应用程序实例。目前,您无法使用简单的 Broker 做到这一点。但是,当您使用功能齐全的 Broker(例如 RabbitMQ)时,每个应用程序实例都会连接到 Broker,并且从一个应用程序实例广播的消息可以通过 Broker 广播到通过任何其他应用程序实例连接的 WebSocket 客户端。
# Monitoring
当使用@EnableWebSocketMessageBroker
或<websocket:message-broker>
时,关键的基础设施组件会自动收集统计数据和计数器,这些数据可以让你深入了解应用程序的内部状态。该配置还会声明一个类型为WebSocketMessageBrokerStats
的 bean,它在一个地方收集所有可用的信息,并默认每 30 分钟在 INFO
级别记录一次。这个 bean 可以通过 Spring 的 MBeanExporter
导出到 JMX,以便在运行时查看(例如,通过 JDK 的 jconsole
)。以下列表总结了可用的信息:
客户端 WebSocket 会话
当前
指示当前有多少客户端会话,该计数进一步细分为 WebSocket 会话与基于 HTTP 流和轮询的 SockJS 会话。
总计
指示已建立的会话总数。
异常关闭
连接失败
已建立但未在 60 秒内收到任何消息后关闭的会话。这通常是代理或网络问题的指示。
超出发送限制
超过配置的发送超时或发送缓冲区限制后关闭的会话,这可能发生在客户端速度较慢的情况下(请参阅上一节)。
传输错误
在发生传输错误后关闭的会话,例如无法读取或写入 WebSocket 连接或 HTTP 请求或响应。
STOMP 帧
已处理的 CONNECT、CONNECTED 和 DISCONNECT 帧的总数,指示在 STOMP 级别上有多少客户端已连接。请注意,当会话异常关闭或客户端在未发送 DISCONNECT 帧的情况下关闭时,DISCONNECT 计数可能会较低。
STOMP 代理 Relay
TCP 连接
指示代表客户端 WebSocket 会话建立到代理的 TCP 连接数。这应该等于客户端 WebSocket 会话的数量 + 1 个额外的共享“系统”连接,用于从应用程序内部发送消息。
STOMP 帧
代表客户端转发到代理或从代理接收的 CONNECT、CONNECTED 和 DISCONNECT 帧的总数。请注意,无论客户端 WebSocket 会话如何关闭,都会将 DISCONNECT 帧发送到代理。因此,较低的 DISCONNECT 帧计数表明代理正在主动关闭连接(可能是因为心跳未及时到达、无效的输入帧或其他问题)。
客户端入站通道
来自支持
clientInboundChannel
的线程池的统计信息,可让你深入了解传入消息处理的健康状况。在此处排队等待的任务表明应用程序可能处理消息的速度太慢。如果存在 I/O 绑定任务(例如,缓慢的数据库查询、对第三方 REST API 的 HTTP 请求等),请考虑增加线程池大小。客户端出站通道
来自支持
clientOutboundChannel
的线程池的统计信息,可让你深入了解向客户端广播消息的健康状况。在此处排队等待的任务表明客户端消耗消息的速度太慢。解决此问题的一种方法是增加线程池大小,以适应预期的并发慢速客户端数量。另一种选择是减少发送超时和发送缓冲区大小限制(请参阅上一节)。SockJS 任务调度器
来自 SockJS 任务调度器的线程池的统计信息,该调度器用于发送心跳。请注意,当在 STOMP 级别协商心跳时,SockJS 心跳将被禁用。
# Testing
本文主要介绍在使用Spring的STOMP-over-WebSocket支持时,测试应用程序的两种主要方法:编写服务器端测试以验证控制器及其注解消息处理方法的的功能,以及编写包含运行客户端和服务器的完整端到端测试。
这两种方法并非互斥,相反,每种方法在整体测试策略中都有其用武之地。服务器端测试更集中,编写和维护更容易。另一方面,端到端集成测试更完整,测试的内容更多,但编写和维护也更复杂。
服务器端测试最简单的形式是编写控制器单元测试。但是,这还不够有用,因为控制器的许多工作都依赖于其注解。纯粹的单元测试无法测试这一点。
理想情况下,应该像运行时那样调用被测控制器,就像使用Spring MVC Test框架测试处理HTTP请求的控制器的方法一样——即,不运行Servlet容器,而是依靠Spring Framework来调用带注解的控制器。与Spring MVC Test一样,这里有两种可能的选择,可以使用“基于上下文”或“独立”设置:
- 借助Spring TestContext框架加载实际的Spring配置,将
clientInboundChannel
作为测试字段注入,并使用它来发送消息,以便由控制器方法处理。 - 手动设置调用控制器所需的最少Spring框架基础设施(即
SimpAnnotationMethodMessageHandler
),并将消息直接传递给控制器。
这两种设置场景都在股票投资组合 (opens new window)示例应用程序的测试中进行了演示。
第二种方法是创建端到端集成测试。为此,您需要在嵌入模式下运行WebSocket服务器,并作为WebSocket客户端连接到该服务器,该客户端发送包含STOMP帧的WebSocket消息。 股票投资组合示例应用 (opens new window)程序的测试也通过使用Tomcat作为嵌入式WebSocket服务器和用于测试目的的简单STOMP客户端来演示了这种方法。
祝你变得更强!
- 01
- Spring中的Web访问:响应式栈 WebFlux06-28
- 02
- Spring中的Web访问:Servlet API支持06-02