Node.js + Redis Sorted Set实现任务队列

需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。

根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。

在设计任务队列的过程中需要考虑到的几个问题

  • 并行执行多个任务
  • 任务唯一性
  • 任务成功或失败后的处理

针对以上问题的解决方案

  • 并行执行多个任务利用 Promise.all 来实现
  • 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
  • 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码

// remote_api.js 模拟第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
  setTimeout(() => {
    let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求
    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
  }, 3000);
});

app.listen('9001', () => {
  console.log('API 服务监听端口:9001');
});
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
  // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
    if (error) {
      console.log(error);
    } else {
      if (task) {
        console.log('任务已存在,不新增相同任务');
        callback(null, task);
      } else {
        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, result);
          }
        });
      }
    }
  });
}

app.get('/', (req, res) => {
  let taskName = req.query['task-name'];
  addTaskToQueue(taskName, (error, result) => {
    if (error) {
      console.log(error);
    } else {
      res.status(200).send('正在查询中......');
    }
  });
});

app.listen(9002, () => {
  console.log('生产者服务监听端口:9002');
});
// consumer.js 定时获取任务并执行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量

function getTasksFromQueue(callback) {
  // 获取多个任务
  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
    if (error) {
      callback(error);
    } else {
      // 将任务分值设置为 0,表示正在处理
      if (tasks.length > 0) {
        let tmp = [];
        tasks.forEach((task) => {
          tmp.push(0);
          tmp.push(task);
        });
        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, tasks)
          }
        });
      }
    }
  });
}

function addFailedTaskToQueue(taskName, callback) {
  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  });
}

function removeSucceedTaskFromQueue(taskName, callback) {
  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  })
}

function execTask(taskName) {
  return new Promise((resolve, reject) => {
    let requestOptions = {
      'url': 'http://127.0.0.1:9001',
      'method': 'GET',
      'timeout': 5000
    };
    request(requestOptions, (error, response, body) => {
      if (error) {
        resolve('failed');
        console.log(error);
        addFailedTaskToQueue(taskName, (error) => {
          if (error) {
            console.log(error);
          } else {

          }
        });
      } else {
        try {
          body = typeof body !== 'object' ? JSON.parse(body) : body;
        } catch (error) {
          resolve('failed');
          console.log(error);
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
          return;
        }
        if (body.status !== 200) {
          resolve('failed');
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        } else {
          resolve('succeed');
          removeSucceedTaskFromQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        }
      }
    });
  });
}

// 定时,每隔 5 秒获取新的任务来执行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
  console.log('获取新任务');
  getTasksFromQueue((error, tasks) => {
    if (error) {
      console.log(error);
    } else {
      if (tasks.length > 0) {
        console.log(tasks);

        Promise.all(tasks.map(execTask))
        .then((results) => {
          console.log(results);
        })
        .catch((error) => {
          console.log(error);
        });

      }
    }
  });
});
(0)

相关推荐

  • php操作redis中的hash和zset类型数据的方法和代码例子

    前面一篇博客主要是string类型,list类型和set类型,下面hash类型和zset类型 1,hset 描述:将哈希表key中的域field的值设为value.如果key不存在,一个新的哈希表被创建并进行HSET操作.如果域field已经存在于哈希表中,旧值将被覆盖. 参数:key field value 返回值:如果field是哈希表中的一个新建域,并且值设置成功,返回1.如果哈希表中域field已经存在且旧值已被新值覆盖,返回0. 2,hsetnx 描述:将哈希表key中的域field的

  • Redis教程(五):Set数据类型

    一.概述: 在Redis中,我们可以将Set类型看作为没有排序的字符集合,和List类型一样,我们也可以在该类型的数据值上执行添加.删除或判断某一元素是否存在等操作.需要说明的是,这些操作的时间复杂度为O(1),即常量时间内完成次操作.Set可包含的最大元素数量是4294967295.       和List类型不同的是,Set集合中不允许出现重复的元素,这一点和C++标准库中的set容器是完全相同的.换句话说,如果多次添加相同元素,Set中将仅保留该元素的一份拷贝.和List类型相比,Set类

  • 详解Java使用Pipeline对Redis批量读写(hmset&hgetall)

    一般情况下,Redis Client端发出一个请求后,通常会阻塞并等待Redis服务端处理,Redis服务端处理完后请求命令后会将结果通过响应报文返回给Client. 感觉这有点类似于HBase的Scan,通常是Client端获取每一条记录都是一次RPC调用服务端. 在Redis中,有没有类似HBase Scanner Caching的东西呢,一次请求,返回多条记录呢? 有,这就是Pipline.官方介绍 http://redis.io/topics/pipelining 通过pipeline方

  • Redis Set 集合的实例详解

     Redis Set 集合的实例详解 Redis的Set是string类型的无序集合.集合成员是唯一的,这就意味着集合中不能出现重复的数据. redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1). 集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员). 实例 redis 127.0.0.1:6379> SADD runoobkey redis (integer) 1 redis 127.0.0.1:6379> SADD ru

  • C++访问Redis的mset 二进制数据接口封装方案

    需求 C++中使用hiredis客户端接口访问redis: 需要使用mset一次设置多个二进制数据 以下给出三种封装实现方案: 简单拼接方案 在redis-cli中,mset的语法是这样的: 复制代码 代码如下: /opt/colin$./redis-cli mset a 11 b 22 c 333 OK 按照这样的语法拼接后,直接使用hiredis字符串接口redisCommand传递: void msetNotBinary(redisContext *c, const vector<stri

  • Redis教程(六):Sorted-Sets数据类型

    一.概述: Sorted-Sets和Sets类型极为相似,它们都是字符串的集合,都不允许重复的成员出现在一个Set中.它们之间的主要差别是Sorted-Sets中的每一个成员都会有一个分数(score)与之关联,Redis正是通过分数来为集合中的成员进行从小到大的排序.然而需要额外指出的是,尽管Sorted-Sets中的成员必须是唯一的,但是分数(score)却是可以重复的.     在Sorted-Set中添加.删除或更新一个成员都是非常快速的操作,其时间复杂度为集合中成员数量的对数.由于So

  • Node.js + Redis Sorted Set实现任务队列

    需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据.为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求.然后定时从任务队列里中取出任务调用第三方 API,若返回状态为"

  • 在Node.js应用中读写Redis数据库的简单方法

    在开始本文之前请确保安装好 Redis 和 Node.js 以及 Node.js 的 Redis 扩展 -- node_redis 首先创建一个新文件夹并新建文本文件 app.js 文件内容如下: var redis = require("redis") , client = redis.createClient(); client.on("error", function (err) { console.log("Error " + err);

  • 在Node.js应用中使用Redis的方法简介

    在开始本文之前请确保安装好 Redis 和 Node.js 以及 Node.js 的 Redis 扩展 -- node_redis 首先创建一个新文件夹并新建文本文件 app.js 文件内容如下: var redis = require("redis") , client = redis.createClient(); client.on("error", function (err) { console.log("Error " + err);

  • Node.js与Sails redis组件的使用教程

    有段时间没写关于NodeJs的文章了,今天也是为了解决高并发的问题,而想起了这个东西,IIS的站点在并发量达到200时有了一个瓶颈,于是想到了这个对高并发支持比较好的框架,nodeJs在我之前写出一些文章,主要为sails框架为主,介绍了一些使用方法,今天主要说下redis组件! 项目:SailsMvc 开发工具:webstorm 语言:nodejs 框架:sails 包:redis 主要介绍几个用法,为string,set,hash和list的使用 测试redis组件的代码 index: fu

  • Node.js开发之访问Redis数据库教程

    大家要记住,Node.js主要用于构建高性能.高可伸缩性的服务器和客户端应用,它面向的是"实时Web". Node.js的目标是提供一个"以简单的方式构建可扩展的网络服务器",它受到来自Ruby语言的事件机(Event Machine)和来自Python的Twisted框架的影响. Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,并提供多种语言的API.从2010年3月15日起,Redis的开发工作由

  • node.js使用redis储存session的方法

    转储session的原因 网上有许多session需要用数据库储存的原因,对我来说原因很简单,仅仅只是node的生产环境不允许将session存到服务器的内存中.会报一个内存溢出的风险警告.所以我决定将session转储到数据库中.而用于存储session的方案有许多,这里由于本人比较菜,所以选择了主流的redis来保存我的session状态 安装redis 首先既然要使用redis,那么第一步当然是将redis安装到服务器上,服务器一般都是linux的操作系统. 所以下面是linux的安装步骤

  • 提升node.js中使用redis的性能遇到的问题及解决方法

    问题初现 某基于node.js开发的业务系统向外提供了一个dubbo服务,提供向第三方缓存查询.设置多项业务数据并聚合操作结果.在QPS达到800时(两台虚拟机,每台机器4Core8G4node进程),在监控平台上出现了非常多的slow rt警告,平均接口响应达到60+ms,请求报警率达到80%+. 为找到造成该服务吞吐量过低的罪魁祸首,业务人员在请求日志中打点了所有查询缓存的操作,结果显示每个请求查询缓存耗时在50-100ms之间跳动.查询了redis-server的监控数据发现,不存在ser

  • node.js中 redis 的安装和基本操作示例

    本文实例讲述了node.js中 redis 的安装和基本操作.分享给大家供大家参考,具体如下: 一.win下安装redis https://github.com/MicrosoftArchive/redis/releases 下载Redis-x64-3.2.100.zip,然后解压,放到自定义目录. 然后打开命令行工具,进入到该目录下,运行安装redis服务. redis-server.exe --service-install redis.windows-service.conf --logl

  • 使用node.js 制作网站前台后台

    node.js  能做什么?我至今也不清楚,他在哪方面应用比较广泛,我没有机会接触到那样的项目.只是因为喜欢,业余时间做了一个网站和后台.深刻领悟到一个道理那就是如果你喜欢一项技术可以玩玩,但是如果用到项目中就必须花些时间去解决很多问题. 使用到的技术: express + jade sqlite + sequelize redis 1. 关于jade 支持include.  比如: include ./includes/header  header 是一个局部视图,类似asp.net  用户控

  • 深入分析node.js的异步API和其局限性

    用异步API的原因 异步的概念之所以首先在Web2.0中火起来,是因为在浏览器中Javascript在单线程上执行,而且他还与UI渲染公用一个线程.这意味着Javascript在执行的时候UI渲染和响应是处于停滞状态的.为了用户体验更好而采取异步的方式(当然,这在所谓的单线程语言中)不阻塞主线程继续响应用户操作.这属于用户体验的范畴. 同样的,如果有其他语言经验的工程师当然也明白,CPU在线程间切换是需要消耗大量的时间的(主要为上下文之间的切换和缓存),所以提高效率也是使用异步API的理由. 当

随机推荐