Java实现一个简单的长轮询的示例代码

目录
  • 分析一下长轮询的实现方式
    • 长轮询与短轮询
    • 配置中心长轮询设计
  • 配置中心长轮询实现
    • 客户端实现
    • 服务端实现

分析一下长轮询的实现方式

现在各大中间件都使用了长轮询的数据交互方式,目前比较流行的例如Nacos的配置中心,RocketMQ Pull(拉模式)消息等,它们都是采用了长轮询方的式实现。就例如Nacos的配置中心,如何做到服务端感知配置变化实时推送给客户端的呢?

长轮询与短轮询

说到长轮询,肯定存在和它相对立的,我们暂且叫它短轮询吧,我们简单介绍一下短轮询:

短轮询也是拉模式。是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。如果配置中心使用这样的方式,会存在以下问题:

由于配置数据并不会频繁变更,若是一直发请求,势必会对服务端造成很大压力。还会造成推送数据的延迟,比如:每10s请求一次配置,如果在第11s时配置更新了,那么推送将会延迟9s,等待下一次请求;

无法在推送延迟和服务端压力两者之间中和。降低轮询的间隔,延迟降低,压力增加;增加轮询的间隔,压力降低,延迟增高。

长轮询为了解决短轮询存在的问题,客户端发起长轮询,如果服务端的数据没有发生变更,会hold住请求,直到服务端的数据发生变化,或者等待一定时间超时才会返回。返回后,客户端再发起下一次长轮询请求监听。

这样设计的好处:

  • 相对于低延时,客户端发起长轮询,服务端感知到数据发生变更后,能立刻返回响应给客户端。
  • 服务端的压力减小,客户端发起长轮询,如果数据没有发生变更,服务端会hold住此次客户端的请求,hold住请求的时间一般会设置到30s或者60s,并且服务端hold住请求不会消耗太多服务端的资源。

下面借用图片来说明一下流程:

  • 首先客户端发起长轮询请求,服务端收到客户端的请求,这时会挂起客户端的请求,如果在服务端设计的30s之内都没有发生变更,服务端会响应回客户端数据没有变更,客户端会继续发送请求。
  • 如果在30s之内服务数据发生了变更,服务端会推送变更的数据到客户端。

配置中心长轮询设计

上面我们已经介绍了整个思路,下面我们用代码实现一下:

  • 首先客户端发送一个HTTP请求到服务端;服务端会开启一个异步线程,如果一直没有数据变更会挂起当前请求(一个 Tomcat 也就 200 个线程,长轮询也不应该阻塞 Tomcat 的业务线程,所以需要配置中心在实现长轮询时往往采用异步响应的方式来实现,而比较方便实现异步 HTTP 的常见手段便是 Servlet3.0 提供的 AsyncContext 机制。)
  • 在服务端设置的超时时间内仍然没有数据变更,那就返回客户端一个没有变更的标识。例如响应304状态码;
  • 在服务端设置的超时时间内有数据变更了,就返回客户端变更的内容;

配置中心长轮询实现

下面用代码实现长轮询:

客户端实现

 @Slf4j
 public class ConfigClientWorker {
 ​
     private final CloseableHttpClient httpClient;
 ​
     private final ScheduledExecutorService executorService;
 ​
     public ConfigClientWorker(String url, String dataId) {
         this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
             Thread thread = new Thread(runnable);
             thread.setName("client.worker.executor-%d");
             thread.setDaemon(true);
             return thread;
         });
 ​
         // ① httpClient 客户端超时时间要大于长轮询约定的超时时间
         RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
         this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
 ​
         executorService.execute(new LongPollingRunnable(url, dataId));
     }
 ​
     class LongPollingRunnable implements Runnable {
 ​
         private final String url;
         private final String dataId;
 ​
         public LongPollingRunnable(String url, String dataId) {
             this.url = url;
             this.dataId = dataId;
         }
 ​
         @SneakyThrows
         @Override
         public void run() {
             String endpoint = url + "?dataId=" + dataId;
             log.info("endpoint: {}", endpoint);
             HttpGet request = new HttpGet(endpoint);
             CloseableHttpResponse response = httpClient.execute(request);
             switch (response.getStatusLine().getStatusCode()) {
                 case 200: {
                     BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                             .getContent()));
                     StringBuilder result = new StringBuilder();
                     String line;
                     while ((line = rd.readLine()) != null) {
                         result.append(line);
                     }
                     response.close();
                     String configInfo = result.toString();
                     log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                     break;
                 }
                 // ② 304 响应码标记配置未变更
                 case 304: {
                     log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
                     break;
                 }
                 default: {
                     throw new RuntimeException("unExcepted HTTP status code");
                 }
             }
             executorService.execute(this);
         }
     }
 ​
     public static void main(String[] args) throws IOException {
 ​
         new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
         System.in.read();
     }
 }
  • httpClient 客户端超时时间要大于长轮询约定的超时时间,不然还没等到服务端返回,客户端自己就超时了。
  • 304 响应码标记配置未变更;
  • http://127.0.0.1:8080/listener 是服务端地址;

服务端实现

 @RestController
 @Slf4j
 @SpringBootApplication
 public class ConfigServer {
 ​
     @Data
     private static class AsyncTask {
         // 长轮询请求的上下文,包含请求和响应体
         private AsyncContext asyncContext;
         // 超时标记
         private boolean timeout;
 ​
         public AsyncTask(AsyncContext asyncContext, boolean timeout) {
             this.asyncContext = asyncContext;
             this.timeout = timeout;
         }
     }
 ​
     // guava 提供的多值 Map,一个 key 可以对应多个 value
     private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
 ​
     private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
             .build();
     private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
 ​
     // 配置监听接入点
     @RequestMapping("/listener")
     public void addListener(HttpServletRequest request, HttpServletResponse response) {
 ​
         String dataId = request.getParameter("dataId");
 ​
         // 开启异步!!!
         AsyncContext asyncContext = request.startAsync(request, response);
         AsyncTask asyncTask = new AsyncTask(asyncContext, true);
 ​
         // 维护 dataId 和异步请求上下文的关联
         dataIdContext.put(dataId, asyncTask);
 ​
         // 启动定时器,30s 后写入 304 响应
         timeoutChecker.schedule(() -> {
             if (asyncTask.isTimeout()) {
                 dataIdContext.remove(dataId, asyncTask);
                 response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
               // 标志此次异步线程完成结束!!!
                 asyncContext.complete();
             }
         }, 30000, TimeUnit.MILLISECONDS);
     }
 ​
     // 配置发布接入点
     @RequestMapping("/publishConfig")
     @SneakyThrows
     public String publishConfig(String dataId, String configInfo) {
         log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
         Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
         for (AsyncTask asyncTask : asyncTasks) {
             asyncTask.setTimeout(false);
             HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
             response.setStatus(HttpServletResponse.SC_OK);
             response.getWriter().println(configInfo);
             asyncTask.getAsyncContext().complete();
         }
         return "success";
     }
 ​
     public static void main(String[] args) {
         SpringApplication.run(ConfigServer.class, args);
     }
 }
  • 客户端请求过来,首先开启一个异步线程request.startAsync(request, response);保证不占用Tomcat线程。此时Tomcat线程以及释放。配合asyncContext.complete()使用。
  • dataIdContext.put(dataId, asyncTask);会将 dataId 和异步请求上下文给关联起来,方便配置发布时,拿到对应的上下文
  • Multimap<String, AsyncTask> dataIdContext它是一个多值 Map,一个 key 可以对应多个 value,你也可以理解为 Map<String,List<AsyncTask>>
  • timeoutChecker.schedule() 启动定时器,30s 后写入 304 响应
  • @RequestMapping("/publishConfig") ,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。
  • asyncTask.getAsyncContext().complete();表示这次异步请求结束了。

启动配置监听

先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码

 16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again

请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld

服务端打印日志:

 2022-08-25 16:45:56.663  INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer         : publish configInfo dataId: [user], configInfo: helloworld

到此这篇关于Java实现一个简单的长轮询的示例代码的文章就介绍到这了,更多相关Java长轮询内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java如何使用ReentrantLock实现长轮询

    Java代码 1. ReentrantLock 加锁阻塞,一个condition对应一个线程,以便于唤醒时使用该condition一定会唤醒该线程 /** * 获取探测点数据,长轮询实现 * @param messageId * @return */ public JSONObject getToutData(String messageId) { Message message = toutMessageCache.get(messageId); if (message == null) {

  • Java servlet通过事件驱动进行高性能长轮询详解

    目录 servlet3.0的异步原理 使用servlet3.0实现长轮询 长轮询实现 servlet3.0的异步原理 servlet基础就不做介绍了,这里就介绍servlet3.0的一个重要的新特性:异步. servlet3.0原理图: tomcat接收到客户端的请求后会将请求AsyncContext交给业务线程,这样tomcat工作线程就能释放出来处理其它请求的连接. 业务线程池接收到AsyncContext后,就可以处理请求业务,完成业务逻辑后,根据AsyncContext获取respons

  • 基于springboot 长轮询的实现操作

    springboot 长轮询实现 基于 @EnableAsync , @Sync @SpringBootApplication @EnableAsync public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } } @RequestMapping("/async") @RestControlle

  • Java实现一个简单的长轮询的示例代码

    目录 分析一下长轮询的实现方式 长轮询与短轮询 配置中心长轮询设计 配置中心长轮询实现 客户端实现 服务端实现 分析一下长轮询的实现方式 现在各大中间件都使用了长轮询的数据交互方式,目前比较流行的例如Nacos的配置中心,RocketMQ Pull(拉模式)消息等,它们都是采用了长轮询方的式实现.就例如Nacos的配置中心,如何做到服务端感知配置变化实时推送给客户端的呢? 长轮询与短轮询 说到长轮询,肯定存在和它相对立的,我们暂且叫它短轮询吧,我们简单介绍一下短轮询: 短轮询也是拉模式.是指不管

  • Java实现一个简单的文件上传案例示例代码

    Java实现一个简单的文件上传案例 实现流程: 1.客户端从硬盘读取文件数据到程序中 2.客户端输出流,写出文件到服务端 3.服务端输出流,读取文件数据到服务端中 4.输出流,写出文件数据到服务器硬盘中 下面上代码 上传单个文件 服务器端 package FileUpload; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.Serve

  • Python实现一个简单的毕业生信息管理系统的示例代码

    写在前面: 从昨晚的梦里回忆起数据管理的作业: 实现一个自己的选题---- 毕业生信息管理系统,实现学生个人信息基本的增删改查, 我想了想前段时间刚学习的列表,这个简单啊 ,设计一个学生信息列表,然后列表里面再存每个学生详细信息的列表,然后来实现一个基本的增删查改,这个不难啊!直接开始撸代码! 上代码! def Menu():##菜单主界面 print('*'*22) print("* 查看毕业生列表输入: 1 *") print("* 添加毕业生信息输入: 2 *"

  • python实现一个简单的并查集的示例代码

    并查集是一种树型的数据结构,用于处理一些不相交集合的合并及查询问题.常常在使用中以森林来表示. 并查集有三种基本操作,获得根节点,判断两节点是否连通,以及将两不连通的节点相连(相当于将两节点各自的集合合并) 用UnionFind类来表示一个并查集,在构造函数中,初始化一个数组parent,parent[i]表示的含义为,索引为i的节点,它的直接父节点为parent[i].初始化时各个节点都不相连,因此初始化parent[i]=i,让自己成为自己的父节点,从而实现各节点不互连. def __ini

  • 如何使用proxy实现一个简单完整的MVVM库的示例代码

    前言 MVVM 是当前时代前端日常业务开发中的必备模式(相关框架如react,vue,angular 等), 使用 MVVM 可以将开发者的精力更专注于业务上的逻辑,而不需要关心如何操作 dom.虽然现在都 9012 年了,mvvm 相关原理的介绍已经烂大街了,但出于学习基础知识的目的(使用 proxy 实现的 vue3.0 还在开发中), 在参考了之前 vue.js 的整体思路之后,自己动手实现了一个简易的通过 proxy 实现的 mvvm. 本项目代码已经开源在github,项目正在持续完善

  • 一个简单的JS时间控件示例代码(JS时分秒时间控件)

    自己在网上找了半天没找到只有 "时分秒"的控件, 就自己做了个,发在这里方便有人用到 鼠标点击 后 的效果 SetTime.js 复制代码 代码如下: /**//************************************ 使用说明:* 首先把本控件包含到页面 * <script src="XXX/setTime.js" type="text/javascript"></script>* 控件调用函数:_Set

  • java 常规轮询长轮询Long polling实现示例详解

    目录 正文 常规轮询 长轮询 正文 长轮询是与服务器保持持久连接的最简单的方式,它不使用任何特定的协议,例如 WebSocket 或者 Server Sent Event. 它很容易实现,在很多场景下也很好用. 常规轮询 从服务器获取新信息的最简单的方式是定期轮询.也就是说,定期向服务器发出请求:“你好,我在这儿,你有关于我的任何信息吗?”例如,每 10 秒一次. 作为响应,服务器首先通知自己,客户端处于在线状态,然后 —— 发送目前为止的消息包. 这可行,但是也有些缺点: 消息传递的延迟最多为

随机推荐