详解PHP多进程消费队列

目录
  • 引言
  • nginx进程模型
  • 进程设计
    • 进程信号量设计
    • PHP安装修信号量
    • 信号量和系统调用
    • daemon(守护)进程
  • 命令设计
    • 启动命令
    • 强制停止命令
    • 强制重启命令
    • 平滑停止命令
    • 平滑重启命令
    • 查看进程状态

引言

最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了段时间又处理不过来了......

这种方式每次都要修改crontab,如果进程挂掉了,不会及时的启动,要等到下次crontab执行的时候才会启动。关闭(重启)进程的时候用的是kill,这可能会丢失正在处理的数据,比如下面这个例子,我们假设sleep过程就是处理逻辑,这里为了明显看出效果,将处理时间放大到10s:

<?php
$i = 1;
while (1) {
    echo "开始第[{$i}]次循环\n";
    sleep(10);
    echo "结束第[{$i}]次循环\n";
    $i++;
}

当我们运行脚本之后,等到循环开始之后,给进程发送kill {$pid},默认发送的是编号为15的SIGTERM信号。假设$i是从队列拿到的,拿到2的时候,正在处理,我们给程序发送了kill信号,和队列数据丢失一样,问题比较大,因此我要想办法解决这些问题。

开始第[1]次循环

结束第[1]次循环

开始第[2]次循环

[1]    28372 terminated  php t.php

nginx进程模型

这时候我想到了nginx,nginx作为高性能服务器的中流砥柱,为成千上万的企业和个人服务,他的进程模型比较经典,如下所示:

管理员通过master进程和nginx进行交互,从/path/to/nginx.pid读取nginx master进程的pid,发送信号给master进程,master根据不同的信号做出不同的处理,然后反馈信息给管理员。worker是master进程fork出来的,master负责管理worker,不会去处理业务,worker才是具体业务的处理者,master可以控制worker的退出、启动,当worker意外退出,master会收到子进程退出的消息,也会重新启动新的worker进程补充上来,不让业务处理受影响。nginx还可以平滑退出,不丢失任何一个正在处理的数据,更新配置时nginx可以做到不影响线上服务来加载新的配置,这在请求量很大的时候特别有用。

进程设计

看了nginx的进模型,我们完全可以开发一个类似的类库来满足处理mcq数据的需求,做到单文件控制所有进程、可以平滑退出、可以查看子进程状态。不需要太复杂,因为我们处理队列数据接收一定的延迟,做到nginx那样不间断服务比较麻烦,费时费力,意义不是很大。设计的进程模型跟nginx类似,更像是nginx的简化版本。

进程信号量设计

信号量是进程间通讯的一种方式,比较简单,单功能也比较弱,只能发送信号给进程,进程根据信号做出不同的处理。

master进程启动的时候保存pid到文件/path/to/daeminze.pid,管理员通过信号和master进程通讯,master进程安装3种信号,碰到不同的信号,做出不同的处理,如下所示:

SIGINT => 平滑退出,处理完正在处理的数据再退出

SIGTERM => 暴力退出,无论进程是否正在处理数据直接退出

SIGUSR1 => 查看进程状态,查看进程占用内存,运行时间等信息

master进程通过信号和worker进程通讯,worker进程安装了2个信号,如下所示:

SIGINT => 平滑退出

SIGUSR1 => 查看worker进程自身状态

为什么worker进程只安装2个信号呢,少了个SIGTERM,因为master进程收到信号SIGTERM之后,向worker进程发送SIGKILL信号,默认强制关闭进程即可。

worker进程是通过master进程fork出来的,这样master进程可以通过pcntl_wait来等待子进程退出事件,当有子进程退出的时候返回子进程pid,做处理并启动新的进程补充上来。

master进程也通过pcntl_wait来等待接收信号,当有信号到达的时候,会返回-1,这个地方还有些坑,在下文中会详细讲。

PHP中有2种信号触发的方式,第一种方式是declare(ticks = 1);,这种效率不高,Zend每执行一次低级语句,都会去检查进程中是否有未处理的信号,现在已经很少使用了,PHP 5.3.0及之前的版本可能会用到这个。

第二种是通过pcntl_signal_dispatch来调用未处理的信号,PHP 5.4.0及之后的版本适用,可以巧妙的将该函数放在循环中,性能上基本没什么损失,现在推荐适用。

PHP安装修信号量

PHP通过pcntl_signal安装信号,函数声明如下所示:

bool pcntl_signal ( int $signo , [callback $handler [, bool $restart_syscalls = true ] )

第三个参数restart_syscalls不太好理解,找了很多资料,也没太查明白,经过试验发现,这个参数对pcntl_wait函数接收信号有影响,当设置为缺省值true的时候,发送信号,进程用pcntl_wait收不到,必须设置为false才可以,看看下面这个例子:

<?php
$i = 0;
while ($i<5) {
    $pid = pcntl_fork();
    $random = rand(10, 50);
    if ($pid == 0) {
        sleep($random);
        exit();
    }
    echo "child {$pid} sleep {$random}\n";
    $i++;
}

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
});

while (1) {
    $pid = pcntl_wait($status);
    var_dump($pid);
    pcntl_signal_dispatch();
}

运行之后,我们对父进程发送kill -SIGINT {$pid}信号,发现pcntl_wait没有反应,等到有子进程退出的时候,发送过的SIGINT会一个个执行,比如下面结果:

child 29643 sleep 48

child 29644 sleep 24

child 29645 sleep 37

child 29646 sleep 20

child 29647 sleep 31

int(29643)

Ctrl + C

Ctrl + C

Ctrl + C

Ctrl + C

int(29646)

这是运行脚本之后马上给父进程发送了四次SIGINT信号,等到一个子进程推出的时候,所有信号都会触发。

但当把安装信号的第三个参数设置为false:

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);

这时候给父进程发送SIGINT信号,pcntl_wait会马上返回-1,信号对应的事件也会触发。

所以第三个参数大概意思就是,是否重新注册此信号,如果为false只注册一次,触发之后就返回,pcntl_wait就能收到消息,如果为true,会重复注册,不会返回,pcntl_wait收不到消息。

信号量和系统调用

信号量会打断系统调用,让系统调用立刻返回,比如sleep,当进程正在sleep的时候,收到信号,sleep会马上返回剩余sleep秒数,比如:

<?php
pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);

while (true) {
	pcntl_signal_dispatch();
    echo "123\n";
    $limit = sleep(2);
	echo "limit sleep [{$limit}] s\n";
}

运行之后,按Ctrl + C,结果如下所示:

123

^Climit sleep [1] s

Ctrl + C

123

limit sleep [0] s

123

^Climit sleep [1] s

Ctrl + C

123

^Climit sleep [2] s

daemon(守护)进程

这种进程一般设计为daemon进程,不受终端控制,不与终端交互,长时间运行在后台,而对于一个进程,我们可以通过下面几个步骤把他升级为一个标准的daemon进程:

protected function daemonize()
{
    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork进程失败");
    } elseif ($pid != 0) {
        exit(0);
    }
    if (-1 == posix_setsid()) {
        throw new Exception("新建立session会话失败");
    }

    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork进程失败");
    } else if($pid != 0) {
        exit(0);
    }

    umask(0);
    chdir("/");
}

拢共分五步:

1.fork子进程,父进程退出。

2.设置子进程为会话组长,进程组长。

3.再次fork,父进程退出,子进程继续运行。

4.恢复文件掩码为0。

5.切换当前目录到根目录/。

第2步是为第1步做准备,设置进程为会话组长,必要条件是进程非进程组长,因此做第一次fork,进程组长(父进程)退出,子进程通过posix_setsid()设置为会话组长,同时也为进程组长。

第3步是为了不让进程重新控制终端,因为一个进程控制一个终端的必要条件是会话组长(pid=sid)。

第4步是为了恢复默认的文件掩码,避免之前做的操作对文件掩码做了设置,带来不必要的麻烦。关于文件掩码, linux中,文件掩码在创建文件、文件夹的时候会用到,文件的默认权限为666,文件夹为777,创建文件(夹)的时候会用默认值减去掩码的值作为创建文件(夹)的最终值,比如掩码022下创建文件666 - 222 = 644,创建文件夹777 - 022 = 755:

掩码 新建文件权限 新建文件夹权限
umask(0) 666 (-rw-rw-rw-) 777 (drwxrwxrwx)
umask(022) 644 (-rw-r--r--) 755 (drwxr-xr-x)

第5步是切换了当前目录到根目录/,网上说避免起始运行他的目录不能被正确卸载,这个不是太了解。

对应5步,每一步的各种id变化信息:

操作后 pid ppid pgid sid
开始 17723 31381 17723 31381
第一次fork 17723 1 17723 31381
posix_setsid() 17740 1 17740 17740
第二次fork 17840 1 17740 17740

另外,会话、进程组、进程的关系如下图所示,这张图有助于更好的理解。

至此,你也可以轻松地造出一个daemon进程了。

命令设计

我准备给这个类库设计6个命令,如下所示:

1.start 启动命令

2.restart 强制重启

3.stop 平滑停止

4.reload 平滑重启

5.quit 强制停止

6.status 查看进程状态

启动命令

启动命令就是默认的流程,按照默认流程走就是启动命令,启动命令会检测pid文件中是否已经有pid,pid对应的进程是否健康,是否需要重新启动。

强制停止命令

管理员通过入口文件结合pid给master进程发送SIGTERM信号,master进程给所有子进程发送SIGKILL信号,等待所有worker进程退出后,master进程也退出。

强制重启命令

强制停止命令+启动命令

平滑停止命令

平滑停止命令,管理员给master进程发送SIGINT信号,master进程给所有子进程发送SIGINT,worker进程将自身状态标记为stoping,当worker进程下次循环的时候会根据stoping决定停止,不在接收新的数据,等所有worker进程退出之后,master进程也退出。

平滑重启命令

平滑停止命令+启动命令

查看进程状态

查看进程状态这个借鉴了workerman的思路,管理员给master进程发送SIGUSR1信号,告诉主进程,我要看所有进程的信息,master进程,master进程将自身的进程信息写入配置好的文件路径A中,然后发送SIGUSR1,告诉worker进程把自己的信息也写入文件A中,由于这个过程是异步的,不知道worker进程啥时候写完,所以master进程在此处等待,等所有worker进程都写入文件之后,格式化所有的信息输出,最后输出的内容如下所示:

➜/dir /usr/local/bin/php DaemonMcn.php status

Daemon [DaemonMcn] 信息:

-------------------------------- master进程状态 --------------------------------

pid       占用内存       处理次数       开始时间                 运行时间

16343     0.75M          --             2018-05-15 09:42:45      0 天 0 时 3 分

12 slaver

-------------------------------- slaver进程状态 --------------------------------

任务task-mcq:

16345     0.75M          236            2018-05-15 09:42:45      0 天 0 时 3 分

16346     0.75M          236            2018-05-15 09:42:45      0 天 0 时 3 分

--------------------------------------------------------------------------------

任务test-mcq:

16348     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分

16350     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分

16358     0.75M          49             2018-05-15 09:42:45      0 天 0 时 3 分

16449     0.75M          1              2018-05-15 09:46:40      0 天 0 时 0 分

--------------------------------------------------------------------------------

等待worker进程将进程信息写入文件的时候,这个地方用了个比较trick的方法,每个worker进程输出一行信息,统计文件的行数,达到worker进程的行数之后表示所有worker进程都将信息写入完毕,否则,每个1s检测一次。

以上就是详解PHP多进程消费队列的详细内容,更多关于PHP多进程消费队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • php多进程并发编程防止出现僵尸进程的方法分析

    本文实例讲述了php多进程并发编程防止出现僵尸进程的方法.分享给大家供大家参考,具体如下: 对于用PHP进行多进程并发编程,不可避免要遇到僵尸进程的问题. 僵尸进程是指的父进程已经退出,而该进程dead之后没有进程接受,就成为僵尸进程(zombie)进程.任何进程在退出前(使用exit退出) 都会变成僵尸进程(用于保存进程的状态等信息),然后由init进程接管.如果不及时回收僵尸进程,那么它在系统中就会占用一个进程表项,如果这种僵尸进程过多,最后系统就没有可以用的进程表项,于是也无法再运行其它的

  • PHP 多进程与信号中断实现多任务常驻内存管理实例方法

    本文章基于 pcntl 扩展做的多进程测试. 进程调度策略 父子进程的调度由操作系统来负责,具体先调度子进程还是父进程由系统的调度算法决定,当然可以在父进程加上延时或是调用进程回收函数 pcntl_wait 可以先让子进程先运行,进程回收的目的是释放进程创建时占用的内存空间,防止变成僵尸进程. 信号: 信号称为软中断系统或是叫软中断,功能是向进程发送异步事件通知. 信号编号: [源码基于 SIGINT,SIGTERM,SIGUSR1 信号,含义请自行查看 kill 命令手册,不在描述] linu

  • php 的多进程操作实践案例分析

    本文实例讲述了php 的多进程操作.分享给大家供大家参考,具体如下: php的多进程处理依赖于pcntl扩展,通过pcntl_fork创建子进程来进行并行处理. 例1如下: <?php $pid = pcntl_fork(); if($pid == -1) { //错误处理:创建子进程失败时返回-1. die('fork error'); } else if ($pid) { //父进程会得到子进程号,所以这里是父进程执行的逻辑 echo "parent \n"; //等待子进程

  • PHP多进程简单实例小结

    本文实例讲述了PHP多进程.分享给大家供大家参考,具体如下: PHP创建多进程需要使用到pcntl模块 在编译时加上--enable-pcntl打开进程控制支持,不是Unix类系统不支持此模块 php官网介绍http://php.net/manual/zh/book.pcntl.php,创建子进程需要使用到pcntl_fork(),文档上介绍该函数说 ,pcntl_fork - 在当前进程当前位置产生分支(子进程). 译注:fork是创建了一个子进程,父进程和子进程 都从fork的位置开始向下继

  • php swoole多进程/多线程用法示例【基于php7nts版】

    本文实例讲述了php swoole多进程/多线程用法.分享给大家供大家参考,具体如下: swoole的多线程其实就是多进程,进程创建太多切换的开销很大,如果能用上pthreads建议用pthreads,因为我用的是php7nts版本没办法用pthreads swoole实例如下: <?php /** * 创建多进程 */ $worker_num = 6; // 默认进程数 $workers = []; // 进程保存 $redirect_stdout = false; // 重定向输出 ; 这个

  • php 多进程编程父进程的阻塞与非阻塞实例分析

    本文实例讲述了php 多进程编程父进程的阻塞与非阻塞.分享给大家供大家参考,具体如下: php中进程的阻塞,主要是父进程等待子进程退出. 1.php代码如下: <?php //定义进程数量 define('FORK_NUMS', 5); //用于保存进程pid $pids = array(); //我们创建5个子进程 for ($i = 0; $i < FORK_NUMS; ++$i) { $pids[$i] = pcntl_fork(); if ($pids[$i] == -1) { die

  • php实现的简单多进程服务器类完整示例

    本文实例讲述了php实现的简单多进程服务器类.分享给大家供大家参考,具体如下: php写的一个简单的多进程服务器. <?php class server { public $port; public $ip; protected $server; public function __construct($ip = '0.0.0.0', $port) { $this->ip = $ip; $this->port = $port; $this->createSocket(); //创建

  • php多进程中的阻塞与非阻塞操作实例分析

    本文实例讲述了php多进程中的阻塞与非阻塞操作.分享给大家供大家参考,具体如下: 我们通过pcntl_fork来创建子进程,使用pcntl_wait和pcntl_waitpid来回收子进程. 子进程退出后,父进程没有及时回收,就会产生僵尸进程. 例1: <?php define('FORK_NUMS', 5); $pids = array(); //我们创建5个子进程 for($i = 0; $i < FORK_NUMS; ++$i) { $pids[$i] = pcntl_fork(); i

  • PHP基于swoole多进程操作示例

    本文实例讲述了PHP基于swoole多进程操作.分享给大家供大家参考,具体如下: 多个任务同时执行 将顺序执行的任务,转化为并行执行(任务在逻辑上可以并行执行) 比如,我们要对已知的用户数据进行判断,是否需要发送邮件和短信,如果需要发送则发送. 不使用多进程时,我们首先判断是否发送邮件,如果需要则发送:然后再判断是否需要发送短信,如果需要则发送.如果发送邮件耗时2s,发送短信耗时2s,那么我们完成任务大概需要4s左右的时间. 如果我们使用多线程的话,可以开两个线程,一个用于处理邮件,一个用于处理

  • 详解PHP多进程消费队列

    目录 引言 nginx进程模型 进程设计 进程信号量设计 PHP安装修信号量 信号量和系统调用 daemon(守护)进程 命令设计 启动命令 强制停止命令 强制重启命令 平滑停止命令 平滑重启命令 查看进程状态 引言 最近开发一个小功能,用到了队列mcq,启动一个进程消费队列数据,后边发现一个进程处理不过来了,又加了一个进程,过了段时间又处理不过来了...... 这种方式每次都要修改crontab,如果进程挂掉了,不会及时的启动,要等到下次crontab执行的时候才会启动.关闭(重启)进程的时候

  • 详解Java七大阻塞队列之SynchronousQueue

    目录 分析 其实SynchronousQueue 是一个特别有意思的阻塞队列,就我个人理解来说,它很重要的特点就是没有容量. 直接看一个例子: package dongguabai.test.juc.test; import java.util.concurrent.SynchronousQueue; /** * @author Dongguabai * @description * @date 2021-09-01 21:52 */ public class TestSynchronousQu

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • 详解IOS串行队列与并行队列进行同步或者异步的实例

    详解IOS串行队列与并行队列进行同步或者异步的实例 IOS中GCD的队列分为串行队列和并行队列,任务分为同步任务和异步任务,他们的排列组合有四种情况,下面分析这四种情况的工作方式. 同步任务,使用GCD dispatch_sync 进行派发任务 - (void)testSync { dispatch_queue_t serialQueue = dispatch_queue_create("com.zyt.queue", DISPATCH_QUEUE_SERIAL); dispatch_

  • 详解python数据结构之队列Queue

    一.前言 队列Queue是一种先进先出(FIFO,First In First Out)的线性表.允许一端进行插入(rear),对应的另一段进行删除(front). 本篇包含以下内容: (1)Queue的基本格式 (2)入队列en_queue (3)删除数据函数 de_queue 二.Queue的基本格式 class Queue(): def __init__(self,size): self.size = size self.front = -1 #设置front初始值,每出队列一个数据就加

  • 详解SpringBoot集成消息队列的案例应用

    目录 背景 方案规划 统一设计 集成Redis消息队列 集成ActiveMQ消息队列 使用示例 背景 最近在对公司开发框架进行优化,框架内涉及到多处入库的日志记录,例如登录日志/操作日志/访问日志/业务执行日志,集成在业务代码中耦合度较高且占用业务操作执行时间,所以准备集成相关消息队列进行代码解耦 方案规划 现有的成熟消息队列组件非常多,例如RabbitMQ,ActiveMQ,Kafka等,考虑到业务并发量不高且框架已经应用于多个项目平稳运行,准备提供基于Redis的消息队列和集成ActiveM

  • 详解Java线程池队列中的延迟队列DelayQueue

    目录 DelayQueue延迟队列 DelayQueue使用场景 DelayQueue属性 DelayQueue构造方法 实现Delayed接口使用示例 DelayQueue总结 在阻塞队里中,除了对元素进行增加和删除外,我们可以把元素的删除做一个延迟的处理,即使用DelayQueue的方法.本文就来和大家聊聊Java线程池队列中的DelayQueue—延迟队列 public enum QueueTypeEnum { ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQ

  • 详解如何构建Promise队列实现异步函数顺序执行

    场景 有a.b.c三个异步任务,要求必须先执行a,再执行b,最后执行c 且下一次任务必须要拿到上一次任务执行的结果,才能做操作 思路 我们需要实现一个队列,将这些异步函数添加进队列并且管理它们的执行,队列具有First In First Out的特性,也就是先添加进去的会被先执行,接着才会执行下一个(注意跟栈作区别) 大家也可以类比一下jQuery的animate方法,添加多个动画也会按顺序执行 解决 模拟3个异步函数 // 异步函数a var a = function () { return

  • Python多进程multiprocessing.Pool类详解

    multiprocessing模块 multiprocessing包是Python中的多进程管理包.它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程.该进程可以允许放在Python程序内部编写的函数中.该Process对象与Thread对象的用法相同,拥有is_alive().join([timeout]).run().start().terminate()等方法.属性有:authkey.daemon(要通过start()设置)

  • 进程间通信之深入消息队列的详解

    最近在Hi3515上调试Qt与DVR程序,发现他们之间使用消息队列通信的,闲暇之余,就总结了一下消息队列,呵呵,自认为通俗易懂,同时,在应用中也发现了消息队列的强大之处. 关于线程的管理(互斥量和条件变量)见:Linux线程管理必备:解析互斥量与条件变量的详解 一.消息队列的特点 1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.    2.消息队列允许一个或多个进程向它写入与读取消息.    3.管道和命名管道都是通信数据都是先进先出的原则.    4.消息队列可以

随机推荐