spring中websocket定时任务实现实时推送
有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。
TaskScheduler定时任务实现
TaskScheduler接口提供了多种调度方法来实现运行任务的执行。
public interface TaskScheduler { //通过触发器来决定task是否执行 ScheduledFuture schedule(Runnable task, Trigger trigger); //在starttime的时候执行一次 ScheduledFuture schedule(Runnable task, Date startTime); ScheduledFuture schedule(Runnable task, Instant startTime); //从starttime开始每个period时间段执行一次task ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); //每隔period执行一次 ScheduledFuture scheduleAtFixedRate(Runnable task, long period); ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period); //从startTime开始每隔delay长时间执行一次 ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); //每隔delay时间执行一次 ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); }
简单测试一下
import cn.hutool.core.date.DateUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; /** * The type Task scheduler test. * * @author yjj * @version 1.0 * @since 2022 -12-28 15:45:17 */ @Slf4j @Component @RequiredArgsConstructor public class TaskSchedulerTest { private final TaskScheduler taskScheduler; @Bean public void test() { //每隔3秒执行一次 Trigger trigger = new CronTrigger("0/3 * * * * *"); //每隔1秒执行一次 //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS); taskScheduler.schedule(new MyThread(), trigger); } private class MyThread implements Runnable { @Override public void run() { log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date()); } } }
效果就是每个3秒执行一次
websocket+定时任务实时推送
实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现
TestWebsocket.java
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; /** * 测试websocket * * @author yjj * @version 1.0 * @since 2022 -12-28 14:55:29 */ @Slf4j @Component @RequiredArgsConstructor public class TestWebsocket implements WebSocketHandler { protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>(); /** * 定时任务集合 */ Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>(); /** * taskScheduler */ private final TaskScheduler taskScheduler; /** * 建立连接后操作 * * @param session 连接session信息 * @throws Exception exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { sendMessage("连接成功~~~~~~,sessionId=" + session.getId()); WEB_SOCKET_SESSIONS.add(session); //设置定时任务,每隔3s执行一次 Trigger trigger = new CronTrigger("0/3 * * * * *"); //开启一个定时任务 ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger); //根据session连接id定时任务线程存到map中 stringScheduledFutureMap.put(session.getId(), schedule); } private class CustomizeTask implements Runnable { private final String sessionId; CustomizeTask(String sessionId) { this.sessionId = sessionId; } @Override public void run() { try { String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date()); sendMessage(JSONUtil.toJsonStr(message), sessionId); } catch (IOException e) { e.printStackTrace(); } } } /** * 接收到消息后的处理 * * @param session 连接session信息 * @param message 信息 * @throws Exception exception */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】"); } /** * ws连接出错时调用 * * @param session session连接信息 * @param exception exception * @throws Exception exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if (session.isOpen()) { sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】"); session.close(); } WEB_SOCKET_SESSIONS.remove(session); } /** * 连接关闭后调用 * * @param session session连接信息 * @param closeStatus 关闭状态 * @throws Exception exception */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { if (session.isOpen()) { sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】"); session.close(); } WEB_SOCKET_SESSIONS.remove(session); String sessionId = session.getId(); ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class); if (scheduledFuture != null) { //暂停对应session的开启的定时任务 scheduledFuture.cancel(true); //集合移除 stringScheduledFutureMap.remove(sessionId); } } /** * 是否支持分片消息 */ @Override public boolean supportsPartialMessages() { return false; } /** * 群发发送消息 * * @param message 消息 * @throws IOException ioException */ public void sendMessage(String message) throws IOException { if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) { for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) { webSocketSession.sendMessage(new TextMessage(message)); } } } /** * 发给指定连接消息 * * @param message 消息 * @throws IOException ioException */ public void sendMessage(String message, String sessionId) throws IOException { if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) { for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) { if (sessionId.equals(webSocketSession.getId())) { webSocketSession.sendMessage(new TextMessage(message)); } } } } }
websocket绑定URL
import com.yjj.test.websocket.TestWebsocket; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import javax.annotation.Resource; /** * websocket配置 * * @author yjj * @version 1.0 * @since 2022 -12-28 15:10:11 */ @EnableWebSocket @Configuration public class WebSocketConfig implements WebSocketConfigurer { @Resource private TestWebsocket testWebsocket; /** * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired. * * @param registry */ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*"); } }
websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * 当定时任务和websocket同时存在时报错解决 * * @author yjj * @version 1.0 * @since 2022 -04-28 17:35:54 */ @Configuration public class ScheduledConfig { /** * Schedule本身是单线程执行的 * * @return the task scheduler */ @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler(); scheduling.setPoolSize(20); return scheduling; } }
效果如下
连接上以后服务每隔3秒会向客户端实时推送消息
到此这篇关于spring中websocket定时任务实现实时推送的文章就介绍到这了,更多相关spring websocket实时推送内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
赞 (0)