关于 Laravel Redis 多个进程同时取队列问题详解

前言

最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。

使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
autostart=true
autorestart=true
numprocs=8
redirect_stderr=true
stdout_logfile=/var/www/xxx.cn/worker.log

注意: numprocs = 8,代表开启 8 个进程来执行 command 中的命令。

如下:

PS C:\Users\tanteng\website\laradock> docker-compose exec php-worker sh
/etc/supervisor/conf.d # ps -ef | grep php
 7 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 8 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 9 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 10 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 11 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 12 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 13 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 14 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon
 44 root  0:00 grep php

Laravel 多进程读取队列内容是否会重复

在 Laravel 的某个控制器方法,一次放入多个任务队列:

public function index(Request $request)
{
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
 $this->dispatch((new SendFile3())->onQueue('sendfile'));
}

在队列处理的方法打印日志,打印处理的队列的 ID:

app/Jobs/SendFile3.php

public function handle()
{
 info('invoke SendFile3');
 dump('invoke handle');
 $rawbody = $this->job->getRawBody();
 $info = json_decode($rawbody, true);
 info('queue id:' . $info['id']);
}

Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:

{
 "job": "Illuminate\\Queue\\CallQueuedHandler@call",
 "data": {
 "commandName": "App\\Jobs\\SendFile3",
 "command": "O:18:\"App\\Jobs\\SendFile3\":4:{s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";s:8:\"sendfile\";s:5:\"delay\";N;}"
 },
 "id": "hadBcy3IpNsnOofQQdHohsa451OkQs88",
 "attempts": 1
}

请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:

这个时候开启 Supervisor 处理队列任务,并查看日志:

[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:JaClJzhDEvntzLCRIz6uRQkCVLbE8Y9C
[2017-12-23 19:01:01] local.INFO: queue id:ukHv0Li4P2VgPa55qU6yEOJM27Mo5YwJ
[2017-12-23 19:01:01] local.INFO: queue id:ObMpwDTmnaveBUkU7aan5abt3Agyt90l
[2017-12-23 19:01:01] local.INFO: queue id:fo2qZn2ftSdQtdnKOciMK7iJb4qlhRGE
[2017-12-23 19:01:01] local.INFO: queue id:uLjFMoOU7Wk7bOAd4zpHb3ccRMJHBtR6
[2017-12-23 19:01:01] local.INFO: queue id:87ULqPBObFmGr16nl5wxFVOi71zGCeRM
[2017-12-23 19:01:01] local.INFO: queue id:9UVl0muQLzBqlRI99rChGW2ElXwVEMIE
[2017-12-23 19:01:01] local.INFO: queue id:a0vgyZuz9HtmH7DGHEpXqesFTcQU3QAF
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:2cXuXxopPkgYiV4WO8gv9CJ6CwXeKtYL
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:9acTAYa8cxpJX6Q3Gb1sULokotP8reqZ
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:BPHQvBboChlv4gr2I0vyLVyw9bijtTYJ
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:Fm6tNajdxYKtdQbDMYDmwWJFLnNikRyg
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:nyAbcvSkBVPbaH3e2ItQkoLJlP1ficib
[2017-12-23 19:01:01] local.INFO: queue id:WBHsSVZtP43569UoPXxfLLJcvYmPW7cP
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:bliPnKcRSDApwVmKLNxEhaKelhm0RDEY
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:eOAoQucEIwRz9uZ64xm6IDKgiqj9Xc3W
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:lzise9EiqQqINrhALbmAI4qNg7qylpb2
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:WXYKvcfOhS1pPnwOwUTsenoMv5l5EUXe
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:XtH5JiwLgnrwWzI02Oyi70pihAOkuJUD
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:9ehmE5HImlpNubpY0xWN8UVrOzxeMqws
[2017-12-23 19:01:01] local.INFO: queue id:C1sK87cpZl47edLA0zhfo7PJ9MIEcoyx
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:2kwl51oH4lyyRrljCReGUCkNiJRDl7oe
[2017-12-23 19:01:01] local.INFO: queue id:ObRpoqrYTPYiyv2delMlOXu3sAPpWJlN
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:6qgu6W3TapLjSrt688yv9HRXvDDLxntz
[2017-12-23 19:01:01] local.INFO: queue id:wiTlERhwn7s9cQkfUF9lLlNADpXjKncI
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:ZSLW0VLFBDpL4wjTJzu3Yb3V45pNe807
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:qhZlXLGfGWRluIeNm7VbllmTJZYb2h5n
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:LUx1IByD3L2psNl9BZwHhk2knXyRPzW6
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:M2RESPjyo5hpAFxxL0EQbWwsUq4jpmWn
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:hUsGaiIAOO6ZfGQc5kGHGpsv5RpoRPYO
[2017-12-23 19:01:01] local.INFO: queue id:cEHJsOy6bLeZ4NbncPziaHqlarMeyyEF
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:w4bkFiJKMU5saqG2xKN3ZRL5BYXGATMk
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:0zBuwbxlrEhhxKfYBkVyTY4z35f154sI
[2017-12-23 19:01:01] local.INFO: queue id:mvoZvyDPvq4tcPjEy9G7PMtH3MwPkPik
[2017-12-23 19:01:01] local.INFO: invoke SendFile3
[2017-12-23 19:01:01] local.INFO: queue id:TLvF74eeidECWKtjZqWvW03UJTRPTL9r
[2017-12-23 19:01:01] local.INFO: queue id:me8wyPfgcz0nf9xvcXz0hf2xVxqa1FFS

这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。

分析一下 Laravel 队列的处理

Laravel 中入队列方法

public function pushRaw($payload, $queue = null, array $options = [])
{
 $this->getConnection()->rpush($this->getQueue($queue), $payload);

 return Arr::get(json_decode($payload, true), 'id');
}

用的是 Redis 的 rpush 命令。

Laravel 中取队列方法

public function pop($queue = null)
{
 $original = $queue ?: $this->default;
 $queue = $this->getQueue($queue);
 $this->migrateExpiredJobs($queue.':delayed', $queue);
 if (! is_null($this->expire)) {
  $this->migrateExpiredJobs($queue.':reserved', $queue);
 }
 list($job, $reserved) = $this->getConnection()->eval(
  LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
 );
 if ($reserved) {
  return new RedisJob($this->container, $this, $job, $reserved, $original);
 }
}

这里用的是 lua 脚本取队列,如下:

public static function pop()
{
 return <<<'LUA'
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
LUA;
}

那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • redis 队列操作的例子(php)

    入队操作 复制代码 代码如下: <?php $redis = new Redis(); $redis->connect('127.0.0.1',6379); while(True){ try{ $value = 'value_'.date('Y-m-d H:i:s'); $redis->LPUSH('key1',$value); sleep(rand()%3); echo $value."\n"; }catch(Exception $e){ echo $e->g

  • 详解Redis用链表实现消息队列

    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换.个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样. 链表实现消息队列 Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型.可以利用lpush和rpop来实现.但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源

  • php中使用redis队列操作实例代码

    例1,入队操作: 复制代码 代码如下: <?php$redis = new Redis();$redis->connect('127.0.0.1',6379);while(True){  try{    $value = 'value_'.date('Y-m-d H:i:s');    $redis->LPUSH('key1',$value);    sleep(rand()%3);    echo $value."\n";  }catch(Exception $e)

  • redis实现简单队列

    在工作中,时常会有用到队列的场景,比较常见的用rabbitMQ这些专业的组件,官网地址是:http://www.rabbitmq.com,重要的是官方有.net的客户端,但是如果对rabbitMQ不熟悉的话,建议使用第三方封装好的 EasyNetQ,rabbitMQ比较适合对安全性,稳定性要求较高的地方,但有时我们也会有对这方面要求不是很高的场景,比如:文章阅读数,实时性要求不是很高的地方,所以我想到了用redis来做队列. redis 的List结构本身就是一个链表 (双向链表),所以符合我们

  • 关于 Laravel Redis 多个进程同时取队列问题详解

    前言 最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下. 使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下: [program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /var/www/xxx.cn/artisan queue:work --que

  • 使用Redis有序集合实现IP归属地查询详解

    工作中经常遇到一类需求,根据 IP 地址段来查找 IP 对应的归属地信息.如果把查询过程放到关系型数据库中,会带来很大的 IO 消耗,速度也不能满足,显然是不合适的. 那有哪些更好的办法呢?为此做了一些尝试,下面来详细说明. 构建索引文件 在 GitHub 上看到一个ip2region 项目,作者通过生成一个包含有二级索引的文件来实现快速查询,查询速度足够快,毫秒级别.但如果想更新地址段或归属地信息,每次都要重新生成文件,并不是很方便. 不过还是推荐大家看看这个项目,其中建索引的思想还是很值得学

  • Java进程内缓存框架EhCache详解

    目录 一:目录 二: 简介 2.1.基本介绍 2.2.主要的特性 2.3. 集成 2.4. ehcache 和 redis 比较 三:事例 3.1.在pom.xml中引入依赖 3.2.在src/main/resources/创建一个配置文件 ehcache.xml 3.3.测试类 3.4.缓存配置 一:xml配置方式: 二:编程方式配置 3.5.Ehcache API 四:Spring整合 4.1.pom.xml 引入spring和ehcache 4.2.在src/main/resources添

  • C语言中进程间通讯的方式详解

    目录 一.无名管道 1.1无名管道的原理 1.2功能 1.3无名管道通信特点 1.4无名管道的实例 二.有名管道 2.1有名管道的原理 2.2有名管道的特点 2.3有名管道实例 三.信号 3.1信号的概念 3.2发送信号的函数 3.3常用的信号 3.4实例 四.IPC进程间通信 4.1IPC进程间通信的种类 4.2查看IPC进程间通信的命令 4.3消息队列 4.4共享内存 4.5信号灯集合 一.无名管道 1.1无名管道的原理 无名管道只能用于亲缘间进程的通信,无名管道的大小是64K.无名管道是内

  • 微信小程序中form 表单提交和取值实例详解

    微信小程序中form 表单提交和取值实例详解 我们知道,如果我们直接给 input 添加 bindinput,比如:<input bindinput="onUsernameInput" />,那么可以在 onUsernameInput 中直接使用 e.detail.value,即: onUsernameInput : function(e) { e.detail.value; } 但是,如果有多个输入控件,我们不可能为每个控件添加 bindinput.bindchange

  • Java语言实现快速幂取模算法详解

    快速幂取模算法的引入是从大数的小数取模的朴素算法的局限性所提出的,在朴素的方法中我们计算一个数比如5^1003%31是非常消耗我们的计算资源的,在整个计算过程中最麻烦的就是我们的5^1003这个过程 缺点1:在我们在之后计算指数的过程中,计算的数字不都拿得增大,非常的占用我们的计算资源(主要是时间,还有空间) 缺点2:我们计算的中间过程数字大的恐怖,我们现有的计算机是没有办法记录这么长的数据的,所以说我们必须要想一个更加高效的方法来解决这个问题 当我们计算AB%C的时候,最便捷的方法就是调用Ma

  • 对numpy Array [: ,] 的取值方法详解

    NumPy数组是一个多维数组对象,称为ndarray 创建一个numpy数组,如下所示 import numpy as np x=np.array([[1,2,3],[4,5,6],[7,8,9]]) 使用的方法和python中的元表差不多 print(x[0:2]) >>>[[1 2 3] [4 5 6]] print(x[:]) >>>[[1 2 3] [4 5 6] [7 8 9]] #有些比较复杂逗号的用法,不一定需要掌握,知道是什么意思即可 #有时候希望只取某

  • 基于Python的Post请求数据爬取的方法详解

    为什么做这个 和同学聊天,他想爬取一个网站的post请求 观察 该网站的post请求参数有两种类型:(1)参数体放在了query中,即url拼接参数(2)body中要加入一个空的json对象,关于为什么要加入空的json对象,猜测原因为反爬虫.既有query参数又有空对象体的body参数是一件脑洞很大的事情. 一开始先在apizza网站 上了做了相关实验才发现上面这个规律的,并发现该网站的请求参数要为raw形式,要是直接写代码找规律不是一件容易的事情. 源码 import requests im

  • java通过Jsoup爬取网页过程详解

    这篇文章主要介绍了java通过Jsoup爬取网页过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一,导入依赖 <!--java爬虫--> <dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.10.3</version> </depe

  • laravel Task Scheduling(任务调度)在windows下的使用详解

    前言 laravel的任务调度是很好用的,因为Laravel提供了平滑而又富有表现力地调度器,并且服务器上只需要一个Cron条目即可,这使我们从编写手动写crontab中解放出来,使得程序重新获得定时任务的控制权. 但是,我们发现laravel的文档中只提供了linux/unix下的解决方案,但是我们大多数时候本地开发环境都是在windows下搭建的,所以,我们需要解决这些问题. 问题 本篇博客主要是围绕着如下几个问题展开 1. windows下,怎么使用laravel任务调度? 2. wind

随机推荐