spring中websocket定时任务实现实时推送

有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。

TaskScheduler定时任务实现

TaskScheduler接口提供了多种调度方法来实现运行任务的执行。

public interface TaskScheduler {

 	//通过触发器来决定task是否执行
    ScheduledFuture schedule(Runnable task, Trigger trigger); 

 	//在starttime的时候执行一次
    ScheduledFuture schedule(Runnable task, Date startTime);
    ScheduledFuture schedule(Runnable task, Instant startTime); 

 	//从starttime开始每个period时间段执行一次task
    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); 

 	//每隔period执行一次
    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);  

 	//从startTime开始每隔delay长时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); 

 	//每隔delay时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}

简单测试一下

import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;

/**
 * The type Task scheduler test.
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:45:17
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {

    private final TaskScheduler taskScheduler;

    @Bean
    public void test() {
        //每隔3秒执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //每隔1秒执行一次
        //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
        taskScheduler.schedule(new MyThread(), trigger);
    }

    private class MyThread implements Runnable {
        @Override
        public void run() {
            log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
        }
    }

}

效果就是每个3秒执行一次

websocket+定时任务实时推送

实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现

TestWebsocket.java

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;

/**
 * 测试websocket
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 14:55:29
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {

    protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();

    /**
     * 定时任务集合
     */
    Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();

    /**
     * taskScheduler
     */
    private final TaskScheduler taskScheduler;

    /**
     * 建立连接后操作
     *
     * @param session 连接session信息
     * @throws Exception exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
        WEB_SOCKET_SESSIONS.add(session);
        //设置定时任务,每隔3s执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //开启一个定时任务
        ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
        //根据session连接id定时任务线程存到map中
        stringScheduledFutureMap.put(session.getId(), schedule);
    }

    private class CustomizeTask implements Runnable {
        private final String sessionId;

        CustomizeTask(String sessionId) {
            this.sessionId = sessionId;
        }

        @Override
        public void run() {
            try {
                String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
                sendMessage(JSONUtil.toJsonStr(message), sessionId);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 接收到消息后的处理
     *
     * @param session 连接session信息
     * @param message 信息
     * @throws Exception exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
    }

    /**
     * ws连接出错时调用
     *
     * @param session   session连接信息
     * @param exception exception
     * @throws Exception exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
    }

    /**
     * 连接关闭后调用
     *
     * @param session     session连接信息
     * @param closeStatus 关闭状态
     * @throws Exception exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
        String sessionId = session.getId();
        ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
        if (scheduledFuture != null) {
            //暂停对应session的开启的定时任务
            scheduledFuture.cancel(true);
            //集合移除
            stringScheduledFutureMap.remove(sessionId);
        }
    }

    /**
     * 是否支持分片消息
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 群发发送消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                webSocketSession.sendMessage(new TextMessage(message));
            }
        }
    }

    /**
     * 发给指定连接消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message, String sessionId) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                if (sessionId.equals(webSocketSession.getId())) {
                    webSocketSession.sendMessage(new TextMessage(message));
                }
            }
        }
    }
}

websocket绑定URL

import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import javax.annotation.Resource;

/**
 * websocket配置
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:10:11
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private TestWebsocket testWebsocket;

    /**
     * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
     *
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
    }
}

websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * 当定时任务和websocket同时存在时报错解决
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -04-28 17:35:54
 */
@Configuration
public class ScheduledConfig {

    /**
     * Schedule本身是单线程执行的
     *
     * @return the task scheduler
     */
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
        scheduling.setPoolSize(20);
        return scheduling;
    }
}

效果如下
连接上以后服务每隔3秒会向客户端实时推送消息

到此这篇关于spring中websocket定时任务实现实时推送的文章就介绍到这了,更多相关spring websocket实时推送内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 基于spring实现websocket实时推送实例

    基于spring框架来写的,websocket实时推送例子,具体内容如下 第一步:自己搭建一个springmvc项目,很简单,网上百度都有:pom文件添加以下: <!-- WebSocket --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>4.2.4.RELE

  • SpringBoot集成WebSocket【基于纯H5】进行点对点[一对一]和广播[一对多]实时推送

    之前实现WebSocket基于STOMP的,觉得SpringBoot封装的太高,不怎么灵活,现在实现一个纯H5的,也大概了解webSocket在内部是怎么传输的. 1.环境搭建 因为在上一篇基于STOMP协议实现的WebSocket里已经有大概介绍过Web的基本情况了,所以在这篇就不多说了,我们直接进入正题吧,在SpringBoot中,我们还是需要导入WebSocket的包. 在pox.xml加上对springBoot对WebSocket的支持: <!-- webSocket --> <

  • spring中websocket定时任务实现实时推送

    有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了.使用的定时任务方式为spring的TaskScheduler对象实现任务调度. TaskScheduler定时任务实现 TaskScheduler接口提供了多种调度方法来实现运行任务的执行. public interface TaskScheduler { //通过触发器来决定task是否执行 ScheduledFuture sche

  • Flask使用SocketIO实现WebSocket与Vue进行实时推送

    目录 前言 核心问题 Flask的原生WebSocket(flask-sockets)与封装SocketIO 1.Flask-SocketIO(封装写法) 2.Flask-Sockets(原生Websocket写法) 3.Bug 1:控制台输出没有Running on 127.0.0.1以及没有输出日志 4.Bug 2:显示连接错误. 前言 本文旨在记录使用Flask框架过程中与前端Vue对接过程中,存在WebSocket总是连接失败导致前端取不到数据的问题.以及在使用WebSocket相关功能

  • 利用Socket.io 实现消息实时推送功能

    项目背景介绍 最近在写的项目中存在着社交模块,需要实现这样的一个功能:当发生了用户被点赞.评论.关注等操作时,需要由服务器向用户实时地推送一条消息.最终完成的项目地址为:https://github.com/noiron/socket-message-push,这里将介绍一下实现的思路及部分代码. 项目的流程中存在着这样的几个对象: 用 Java 实现的后端服务器 用 Node.js 实现的消息推送服务器 用户进行操作的客户端 事件处理的流程如下: 用户进行点赞操作时,后端服务器会进行处理,并向

  • Django Channel实时推送与聊天的示例代码

    先来看一下最终的效果吧 开始聊天,输入消息并点击发送消息就可以开始聊天了 点击 "获取后端数据"开启实时推送 先来简单了解一下 Django Channel Channels是一个采用Django并将其功能扩展到HTTP以外的项目,以处理WebSocket,聊天协议,IoT协议等.它基于称为ASGI的Python规范构建. 它以Django的核心为基础,并在其下面分层了一个完全异步的层,以同步模式运行Django本身,但异步处理了连接和套接字,并提供了以两种方式编写的选择,从而实现了这

  • Springboot Websocket Stomp 消息订阅推送

    目录 需求背景 websocket协议 stomp协议 需求背景 闲话不扯,直奔主题.需要和web前端建立长链接,互相实时通讯,因此想到了websocket,后面随着需求的变更,需要用户订阅主题,实现消息的精准推送,发布订阅等,则想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议. websocket协议 想到了之前写的一个websocket长链接的demo,也贴上代码供大家参考. pom文件 直接引入spring-bo

  • WebSocket简介与消息推送

    目录 一.Socket简介 TCP/IP协议 UDP协议 二.WebSocket简介与消息推送 三.WebSocket客户端 四.WebSocket服务器端 五.测试运行 六.小结与消息推送框架 6.1.开源Java消息推送框架 Pushlet 6.2.开源DotNet消息推送框架SignalR 七.代码下载 7.1.Java实现的服务器端代码与客户端代码下载 7.2.DotNet服务器端手动连接实现代码下载 7.3.DotNet下使用SuperWebSocket三方库实现代码下载 B/S结构的

  • 百度实时推送api接口应用示例

    网站质量不错的网站可以在百度站长平台/数据提交/sitemap栏目下看到实时推送的功能, 目前这个工具是邀请开放, 百度的实时推送的api接口可以实时推送我们新发布的文章, 保证百度在第一时间收录.   百度站长平台 http://zhanzhang.baidu.com/ 打开百度站长平台, 点开实时推送的添加新数据接口获得带token的api推送地址:     http://ping.baidu.com/sitemap?site=www.yourdomain.com&resource_name

  • SpringBoot+WebSocket+Netty实现消息推送的示例代码

    上一篇文章讲了Netty的理论基础,这一篇讲一下Netty在项目中的应用场景之一:消息推送功能,可以满足给所有用户推送,也可以满足给指定某一个用户推送消息,创建的是SpringBoot项目,后台服务端使用Netty技术,前端页面使用WebSocket技术. 大概实现思路: 前端使用webSocket与服务端创建连接的时候,将用户ID传给服务端 服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中 如果需要给所有用户发送消息,直接执行channel组的writ

  • PHP实现的消息实时推送功能【基于反ajax推送】

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> <title>反ajax推送</title> <style> .send{color:#555;text-align: left;} .require{color:blue;text-align: right;} .content_box{text-align: cen

随机推荐