基于Python实现配置热加载的方法详解

目录
  • 背景
  • 如何实现
  • 使用多进程实现配置热加载
    • 使用signal信号量来实现热加载
    • 采用multiprocessing.Event 来实现配置热加载
  • 结语

背景

由于最近工作需求,需要在已有项目添加一个新功能,实现配置热加载的功能。所谓的配置热加载,也就是说当服务收到配置更新消息之后,我们不用重启服务就可以使用最新的配置去执行任务。

如何实现

下面我分别采用多进程、多线程、协程的方式去实现配置热加载。

使用多进程实现配置热加载

如果我们代码实现上使用多进程, 主进程1来更新配置并发送指令,任务的调用是进程2,如何实现配置热加载呢?

使用signal信号量来实现热加载

当主进程收到配置更新的消息之后(配置读取是如何收到配置更新的消息的? 这里我们暂不讨论), 主进程就向进子程1发送kill信号,子进程1收到kill的信号就退出,之后由信号处理函数来启动一个新的进程,使用最新的配置文件来继续执行任务。

main 函数

def main():
    # 启动一个进程执行任务
    p1 = Process(target=run, args=("p1",))
    p1.start()

    monitor(p1, run) # 注册信号
    processes["case100"] = p1 #将进程pid保存
    num = 0
    while True: # 模拟获取配置更新
        print(
            f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
        print(f"{processes=}\n")
        sleep(2)
        if num == 4:
            kill_process(processes["case100"]) # kill 当前进程
        if num == 8:
            kill_process(processes["case100"]) # kill 当前进程
        if num == 12:
            kill_process(processes["case100"]) # kill 当前进程
        num += 1

signal_handler 函数

def signal_handler(process: Process, func, signum, frame):
    # print(f"{signum=}")
    global counts

    if signum == 17:  # 17 is SIGCHILD
        # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD
        for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
            signal.signal(signame, SIG_DFL)

        print("Launch a new process")
        p = multiprocessing.Process(target=func, args=(f"p{counts}",))
        p.start()
        monitor(p, run)
        processes["case100"] = p
        counts += 1

    if signum == 2:
        if process.is_alive():
            print(f"Kill {process} process")
            process.terminate()
        signal.signal(SIGCHLD, SIG_IGN)
        sys.exit("kill parent process")

完整代码如下

#! /usr/local/bin/python3.8
from multiprocessing import Process
from typing import Dict
import signal
from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
import multiprocessing
from multiprocessing import Process
from typing import Callable
from data import processes
import sys
from functools import partial
import time

processes: Dict[str, Process] = {}
counts = 2

def run(process: Process):
    while True:
        print(f"{process} running...")
        time.sleep(1)

def kill_process(process: Process):
    print(f"kill {process}")
    process.terminate()

def monitor(process: Process, func: Callable):
    for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
        # SIGTERM is kill signal.
        # No SIGCHILD is not trigger singnal_handler,
        # No SIGINT is not handler ctrl+c,
        # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'>
        signal.signal(signame, partial(signal_handler, process, func))

def signal_handler(process: Process, func, signum, frame):
    print(f"{signum=}")
    global counts

    if signum == 17:  # 17 is SIGTERM
        for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
            signal.signal(signame, SIG_DFL)
        print("Launch a new process")
        p = multiprocessing.Process(target=func, args=(f"p{counts}",))
        p.start()
        monitor(p, run)
        processes["case100"] = p
        counts += 1

    if signum == 2:
        if process.is_alive():
            print(f"Kill {process} process")
            process.terminate()
        signal.signal(SIGCHLD, SIG_IGN)
        sys.exit("kill parent process")

def main():
    p1 = Process(target=run, args=("p1",))
    p1.start()
    monitor(p1, run)
    processes["case100"] = p1
    num = 0
    while True:
        print(
            f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
        print(f"{processes=}\n")
        time.sleep(2)
        if num == 4:
            kill_process(processes["case100"])
        if num == 8:
            kill_process(processes["case100"])
        if num == 12:
            kill_process(processes["case100"])
        num += 1

if __name__ == '__main__':
    main()

执行结果如下

multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1

processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}

p1 running...
p1 running...
kill <Process name='Process-1' pid=2533 parent=2532 started>
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1

processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}

signum=17
Launch a new process
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
kill <Process name='Process-2' pid=2577 parent=2532 started>
signum=17
Launch a new process
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1

processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>}

p3 running...
p3 running...
multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1

总结

好处:使用信号量可以处理多进程之间通信的问题。

坏处:代码不好写,写出来代码不好理解。信号量使用必须要很熟悉,不然很容易自己给自己写了一个bug.(所有初学者慎用,老司机除外。)

还有一点不是特别理解的就是process.terminate() 发送出信号是SIGTERM number是15,但是第一次signal_handler收到信号却是number=17,如果我要去处理15的信号,就会导致前一个进程不能kill掉的问题。欢迎有对信号量比较熟悉的大佬,前来指点迷津,不甚感谢。

采用multiprocessing.Event 来实现配置热加载

实现逻辑是主进程1 更新配置并发送指令。进程2启动调度任务。

这时候当主进程1更新好配置之后,发送指令给进程2,这时候的指令就是用Event一个异步事件通知。

直接上代码

scheduler 函数

def scheduler():
    while True:
        print('wait message...')
        case_configurations = scheduler_notify_queue.get()
        print(f"Got case configurations {case_configurations=}...")

        task_schedule_event.set() # 设置set之后, is_set 为True

        print(f"Schedule will start ...")
        while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
            run(case_configurations)

        print("Clearing all scheduling job ...")

event_scheduler 函数

def event_scheduler(case_config):

    scheduler_notify_queue.put(case_config)
    print(f"Put cases config to the Queue ...")

    task_schedule_event.clear() # clear之后,is_set 为False
    print(f"Clear scheduler jobs ...")

    print(f"Schedule job ...")

完整代码如下

import multiprocessing
import time

scheduler_notify_queue = multiprocessing.Queue()
task_schedule_event = multiprocessing.Event()

def run(case_configurations: str):
    print(f'{case_configurations} running...')
    time.sleep(3)

def scheduler():
    while True:
        print('wait message...')
        case_configurations = scheduler_notify_queue.get()

        print(f"Got case configurations {case_configurations=}...")
        task_schedule_event.set()

        print(f"Schedule will start ...")
        while task_schedule_event.is_set():
            run(case_configurations)

        print("Clearing all scheduling job ...")

def event_scheduler(case_config: str):

    scheduler_notify_queue.put(case_config)
    print(f"Put cases config to the Queue ...")

    task_schedule_event.clear()
    print(f"Clear scheduler jobs ...")

    print(f"Schedule job ...")

def main():
    scheduler_notify_queue.put('1')
    p = multiprocessing.Process(target=scheduler)
    p.start()

    count = 1
    print(f'{count=}')
    while True:
        if count == 5:
            event_scheduler('100')
        if count == 10:
            event_scheduler('200')
        count += 1
        time.sleep(1)

if __name__ == '__main__':
    main()

执行结果如下

wait message...
Got case configurations case_configurations='1'...
Schedule will start ...
1 running...
1 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='100'...
Schedule will start ...
100 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='200'...
Schedule will start ...
200 running...
200 running...

总结

使用Event事件通知,代码不易出错,代码编写少,易读。相比之前信号量的方法,推荐大家多使用这种方式。

使用多线程或协程的方式,其实和上述实现方式一致。唯一区别就是调用了不同库中,queue和 event.

# threading
scheduler_notify_queue = queue.Queue()
task_schedule_event = threading.Event()

# async
scheduler_notify_queue = asyncio.Queue()
task_schedule_event = asyncio.Event()

结语

具体的实现的方式有很多,也各自有各自的优劣势。我们需要去深刻理解到需求本身,才去做技术选型。

以上就是基于Python实现配置热加载的方法详解的详细内容,更多关于Python配置热加载的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python实现加载及解析properties配置文件的方法

    本文实例讲述了Python实现加载及解析properties配置文件的方法.分享给大家供大家参考,具体如下: 这里参考前面一篇:http://www.jb51.net/article/137393.htm 我们都是在java里面遇到要解析properties文件,在python中基本没有遇到这中情况,今天用python跑深度学习的时候,发现有些参数可以放在一个global.properties全局文件中,这样使用的时候更加方便.原理都是加载文件,然后用line方法进行解析判断"=",自

  • 利用python进行数据加载

    前言 最近参加了datawhale的组队学习活动,在组队学习动员下,开始通过强迫自己输出来实现更好的输入与处理,6-15开始自己的第一次文章发布,我会把自己这个真的很小白遇到的问题写出来,希望能给屏幕前小白的你带来帮助. 工作中大量繁琐的自动化,把以前在学校摸过的python重新捡起来,不成体系的.拼图一样把需要的工作搭建起来,工作暂时是可用上了,每天节省了至少3个小时的数据处理工作,手里拿着python这个锤子,看什么都像钉子. 首先,你要先学会安装软件,anaconda软件,安装成功后,你点

  • Python实现从文件中加载数据的方法详解

    前几篇都是手动录入或随机函数产生的数据.实际有许多类型的文件,以及许多方法,用它们从文件中提取数据来图形化. 比如之前python基础(12)介绍打开文件的方式,可直接读取文件中的数据,扩大了我们的数据来源.下面,将展示几种方法. 我们将使用内置的 csv 模块加载CSV文件 CSV文件是一种特殊的文本文件,文件中的数据以逗号作为分隔符,很适合进行数据的解析.先用excle建立如下表格和数据,另存为csv格式文件,放到代码目录下. 包含在Python标准库中自带CSV 模块,我们只需要impor

  • 一行代码实现Python动态加载依赖

    目录 快速开始 通过 pip 安装运行 注入代码运行 前几天在一个开源项目里遇到好多用户反馈,不会安装依赖,或者执行 pip install -r requirements.txt 没有反应. 可能造成的原因有很多种,一一排查起来也很麻烦. 想一劳永逸解决这个问题,一般大家都是到 site-packages 里面把所需要的包导出来,放到项目根目录. 但这样终究太过粗糙,不符合Python优雅的个性. 所以我就想,能不能动态引入包,如果没有的话,再调用 pip 下载.最后也差不多实现了我的设想.

  • python selenium禁止加载某些请求的实现

    目录 问题描述 解决方案 参考 问题描述 通过selenium请求目标网站时候, 真实数据(我这里是验证码图片)已经加载出来, 由于网站做了第三方上报所以得等待很久, 但是上报这个请求不是必须的. 例如 验证码已经加载完成, 但是huatuo.qq.com响应时间过长 , webdriver.get()的机制是等待请求的url响应全部完成才进行下一步. 显示等待和隐式等待的作用是每隔多少秒来检测一下这个地址是否加载完成, 所以此处不生效. 那我要做的是: 当请求目标url时候, 希望webdri

  • 基于Python实现配置热加载的方法详解

    目录 背景 如何实现 使用多进程实现配置热加载 使用signal信号量来实现热加载 采用multiprocessing.Event 来实现配置热加载 结语 背景 由于最近工作需求,需要在已有项目添加一个新功能,实现配置热加载的功能.所谓的配置热加载,也就是说当服务收到配置更新消息之后,我们不用重启服务就可以使用最新的配置去执行任务. 如何实现 下面我分别采用多进程.多线程.协程的方式去实现配置热加载. 使用多进程实现配置热加载 如果我们代码实现上使用多进程, 主进程1来更新配置并发送指令,任务的

  • Go 热加载之fresh详解

    正文 热加载是指可以在不重启服务的情况下,保存后即可让更改的代码生效的一种开发模式.热加载可以显著的提升开发和调试的效率,有了热加载后,说明你不用重新再编译.再执行了. 特别是涉及到效果渲染,如前端的开发中,如果每次改了代码后都要重新编译再执行,那开发效率太低了. 最完美的就是:代码改了啥,我一保存,前端页面马上就能显示我改了啥,这样就十分方便. 同样 Go 里面也有这种热加载的机制,Go 语言具有 部署简单.并发性好.上手快 的优势,如 Docker.Kubnernetes 等都是用 Go 开

  • mybatis xml文件热加载实现示例详解

    目录 引言 一.xml 文件热加载实现原理 1.1 xml 文件怎么样解析 1.2 实现思路 二.mybatis-xmlreload-spring-boot-starter 登场 2.1 核心代码 2.2 安装方式 2.3 使用配置 最后 引言 本文博主给大家带来一篇 mybatis xml 文件热加载的实现教程,自博主从事开发工作使用 Mybatis 以来,如果需要修改 xml 文件的内容,通常都需要重启项目,因为不重启的话,修改是不生效的,Mybatis 仅仅会在项目初始化的时候将 xml

  • PHP 7.4中使用预加载的方法详解

    前言 PHP 7.4增加了预加载支持,这一功能可以显着提高代码的性能. 这是一个简单的预加载: 为了预加载文件,您需要编写自定义PHP脚本 此脚本在服务器启动时执行一次 所有预加载的文件都可在内存中用于所有请求 在重新启动服务器之前,对源文件所做的更改不会产生任何影响 让我们深入研究一下. Opcache,但更多 虽然预加载是在顶级操作opcache上构建的,但它并不完全相同.Opcache将获取您的PHP源文件,将其编译为"操作码",并将这些编译后的文件存储在磁盘上. 您可以将&qu

  • Android ListView异步加载图片方法详解

    本文实例讲述了Android ListView异步加载图片方法.分享给大家供大家参考,具体如下: 先说说这篇文章的优点把,开启线程异步加载图片,然后刷新UI显示图片,而且通过弱引用缓存网络加载的图片,节省了再次连接网络的开销. 这样做无疑是非常可取的方法,但是加载图片时仍然会感觉到轻微的卡屏现象,特别是listview里的item在进行快速滑动的时候. 我找了一下原因,可能是在listview快速滑动屏幕的时候划过的item太多 而且每次调用getView方法后就会异步的在过去某个时间内用han

  • PHP从零开始打造自己的MVC框架之类的自动加载实现方法详解

    本文实例讲述了PHP从零开始打造自己的MVC框架之类的自动加载实现方法.分享给大家供大家参考,具体如下: 前面介绍了MVC框架的入口文件,接下来我们希望完成一个"自动加载类"的功能,我们把这个功能放到Imooc这个基础类当中. core\imooc.php: <?php namespace core; class Imooc { public static $classMap = array(); static public function run() { p('ok'); $

  • 微信小程序实现瀑布流布局与无限加载的方法详解

    前言 瀑布流布局是一种比较流行的页面布局方式,最典型的就是Pinterest.com,每个卡片的高度不都一样,形成一种参差不齐的美感. 在HTML5中,我们可以找到很多基于jQuery之类实现的瀑布流布局插件,轻松做出这样的布局形式.在微信小程序中,我们也可以做出这样的效果,不过由于小程序框架的一些特性,在实现思路上还是有一些差别的. 今天我们就来看一下如何在小程序中去实现这种瀑布流布局: 小程序瀑布流布局 我们要实现的是一个固定2列的布局,然后将图片数据动态加载进这两列中(而加载进来的图片,会

  • Kubernetes中Nginx配置热加载的全过程

    目录 前言 使用方法 总结 前言 Nginx本身是支持热更新的,通过nginx -s reload指令,实际通过向进程发送HUB信号实现不停服重新加载配置,然而在Docker或者Kubernetes中,每次都需要进容器执行nginx -s reload指令,单docker容器还好说,可以在外面通过exec指定容器执行该指令进行热加载,Kubernetes的话,就比较难受了 今天介绍一下Kubernetes中Nginx热加载配置的处理方法——reloader reloader地址:https://

  • 详解tomcat热部署和热加载的方法

    详解tomcat热部署和热加载的方法 我在项目开发过程中,经常要改动Java/JSP 文件,但是又不想从新启动服务器(服务器从新启动花时间),想直接获得(debug)结果.有两种方式热部署 和热加载: 1.热加载:在server.xml -> context 属性中 设置 reloadable="true" <Context docBase="xxx" path="/xxx" reloadable="true"/&

  • JS图片懒加载库VueLazyLoad详解

    目录 背景 说明 实现原理 1. placeholder 的实现很细致和灵活 2. 添加图片缓存 3. 事件监听使用节流 4. 监听事件不止滚动事件 5. 事件列队的方式来处理懒加载 6. 支持 data-srcset 7. 自定义控制可视区的判定范围 待完善 1. 没有解决布局抖动 2. 跳过已经加载图片的判断方式 3. 局部懒加载 4. 性能不是很好 5. observer 模式配置简单 6. SEO 不友好 总结 背景 上篇<图片懒加载原理方案详解>中详细解析了图片懒加载的原理和方案.主

随机推荐