Springboot Websocket Stomp 消息订阅推送

目录
  • 需求背景
  • websocket协议
  • stomp协议

需求背景

闲话不扯,直奔主题。需要和web前端建立长链接,互相实时通讯,因此想到了websocket,后面随着需求的变更,需要用户订阅主题,实现消息的精准推送,发布订阅等,则想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议。

websocket协议

想到了之前写的一个websocket长链接的demo,也贴上代码供大家参考。

pom文件
直接引入spring-boot-starter-websocket即可。

    	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

声明websocket endpoint

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @ClassName WebSocketConfig
 * @Author scott
 * @Date 2021/6/16
 * @Version V1.0
 **/
@Configuration
public class WebSocketConfig {

    /**
     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

websocket实现类,其中通过注解监听了各种事件,实现了推送消息等相关逻辑

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: DataTypePushWebSocket
 * @Author: scott
 * @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {

    private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);

    /**
     * 记录当前在线连接数
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
            .initialCapacity(10)
            .maximumSize(300)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token")String token) {
        String sessionId = session.getId();
        onlineCount.incrementAndGet(); // 在线数加1
        this.sendMessage("sessionId:" + sessionId +",已经和server建立连接", session);
        SESSION_CACHE.put(sessionId,session);
        log.info("有新连接加入:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session,@PathParam("token")String token) {
        onlineCount.decrementAndGet(); // 在线数减1
        SESSION_CACHE.invalidate(session.getId());
        log.info("有一连接关闭:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("token")String token) {
        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        this.sendMessage("服务端已收到推送消息:" + message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 服务端发送消息给客户端
     */
    private static void sendMessage(String message, Session toSession) {
        try {
            log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败:{}", e);
        }
    }

    public static AjaxResult sendMessage(String message, String sessionId){
        Session session = SESSION_CACHE.getIfPresent(sessionId);
        if(Objects.isNull(session)){
            return AjaxResult.error("token已失效");
        }
        sendMessage(message,session);
        return AjaxResult.success();
    }

    public static AjaxResult sendBroadcast(String message){
        long size = SESSION_CACHE.size();
        if(size <=0){
            return AjaxResult.error("当前没有在线客户端,无法推送消息");
        }
        ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
        Set<String> keys = sessionConcurrentMap.keySet();
        for (String key : keys) {
            Session session = SESSION_CACHE.getIfPresent(key);
            DataTypePushWebSocket.sendMessage(message,session);
        }

        return AjaxResult.success();

    }

}

至此websocket服务端代码已经完成。

stomp协议

前端代码.这个是在某个vue工程中写的js,各位大佬自己动手改改即可。其中Settings.wsPath是后端定义的ws地址例如ws://localhost:9003/ws

import Stomp from 'stompjs'
import Settings from '@/settings.js'

export default {
  // 是否启用日志 默认启用
  debug:true,
  // 客户端连接信息
  stompClient:{},
  // 初始化
  init(callBack){
    this.stompClient = Stomp.client(Settings.wsPath)
    this.stompClient.hasDebug = this.debug
    this.stompClient.connect({},suce =>{
      this.console("连接成功,信息如下 ↓")
      this.console(this.stompClient)
      if(callBack){
        callBack()
      }
    },err => {
      if(err) {
        this.console("连接失败,信息如下 ↓")
        this.console(err)
      }
    })
  },
  // 订阅
  sub(address,callBack){
    if(!this.stompClient.connected){
      this.console("没有连接,无法订阅")
      return
    }
    // 生成 id
    let timestamp= new Date().getTime() + address
    this.console("订阅成功 -> "+address)
    this.stompClient.subscribe(address,message => {
      this.console(address+" 订阅消息通知,信息如下 ↓")
      this.console(message)
      let data = message.body
      callBack(data)
    },{
      id: timestamp
    })
  },
  unSub(address){
    if(!this.stompClient.connected){
      this.console("没有连接,无法取消订阅 -> "+address)
      return
    }
    let id = ""
    for(let item in this.stompClient.subscriptions){
      if(item.endsWith(address)){
        id = item
        break
      }
    }
    this.stompClient.unsubscribe(id)
    this.console("取消订阅成功 -> id:"+ id + " address:"+address)
  },
  // 断开连接
  disconnect(callBack){
    if(!this.stompClient.connected){
      this.console("没有连接,无法断开连接")
      return
    }
    this.stompClient.disconnect(() =>{
      console.log("断开成功")
      if(callBack){
        callBack()
      }
    })
  },
  // 单位 秒
  reconnect(time){
    setInterval(() =>{
      if(!this.stompClient.connected){
        this.console("重新连接中...")
        this.init()
      }
    },time * 1000)
  },
  console(msg){
    if(this.debug){
      console.log(msg)
    }
  },
  // 向订阅发送消息
  send(address,msg) {
    this.stompClient.send(address,{},msg)
  }
}

后端stomp config,里面都有注释,写的很详细,并且我加入了和前端的心跳ping pong。

package com.cn.scott.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @ClassName: WebSocketStompConfig
 * @Author: scott
 * @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    private static long HEART_BEAT=10000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //允许使用socketJs方式访问,访问点为webSocket,允许跨域
        //在网页上我们就可以通过这个链接
        //ws://127.0.0.1:port/ws来和服务器的WebSocket连接
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();
        //基于内存的STOMP消息代理来代替mq的消息代理
        //订阅Broker名称,/user代表点对点即发指定用户,/topic代表发布广播即群发
        //setHeartbeatValue 设置心跳及心跳时间
        registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
        //点对点使用的订阅前缀,不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user/");
    }
}

后端stomp协议接受、订阅等动作通知

package com.cn.scott.ws;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName StompSocketHandler
 * @Author scott
 * @Date 2021/6/30
 * @Version V1.0
 **/
@RestController
public class StompSocketHandler {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    /**
    * @MethodName: subscribeMapping
     * @Description: 订阅成功通知
     * @Param: [id]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @SubscribeMapping("/user/{id}/listener")
    public void subscribeMapping(@DestinationVariable("id") final long id) {
        System.out.println(">>>>>>用户:"+id +",已订阅");
        SubscribeMsg param = new SubscribeMsg(id,String.format("用户【%s】已订阅成功", id));
        sendToUser(param);
    }

    /**
    * @MethodName: test
     * @Description: 接收订阅topic消息
     * @Param: [id, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @MessageMapping(value = "/user/{id}/listener")
    public void UserSubListener(@DestinationVariable long  id, String msg) {
        System.out.println("收到客户端:" +id+",的消息");
        SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用户【%s】发送消息【%s】", id,msg));
        sendToUser(param);
    }

     @GetMapping("/refresh/{userId}")
    public void refresh(@PathVariable Long userId, String msg) {
        StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服务端向用户【%s】发送消息【%s】", userId,msg));
        sendToUser(param);
    }

    /**
    * @MethodName: sendToUser
     * @Description: 推送消息给订阅用户
     * @Param: [userId]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendToUser(SubscribeMsg screenChangeMsg){
        //这里可以控制权限等。。。
        simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
    }

    /**
    * @MethodName: sendBroadCast
     * @Description: 发送广播,需要用户事先订阅广播
     * @Param: [topic, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendBroadCast(String topic,String msg){
        simpMessagingTemplate.convertAndSend(topic,msg);
    }

    /**
     * @ClassName: SubMsg
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public static class SubscribeMsg {
        private Long userId;
        private String msg;
        public SubscribeMsg(Long UserId, String msg){
            this.userId = UserId;
            this.msg = msg;
        }
        public Long getUserId() {
            return userId;
        }
        public String getMsg() {
            return msg;
        }
    }
}

连接展示

建立连接成功,这里可以看出是基于websocket协议

连接信息

ping pong

调用接口向订阅用户1发送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客户端控制台查看已经收到了消息。这个时候不同用户通过自己的userId可以区分订阅的主题,可以做到通过userId精准的往客户端推送消息。

还记得我们在后端配置的时候还指定了广播的订阅主题/topic,这时我们前端通过js只要订阅了这个主题,那么后端在像这个主题推送消息时,所有订阅的客户端都能收到,感兴趣的小伙伴可以自己试试,api我都写好了。

至此,实战完毕,喜欢的小伙伴麻烦关注加点赞。

springboot + stomp后端源码地址:https://gitee.com/ErGouGeSiBaKe/stomp-server

到此这篇关于Springboot Websocket Stomp 消息订阅推送的文章就介绍到这了,更多相关Springboot Websocket Stomp 消息订阅推送内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot websocket集群(stomp协议)连接时候传递参数

    最近在公司项目中接到个需求.就是后台跟前端浏览器要保持长连接,后台主动往前台推数据. 网上查了下,websocket stomp协议处理这个很简单.尤其是跟springboot 集成. 但是由于开始是单机玩的,很顺利. 但是后面部署到生产搞集群的话,就会出问题了. 假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到.但是B节点由于没有跟浏览器A建立连接.B节点发的消息浏览器就收不到了. 网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的. 还有很多思路都

  • Springboot Websocket Stomp 消息订阅推送

    目录 需求背景 websocket协议 stomp协议 需求背景 闲话不扯,直奔主题.需要和web前端建立长链接,互相实时通讯,因此想到了websocket,后面随着需求的变更,需要用户订阅主题,实现消息的精准推送,发布订阅等,则想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议. websocket协议 想到了之前写的一个websocket长链接的demo,也贴上代码供大家参考. pom文件 直接引入spring-bo

  • SpringBoot+WebSocket实现消息推送功能

    目录 背景 WebSocket简介 协议原理 WebSocket与HTTP协议的区别 WebSocket特点 应用场景 系统集成Websocket jar包引入 Websocket配置 具体实现 测试示例 页面请求websocket 测试效果 背景 项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询.comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够

  • 利用Socket.io 实现消息实时推送功能

    项目背景介绍 最近在写的项目中存在着社交模块,需要实现这样的一个功能:当发生了用户被点赞.评论.关注等操作时,需要由服务器向用户实时地推送一条消息.最终完成的项目地址为:https://github.com/noiron/socket-message-push,这里将介绍一下实现的思路及部分代码. 项目的流程中存在着这样的几个对象: 用 Java 实现的后端服务器 用 Node.js 实现的消息推送服务器 用户进行操作的客户端 事件处理的流程如下: 用户进行点赞操作时,后端服务器会进行处理,并向

  • 浅谈我是如何用redis做实时订阅推送的

    前阵子开发了公司领劵中心的项目,这个项目是以redis作为关键技术落地的. 先说一下领劵中心的项目吧,这个项目就类似京东app的领劵中心,当然图是截取京东的,公司的就不截了... 其中有一个功能叫做领劵的订阅推送.什么是领劵的订阅推送?就是用户订阅了该劵的推送,在可领取前的一分钟就要把提醒信息推送到用户的app中.本来这个订阅功能应该是消息中心那边做的,但他们说这个短时间内做不了.所以让我这个负责优惠劵的做了-.-!.具体方案就是到具体的推送时间点了,coupon系统调用消息中心的推送接口,把信

  • spring中websocket定时任务实现实时推送

    有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了.使用的定时任务方式为spring的TaskScheduler对象实现任务调度. TaskScheduler定时任务实现 TaskScheduler接口提供了多种调度方法来实现运行任务的执行. public interface TaskScheduler { //通过触发器来决定task是否执行 ScheduledFuture sche

  • PHP实现的消息实时推送功能【基于反ajax推送】

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> <title>反ajax推送</title> <style> .send{color:#555;text-align: left;} .require{color:blue;text-align: right;} .content_box{text-align: cen

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • PHP实现长轮询消息实时推送功能代码实例讲解

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> <title>反ajax推送</title> <style> .send{color:#555;text-align: left;} .require{color:blue;text-align: right;} .content_box{text-align: cen

  • Spring和Websocket相结合实现消息的推送

    本文主要有三个步骤 1.用户登录后建立websocket连接,默认选择websocket连接,如果浏览器不支持,则使用sockjs进行模拟连接 2.建立连接后,服务端返回该用户的未读消息 3.服务端进行相关操作后,推送给某一个用户或者所有用户新消息 相关环境 Spring4.0.6(要选择4.0+),tomcat7.0.55 Websocet服务端实现 WebSocketConfig.java @Configuration @EnableWebMvc @EnableWebSocket publi

  • iOS消息远程推送通知

    本文实例为大家分享了iOS消息推送.iOS远程通知代码,供大家参考,具体内容如下 消息推送 /* 要开发测试消息机制的程序,必须用真机测试 推送消息的类型 UIRemoteNotificationTypeNone 不接收推送消息 UIRemoteNotificationTypeBadge 接收图标数字 UIRemoteNotificationTypeSound 接收音频 UIRemoteNotificationTypeAlert 接收消息文字 UIRemoteNotificationTypeNe

随机推荐