使用 Celery Once 来防止 Celery 重复执行同一个任务

在使用 Celery 的时候发现有的时候 Celery 会将同一个任务执行两遍,我遇到的情况是相同的任务在不同的 worker 中被分别执行,并且时间只相差几毫秒。这问题我一直以为是自己哪里处理的逻辑有问题,后来发现其他人 也有类似的问题,然后基本上出问题的都是使用 Redis 作为 Broker 的,而我这边一方面不想将 Redis 替换掉,就只能在 task 执行的时候加分布式锁了。

不过在 Celery 的 issue 中搜索了一下,有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。 大致看了一下 Celery Once ,发现非常符合现在的情况,就用了下。

Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={'graceful': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

另外如果要手动设置任务的 key,可以指定 keys 参数

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

总得来说,分为几步

第一步,安装

pip install -U celery_once

第二步,增加配置

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

第三步,修改 delay 方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))

第四步,修改 task 参数

@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

参考链接 https://github.com/cameronmaske/celery-once

到此这篇关于使用 Celery Once 来防止 Celery 重复执行同一个任务的文章就介绍到这了,更多相关 Celery 重复执行同一个任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 聊聊通过celery_one避免Celery定时任务重复执行的问题

    在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因. 参考:https://blog.csdn.net/qq_41333582/article/details/83899884 有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once. Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现

  • 使用 Celery Once 来防止 Celery 重复执行同一个任务

    在使用 Celery 的时候发现有的时候 Celery 会将同一个任务执行两遍,我遇到的情况是相同的任务在不同的 worker 中被分别执行,并且时间只相差几毫秒.这问题我一直以为是自己哪里处理的逻辑有问题,后来发现其他人 也有类似的问题,然后基本上出问题的都是使用 Redis 作为 Broker 的,而我这边一方面不想将 Redis 替换掉,就只能在 task 执行的时候加分布式锁了. 不过在 Celery 的 issue 中搜索了一下,有人使用 Redis 实现了分布式锁,然后也有人使用了

  • 源码分析 Laravel 重复执行同一个队列任务的原因

    前言 laravel 的队列服务对各种不同的后台队列服务提供了统一的 API.队列允许你延迟执行消耗时间的任务,比如发送一封邮件.这样可以有效的降低请求响应的时间. 发现问题 在 Laravel 中使用 Redis 处理队列任务,框架提供的功能非常强大,但是最近遇到一个问题,就是发现一个任务被多次执行,这是为什么呢? 先说原因: 因为在 Laravel 中如果一个队列(任务)执行时间大于 60 秒,就会被认为执行失败并重新加入队列中,这样就会导致重复执行同一个任务. 这个任务的逻辑就是给用户推送

  • js定时器(执行一次、重复执行)

    1,只执行一次的定时器 复制代码 代码如下: <script> //定时器 异步运行 function hello(){ alert("hello"); } //使用方法名字执行方法 var t1 = window.setTimeout(hello,1000); var t2 = window.setTimeout("hello()",3000);//使用字符串执行方法 window.clearTimeout(t1);//去掉定时器 </scrip

  • javascript延时重复执行函数 lLoopRun.js

    function lLoopRun(sFuncLoop,sFuncEnd,nDelay) { //writen by http://fengyan.iecn.cn //sFuncLoop >> 字符串型,需要重复执行的Javascript函数或语句(多个函数或语句请用;分隔) //sFuncEnd >> 字符串型,用于中止重复执行动作(sFuncLoop)的Javascript函数或语句 //nDelay >> 数字型,重复执行的时间间隔(毫秒数) var vinter

  • 延时重复执行函数 lLoopRun.js

    公司的一个项目中,有许多地方需要延时执行一些可重复性的函数(动作),就写了下面这段函数. 呵呵,不知道取什么意思更为确切些,就定为了:LoopRun,意为"重复执行"  function lLoopRun(sFuncLoop,sFuncEnd,nDelay) {   var vintervalId = null;   var runString  = sFuncLoop;   var stopString  = sFuncEnd;   var delayTime  = nDelay;

  • JavaScript避免代码的重复执行经验技巧分享

    我喜欢到一些大型网站上去翻阅它们的原代码,期望能找到一些可以应用到自己的代码中的模式,或发现一些之前从未听说过的工具和技巧.可是,在我查看这些大型网站的源代码时,经常会发现一个问题,那就是重复的代码执行,重复的功能应用.下面就是一些在查看它们的源代码时发现一些问题,把这些分享给大家,希望能让你们更加简洁高效的写出JavaScript代码. 重复的收集元素 我在他们的JavaScript代码里看到的最常见的问题是重复的收集元素.虽然jQuery选择器引擎或querySelectorAll的执行速度

  • JavaScript中误用/g导致的正则test()无法正确重复执行的解决方案

    一个简单的利用正则判断输入是否为数字: input1 = '0281234567';input2 = '0282345678';var reg = /^\d+$/g; reg.test(input1); //true reg.test(input2); //false 发现第二次test的时候返回的值不对了.排除书写错误,取值失败等等各种干扰因素后,发现仅仅是reg正则第二次执行的时候就无法正确执行了.之前并未遇到过这个问题,便搜索了一下相关信息. 原来这个问题实际上是/g导致的,而此时也才发现

  • android中ListView多次刷新重复执行getView的解决方法

    以前倒是没有注意listview的getView会重复执行多次,这次因为布局比较复杂,所以在测试的时候去断点跟踪,发现同一条数据不断的重复执行.觉得很奇怪,于是上网搜索了一下.网上的解释基本一致,就是ListView布局时height和width都不是fill_parent,导致不断计算高度,不断刷新.或者说它的父容器没有设置成fill_parent. 可以布局太复杂的情况下,全部按照fill_parent去调整不现实.所以想了另一种方案,就是动态固定高度. 在程序运行后,固定ListView的

  • Shell脚本避免重复执行的方法

    很多用cron定时执行的shell脚本可能会由于各种原因执行很久,会有必要在运行的时候先检查一下自身是否还在运行.本文提供的linux shell脚本用以检查以命令sh ...来执行的shell脚本.要对其他东西进行唯一性检查,可以稍微修改一下源代码. 复制代码 代码如下: # 检查通过sh命令执行的shell脚本是不是还在执行当中,避免重复执行. # 把这段代码放在需要保证唯一性的程序头部即可 # 注意,如果直接把此脚本放到cron里面执行的话,必须再grep -v " -c sh "

随机推荐