聊聊通过celery_one避免Celery定时任务重复执行的问题

在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。

参考:https://blog.csdn.net/qq_41333582/article/details/83899884

有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。

Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base

@task(base=QueueOnce, once={'graceful': True})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。

另外如果要手动设置任务的 key,可以指定 keys 参数

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

解决步骤

Celery One允许你将Celery任务排队,防止多次执行

安装

pip install -U celery_once

要求,需要Celery4.0,老版本可能运行,但不是官方支持的。

使用celery_once,tasks需要继承一个名为QueueOnce的抽象base tasks

Once安装完成后,需要配置一些关于ONCE的选项在Celery配置中

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')

# 一般之前的配置没有这个,需要添加上
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

# 在原本没有参数的里面加上base
@celery.task(base=QueueOnce)
def slow_task():
    sleep(30)
    return "Done!"

要确定配置,需要取决于使用哪个backend进行锁定,查看Backends

在后端,这将覆盖apply_async和delay。它不影响直接调用任务。

在运行任务时,celery_once检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发AlreadyQueued异常。

example.delay(10)
example.delay(10)
Traceback (most recent call last):
    ..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
    ..
AlreadyQueued()

graceful:如果在任务的选项中设置了once={'graceful': True},或者在运行时设置了apply_async,则任务可以返回None,而不是引发AlreadyQueued异常。

from celery_once import AlreadyQueued
# Either catch the exception,
try:
    example.delay(10)
except AlreadyQueued:
    pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
    sleep(30)
    return "Done!"

其他功能请访问:https://pypi.org/project/celery_once/

到此这篇关于通过celery_one避免Celery定时任务重复执行的文章就介绍到这了,更多相关Celery定时任务重复执行内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python Celery定时任务的示例

    本文介绍了python Celery定时任务的示例,分享给大家,具体如下: 配置 启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE . Celery的定时任务都由celery beat来进行调度.celery beat默认按照settings.py之中的时区时间来调度定时任务. 创建定时任务 一种创建定时任务的方式是配置CELERYBEAT_SCHEDULE: #每30秒调用task.add from datetime import timedelta CELERYBEA

  • Django+Celery实现动态配置定时任务的方法示例

    哈喽,今天给大家分享一篇Django+Celery实现动态配置定时任务,因为最近也是无意间看到一位大佬关于这块的文章,然后自己觉得不错,也想学习写一下,然后最终实现功能是在前端页面统一管理计划任务,大家可以在admin管理页面设置,也可以在自己写的前端页面删除添加编辑,实时生效,还可以监控这些监控任务是否运行成功失败. 补充:如果大家对celery不熟悉的话,建议先学习celery 一.安装 1.在Linux系统上安装模块 celery (3.1.26.post2) celery-with-re

  • django+xadmin+djcelery实现后台管理定时任务

    继上一篇中间表的数据是动态的,图表展示的数据才比较准确.这里用到一个新的模块Djcelery,安装配置步骤如下: 1.安装 redis==2.10.6 celery==3.1.23 django-celery==3.1.17 flower==0.9.2 supervisor==3.3.4 flower用于监控定时任务,supervisor管理进程,可选 2.配置 settings.py中添加以下几行: #最顶头加上 from __future__ import absolute_import #

  • Django配置celery(非djcelery)执行异步任务和定时任务

    所有演示均基于Django2.0 celery是一个基于python开发的简单.灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度.采用典型的生产者-消费者模型,主要由三部分组成: 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis.rabbitmq等作为broker 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程 存储结果的bac

  • celery实现动态设置定时任务

    本文实例为大家分享了celery动态设置定时任务的具体代码,供大家参考,具体内容如下 首先celery是一种异步任务队列,如果还不熟悉这个开源软件的请先看看官方文档,快速入门. 这里讲的动态设置定时任务的方法不使用数据库保存定时任务的信息,所以是项目重启后定时任务配置就会丢失,如果想保存成永久配置,可以考虑保存到数据库.redis或者使用pickle.json保存成文件,在项目启动时自动载入. 方法原理介绍 先来看一下celery的beat运行过程. 上图是beat的主要组成结构,beat中包含

  • celery4+django2定时任务的实现代码

    网上有很多celery + django实现定时任务的教程,不过它们大多数是基于djcelery + celery3的: 或者是使用django_celery_beat配置较为繁琐的. 显然简洁而高效才是我们最终的追求,而celery4已经不需要额外插件即可与django结合实现定时任务了,原生的celery beat就可以很好的实现定时任务功能. 当然使用原生方案的同时有几点插件所带来的好处被我们放弃了: 插件提供的定时任务管理将不在可用,当我们只需要任务定期执行而不需要人为调度的时候这点忽略

  • 聊聊通过celery_one避免Celery定时任务重复执行的问题

    在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因. 参考:https://blog.csdn.net/qq_41333582/article/details/83899884 有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once. Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现

  • Springboot定时任务Scheduled重复执行操作

    今天用scheduled写定时任务的时候发现定时任务一秒重复执行一次,而我的cron表达式为 * 0/2 * * * * . 在源码调试的过程中,发现是我的定时任务执行过程太短导致的. 于是我另外写了个简单的定时任务 @Component public class TestJob { @Scheduled(cron = "* 0/2 * * * *") public void test() { System.out.println("测试开始"); System.o

  • 使用 Celery Once 来防止 Celery 重复执行同一个任务

    在使用 Celery 的时候发现有的时候 Celery 会将同一个任务执行两遍,我遇到的情况是相同的任务在不同的 worker 中被分别执行,并且时间只相差几毫秒.这问题我一直以为是自己哪里处理的逻辑有问题,后来发现其他人 也有类似的问题,然后基本上出问题的都是使用 Redis 作为 Broker 的,而我这边一方面不想将 Redis 替换掉,就只能在 task 执行的时候加分布式锁了. 不过在 Celery 的 issue 中搜索了一下,有人使用 Redis 实现了分布式锁,然后也有人使用了

  • Linux中crontab定时任务不执行的原因

    最近在linux中遇到了个crontab定时任务不执行的case,在这给大家分享一下,避免踩到我遇到的坑. 先贴脚本吧 为了方便展示,把脚本入参,都写死了 #!/usr/bin/env bash # 1输出文件,到log 例如: bg # 2 目标文件目录 不带最后的/ 例如:/opt/flume/home/hdp_lbg_ectech/rawdata/xxx/web_vip_bg # 3 执行的时候选择前几天的日志 例如:1 LAST_DAY=$(date +"%Y-%m-%d" -

  • Python Celery定时任务详细讲解

    目录 前言 一.Celery定时任务是什么? 二.使用步骤 1.代码结构 2.启动定时任务 3.执行结果 总结 前言 Celery在python中的应用除了实现异步任务(async task)外也可以执行定时任务(beat) 一.Celery定时任务是什么? Celery默认任务单元由任务生产者触发,但有时可能需要其自动触发,而Beat进程正是负责此类任务,能够自动触发定时/周期性任务. Celery 进行周期任务也很简单,只需要在配置中配置好周期任务,然后在运行一个周期任务触发器(beat)即

  • js定时器(执行一次、重复执行)

    1,只执行一次的定时器 复制代码 代码如下: <script> //定时器 异步运行 function hello(){ alert("hello"); } //使用方法名字执行方法 var t1 = window.setTimeout(hello,1000); var t2 = window.setTimeout("hello()",3000);//使用字符串执行方法 window.clearTimeout(t1);//去掉定时器 </scrip

  • javascript延时重复执行函数 lLoopRun.js

    function lLoopRun(sFuncLoop,sFuncEnd,nDelay) { //writen by http://fengyan.iecn.cn //sFuncLoop >> 字符串型,需要重复执行的Javascript函数或语句(多个函数或语句请用;分隔) //sFuncEnd >> 字符串型,用于中止重复执行动作(sFuncLoop)的Javascript函数或语句 //nDelay >> 数字型,重复执行的时间间隔(毫秒数) var vinter

  • 延时重复执行函数 lLoopRun.js

    公司的一个项目中,有许多地方需要延时执行一些可重复性的函数(动作),就写了下面这段函数. 呵呵,不知道取什么意思更为确切些,就定为了:LoopRun,意为"重复执行"  function lLoopRun(sFuncLoop,sFuncEnd,nDelay) {   var vintervalId = null;   var runString  = sFuncLoop;   var stopString  = sFuncEnd;   var delayTime  = nDelay;

  • JavaScript避免代码的重复执行经验技巧分享

    我喜欢到一些大型网站上去翻阅它们的原代码,期望能找到一些可以应用到自己的代码中的模式,或发现一些之前从未听说过的工具和技巧.可是,在我查看这些大型网站的源代码时,经常会发现一个问题,那就是重复的代码执行,重复的功能应用.下面就是一些在查看它们的源代码时发现一些问题,把这些分享给大家,希望能让你们更加简洁高效的写出JavaScript代码. 重复的收集元素 我在他们的JavaScript代码里看到的最常见的问题是重复的收集元素.虽然jQuery选择器引擎或querySelectorAll的执行速度

随机推荐