SpringBoot之WebSocket消息推送

WebSocket协议
WebSocket是一种在单个TCP连接上进行全双工通讯的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输

STOMP协议
STOMP是面向文本的消息传送协议。STOMP客户端与支持STOMP协议的消息代理进行通信。STOMP使用不同的命令,如连接,发送,订阅,断开等进行通信。

SockJS
SockJS是一个JavaScript库,提供跨浏览器JavaScript的API,创建了一个低延迟、全双工的浏览器和web服务器之间通信通道

使用websocket有两种方式:1是使用sockjs,2是使用h5的标准。使用Html5标准自然更方便简单,所以记录的是配合h5的使用方法。

1、pom引入

 核心是@ServerEndpoint这个注解。这个注解是Javaee标准里的注解,tomcat7以上已经对其进行了实现,如果是用传统方法使用tomcat发布项目,只要在pom文件中引入javaee标准即可使用。

  1. javax
  2. javaee-api
  3. 7.0
  4. provided


但使用springboot的内置tomcat时,就不需要引入javaee-api了,spring-boot已经包含了。使用springboot的websocket功能首先引入springboot组件。

  1. org.springframework.boot
  2. spring-boot-starter-websocket

2、使用@ServerEndpoint创立websocket endpoint

  首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。

@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3、websocket的具体实现方法:

  1. import org.springframework.stereotype.Component;
  2. import javax.websocket.*;
  3. import javax.websocket.server.ServerEndpoint;
  4. import java.io.IOException;
  5. import java.util.concurrent.CopyOnWriteArraySet;
  6. @ServerEndpoint(value = "/websocket")
  7. @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理
  8. public class WebSocket {
  9. //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  10. private static int onlineCount = 0;
  11. //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  12. private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
  13. //与某个客户端的连接会话,需要通过它来给客户端发送数据
  14. private Session session;
  15. /**
  16. * 连接建立成功调用的方法
  17. */
  18. @OnOpen
  19. public void onOpen(Session session) {
  20. this.session = session;
  21. webSocketSet.add(this); //加入set中
  22. addOnlineCount(); //在线数加1
  23. System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
  24. try {
  25. sendMessage("Hello world");
  26. } catch (IOException e) {
  27. System.out.println("IO异常");
  28. }
  29. }
  30. /**
  31. * 连接关闭调用的方法
  32. */
  33. @OnClose
  34. public void onClose() {
  35. webSocketSet.remove(this); //从set中删除
  36. subOnlineCount(); //在线数减1
  37. System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
  38. }
  39. /**
  40. * 收到客户端消息后调用的方法
  41. *
  42. * @param message 客户端发送过来的消息
  43. */
  44. @OnMessage
  45. public void onMessage(String message, Session session) {
  46. System.out.println("来自客户端的消息:" + message);
  47. //群发消息
  48. for (WebSocket item : webSocketSet) {
  49. try {
  50. item.sendMessage(message);
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. /**
  57. * 发生错误时调用
  58. */
  59. @OnError
  60. public void onError(Session session, Throwable error) {
  61. System.out.println("发生错误");
  62. error.printStackTrace();
  63. }
  64. public void sendMessage(String message) throws IOException {
  65. this.session.getBasicRemote().sendText(message);
  66. //this.session.getAsyncRemote().sendText(message);
  67. }
  68. /**
  69. * 群发自定义消息
  70. */
  71. public static void sendInfo(String message) throws IOException {
  72. for (WebSocket item : webSocketSet) {
  73. try {
  74. item.sendMessage(message);
  75. } catch (IOException e) {
  76. continue;
  77. }
  78. }
  79. }
  80. public static synchronized int getOnlineCount() {
  81. return onlineCount;
  82. }
  83. public static synchronized void addOnlineCount() {
  84. WebSocket.onlineCount++;
  85. }
  86. public static synchronized void subOnlineCount() {
  87. WebSocket.onlineCount--;
  88. }
  89. }

使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。

虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。

4、前端代码

  1. My WebSocket
  2. Welcome
  3. id="text" type="text" /> onclick="send()">Send onclick="closeWebSocket()">Close
  4. id="message">
  • type="text/javascript">
  • var websocket = null;
  • //判断当前浏览器是否支持WebSocket
  • if('WebSocket' in window){
  • websocket = new WebSocket("ws://localhost:8084/websocket");
  • }
  • else{
  • alert('Not support websocket')
  • }
  • //连接发生错误的回调方法
  • websocket.onerror = function(){
  • setMessageInnerHTML("error");
  • };
  • //连接成功建立的回调方法
  • websocket.onopen = function(event){
  • setMessageInnerHTML("open");
  • }
  • //接收到消息的回调方法
  • websocket.onmessage = function(event){
  • setMessageInnerHTML(event.data);
  • }
  • //连接关闭的回调方法
  • websocket.onclose = function(){
  • setMessageInnerHTML("close");
  • }
  • //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
  • window.onbeforeunload = function(){
  • websocket.close();
  • }
  • //将消息显示在网页上
  • function setMessageInnerHTML(innerHTML){
  • document.getElementById('message').innerHTML += innerHTML + '';
  • }
  • //关闭连接
  • function closeWebSocket(){
  • websocket.close();
  • }
  • //发送消息
  • function send(){
  • var message = document.getElementById('text').value;
  • websocket.send(message);
  • }
  • 以上代码,实现了websocket简单消息推送,可以实现两个页面间的消息显示,但是Java后台主动推送消息时,无法获取消息推送的websocket下的session,即无法实现websocket下session的共享。

    为解决主动推送的难题,需要在建立连接时,将websocket下的session与servlet下的HttpSession(或者其他session,我们这用到了shiro下的session)建立关联关系。

    webSocket配置Java类:

    1. import com.bootdo.common.utils.ShiroUtils;
    2. import org.apache.catalina.session.StandardSessionFacade;
    3. import org.apache.shiro.session.Session;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    7. import javax.servlet.http.HttpSession;
    8. import javax.websocket.HandshakeResponse;
    9. import javax.websocket.server.HandshakeRequest;
    10. import javax.websocket.server.ServerEndpointConfig;
    11. import javax.websocket.server.ServerEndpointConfig.Configurator;
    12. @Configuration
    13. public class WebSocketConfig extends Configurator {
    14. @Override
    15. public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    16. /*如果没有监听器,那么这里获取到的HttpSession是null*/
    17. StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
    18. if (ssf != null) {
    19. HttpSession httpSession = (HttpSession) request.getHttpSession();
    20. //关键操作
    21. sec.getUserProperties().put("sessionId", httpSession.getId());
    22. System.out.println("获取到的SessionID:" + httpSession.getId());
    23. }
    24. }
    25. /**
    26. * 引入shiro框架下的session,获取session信息
    27. */
    28. /*
    29. @Override
    30. public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    31. Session shiroSession = ShiroUtils.getSubjct().getSession();
    32. sec.getUserProperties().put("sessionId", shiroSession.getId());
    33. }
    34. */
    35. @Bean
    36. public ServerEndpointExporter serverEndpointExporter() {
    37. //这个对象说一下,貌似只有服务器是tomcat的时候才需要配置,具体我没有研究
    38. return new ServerEndpointExporter();
    39. }
    40. }

    webSocket消息实现类方法:

    1. import org.springframework.stereotype.Component;
    2. import javax.websocket.*;
    3. import javax.websocket.server.ServerEndpoint;
    4. import java.io.IOException;
    5. import java.util.concurrent.CopyOnWriteArraySet;
    6. //configurator = WebsocketConfig.class 该属性就是我上面配置的信息
    7. @ServerEndpoint(value = "/websocket", configurator = WebSocketConfig.class)
    8. @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理
    9. public class WebSocket {
    10. //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    11. private static int onlineCount = 0;
    12. //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    13. private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
    14. //与某个客户端的连接会话,需要通过它来给客户端发送数据
    15. private Session session;
    16. /**
    17. * 连接建立成功调用的方法
    18. *

    19. * config用来获取WebsocketConfig中的配置信息
    20. */
    21. @OnOpen
    22. public void onOpen(Session session, EndpointConfig config) {
    23. //获取WebsocketConfig.java中配置的“sessionId”信息值
    24. String httpSessionId = (String) config.getUserProperties().get("sessionId");
    25. this.session = session;
    26. webSocketSet.add(this); //加入set中
    27. addOnlineCount(); //在线数加1
    28. System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
    29. try {
    30. sendMessage("Hello world");
    31. } catch (IOException e) {
    32. System.out.println("IO异常");
    33. }
    34. }
    35. /**
    36. * 连接关闭调用的方法
    37. */
    38. @OnClose
    39. public void onClose() {
    40. webSocketSet.remove(this); //从set中删除
    41. subOnlineCount(); //在线数减1
    42. System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
    43. }
    44. /**
    45. * 收到客户端消息后调用的方法
    46. *
    47. * @param message 客户端发送过来的消息
    48. */
    49. @OnMessage
    50. public void onMessage(String message, Session session) {
    51. System.out.println("来自客户端的消息:" + message);
    52. //群发消息
    53. for (WebSocket item : webSocketSet) {
    54. try {
    55. item.sendMessage(message);
    56. } catch (IOException e) {
    57. e.printStackTrace();
    58. }
    59. }
    60. }
    61. /**
    62. * 发生错误时调用
    63. */
    64. @OnError
    65. public void onError(Session session, Throwable error) {
    66. System.out.println("发生错误");
    67. error.printStackTrace();
    68. }
    69. public void sendMessage(String message) throws IOException {
    70. this.session.getBasicRemote().sendText(message);
    71. //this.session.getAsyncRemote().sendText(message);
    72. }
    73. /**
    74. * 群发自定义消息
    75. */
    76. public static void sendInfo(String message) throws IOException {
    77. for (WebSocket item : webSocketSet) {
    78. try {
    79. item.sendMessage(message);
    80. } catch (IOException e) {
    81. continue;
    82. }
    83. }
    84. }
    85. public static synchronized int getOnlineCount() {
    86. return onlineCount;
    87. }
    88. public static synchronized void addOnlineCount() {
    89. WebSocket.onlineCount++;
    90. }
    91. public static synchronized void subOnlineCount() {
    92. WebSocket.onlineCount--;
    93. }
    94. }

    注意,有上面配置后,如果配置获取的信息为null,需加入监听实现类:

    1. import org.springframework.stereotype.Component;
    2. import javax.servlet.ServletRequestEvent;
    3. import javax.servlet.ServletRequestListener;
    4. import javax.servlet.http.HttpServletRequest;
    5. import javax.servlet.http.HttpSession;
    6. /**
    7. * 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去
    8. */
    9. @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理,相当于注册监听吧
    10. public class RequestListener implements ServletRequestListener {
    11. @Override
    12. public void requestInitialized(ServletRequestEvent sre) {
    13. //将所有request请求都携带上httpSession
    14. HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession();
    15. System.out.println("将所有request请求都携带上httpSession " + httpSession.getId());
    16. }
    17. public RequestListener() {
    18. }
    19. @Override
    20. public void requestDestroyed(ServletRequestEvent arg0) {
    21. }
    22. }

    对应的前端页面无需改变