PHP并发多进程处理利器Gearman使用介绍

工作中我们有时候会遇到比如需要同时发布数据到多个个服务器上,或者同时处理多个任务。可以使用PHP的curl_multi的方式并发处理请求,但是由于网络和数据以及各个服务器等等的一些情况导致这种并发处理的响应时间很慢,因为在并发请求的过程中还包括记录日志,处理数据等逻辑,等待处理结果并返回,所以也不能友好的满足后台操作的体验。

现在有另外一种方案,利Gearman来实现并发的需求。通过Client将请求发送到Gearman的Jobs,在每个Work中来再来进行curl_multi和数据处理和日志等一些操作,同时用supervisor 来监控Gearman以及Works的进程,这样可以实现一个并行的多进程和负载均衡的方案。

Gearman可以做什么:

异步处理:图片处理,订单处理,批量邮件/通知之类的
要求高CPU或内存的处理:大容量的数据处理,MapReduce运算,日志聚集,视频编码
分布式和并行的处理
定时处理:增量更新,数据复制
限制速率的FIFO处理
分布式的系统监控任务

Gearman工作原理:
使用Gearman的应用通常有三部分组成:一个Client、一个Worker、一个 任务服务器。 Client的作用是提出一个 Job 任务 交给 Job Server 任务服务器。Job Server 会去寻找一个 合适的 Worker 来完成这项任务。Worker 执行由 Client 发送过来的 Job,并且将结果通过 Job Server 返回给 Client。Gearman 提供了 Client 和 Worker 的 API,利用这些API 应用可以同 Gearman Job Server来进行通信。Gearman 内部 Client 和 Worker 之间的通信都是通过 TCP 连接来进行的。

Gearman可以将工作的负载分担到不同的机器中。

安装:

代码如下:

rpm -ivh http://dl.iuscommunity.org/pub/ius/stable/Redhat/6/x86_64/epel-release-6-5.noarch.rpm
yum install -y gearmand

启动:
gearmand -d

安装PHP Gearman扩展
我都是用pcel来安装的,你也可以下载源码包来编译安装,但是记得要先安装libgearman和re2c,不然扩展编译安装会出错。

pecl install gearman #不成功并提示版本问题可以试试 pecl install gearman-1.0.3,默认好像是1.1.2
编译安装也很简单

代码如下:

wget -c http://pecl.php.net/get/gearman-1.1.1.tgz
tar zxvf gearman-1.1.1.tgz
phpize
./configure
make && make install
echo "extension=gearman.so" >> /etc/php.ini

PHP接口函数
Gearman提供很多完善的扩展函数,包括GearmanClient,GearmanJob,GearmanTask,GearmanWorker,具体可以查看PHP官方手册.
这是官方提供的Example其中的一个,相当与一个并发的分发任务处理的例子

<?php

$client = new GearmanClient();
$client->addServer();

// initialize the results of our 3 "query results" here
$userInfo = $friends = $posts = null;

// This sets up what gearman will callback to as tasks are returned to us.
// The $context helps us know which function is being returned so we can
// handle it correctly.
$client->setCompleteCallback(function(GearmanTask $task, $context) use (&$userInfo, &$friends, &$posts) {
switch ($context)
{
case 'lookup_user':
$userInfo = $task->data();
break;
case 'baconate':
$friends = $task->data();
break;
case 'get_latest_posts_by':
$posts = $task->data();
break;
}
});

// Here we queue up multiple tasks to be execute in *as much* parallelism as gearmand can give us
$client->addTask('lookup_user', 'joe@joe.com', 'lookup_user');
$client->addTask('baconate', 'joe@joe.com', 'baconate');
$client->addTask('get_latest_posts_by', 'joe@joe.com', 'get_latest_posts_by');

echo "Fetching...\n";
$start = microtime(true);
$client->runTasks();
$totaltime = number_format(microtime(true) - $start, 2);

echo "Got user info in: $totaltime seconds:\n";
var_dump($userInfo, $friends, $posts);

gearman_work.php

<?php

$worker = new GearmanWorker();
$worker->addServer();

$worker->addFunction('lookup_user', function(GearmanJob $job) {
// normally you'd so some very safe type checking and query binding to a database here.
// ...and we're gonna fake that.
sleep(3);
return 'The user requested (' . $job->workload() . ') is 7 feet tall and awesome';
});

$worker->addFunction('baconate', function(GearmanJob $job) {
sleep(3);
return 'The user (' . $job->workload() . ') is 1 degree away from Kevin Bacon';
});

$worker->addFunction('get_latest_posts_by', function(GearmanJob $job) {
sleep(3);
return 'The user (' . $job->workload() . ') has no posts, sorry!';
});

while ($worker->work());

我在3个终端中都执行了gearman_work.php

ryan@ryan-lamp:~$ ps aux | grep gearman* | grep -v grep
gearman 1504 0.0 0.1 60536 1264 ? Ssl 11:06 0:00 /usr/sbin/gearmand --pid-file=/var/run/gearman/gearmand.pid --user=gearman --daemon --log-file=/var/log/gearman-job-server/gearman.log --listen=127.0.0.1
ryan 2992 0.0 0.8 43340 9036 pts/0 S+ 14:05 0:00 php /var/www/gearmand_work.php
ryan 3713 0.0 0.8 43340 9036 pts/1 S+ 14:05 0:00 php /var/www/gearmand_work.php
ryan 3715 0.0 0.8 43340 9036 pts/2 S+ 14:05 0:00 php /var/www/gearmand_work.php

来查看下执行gearman_work.php的结果shell

代码如下:

Fetching...
Got user info in: 3.03 seconds:
string(59) "The user requested (joe@joe.com) is 7 feet tall and awesome"
string(56) "The user (joe@joe.com) is 1 degree away from Kevin Bacon"
string(43) "The user (joe@joe.com) has no posts, sorry!"

看到上面的3.03 seconds,说明client请求过去的任务被并行分发执行了。
在实际的生产环境中,为了监测gearmand和work的进程没有被意外退出,我们可以借助Supervisor这个工具.

(0)

相关推荐

  • PHP多进程之pcntl_fork的实例详解

    PHP多进程编之pcntl_fork的实例详解 其实PHP是支持并发的,只是平时很少使用而已.平时使用最多的应该是使用PHP-FMP调度php进程了吧. 但是,PHP的使用并不局限于做Web,我们完全也可以使用PHP来进行系统工具类的编程,做监控或者是运维.在使用这些方向的时候,我们可以使用到PHP的更多特性,例如并发(多进程).socket编程等. 那么接下来就说说我遇到的PHP多进程的编程.这个多进程的使用是有一个背景的,下面模糊描述一下背景. 我需要一个监控系统,当然使用PHP语言,监控系

  • 以实例全面讲解PHP中多进程编程的相关函数的使用

    PHP有一组进程控制函数(编译时需要–enable-pcntl与posix扩展),使得php能实现跟c一样的创建子进程.使用exec函数执行程序.处理信号等功能. <?php header('content-type:text/html;charset=utf-8' ); // 必须加载扩展 if (!function_exists("pcntl_fork")) { die("pcntl extention is must !"); } //总进程的数量 $t

  • PHP多进程编程实例详解

    本文实例讲述了PHP多进程编程.分享给大家供大家参考,具体如下: 第一步: $ php -m  命令查看php是否安装pcntl 和 posix扩展,若没有则安装 使用场景: 1. 要进行大量的网络耗时的操作 2. 要做大量的运算,并且,系统有多个cpu,为了让用户有更快的体验,把一个任务,分成几个小任务,最后合并. 多进程常用函数: pcntl_alarm - 为进程设置一个alarm闹钟信号 pcntl_errno - 别名 pcntl_strerror pcntl_exec - 在当前进程

  • 分享PHP-pcntl 实现多进程代码

    PHP使用PCNTL系列的函数也能做到多进程处理一个事务.比如我需要从数据库中获取80w条的数据,再做一系列后续的处理,这个时候,用单进程?你可以等到明年今天了...所以应该使用pcntl函数了. 下面我们来看个实例 代码 <?php $arChildId = array(); for($i = 0; $i < 10; $i++) { $iPid = pcntl_fork(); if($iPid == -1) { die('can\'t be forked.'); } if($iPid) {

  • PHP多进程编程之僵尸进程问题的理解

    PHP多进程编程之僵尸进程问题的理解 使用pcntl_fork函数可以让PHP实现多进程并发或者异步处理的效果:http://www.jb51.net/article/125789.htm 那么问题是我们产生的进程需要去控制,而不能置之不理.最基本的方式就是fork进程和杀死进程. 通过利用pcntl_fork函数,我们已经有了新的子进程,而子进程接下来完成我们需要处理的内容,那么我们就暂且叫做service()吧,而且我们需要很多个service()进行处理,再次参照我们之前的需求,父进程需要

  • PHP基于文件锁解决多进程同时读写一个文件问题示例

    本文实例讲述了PHP基于文件锁解决多进程同时读写一个文件问题.分享给大家供大家参考,具体如下: 首先PHP是支持进程的而不支持多线程(这个先搞清楚了),如果是对于文件操作,其实你只需要给文件加锁就能解决,不需要其它操作,PHP的flock已经帮你搞定了. 用flock在写文件前先锁上,等写完后解锁,这样就实现了多线程同时读写一个文件避免冲突.大概就是下面这个流程 /* *flock(file,lock,block) *file 必需,规定要锁定或释放的已打开的文件 *lock 必需.规定要使用哪

  • PHP多进程编程总结(推荐)

    1. 准备 在动手之前,请确定你用的不是M$ Windows平台(因为我没有Windows).Linux / BSD / Unix应该都是没问题的.确认好了工作环境以后一起来看看我们需要的PHP模块是否都有.打开终端输入下面的命令: $ php -m 这个命令检查并打印当前PHP所有开启的扩展,看一下pcntl和posix是否在输出的列表中. 1.1. pcntl 如果找不到pcntl,八成是编译的时候没把这个扩展编译进去.如果你和我一样是编译安装的PHP,那么需要重新编译安装PHP.在配置的时

  • PHP并发多进程处理利器Gearman使用介绍

    工作中我们有时候会遇到比如需要同时发布数据到多个个服务器上,或者同时处理多个任务.可以使用PHP的curl_multi的方式并发处理请求,但是由于网络和数据以及各个服务器等等的一些情况导致这种并发处理的响应时间很慢,因为在并发请求的过程中还包括记录日志,处理数据等逻辑,等待处理结果并返回,所以也不能友好的满足后台操作的体验. 现在有另外一种方案,利Gearman来实现并发的需求.通过Client将请求发送到Gearman的Jobs,在每个Work中来再来进行curl_multi和数据处理和日志等

  • Java 高并发二:多线程基础详细介绍

    本系列基于炼数成金课程,为了更好的学习,做了系列的记录. 本文主要介绍 1.什么是线程 2.线程的基本操作 3.守护线程 4.线程优先级 5.基本的线程同步操作 1. 什么是线程 线程是进程内的执行单元 某个进程当中都有若干个线程. 线程是进程内的执行单元. 使用线程的原因是,进程的切换是非常重量级的操作,非常消耗资源.如果使用多进程,那么并发数相对来说不会很高.而线程是更细小的调度单元,更加轻量级,所以线程会较为广泛的用于并发设计. 在Java当中线程的概念和操作系统级别线程的概念是类似的.事

  • Java 高并发四:无锁详细介绍

    在[高并发Java 一] 前言中已经提到了无锁的概念,由于在jdk源码中有大量的无锁应用,所以在这里介绍下无锁. 1 无锁类的原理详解 1.1 CAS CAS算法的过程是这样:它包含3个参数CAS(V,E,N).V表示要更新的变量,E表示预期值,N表示新值.仅当V 值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么 都不做.最后,CAS返回当前V的真实值.CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成 操作.当多个线程同时使用CAS操

  • Java并发编程之栅栏(CyclicBarrier)实例介绍

    栅栏类似闭锁,但是它们是有区别的. 1.闭锁用来等待事件,而栅栏用于等待其他线程.什么意思呢?就是说闭锁用来等待的事件就是countDown事件,只有该countDown事件执行后所有之前在等待的线程才有可能继续执行;而栅栏没有类似countDown事件控制线程的执行,只有线程的await方法能控制等待的线程执行. 2.CyclicBarrier强调的是n个线程,大家相互等待,只要有一个没完成,所有人都得等着. 场景分析:10个人去春游,规定达到一个地点后才能继续前行.代码如下 复制代码 代码如

  • Java并发容器介绍

    目录 1.原子类 2.锁 3.并发容器 4.List接口下 5.Map接口下 6.Set接口下 7.Queue接口下 Java并发包(concurrent)是Java用来处理并发问题的利器,该并发包中主要有原子类,锁(lock),并发容器类等等.本系列博客主要就是介绍并发包中一些常用的并发容器,常用的类.那么就让我们一起来揭开并发包的面纱吧. 环境: 基于JDK1.8 1.原子类 首先登场的就是我们的原子类.啥是原子类?原子类用啥用? 第一个问题,啥是原子类:操作具有原子性的类,我们称之为原子类

  • 关于Java 并发的 CAS

    目录 一.为什么要无锁 二.什么是CAS? 三.Java 中的CAS 四.CAS存在的问题 1.自旋的劣势 2.ABA 问题 3.尝试应用 4.CAS 源码 一.为什么要无锁 我们一想到在多线程下保证安全的方式头一个要拎出来的肯定是锁,不管从硬件.操作系统层面都或多或少在使用锁.锁有什么缺点吗?当然有了,不然 JDK 里为什么出现那么多各式各样的锁,就是因为每一种锁都有其优劣势. 使用锁就需要获得锁.释放锁,CPU 需要通过上下文切换和调度管理来进行这个操作,对于一个 「独占锁」 而言一个线程在

  • 基于存储过程的详细介绍

    存储过程简介 --------------------------------------------------------------------------------什么是存储过程:存储过程可以说是一个记录集吧,它是由一些T-SQL语句组成的代码块,这些T-SQL语句代码像一个方法一样实现一些功能(对单表或多表的增删改查),然后再给这个代码块取一个名字,在用到这个功能的时候调用他就行了. 存储过程的好处: 1.由于数据库执行动作时,是先编译后执行的.然而存储过程是一个编译过的代码块,所以

  • Python tornado队列示例-一个并发web爬虫代码分享

    Queue Tornado的tornado.queue模块为基于协程的应用程序实现了一个异步生产者/消费者模式的队列.这与python标准库为多线程环境实现的queue模块类似. 一个协程执行到yieldqueue.get会暂停,直到队列中有条目.如果queue有上限,一个协程执行yieldqueue.put将会暂停,直到队列中有空闲的位置. 在一个queue内部维护了一个未完成任务的引用计数,每调用一次put操作便会增加引用计数,而调用task_done操作将会减少引用计数. 下面是一个简单的

  • python如何使用socketserver模块实现并发聊天

    这篇文章主要介绍了python如何使用socketserver模块实现并发聊天,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 利用socketserver模块很容易实现并发功能,下面的server.py和client.py程序实现了这一功能. 代码如下 #server.pyimport socketserver class MyServer(socketserver.BaseRequestHandler): def handle(self):

  • 如何使用Python多线程测试并发漏洞

    这篇文章主要介绍了如何使用Python多线程测试并发漏洞,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 需求介绍 有时候想看看Web应用在代码或者数据库层有没有加锁,比如在一些支付.兑换类的场景,通过多线程并发访问的测试方式可以得到一个结论. 步骤 1. Burp Suite安装插件 安装一个Copy As Python-Requests插件,提高编码效率: 2. 拦截包并拷贝发包的代码 打开一个文本编辑器,右键粘贴出来: import req

随机推荐