SpringBoot整合WebSocket的客户端和服务端的实现代码

目录
  • 一、项目中服务端的创建
  • 二、java充当客户端链接ws
    • 1、ws客户端的配置
    • 2、配置信息需要在项目启动的时候去启用和链接ws服务
    • 3、接收服务端推送的消息进行权限过滤demo
    • 4、ws客户端推送消息,推送消息和上面服务端类似。

本文是项目中使用了websocket进行一些数据的推送,对比项目做了一个demo,ws的相关问题不做细数,仅做一下记录。

此demo针对ws的搭建主要逻辑背景是一个服务端B:通讯层 产生消息推送出去,另外一个项目A充当客户端和服务端,A的客户端:是接收通讯层去无差别接收这些消息,A的服务端:根据地址ip去订阅。用户通过订阅A的ws,同时记录下自己的信息,项目B推送的消息,项目A接收到之后通过当初订阅的逻辑和一些权限过滤条件对项目B产生的消息进行过滤再推送到用户客户端上。

一、项目中服务端的创建

首先引入maven仓库

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

websocket的服务端搭建

同时注意springboot要开启ws服务

启动类加上@EnableScheduling

简要解读demo

/webSocket/{id}:链接的id是业务上的一个id,这边之前做过类似拍卖的,相当于一个服务端或者业务上的一个标识,是客户端指明链接到哪一个拍卖间的标识

@ServerEndpoint:作为服务端的注解。

package com.ghh.myproject.websocket;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{id}")
@Component
public class WebSocket {
    private Logger log = LoggerFactory.getLogger(WebSocket.class);
    private static int onlineCount = 0;
    /** 创建一个map存放   产生的ws链接推送 */
    private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    /** 创建一个map存放   当前接入的客户端 */
    private static Map<String, String> idMap = new ConcurrentHashMap<>();

    private Session session;
    /** 链接进入的一个场景id */
    private String id;
    /** 每一个链接的一个唯一标识 */
    private String userNo;
    /**
    * @Description: 第三方文接入当前项目websocket后的记录信息
    * @DateTime: 2021/7/5 10:02
    * @Author: GHH
    * @Params: [id, session]
    * @Return void
    */
    @OnOpen
    public void onOpen(@PathParam("id") String id, Session session) throws IOException {
        log.info("已连接到id:{}竞拍场,当前竞拍场人数:{}", id, getUserNosById(id).size());
        this.id = id;
        this.session = session;
        // 生成一个随机序列号来存储一个id下的所有用户
        this.userNo = UUID.fastUUID().toString();
        addOnlineCount();
        //根据随机序列号存储一个socket连接
        clients.put(userNo, this);
        idMap.put(userNo, id);
    }
    /**
    * @Description: 关闭连接
    * @DateTime: 2021/7/5 10:02
    * @Author: GHH
    * @Params: []
    * @Return void
    */
    @OnClose
    public void onClose() throws IOException {
        clients.remove(userNo);
        idMap.remove(userNo);
        subOnlineCount();
    }
    /**
    * @Description: 客户端发送消息调用此方法
    * @DateTime: 2021/6/16 15:35
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    @OnMessage
    public void onMessage(String message) throws IOException {
//        JSONObject jsonTo = JSONObject.parseObject(message);
//        String mes = (String) jsonTo.get("message");
//        if (!("All").equals(jsonTo.get("To"))) {
//            sendMessageTo(mes, jsonTo.get("To").toString());
//        } else {
//            sendMessageAll(message);
//        }
        log.info("onMessage方法成功");
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}", error);
    }
    public static void sendMessageTo(String message, String userNo) throws IOException {
        // session.getBasicRemote().sendText(message);
        //session.getAsyncRemote().sendText(message);
        WebSocket webSocket = clients.get(userNo);
        if (webSocket != null && webSocket.session.isOpen()) {
            webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
        }
    }
    /**
    * @Description: 推送到指定的id值的记录
    * @DateTime: 2021/6/15 17:11
    * @Author: GHH
    * @Params: [message, id]
    * @Return void
    */
    public static void sendMessageToById(String message, String id) {
        // session.getBasicRemote().sendText(message);
        //session.getAsyncRemote().sendText(message);
        //根据id获取所有的userNo链接的用户
        List<String> userNos = getUserNosById(id);
        for (WebSocket item : clients.values()) {
            //遍历链接的value值,如果当前传入的id中链接的用户包含value值,则推送。
            if (userNos.contains(item.userNo)) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    }
    /**
    * @Description: 推送所有开启的信息
    * @DateTime: 2021/6/15 17:13
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    public static void sendMessageAll(String message){
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
    /**
    * @Description: 根据相应场景的一些逻辑处理
    * @DateTime: 2021/7/5 10:03
    * @Author: GHH
    * @Params: [id]
    * @Return java.util.List<java.lang.String>
    */
    public static List<String> getUserNosById(String id) {
        ArrayList<String> userNos = new ArrayList<>();
        for (Map.Entry<String, String> entry : idMap.entrySet()) {
            if (entry.getValue().equals(id)) {
                userNos.add(entry.getKey());
            }
        }
        return userNos;
    }
}

 demo中模拟的是定时器推送,第一个参数是消息内容,第二个是推送到哪一个拍卖间或者其他业务上的内容。方法的具体内容上一段代码有详细解释,有通过id,或者发送给全部ws链接的客户端

WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?")
    public void job1(){
        log.info("测试生成次数:{}",count);
        redisTemplate.opsForValue().set("测试"+count, ""+count++);
        if (count%2==0){
            WebSocket.sendMessageToById(""+count,2+"");
        }else {
            WebSocket.sendMessageToById(""+count,1+"");
        }

        log.info("websocket发送"+count);
    }

二、java充当客户端链接ws

上述是java作为ws服务端推送当前业务信息的一个demo。我们项目目前做的是一个通讯层的概念,只能够推送数据内容,却无法根据用户权限去推送不同的数据。

ws客户端的搭建,首先链接ws服务端。首先是我们另外一个服务的ws配置信息,我这边demo是模拟链接上面的ws服务

1、ws客户端的配置

package com.ghh.websocketRecive.wsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;
/**
 * @author ghh
 * @date 2019-08-16 16:02
 */
@Component
@Slf4j
public class WSClient {
    public static Session session;
    public static void startWS() {
        try {
            if (WSClient.session != null) {
                WSClient.session.close();
            }
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            //设置消息大小最大为10M
            container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
            container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
            // 客户端,开启服务端websocket。
            String uri = "ws://192.168.0.108:8082/webSocket/1";
            Session session = container.connectToServer(WSHandler.class, URI.create(uri));
            WSClient.session = session;
        } catch (Exception ex) {
            log.info(ex.getMessage());
        }
    }
}

2、配置信息需要在项目启动的时候去启用和链接ws服务

package com.ghh.websocketRecive;
import com.ghh.websocketRecive.wsMessage.WSClient;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
@Slf4j
@EnableScheduling
@SpringBootApplication
@MapperScan("com.ghh.websocketRecive.dao")
public class WebsocketReciveApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebsocketReciveApplication.class, args);
    }
    @PostConstruct
    public void init(){
        log.info("初始化应用程序");     // 初始化ws,链接服务端
        WSClient.startWS();
    }
}

3、接收服务端推送的消息进行权限过滤demo

@ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。

package com.ghh.websocketRecive.wsMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ghh.websocketRecive.entity.Student;
import com.ghh.websocketRecive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.util.Objects;
import java.util.Set;
import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
@ClientEndpoint
@Slf4j
@Component
public class WSHandler {
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private static RedisTemplate<String,String> redisTemplateService;
    @PostConstruct
    public void init() {
        redisTemplateService=redisTemplate;
    }
    @OnOpen
    public void onOpen(Session session) {
        WSClient.session = session;
    }
    @OnMessage
    public void processMessage(String message) {
        log.info("websocketRecive接收推送消息"+message);
        int permission = Integer.parseInt(message)%5;
        //查询所有订阅的客户端的ip。
        Set<String> keys = redisTemplateService.keys("ip:*");
        for (String key : keys) {
            // 根据登录后存储的客户端ip,获取权限地址
            String s = redisTemplateService.opsForValue().get(key);
            String[] split = s.split(",");
            for (String s1 : split) {
                //向含有推送过来的数据权限地址的客户端推送告警数据。
                if (s1.equals(permission+"")){
                    WebSocket.sendMessageToByIp(message,key.split(":")[1]);
                }
            }
        }
    }
    @OnError
    public void processError(Throwable t) {
        WSClient.session = null;
        try {
            Thread.sleep(5000);
            startWS();
        } catch (InterruptedException e) {
            log.error("---websocket processError InterruptedException---", e);
        }
        log.error("---websocket processError error---", t);
    }
    @OnClose
    public void processClose(Session session, CloseReason closeReason) {
        log.error(session.getId() + closeReason.toString());
    }
    public void send(String sessionId, String message) {
        try {
            log.info("send Msg:" + message);
            if (Objects.nonNull(WSClient.session)) {
                WSClient.session.getBasicRemote().sendText(message);
            } else {
                log.info("---websocket error----");
            }
        } catch (Exception e) {
            log.error("---websocket send error---", e);
        }
    }
}

4、ws客户端推送消息,推送消息和上面服务端类似。

这边是根据ip

package com.ghh.websocketRecive.wsMessage;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.ghh.websocketRecive.service.UserService;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{ip}")
@Component
public class WebSocket {
    private Logger log = LoggerFactory.getLogger(WebSocket.class);
    private static int onlineCount = 0;
    private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    private Session session;
    /** 当前连接服务端的客户端ip */
    private String ip;
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private static RedisTemplate<String,String> redisTemplateService;
    @PostConstruct
    public void init() {
        redisTemplateService = redisTemplate;
    }
    @OnOpen
    public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
        log.info("ip:{}客户端已连接:,当前客户端数量:{}", ip, onlineCount+1);
        this.ip = ip;
        this.session = session;
        // 接入一个websocket则生成一个随机序列号
        addOnlineCount();
        //根据随机序列号存储一个socket连接
        clients.put(ip, this);
    }
    @OnClose
    public void onClose() throws IOException {
        clients.remove(ip);
        onlineCount--;
        subOnlineCount();
    }
    /**
    * @Description: 客户端发送消息调用此方法
    * @DateTime: 2021/6/16 15:35
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    @OnMessage
    public void onMessage(String message) throws IOException {
        log.info("客户端发送消onMessage方法成功");
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}", error);
    }
    public static void sendMessageTo(String message, String userNo) throws IOException {
        WebSocket webSocket = clients.get(userNo);
        if (webSocket != null && webSocket.session.isOpen()) {
            webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
        }
    }
    /**
    * @Description: 推送到指定的ip值的记录
    * @DateTime: 2021/6/15 17:11
    * @Author: GHH
    * @Params: [message, id]
    * @Return void
    */
    public static void sendMessageToByIp(String message, String ip) {
        for (WebSocket item : clients.values()) {
            //遍历链接的value值,如果当前传入的ip中链接的用户包含value值,则推送。
            if (item.ip.equals(ip)) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    }
    /**
    * @Description: 推送所有开启的信息
    * @DateTime: 2021/6/15 17:13
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    public static void sendMessageAll(String message){
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
}

概述:

至此,简易的demo搭建完成,项目gitee网址:https://gitee.com/ghhNB/study.git

到此这篇关于SpringBoot整合WebSocket的客户端和服务端的实现的文章就介绍到这了,更多相关SpringBoot整合WebSocket内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot整合websocket最基础入门使用教程详解

    项目最终的文件结构 1 添加maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <

  • SpringBoot整合websocket实现即时通信聊天

    目录 一.技术介绍 1.1 客户端WebSocket 1.1.1 函数 1.1.2 事件 1.2 服务端WebSocket 二.实战 2.1.服务端 2.1.1引入maven依赖 2.1.2 编写配置类 2.1.3 编写WebSocketService服务类 2.1.4 建立连接 2.1.5 关闭连接 2.1.6 发送消息 2.1.7 监听错误 2.2 客户端 2.2.1 主页面 2.2.1 聊天页面 三.开源地址 四.参考文献 一.技术介绍 线上演示地址:http://chat.breez.w

  • springboot整合websocket实现群聊思路代码详解

    实现思路 发送者向服务器发送大家早上好.其它客户端可以收到对应消息. 项目展示 通过springboot引入websocket,实现群聊,通过在线websocket测试进行展示. 核心代码 pom引入jar <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2

  • SpringBoot2.0整合WebSocket代码实例

    这篇文章主要介绍了SpringBoot2.0整合WebSocket代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 之前公司的某个系统为了实现推送技术,所用的技术都是Ajax轮询,这种方式浏览器需要不断的向服务器发出请求,显然这样会浪费很多的带宽等资源,所以研究了下WebSocket,本文将详细介绍下. 一.什么是WebSocket? WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议,能更好的节省服务器资

  • 使用springboot整合websocket实现群聊教程

    目录 先上效果图: 先来准备工作导入依赖 导入依赖后扫描启用 接收前端传回数据 其中重点就是4个注解 **@OnOpen,@OnClose,@OnMessage,@OnError** 前端页面代码 模板引擎代码如下 最后效果图如下 先上效果图: 相对来说更好看那么一点但是,实现代码都是一样的. 先来准备工作导入依赖 <!--websocket依赖--> <dependency> <groupId>org.springframework.boot</groupId&

  • SpringBoot整合Netty实现WebSocket的示例代码

    目录 一.pom.xml依赖配置 二.代码 2.1.NettyServer 类 2.2.SocketHandler 类 2.3.ChannelHandlerPool 类 2.4.Application启动类 三.测试 一.pom.xml依赖配置 <!-- netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <v

  • 通过实例讲解springboot整合WebSocket

    一.背景 我们都知道 http 协议只能浏览器单方面向服务器发起请求获得响应,服务器不能主动向浏览器推送消息.想要实现浏览器的主动推送有两种主流实现方式: 轮询:缺点很多,但是实现简单 websocket:在浏览器和服务器之间建立 tcp 连接,实现全双工通信 springboot 使用 websocket 有两种方式,一种是实现简单的 websocket,另外一种是实现STOMP协议.这一篇实现简单的 websocket,STOMP 下一篇在讲. 注意:如下都是针对使用 springboot

  • SpringBoot整合WebSocket的客户端和服务端的实现代码

    目录 一.项目中服务端的创建 二.java充当客户端链接ws 1.ws客户端的配置 2.配置信息需要在项目启动的时候去启用和链接ws服务 3.接收服务端推送的消息进行权限过滤demo 4.ws客户端推送消息,推送消息和上面服务端类似. 本文是项目中使用了websocket进行一些数据的推送,对比项目做了一个demo,ws的相关问题不做细数,仅做一下记录. 此demo针对ws的搭建主要逻辑背景是一个服务端B:通讯层 产生消息推送出去,另外一个项目A充当客户端和服务端,A的客户端:是接收通讯层去无差

  • SpringBoot整合WebSocket实现后端向前端发送消息的实例代码

    一.什么是 websocket 接口 使用 websocket 建立长连接,服务端和客户端可以互相通信,服务端只要有数据更新,就可以主动推给客户端. WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输.在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道.两者之

  • java Nio使用NioSocket客户端与服务端交互实现方式

    NioSocket 客户端与服务端交互实现 java Nio是jdk1.4新增的io方式-–nio(new IO),这种方式在目前来说算不算new,更合适的解释应该是non-block IO. non-block是相对于传统的io方式来讲的.传统的Io方式是阻塞的,我们拿网络io来举例,传统的io模型如下: 服务端主线程负责不断地server.accept(),如果没有客户端请求主线程就会阻塞,当客户端请求时,主线程会通过线程池创建一个新的线程执行. 简单解释就是一个线程负责一个客户端的sock

  • SpringBoot整合Spring Boot Admin实现服务监控的方法

    目录 1. Server端服务开发 1.1. 引入核心依赖 1.2. application.yml配置文件 1.3. Security配置文件 1.4. 主启动类 2. Client端服务开发 2.1. 引入核心依赖 2.2. application.yml配置文件 2.3. logback-spring.xml文件 2.4. 主启动类 3. 验证 4. 配置邮件告警 4.1. 引入核心依赖 4.3. 通知配置文件 4.4. 验证 Spring Boot Admin用于管理和监控一个或多个Sp

  • Python警察与小偷的实现之一客户端与服务端通信实例

    本文实例讲述了Python警察与小偷的实现之一客户端与服务端通信,分享给大家供大家参考.具体方法分析如下: 该实例来源于ISCC 2012 破解关第四题 目的是通过逆向police,实现一个thief,能够与police进行通信 实际上就是一个RSA加密通信的例子,我们通过自己编写客户端和服务端来实现上面的thief和police的功能. 要通信,这们这次先通过python写出可以进行网络连接的客户端与服务端. 服务端代码如下: #!/usr/bin/env python import Sock

  • python Socket之客户端和服务端握手详解

    简单的学习下利用socket来建立客户端和服务端之间的连接并且发送数据 1. 客户端socketClient.py代码 import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 建立连接: s.connect(('127.0.0.1', 9999)) # 接收欢迎消息: print(s.recv(1024).decode('utf-8')) for data in [b'Michael', b'Tracy', b'

随机推荐