利用 JavaScript 实现并发控制的示例代码

一、前言

  在开发过程中,有时会遇到需要控制任务并发执行数量的需求。

  例如一个爬虫程序,可以通过限制其并发任务数量来降低请求频率,从而避免由于请求过于频繁被封禁问题的发生。

  接下来,本文介绍如何实现一个并发控制器。

二、示例

const task = timeout => new Promise((resolve) => setTimeout(() => {
  resolve(timeout);
 }, timeout))

 const taskList = [1000, 3000, 200, 1300, 800, 2000];

 async function startNoConcurrentControl() {
  console.time(NO_CONCURRENT_CONTROL_LOG);
  await Promise.all(taskList.map(item => task(item)));
  console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
 }

 startNoConcurrentControl();

  上述示例代码利用 Promise.all 方法模拟6个任务并发执行的场景,执行完所有任务的总耗时为 3000 毫秒。

  下面会采用该示例来验证实现方法的正确性。

三、实现

  由于任务并发执行的数量是有限的,那么就需要一种数据结构来管理不断产生的任务。

  队列的「先进先出」特性可以保证任务并发执行的顺序,在 JavaScript 中可以通过「数组来模拟队列」

class Queue {
  constructor() {
   this._queue = [];
  }

  push(value) {
   return this._queue.push(value);
  }

  shift() {
   return this._queue.shift();
  }

  isEmpty() {
   return this._queue.length === 0;
  }
 }

  对于每一个任务,需要管理其执行函数和参数:

class DelayedTask {
  constructor(resolve, fn, args) {
   this.resolve = resolve;
   this.fn = fn;
   this.args = args;
  }
 }

  接下来实现核心的 TaskPool 类,该类主要用来控制任务的执行:

class TaskPool {
  constructor(size) {
   this.size = size;
   this.queue = new Queue();
  }

  addTask(fn, args) {
   return new Promise((resolve) => {
    this.queue.push(new DelayedTask(resolve, fn, args));
    if (this.size) {
     this.size--;
     const { resolve: taskResole, fn, args } = this.queue.shift();
     taskResole(this.runTask(fn, args));
    }
   })
  }

  pullTask() {
   if (this.queue.isEmpty()) {
    return;
   }

   if (this.size === 0) {
    return;
   }

   this.size++;
   const { resolve, fn, args } = this.queue.shift();
   resolve(this.runTask(fn, args));
  }

  runTask(fn, args) {
   const result = Promise.resolve(fn(...args));

   result.then(() => {
    this.size--;
    this.pullTask();
   }).catch(() => {
    this.size--;
    this.pullTask();
   })

   return result;
  }
 }

TaskPool 包含三个关键方法:

  • addTask: 将新的任务放入队列当中,并触发任务池状态检测,如果当前任务池非满载状态,则从队列中取出任务放入任务池中执行。
  • runTask: 执行当前任务,任务执行完成之后,更新任务池状态,此时触发主动拉取新任务的机制。
  • pullTask: 如果当前队列不为空,且任务池不满载,则主动取出队列中的任务执行。

  接下来,将前面示例的并发数控制为2个:

 const cc = new ConcurrentControl(2);

 async function startConcurrentControl() {
  console.time(CONCURRENT_CONTROL_LOG);
  await Promise.all(taskList.map(item => cc.addTask(task, [item])))
  console.timeEnd(CONCURRENT_CONTROL_LOG);
 }

 startConcurrentControl();

  执行流程如下:

  最终执行任务的总耗时为 5000 毫秒。

四、高阶函数优化参数传递

await Promise.all(taskList.map(item => cc.addTask(task, [item])))

  手动传递每个任务的参数的方式显得非常繁琐,这里可以通过「高阶函数实现参数的自动透传」

addTask(fn) {
  return (...args) => {
   return new Promise((resolve) => {
    this.queue.push(new DelayedTask(resolve, fn, args));

    if (this.size) {
     this.size--;
     const { resolve: taskResole, fn: taskFn, args: taskArgs } = this.queue.shift();
     taskResole(this.runTask(taskFn, taskArgs));
    }
   })
  }
 }

改造之后的代码显得简洁了很多:

 await Promise.all(taskList.map(cc.addTask(task)))

五、优化出队操作

  数组一般都是基于一块「连续内存」来存储,当调用数组的 shift 方法时,首先是删除头部元素(时间复杂度 O(1)),然后需要将未删除元素左移一位(时间复杂度 O(n)),所以 shift 操作的时间复杂度为 O(n)。

  由于 JavaScript 语言的特性,V8 在实现 JSArray 的时候给出了一种空间和时间权衡的解决方案,在不同的场景下,JSArray 会在 FixedArray 和 HashTable 两种模式间切换。

  在 hashTable 模式下,shift 操作省去了左移的时间复杂度,其时间复杂度可以降低为 O(1),即使如此,shift 仍然是一个耗时的操作。

  在数组元素比较多且需要频繁执行 shift 操作的场景下,可以通过「reverse + pop」的方式优化。

const Benchmark = require('benchmark');
 const suite = new Benchmark.Suite;

 suite.add('shift', function() {
  let count = 10;
  const arr = generateArray(count);
  while (count--) {
   arr.shift();
  }
 })
 .add('reverse + pop', function() {
  let count = 10;
  const arr = generateArray(count);
  arr.reverse();
  while (count--) {
   arr.pop();
  }
 })
 .on('cycle', function(event) {
  console.log(String(event.target));
 })
 .on('complete', function() {
  console.log('Fastest is ' + this.filter('fastest').map('name'));
  console.log('\n')
 })
 .run({
  async: true
 })

通过 benchmark.js 跑出的基准测试数据,可以很容易地看出哪种方式的效率更高:

  回顾之前 Queue 类的实现,由于只有一个数组来存储任务,直接使用 reverse + pop 的方式,必然会影响任务执行的次序。

  这里就需要引入双数组的设计,一个数组负责入队操作,一个数组负责出队操作。

 class HighPerformanceQueue {
  constructor() {
   this.q1 = []; // 用于 push 数据
   this.q2 = []; // 用于 shift 数据
  }

  push(value) {
   return this.q1.push(value);
  }

  shift() {
   let q2 = this.q2;
   if (q2.length === 0) {
    const q1 = this.q1;
    if (q1.length === 0) {
     return;
    }
    q2 = this.q2 = q1.reverse();
   }

   return q2.pop();
  }

  isEmpty() {
   if (this.q1.length === 0 && this.q2.length === 0) {
    return true;
   }
   return false;
  }
 }

最后通过基准测试来验证优化的效果:

到此这篇关于利用 JavaScript 实现并发控制的示例代码的文章就介绍到这了,更多相关js 并发控制内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Nodejs实战心得之eventproxy模块控制并发

    目标 建立一个 lesson4 项目,在其中编写代码. 代码的入口是 app.js,当调用 node app.js 时,它会输出 CNode(https://cnodejs.org/ ) 社区首页的所有主题的标题,链接和第一条评论,以 json 的格式. 输出示例: [ { "title": "[公告]发招聘帖的同学留意一下这里", "href": "http://cnodejs.org/topic/541ed2d05e28155f24

  • js异步接口并发数量控制的方法示例

    请实现如下的函数(发请求的函数可以直接使用fetch) 可以批量请求数据,所有的URL地址在urls参数中 同时可以通过max参数 控制请求的并发度 当所有的请求结束后,需要执行callback回调 function sendRequest (urls: string[], max: number, callback: () => void) {} fetch 函数返回的是一个promise,promise对象在实例化的时候就已经开始执行了. 简易实现 function fetch(url) {

  • Nodejs爬虫进阶教程之异步并发控制

    之前写了个现在看来很不完美的小爬虫,很多地方没有处理好,比如说在知乎点开一个问题的时候,它的所有回答并不是全部加载好了的,当你拉到回答的尾部时,点击加载更多,回答才会再加载一部分,所以说如果直接发送一个问题的请求链接,取得的页面是不完整的.还有就是我们通过发送链接下载图片的时候,是一张一张来下的,如果图片数量太多的话,真的是下到你睡完觉它还在下,而且我们用nodejs写的爬虫,却竟然没有用到nodejs最牛逼的异步并发的特性,太浪费了啊. 思路 这次的的爬虫是上次那个的升级版,不过呢,上次那个虽

  • 利用 JavaScript 实现并发控制的示例代码

    一.前言   在开发过程中,有时会遇到需要控制任务并发执行数量的需求.   例如一个爬虫程序,可以通过限制其并发任务数量来降低请求频率,从而避免由于请求过于频繁被封禁问题的发生.   接下来,本文介绍如何实现一个并发控制器. 二.示例 const task = timeout => new Promise((resolve) => setTimeout(() => { resolve(timeout); }, timeout)) const taskList = [1000, 3000,

  • 利用javascript打开模态对话框(示例代码)

    1. 标准的方法 复制代码 代码如下: <script type="text/javascript">   function openWin(src, width, height, showScroll){   window.showModalDialog (src,"","location:No;status:No;help:No;dialogWidth:"+width+";dialogHeight:"+heig

  • 分享javascript计算时间差的示例代码

    在实际应用中,需要计算两个时间点之间的差距,一般来说都是计算当前时间和一个指定时间点之间的差距,并且有时候需要精确到天.小时.分钟和秒,下面就简单介绍一下如何实现此效果. 效果图: 距离新年: 代码如下: <html> <head> <title>javascript计算时间差</title> <style type="text/css"> #thenceThen { font-size:2em; } </style&g

  • 利用python生成照片墙的示例代码

    PIL(Python Image Library)是python的第三方图像处理库,但是由于其强大的功能与众多的使用人数,几乎已经被认为是python官方图像处理库了.其官方主页为:PIL. PIL历史悠久,原来是只支持python2.x的版本的,后来出现了移植到python3的库pillow,pillow号称是friendly fork for PIL,其功能和PIL差不多,但是支持python3.本文只使用了PIL那些最常用的特性与用法,主要参考自:http://www.effbot.org

  • 利用Pygame绘制圆环的示例代码

    目录 三角函数 弧度和角度的关系 基本包和事件捕捉 主程序 全部代码 三角函数 如果我们以OP作为圆的半径r,以o点作为圆的圆心,圆上的点的x坐标就是r * cos a ,y坐标就是 r * sin a. python中提供math.cos() 和 math.sin(),要求参数为弧度. 弧度和角度的关系 PI代表180度,PI就是圆周率:3.1415926 535 897392 23846,python提供了角度和弧度的转化 math.degress() 弧度转角度 math.radiens(

  • 利用Python实现网络测试的示例代码

    Speedtest CLI 专为软件开发人员.系统管理员和计算机爱好者等打造,是 Ookla® 提供技术支持的首款正式 Linux 本机 Speedtest 应用程序. Speedtest CLI是使用python语言开发的,不仅可以直接在命令行运行.也可以作为python模块在python IDE中直接调用. 首先,看一下如何在python应用中进行调用,使用pip直接安装. pip install speedtest-cli 将该模块直接导入到我们当前的代码块中. import speedt

  • 利用OpenLayer绘制扇形的示例代码

    目录 创建openlayers地图 绘制扇形方法 我在网上看了很多说是绘制扇形的方法,但是我用的时候都不是很好玩,所以说呢,我自己整理了一下,符合了我想要的效果,尽管我能力有限,还是决定分享一下,因为找资料太难了! 我比较懒,就不废话了,直接上代码! 创建openlayers地图 包我就不复制了,根据官网提供的API自己引用吧! openlayers API地址 创建地图 // 实例化Map map = new Map({ // 创建一个地图 layers: [new TileLayer({ s

  • Python利用Turtle绘制Technoblade的示例代码

    在刚过去不久的6月30日那天,国外一位在YouTube拥有上千万粉丝的我的世界游戏主播Technoblade因癌症与世长辞,年仅23岁,他并没有离开我们,只是用另外一种方式活在了这个世界上. 为了纪念他,特地写了这篇文章,教大家用Turtle绘制出Technoblade,运行效果如下: 代码如下: 首先,用坐标表示每个像素块的颜色,定义一个列表 POS=[ [4,0,(213,164,9)], [6,0,(213,164,9)], [9,0,(213,164,9)], [11,0,(213,16

  • Java客户端利用Jedis操作redis缓存示例代码

    前言 Redis是一个开源的Key-Value数据缓存,和Memcached类似.Redis多种类型的value,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型). Jedis 是 Redis 官方首选的 Java 客户端开发包.下面就来给大家详细关于Java客户端利用Jedis操作redis缓存的相关内容,话不多说,直接来看示例代码吧. 示例代码: //连接redis ,redis的默认端口是6379 Jedis

  • 利用python获取Ping结果示例代码

    前言 本文主要跟大家分享了关于利用python获取Ping结果的相关内容,分享出来供大家参考学习,下面话不多说,来一起看看详细的介绍吧. 示例代码: # -*- coding: utf-8 -*- import subprocess import re def get_ping_result(ip_address): p = subprocess.Popen(["ping.exe", ip_address], stdin = subprocess.PIPE, stdout = subp

随机推荐