JS 实现请求调度器

前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。

TLDR; 直接跳转『抽象和复用』章节。

为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:

const ids = [1001, 1002, 1003, 1004, 1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';

// fetch 函数发送 HTTP 请求,返回 Promise
const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);

Promise.all(appPromises)
 // 通过 reduce 做累加
 .then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
 .catch((error) => console.log(error));

上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:

<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx/1.10.1</center>
</body>
</html>

如何解决呢?

一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunkchunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合

难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。

// task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求
const task1 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(1);
 console.log('task1 executed');
 }, 1000);
});

const task2 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(2);
 console.log('task2 executed');
 }, 1000);
});

const task3 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(3);
 console.log('task3 executed');
 }, 1000);
});

// 聚合结果
let result = 0;

const resultPromise = [task1, task2, task3].reduce((current, next) =>
 current.then((number) => {
 console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve
 result += number;

 return next();
 }),

 Promise.resolve(0)) // 聚合初始值

 .then(function(last) {
 console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve

 result += last;

 console.log('all executed with result', result);

 return Promise.resolve(result);
 });

运行结果如图 1:

代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。

难点解决了,我们看看最终代码:

/**
 * 模拟 HTTP 请求
 * @param {String} url
 * @return {Promise}
 */
function fetch(url) {
 console.log(`Fetching ${url}`);
 return new Promise((resolve) => {
 setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
 });
}

const urlPrefix = 'http://opensearch.example.com/api/apps';

const aggregator = {
 /**
 * 入口方法,开启定时任务
 *
 * @return {Promise}
 */
 start() {
 return this.fetchAppIds()
 .then(ids => this.fetchAppsSerially(ids, 2))
 .then(apps => this.sumPv(apps))
 .catch(error => console.error(error));
 },

 /**
 * 获取所有应用的 ID
 *
 * @private
 *
 * @return {Promise}
 */
 fetchAppIds() {
 return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
 },

 promiseFactory(ids) {
 return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
 },

 /**
 * 获取所有应用的详情
 *
 * 一次并发请求 `concurrency` 个应用,称为一个 chunk
 * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕
 *
 * @private
 *
 * @param {[Number]} ids
 * @param {Number} concurrency 一次并发的请求数量
 * @return {[Object]}  所有应用的信息
 */
 fetchAppsSerially(ids, concurrency = 100) {
 // 分块
 let chunkOfIds = ids.splice(0, concurrency);
 const tasks = [];

 while (chunkOfIds.length !== 0) {
 tasks.push(this.promiseFactory(chunkOfIds));
 chunkOfIds = ids.splice(0, concurrency);
 }

 // 按块顺序执行
 const result = [];
 return tasks.reduce((current, next) => current.then((chunkOfApps) => {
 console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
 result.push(...chunkOfApps); // 拍扁数组
 return next();
 }), Promise.resolve([]))
 .then((lastchunkOfApps) => {
 console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');

 result.push(...lastchunkOfApps); // 再次拍扁它
 console.info('All chunks has been executed with result', result);
 return result;
 });
 },

 /**
 * 聚合所有应用的 PV
 *
 * @private
 *
 * @param {[]} apps
 * @return {[type]} [description]
 */
 sumPv(apps) {
 const initial = { pv: 0 };

 return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
 }
};

// 开始运行
aggregator.start().then(console.log);

运行结果如图 2:

抽象和复用

目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。

串行

先模拟一个 http get 请求。

/**
 * mocked http get.
 * @param {string} url
 * @returns {{ url: string; delay: number; }}
 */
function httpGet(url) {
 const delay = Math.random() * 1000;

 console.info('GET', url);

 return new Promise((resolve) => {
 setTimeout(() => {
 resolve({
 url,
 delay,
 at: Date.now()
 })
 }, delay);
 })
}

串行执行一批请求。

const ids = [1, 2, 3, 4, 5, 6, 7];

// 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的
const httpGetters = ids.map(id =>
 () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);

// 串行执行之
const tasks = await httpGetters.reduce((acc, cur) => {
 return acc.then(cur);

 // 简写,等价于
 // return acc.then(() => cur());
}, Promise.resolve());

tasks.then(() => {
 console.log('done');
});

注意观察控制台输出,应该串行输出以下内容:

GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
GET https://jsonplaceholder.typicode.com/posts/7

分段串行,段中并行

重点来了。本文的请求调度器实现

/**
 * Schedule promises.
 * @param {Array<(...arg: any[]) => Promise<any>>} factories
 * @param {number} concurrency
 */
function schedulePromises(factories, concurrency) {
 /**
 * chunk
 * @param {any[]} arr
 * @param {number} size
 * @returns {Array<any[]>}
 */
 const chunk = (arr, size = 1) => {
 return arr.reduce((acc, cur, idx) => {
 const modulo = idx % size;

 if (modulo === 0) {
 acc[acc.length] = [cur];
 } else {
 acc[acc.length - 1].push(cur);
 }

 return acc;
 }, [])
 };

 const chunks = chunk(factories, concurrency);

 let resps = [];

 return chunks.reduce(
 (acc, cur) => {
 return acc
 .then(() => {
  console.log('---');
  return Promise.all(cur.map(f => f()));
 })
 .then((intermediateResponses) => {
  resps.push(...intermediateResponses);

  return resps;
 })
 },

 Promise.resolve()
 );
}

测试下,执行调度器:

// 分段串行,段中并行
schedulePromises(httpGetters, 3).then((resps) => {
 console.log('resps:', resps);
});

控制台输出:

---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7

resps: [
 {
 "url": "https://jsonplaceholder.typicode.com/posts/1",
 "delay": 733.010980640727,
 "at": 1615131322163
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/2",
 "delay": 594.5056229848931,
 "at": 1615131322024
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/3",
 "delay": 738.8230109146299,
 "at": 1615131322168
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/4",
 "delay": 525.4604386109747,
 "at": 1615131322698
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/5",
 "delay": 29.086379722201183,
 "at": 1615131322201
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/6",
 "delay": 592.2345027398272,
 "at": 1615131322765
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/7",
 "delay": 513.0684467560949,
 "at": 1615131323284
 }
]

总结

  1. 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
  2. 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
  3. 本文的精髓在于使用 reduce 作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce 精通见上篇 你终于用 Reduce 了🎉。

以上就是JS 实现请求调度器的详细内容,更多关于JS 请求调度器的资料请关注我们其它相关文章!

(0)

相关推荐

  • 解决HttpPost+json请求---服务器中文乱码及其他问题

    好凌乱的题目,只是一些功能点的总结咯. 首先构造一个json对象用于存放数据,如果光加上header为utf-8就能解决中文就大错特错了... json对象可以put变量,也可以put对象.取的时候 obj.getJSONObject("people").getString("name") HttpClient httpClient = new DefaultHttpClient(); String url = "***"; HttpPost h

  • jQuery+Ajax+js实现请求json格式数据并渲染到html页面操作示例

    本文实例讲述了jQuery+Ajax+js实现请求json格式数据并渲染到html页面操作.分享给大家供大家参考,具体如下: 1.先给json格式的数据: [ {"id":1,"name":"stan"}, {"id":2,"name":"jack"}, {"id":3,"name":"lucy"}, {"id&quo

  • PHP使用Http Post请求发送Json对象数据代码解析

    因项目的需要,PHP调用第三方 Java/.Net 写好的 Restful Api,其中有些接口,需要 在发送 POST 请求时,传入对象. Http中传输对象,最好的表现形式莫过于JSON字符串了,但是作为参数的接收方,又是需要被告知传过来的是JSON! 其实这不难,只需要发送一个 http Content-Type头信息即可,即 "Content-Type: application/json; charset=utf-8",参考代码如下: <?php /** * PHP发送J

  • 原生js实现ajax请求和JSONP跨域请求操作示例

    本文实例讲述了原生js实现ajax请求和JSONP跨域请求.分享给大家供大家参考,具体如下: 直接上代码: const ajax = (params = {}) => { const nowJson = params.jsonp ? jsonp(params) : json(params); function jsonp(params){ //创建script标签并加入到页面中 var callbackName = params.jsonp; var head = document.getEle

  • 在vue中使用jsonp进行跨域请求接口操作

    前言: 这里我们使用的是第三方插件jsonp. github网址:https://github.com/webmodules/jsonp 1.安装 npm install jsonp -S 2.引入 一般新建一个js文件来引入原始jsonp插件,然后对原始插件进行封装,对跨域接口参数的拼接,封装好这个jsonp文件后export出去,之后在哪里用到就再在那里import. 1.新建jsonp.js文件来封装原始jsonp插件 // 引入原始jsonp插件 import originJsonp f

  • Javascript原生ajax请求代码实例

    这篇文章主要介绍了Javascript原生ajax请求代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 代码如下 class Ajax{ constructor(url, method, data, callback_suc, callback_err, callback_run){ this.RT = true;//默认为异步请求 this.url = url; this.method = method || "POST";

  • PHP实现chrome表单请求数据转换为接口使用的json数据

    为什么要写转换程序 最近在做旧版程序迁移,旧的架构为常规的MVC模式,新版架构全部改成restful架构. 由于改版数据是一致的,但是请求结构不一致,新版的请求全部以json形式提交,为了方便测试,之前一直都是直接在浏览器打开开发者工具,然后把请求内容复制过来,然后手动改成json形式,由于之前数据量比较少,暂时未发现测试时候不方便的情况,但是今天遇到了数据比较多的情况,于是我想,为啥不写一段转换程序呢? 本身来说,程序的初衷应该是提高工作效率,这也是我之前缺少反思的地方,因此写下这篇文章做个记

  • Spring中使用JSR303请求约束判空的实现

    1. 适用场景 有时候我们在表单里提交一系列参数, 到后台封装成一个对象, 要对对象的属性做各种字段值的约束; 这时候, 当然可以if-else一个一个的判断, 有更简洁的做法, 就是使用 JSR303+spring的validation: 2. 使用方法步骤(分3步) 实体类加字段约束注解 Controller类中@Valid标注启用(@Validated也兼容@Valid) BindingResult获取报错信息 2.1 实体类加字段约束注解 如我们要收集前端表单的字段数据到Person实体

  • JavaScript如何实现防止重复的网络请求的示例

    前言 在开发中,经常会遇到接口重复请求导致的各种问题. 对于重复的网络请求,会导致页面更新多次,发生页面抖动的现象,影响用户体验. 例如当前页面请求还未响应完成,就切换到其他路由,那么这些请求直到响应返回才会中止. 无论从用户体验或者从业务严谨方面来说,取消无用的请求确实是需要避免的. 实现思路 **  1.在发送请求前先拦截当前请求地址 (url + 方法 + 参数): **  2.开启一个请求队列用于保存 当前地址: **  3.每次请求查看请求队列里面有没有当前url地址: **  4.如

  • nodejs使用socket5进行代理请求的实现

    需要用到2个库, request socks5-http-client/lib/Agent/ 或 socks5-https-client/lib/Agent 因为一些已知原因,有时候,http.request请求无法拿到数据, 首先想到的是proxy,其次是socket5. 有了以上两个库,接下来的代码超级简单. const request = require('request'); var httpAgent = require('socks5-http-client/lib/Agent');

  • JavaScript/TypeScript 实现并发请求控制的示例代码

    场景 假设有 10 个请求,但是最大的并发数目是 5 个,并且要求拿到请求结果,这样就是一个简单的并发请求控制 模拟 利用 setTimeout 实行简单模仿一个请求 let startTime = Date.now(); const timeout = (timeout: number, ret: number) => { return (idx?: any) => new Promise((resolve) => { setTimeout(() => { const compa

  • JavaScript实现串行请求的示例代码

    使用async和await var fn = async function(promiseArr) { for(let i = 0,len = arr.length; i<len; i++) { currentPromise = (promiseArr[i] instanceOf Promise) ? promiseArr[i] : Promise.resolve(promiseArr[i]); var result = await currentPromise; console.log(res

随机推荐