swoole_process实现进程池的方法示例

swoole —— 重新定义PHP

swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process 的研究在swoole中显得尤为重要。

预备知识

IO多路复用

swoole 中的io多路复用表现为底层的 epoll进程模型,在C语言中表现为 epoll 函数。

  • epoll 模型下会持续监听自己名下的素有socket 描述符 fd
  • 当触发了 socket 监听的事件时,epoll 函数才会响应,并返回所有监听该时间的 socket 集合
  • epoll 的本质是阻塞IO,它的优点在于能同事处理大量socket连接

Event loop 事件循环

swoole 对 epoll 实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)

  • Event loop 是一个Reactor线程,其中运行了一个epoll实例。
  • 通过swoole_event_add将socket描述符的一个事件添加到epoll监听中,事件发生时将执行回调函数
  • 不可用于fpm环境下,因为fpm在任务结束时可能会关掉进程。

swoole_process

  • 基于C语言封装的进程管理模块,方便php来调用
  • 内置管道、消息队列接口,方便实现进程间通信

我们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。

  • 静态模式 即初始化固定的进程数,当来了一个请求时,从中选取一个进程来处理。
  • 动态模式 指定最小、最大进程数,当请求量过大,进程数不超过最大限制时,新增线程去处理请求

接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通信、定时器等使用,实际情况使用封装好的swoole_server来实现task任务队列池会更方便。

假如有个定时投递的任务队列:

<?php

/**
 * 动态进程池,类似fpm
 * 动态新建进程
 * 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数
 */

// 一个进程定时投递任务

/**
 * 1. tick
 * 2. process及其管道通讯
 * 3. event loop 事件循环
 */
class processPool
{
  private $pool;

  /**
   * @var swoole_process[] 记录所有worker的process对象
   */
  private $workers = [];

  /**
   * @var array 记录worker工作状态
   */
  private $used_workers = [];

  /**
   * @var int 最小进程数
   */
  private $min_woker_num = 5;

  /**
   * @var int 初始进程数
   */
  private $start_worker_num = 10;

  /**
   * @var int 最大进程数
   */
  private $max_woker_num = 20;

  /**
   * 进程闲置销毁秒数
   * @var int
   */
  private $idle_seconds = 5;

  /**
   * @var int 当前进程数
   */
  private $curr_num;

  /**
   * 闲置进程时间戳
   * @var array
   */
  private $active_time = [];

  public function __construct()
  {
    $this->pool = new swoole_process(function () {
      // 循环建立worker进程
      for ($i = 0; $i < $this->start_worker_num; $i++) {
        $this->createWorker();
      }
      echo '初始化进程数:' . $this->curr_num . PHP_EOL;
      // 每秒定时往闲置的worker的管道中投递任务
      swoole_timer_tick(1000, function ($timer_id) {
        static $count = 0;
        $count++;
        $need_create = true;
        foreach ($this->used_workers as $pid => $used) {
          if ($used == 0) {
            $need_create = false;
            $this->workers[$pid]->write($count . ' job');
            // 标记使用中
            $this->used_workers[$pid] = 1;
            $this->active_time[$pid] = time();
            break;
          }
        }
        foreach ($this->used_workers as $pid => $used)
          // 如果所有worker队列都没有闲置的,则新建一个worker来处理
          if ($need_create && $this->curr_num < $this->max_woker_num) {
            $new_pid = $this->createWorker();
            $this->workers[$new_pid]->write($count . ' job');
            $this->used_workers[$new_pid] = 1;
            $this->active_time[$new_pid] = time();
          }

        // 闲置超过一段时间则销毁进程
        foreach ($this->active_time as $pid => $timestamp) {
          if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
            // 销毁该进程
            if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) {
              $this->workers[$pid]->write('exit');
              unset($this->workers[$pid]);
              $this->curr_num = count($this->workers);
              unset($this->used_workers[$pid]);
              unset($this->active_time[$pid]);
              echo "{$pid} destroyed\n";
              break;
            }
          }
        }

        echo "任务{$count}/{$this->curr_num}\n";

        if ($count == 20) {
          foreach ($this->workers as $pid => $worker) {
            $worker->write('exit');
          }
          // 关闭定时器
          swoole_timer_clear($timer_id);
          // 退出进程池
          $this->pool->exit(0);
          exit();
        }
      });

    });

    $master_pid = $this->pool->start();
    echo "Master $master_pid start\n";

    while ($ret = swoole_process::wait()) {
      $pid = $ret['pid'];
      echo "process {$pid} existed\n";
    }
  }

  /**
   * 创建一个新进程
   * @return int 新进程的pid
   */
  public function createWorker()
  {
    $worker_process = new swoole_process(function (swoole_process $worker) {
      // 给子进程管道绑定事件
      swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
        $data = trim($worker->read());
        if ($data == 'exit') {
          $worker->exit(0);
          exit();
        }
        echo "{$worker->pid} 正在处理 {$data}\n";
        sleep(5);
        // 返回结果,表示空闲
        $worker->write("complete");
      });
    });

    $worker_pid = $worker_process->start();

    // 给父进程管道绑定事件
    swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
      $data = trim($worker_process->read());
      if ($data == 'complete') {
        // 标记为空闲
//        echo "{$worker_process->pid} 空闲了\n";
        $this->used_workers[$worker_process->pid] = 0;
      }
    });

    // 保存process对象
    $this->workers[$worker_pid] = $worker_process;
    // 标记为空闲
    $this->used_workers[$worker_pid] = 0;
    $this->active_time[$worker_pid] = time();
    $this->curr_num = count($this->workers);
    return $worker_pid;
  }

}

new processPool();

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解PHP swoole process的使用方法

    引入背景:假如我们每天有10000个订单生成,需要同步到仓储系统中去,以前做法是开启一个crontab去跑这些任务,但是发现总有感觉同步效率低,间隔时间都是分钟级别的. 解决方案测试:我们将同步订单的任务表添加一个hash作为key,作为分发条件,因为mysql中select如果做mod函数是用不到索引的,所以我们自己做随机hash,但是务必不需要范围太大,以免服务器资源不够,方法是根据hashkey投放到不同的进程中进行同步,测试代码如下 <?php /** * Created by PhpS

  • swoole_process实现进程池的方法示例

    swoole -- 重新定义PHP swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process 的研究在swoole中显得尤为重要. 预备知识 IO多路复用 swoole 中的io多路复用表现为底层的 epoll进程模型,在C语言中表现为 epoll 函数. epoll 模型下会持续监听自己名下的素有socket 描述符 fd 当触发了 socket 监听的事件时,epoll 函数才会响应,并返回所有监听该时间的 socket

  • java基于quasar实现协程池的方法示例

    业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍.所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!在csdn中基本都是对它的基本使用,用法和线程差不多.不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~) 一个线程可以多个协程,一个进程也可以单独拥有多个协程. 线程进程都是同步机制,而协程则是异步. 协程能保留上一次调用时的状态,每次过程重入时,就相当于进

  • Python获取系统所有进程PID及进程名称的方法示例

    本文实例讲述了Python获取系统所有进程PID及进程名称的方法.分享给大家供大家参考,具体如下: psutil模块中提供了进程管理方法,引用其中的几个方法就能够获得进程的相关信息.简单写个小脚本测试一下,顺便看看本机Windows7系统中到底运行着多少个进程,进程都是什么. 代码: # -*- coding: utf-8 -*- #! python2 #!/usr/bin/python import psutil pids = psutil.pids() for pid in pids: p

  • springboot配置druid连接池的方法示例

    Druid的简介 Druid首先是一个数据库连接池.Druid是目前最好的数据库连接池,在功能.性能.扩展性方面,都超过其他数据库连接池,包括DBCP.C3P0.BoneCP.Proxool.JBoss DataSource.Druid已经在阿里巴巴部署了超过600个应用,经过一年多生产环境大规模部署的严苛考验.Druid是阿里巴巴开发的号称为监控而生的数据库连接池! Druid的功能 1.替换DBCP和C3P0.Druid提供了一个高效.功能强大.可扩展性好的数据库连接池. 2.可以监控数据库

  • java 注解实现一个可配置线程池的方法示例

    前言 项目需要多线程执行一些Task,为了方便各个服务的使用.特意封装了一个公共工具类,下面直接撸代码: PoolConfig(线程池核心配置参数): /** * <h1>线程池核心配置(<b style="color:#CD0000">基本线程池数量.最大线程池数量.队列初始容量.线程连接保持活动秒数(默认60s)</b>)</h1> * * <blockquote><code> * <table bord

  • PHP 进程池与轮询调度算法实现多任务的示例代码

    phper 请了解进程调度策略,CPU 时间片,进程控制[创建,销毁,回收,进程信号]与及进程运行流程和基本的进程组,信号中断原理,以及进程之间的关系. 关于进程的更多内容可参考本人前面撸过的文章或是百度了解. 进程的通信: 匿名管道,命名管道,消息队列,内存共享,socketpair 请自行撸代码测试哦 进程的调度算法: 轮询,随机分发,计分板等策略或是搞个优先极或是队列,或是堆栈等基本的算法[自己去发挥哦] 进程池: 撸过 tcp 的话应该知道要能处理多个客户端,就得用 IO 复用技术[事件

  • python进程池实现的多进程文件夹copy器完整示例

    本文实例讲述了python进程池实现的多进程文件夹copy器.分享给大家供大家参考,具体如下: 应用:文件夹copy器(多进程版) import multiprocessing import os import time import random def copy_file(queue, file_name,source_folder_name, dest_folder_name): """copy文件到指定的路径""" f_read = op

  • python自动下载图片的方法示例

    近日闲来无事,总有一种无形的力量萦绕在朕身边,让朕精神涣散,昏昏欲睡. 可是,像朕这么有职业操守的社畜怎么能在上班期间睡瞌睡呢,我不禁陷入了沉思.... 突然旁边的IOS同事问:'嘿,兄弟,我发现一个网站的图片很有意思啊,能不能帮我保存下来提升我的开发灵感?' 作为一个坚强的社畜怎么能说自己不行呢,当时朕就不假思索的答应:'oh, It's simple. Wait for me a few minute.' 点开同事给的图片网站, 网站大概长这样: 在朕翻看了几十页之后,朕突然觉得有点上头.心

  • python 多线程实现多任务的方法示例

    目录 1 多线程实现多任务 1.1 什么是线程? 1.2 一个程序实现多任务的方法 1.3 多线程的创建方式 1.3.1 创建threading.Thread对象 1.3.2 继承threading.Thread,并重写run 1.4 线程何时开启,何时结束 1.5 线程的 join() 方法 1.6 多线程共享全局变量出现的问题 1.7 互斥锁可以弥补部分线程安全问题.(互斥锁和GIL锁是不一样的东西!) 1.8 线程池ThreadPoolExecutor 1.8.1 创建线程池 1.8.2 

  • Python语法学习之进程池与进程锁详解

    目录 进程池 什么是进程池 进程池的创建模块 - multiprocessing 创建进程池函数 - Pool 进程池的常用方法 apply_async 函数演示案例 close 函数与 join 函数 演示 进程锁 进程锁的概念 进程锁的加锁与解锁 NICE!大家好,在上一章节,我们学习了 multiprocessing 模块 的关于进程的创建与进场常用的方法的相关知识. 通过在一个主进程下创建多个子进程可以帮助我们加速程序的运行,并且提高工作效率.不过上一章节文末我们也说过进程的问题,由于每

随机推荐