源码分析 Laravel 重复执行同一个队列任务的原因

前言

laravel 的队列服务对各种不同的后台队列服务提供了统一的 API。队列允许你延迟执行消耗时间的任务,比如发送一封邮件。这样可以有效的降低请求响应的时间。

发现问题

在 Laravel 中使用 Redis 处理队列任务,框架提供的功能非常强大,但是最近遇到一个问题,就是发现一个任务被多次执行,这是为什么呢?

先说原因:

因为在 Laravel 中如果一个队列(任务)执行时间大于 60 秒,就会被认为执行失败并重新加入队列中,这样就会导致重复执行同一个任务。

这个任务的逻辑就是给用户推送内容,需要根据队列内容取出用户并遍历,通过请求后端 HTTP 接口发送。比如有 10000 个用户,在用户数量多或接口处理速度没那么快的情况下,执行时间肯定会大于 60 秒,于是这个任务就被重新加入队列。情况更糟糕一点,前面的任务如果都没有在 60 秒执行完,就都会重新加入队列,这样同一个任务就不止重复执行一次了,而是多次。

下面从 Laravel 源代码找一下罪魁祸首。

源代码文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php

/**
 * The expiration time of a job.
 *
 * @var int|null
 */
protected $expire = 60;

这个 $expire 成员变量是一个固定的值,Laravel 认为一个队列再怎么 60 秒也该执行完了吧。取队列方法:

public function pop($queue = null)
{
 $original = $queue ?: $this->default;
 $queue = $this->getQueue($queue);
 $this->migrateExpiredJobs($queue.':delayed', $queue);
 if (! is_null($this->expire)) {
  $this->migrateExpiredJobs($queue.':reserved', $queue);
 }
 list($job, $reserved) = $this->getConnection()->eval(
  LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
 );
 if ($reserved) {
  return new RedisJob($this->container, $this, $job, $reserved, $original);
 }
}

取队列有几步操作,因为队列执行失败,或执行超时等都会放入另外的集合保存起来,以便重试,过程如下:

1.把因执行失败的队列从 delayed 集合重新 rpush 到当前执行的队列中。

2.把因执行超时的队列从 reserved 集合重新 rpush 到当前执行的队列中。

3.然后才是从队列中取任务开始执行,同时把队列放入 reserved 的有序集合。

这里使用了 eval 命令执行这个过程,用到了几个 lua 脚本。

从要执行的队列中取任务:

local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
 reserved = cjson.decode(job)
 reserved['attempts'] = reserved['attempts'] + 1
 reserved = cjson.encode(reserved)
 redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}

可以看到 Laravel 在取 Redis 要执行的队列的时候,同时会放一份到一个有序集合中,并使用过期时间戳作为分值。

只有当这个任务完成后,再把有序集合中这个任务移除。从这个有序集合移除队列的代码就省略,我们看一下 Laravel 如何处理执行时间大于 60 秒的队列。

也就是这段 lua 脚本执行的操作:

local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
if(next(val) ~= nil) then
 redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
 for i = 1, #val, 100 do
  redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
 end
end
return true

这里 zrangebyscore 找出分值从无限小到当前时间戳的元素,也就是 60 秒之前加入到集合的任务,然后通过 zremrangebyrank 从集合移除这些元素并 rpush 到队列中。

看到这里应该就恍然大悟了。

如果一个队列 60 秒没执行完,那么进程在取队列的时候从 reserved 集合中把这些任务又重新 rpush 到队列中。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • Laravel 4.2 中队列服务(queue)使用感受

    这半个月,我参与重写了一个微信公众号后端系统,首次使用了laravel 4.2,以及laravel引以为傲的队列服务(queue). 由于整个系统涉及到多端交互,又有大量语音传输.处理的业务,我们在一些地方发现响应时间过长.之前的系统基于node.js和mongoDB,由于node天生就是异步,有守护进程,所以并没有出现过这个问题,而这次重写必然要引入异步流程了.Queue进入了我们的视线. 根据这一页几乎还全是英文的"中文文档" ,laravel恰好在4.2版本中刚刚引入了redis

  • 浅析Laravel5中队列的配置及使用

    前言 队列常常用于两种场景,一种是高并发的情况,一种是耗时的操作,可以将任务放到队列中去,消费者从队列取任务执行,当然还有失败的情况如何处理,以及延迟,重试,更复杂的情况还有优先级的实现. 在Laravel 5中使用队列非常简单,并且失败处理,延迟,重试的方法都已经实现,下面简单尝试了一下Laravel的队列服务. Laravel默认支持以下几种队列服务:sync, database, beanstalkd, sqs, redis,本例使用redis作为队列服务,需先配置好Redis服务. 1.

  • PHP的Laravel框架中使用消息队列queue及异步队列的方法

    queue配置 首先说明一下我之前的项目中如何使用queue的. 我们现在的项目都是用的symfony,老一点的项目用的symfony1.4,新一点的项目用的都是symfony2.symfony用起来整体感觉还是很爽的,尤其symfony2,整体上来讲使用了很多java里面框架的设计思想.但是他不支持queue.在symfony,我们使用queue也经历了几个过程.最开始使用张堰同学的httpsqs.这个简单使用,但是存在单点.毕竟我们的项目还是正式对外服务的,所以我们研究了Apache旗下的开

  • Laravel使用消息队列需要注意的一些问题

    前言 消息队列对于大型的Web项目来说是必不可少的一个模块,通过消息队列可以解决大并发和多种语言通信接口等问题.对于大并发的问题,可以将耗时的任务或者不能同时大量并行的任务封装起来传输到消息队列中,由处理程序不断从消息队列中提取消息并进行处理,这样通过消息队列的缓冲可以使得在大并发情况下不再阻塞,如果性能不够用还可以添加多个处理任务从消息队列中获取消息进行处理.比如数据库的操作,当对数据库的读.写操作过多时就会存在锁表等问题,读的问题可以通过缓存等方案解决,写的问题就需要消息队列来解决.而且,在

  • Laravel中利用队列发送邮件的方法示例

    前言 本文主要给大家介绍了关于Laravel中队列发送邮件的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: 批量处理任务的场景在我们开发中是经常使用的,比如邮件群发,消息通知,短信,秒杀等等,我们需要将这个耗时的操作放在队列中来处理,从而大幅度缩短Web请求和相应的时间.下面讲解下Laravel中队列的使用 1.配置文件 config/queue.php <?php return [ 'default' => env('QUEUE_DRIVER', 'sync'),

  • 浅谈Laravel队列实现原理解决问题记录

    问题 公司项目使用Laravel的开发的两个项目在同一个测试服务器部署,公用同一个redis.在使用laravel中的队列时,产生冲突干扰. 查找问题原因 在laravel 队列的操作类Illuminate\Queue\RedisQueue.php中可以看到pushRaw()方法: // 将一任务推入队列中 public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()-

  • 源码分析 Laravel 重复执行同一个队列任务的原因

    前言 laravel 的队列服务对各种不同的后台队列服务提供了统一的 API.队列允许你延迟执行消耗时间的任务,比如发送一封邮件.这样可以有效的降低请求响应的时间. 发现问题 在 Laravel 中使用 Redis 处理队列任务,框架提供的功能非常强大,但是最近遇到一个问题,就是发现一个任务被多次执行,这是为什么呢? 先说原因: 因为在 Laravel 中如果一个队列(任务)执行时间大于 60 秒,就会被认为执行失败并重新加入队列中,这样就会导致重复执行同一个任务. 这个任务的逻辑就是给用户推送

  • laravel源码分析队列Queue方法示例

    目录 前言 队列任务的创建 队列任务的分发 前言 队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间.本文我们就来分析下队列创建和执行的源码. 队列任务的创建 先通过命令创建一个 Job 类,成功之后会创建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php. > php artisan make:job DemoJob > Job created suc

  • Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)

    通过前面三篇的分析,我们深入了解了AbstractQueuedSynchronizer的内部结构和一些设计理念,知道了AbstractQueuedSynchronizer内部维护了一个同步状态和两个排队区,这两个排队区分别是同步队列和条件队列.我们还是拿公共厕所做比喻,同步队列是主要的排队区,如果公共厕所没开放,所有想要进入厕所的人都得在这里排队.而条件队列主要是为条件等待设置的,我们想象一下如果一个人通过排队终于成功获取锁进入了厕所,但在方便之前发现自己没带手纸,碰到这种情况虽然很无奈,但是它

  • Netty分布式NioEventLoop任务队列执行源码分析

    目录 执行任务队列 跟进runAllTasks方法: 我们跟进fetchFromScheduledTaskQueue()方法 回到runAllTasks(long timeoutNanos)方法中 章节小结 前文传送门:NioEventLoop处理IO事件 执行任务队列 继续回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectN

  • Netty源码分析NioEventLoop执行select操作入口

    分析完了selector的创建和优化的过程, 这一小节分析select相关操作 select操作的入口,NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SEL

  • jQuery 1.9.1源码分析系列(十四)之常用jQuery工具

    为了给下一章分析动画处理做准备,先来看一下一些工具.其中队列工具在动画处理中被经常使用. jQuery.fn. queue(([ queueName ] [, newQueue ]) || ([ queueName ,] callback ))(获取或设置当前匹配元素上待执行的函数队列. 如果当前jQuery对象匹配多个元素:获取队列时,只获取第一个匹配元素上的队列:设置队列(替换队列.追加函数)时,则为每个匹配元素都分别进行设置.如果需要移除并执行队列中的第一个函数,请使用dequeue()函

  • Java并发系列之ReentrantLock源码分析

    在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant

  • Java并发编程之Condition源码分析(推荐)

    Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充.ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活".Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现. 常用方法 Condition中主要的

  • futuretask源码分析(推荐)

    FutureTask只实现RunnableFuture接口: 该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性. 1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了( Thread(Runnable) ). 2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成.也可以中断执行,判断执行状态等. FutureTask是一个支持取消行为的异

  • Java并发系列之ConcurrentHashMap源码分析

    我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到O(1)级别.Java为我们提供了一个现成的哈希结构,那就是HashMap类,在前面的文章中我曾经介绍过HashMap类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的.为此,Java为我们提供了另外一个HashTable类,它对于多线程同步的处理非常简单粗暴,那就是在HashMap的基础上对其所有方法都使用synchronized关键字进行加锁.这种方法虽然简单,但导致了一个问题,那就是在同一时间

随机推荐