PHP中使用协同程序实现合作多任务第1/2页

PHP5.5一个比较好的新功能是实现对生成器和协同程序的支持。对于生成器,PHP的文档和各种其他的博客文章(就像这一个或这一个)已经有了非常详细的讲解。协同程序相对受到的关注就少了,所以协同程序虽然有很强大的功能但也很难被知晓,解释起来也比较困难。

这篇文章指导你通过使用协同程序来实施任务调度,通过实例实现对技术的理解。我将在前三节做一个简单的背景介绍。如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节。

生成器

生成器最基本的思想也是一个函数,这个函数的返回值是依次输出,而不是只返回一个单独的值。或者,换句话说,生成器使你更方便的实现了迭代器接口。下面通过实现一个xrange函数来简单说明:

代码如下:

<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}

foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

上面这个xrange()函数提供了和PHP的内建函数range()一样的功能。但是不同的是range()函数返回的是一个包含属组值从1到100万的数组(注:请查看手册)。而xrange()函数返回的是依次输出这些值的一个迭代器,而且并不会真正以数组形式计算。

这种方法的优点是显而易见的。它可以让你在处理大数据集合的时候不用一次性的加载到内存中。甚至你可以处理无限大的数据流。

当然,也可以不同通过生成器来实现这个功能,而是可以通过继承Iterator接口实现。通过使用生成器实现起来会更方便,而不用再去实现iterator接口中的5个方法了。

生成器为可中断的函数
要从生成器认识协同程序,理解它们内部是如何工作的非常重要:生成器是可中断的函数,在它里面,yield构成了中断点。

紧接着上面的例子,如果你调用xrange(1,1000000)的话,xrange()函数里代码没有真正地运行。相反,PHP只是返回了一个实现了迭代器接口的 生成器类实例:

代码如下:

<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)

你对某个对象调用迭代器方法一次,其中的代码运行一次。例如,如果你调用$range->rewind(),那么xrange()里的代码运行到控制流 第一次出现yield的地方。在这种情况下,这就意味着当$i=$start时yield $i才运行。传递给yield语句的值是使用$range->current()获取的。

为了继续执行生成器中的代码,你必须调用$range->next()方法。这将再次启动生成器,直到yield语句出现。因此,连续调用next()和current()方法 你将能从生成器里获得所有的值,直到某个点没有再出现yield语句。对xrange()来说,这种情形出现在$i超过$end时。在这中情况下, 控制流将到达函数的终点,因此将不执行任何代码。一旦这种情况发生,vaild()方法将返回假,这时迭代结束。

协程

协程给上面功能添加的主要东西是回送数据给生成器的能力。这将把生成器到调用者的单向通信转变为两者之间的双向通信。
通过调用生成器的send()方法而不是其next()方法传递数据给协程。下面的logger()协程是这种通信如何运行的例子:

代码如下:

<?php

function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}

$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')

正如你能看到,这儿yield没有作为一个语句来使用,而是用作一个表达式。即它有一个返回值。yield的返回值是传递给send()方法的值。 在这个例子里,yield将首先返回"Foo",然后返回"Bar"。

上面的例子里yield仅作为接收者。混合两种用法是可能的,即既可接收也可发送。接收和发送通信如何进行的例子如下:

代码如下:

<?php

function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}

$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (the first var_dump in gen)
                              // string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2"   (again from within gen)
                              // NULL               (the return value of ->send())

马上理解输出的精确顺序有点困难,因此确定你知道为什按照这种方式输出。我愿意特别指出的有两点:第一点,yield表达式两边使用 圆括号不是偶然。由于技术原因(虽然我已经考虑为赋值增加一个异常,就像Python那样),圆括号是必须的。第二点,你可能已经注意到 调用current()之前没有调用rewind()。如果是这么做的,那么已经隐含地执行了rewind操作。

多任务协作

如果阅读了上面的logger()例子,那么你认为“为了双向通信我为什么要使用协程呢? 为什么我不能只用常见的类呢?”,你这么问完全正确。上面的例子演示了基本用法,然而上下文中没有真正的展示出使用协程的优点。这就是列举许多协程例子的理由。正如上面介绍里提到的,协程是非常强大的概念,不过这样的应用很稀少而且常常十分复杂。给出一些简单而真实的例子很难。

在这篇文章里,我决定去做的是使用协程实现多任务协作。我们尽力解决的问题是你想并发地运行多任务(或者“程序”)。不过处理器在一个时刻只能运行一个任务(这篇文章的目标是不考虑多核的)。因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”。

多任务协作这个术语中的“协作”说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样它就可以运行其他任务了。这与“抢占”多任务相反,抢占多任务是这样的:调度器可以中断运行了一段时间的任务,不管它喜欢还是不喜欢。协作多任务在Windows的早期版本(windows95)和Mac OS中有使用,不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动传回 控制的话,那么坏行为的软件将很容易为自身占用整个CPU,不与其他任务共享。

这个时候你应当明白协程和任务调度之间的联系:yield指令提供了任务中断自身的一种方法,然后把控制传递给调度器。因此协程可以运行多个其他任务。更进一步来说,yield可以用来在任务和调度器之间进行通信。

我们的目的是 对 “任务”用更轻量级的包装的协程函数:

代码如下:

<?php

class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

public function getTaskId() {
        return $this->taskId;
    }

public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }

public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

public function isFinished() {
        return !$this->coroutine->valid();
    }
}

一个任务是用 任务ID标记一个协程。使用setSendValue()方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个)。 run()函数确实没有做什么,除了调用send()方法的协同程序。要理解为什么添加beforeFirstYieldflag,需要考虑下面的代码片段:

代码如下:

<?php

function gen() {
    yield 'foo';
    yield 'bar';
}

$gen = gen();
var_dump($gen->send('something'));

// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:
$gen->rewind();
var_dump($gen->send('something'));

// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!

通过添加 beforeFirstYieldcondition 我们可以确定 first yield 的值 被返回。

调度器现在不得不比多任务循环要做稍微多点了,然后才运行多任务:

代码如下:

<?php

class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

public function __construct() {
        $this->taskQueue = new SplQueue();
    }

public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }

public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();

if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务映射数组里。接着它通过把任务放入任务队列里来实现对任务的调度。接着run()方法扫描任务队列,运行任务。如果一个任务结束了,那么它将从队列里删除,否则它将在队列的末尾再次被调度。
 让我们看看下面具有两个简单(并且没有什么意义)任务的调度器:

代码如下:

<?php

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task1());
$scheduler->newTask(task2());

$scheduler->run();

两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器。输出结果如下:

代码如下:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的,接着第二个任务结束后,只有第一个任务继续运行。

与调度器之间通信

既然调度器已经运行了,那么我们就转向日程表的下一项:任务和调度器之间的通信。我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用。我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上。因此为了执行特权级别的操作(如杀死另一个进程),就不得不以某种方式把控制传回给内核,这样内核就可以执行所说的操作了。再说一遍,这种行为在内部是通过使用中断指令来实现的。过去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。

我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样久允许它做它想做的任何事),我们将通过给yield表达式传递信息来与系统调用通信。这儿yield即是中断,也是传递信息给调度器(和从调度器传递出信息)的方法。

为了说明系统调用,我将对可调用的系统调用做一个小小的封装:

代码如下:

<?php

class SystemCall {
    protected $callback;

public function __construct(callable $callback) {
        $this->callback = $callback;
    }

public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback; // Can't call it directly in PHP :/
        return $callback($task, $scheduler);
    }
}

它将像其他任何可调用那样(使用_invoke)运行,不过它要求调度器把正在调用的任务和自身传递给这个函数。为了解决这个问题 我们不得不微微的修改调度器的run方法:

代码如下:

<?php
public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();

if ($retval instanceof SystemCall) {
            $retval($task, $this);
            continue;
        }

if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}

第一个系统调用除了返回任务ID外什么都没有做:

代码如下:

<?php
function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}

这个函数确实设置任务id为下一次发送的值,并再次调度了这个任务。由于使用了系统调用,所以调度器不能自动调用任务,我们需要手工调度任务(稍后你将明白为什么这么做)。要使用这个新的系统调用的话,我们要重新编写以前的例子:

代码如下:

<?php

function task($max) {
    $tid = (yield getTaskId()); // <-- here's the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task(10));
$scheduler->newTask(task(5));

$scheduler->run();

这段代码将给出与前一个例子相同的输出。注意系统调用同其他任何调用一样正常地运行,不过预先增加了yield。要创建新的任务,然后再杀死它们的话,需要两个以上的系统调用:  


代码如下:

<?php

function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}

function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            $task->setSendValue($scheduler->killTask($tid));
            $scheduler->schedule($task);
        }
    );
}

killTask函数需要在调度器里增加一个方法:

代码如下:

<?php

public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;
    }

unset($this->taskMap[$tid]);

// This is a bit ugly and could be optimized so it does not have to walk the queue,
    // but assuming that killing tasks is rather rare I won't bother with it now
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {
            unset($this->taskQueue[$i]);
            break;
        }
    }

return true;
}

当前1/2页 12下一页阅读全文

(0)

相关推荐

  • Python多线程结合队列下载百度音乐的方法

    本文实例讲述了Python多线程结合队列下载百度音乐的方法.分享给大家供大家参考.具体如下: 一直想做个下载音乐的脚本,后来决定就拿百度音乐开刀,经过多次分析,终于制作了一个下载百度音乐的脚本,目前只默认下载第一页,童鞋们可以自由拓展. 适用Windows和Linux平台.依赖BeautifulSoup这个库,主要对HTML进行解析 #!/usr/bin/python # -*- coding: utf-8 -*- ''' 百度中批量下载某歌手的歌(目前只下载第一页,可以自行拓展) @autho

  • C#多线程处理多个队列数据的方法

    本文实例讲述了C#多线程处理多个队列数据的方法.分享给大家供大家参考.具体实现方法如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Collections; using System.Windows.Forms; namespace ThredProcessQueue { //用于顯示狀態的代理

  • C#如何对多线程、多任务管理(demo)

    下面一段内容有项目需求有项目分析,通过一个小demo给大家展示下C#如何对多线程.多任务管理的. 项目需求:假设多个任务需要执行,每个任务不是一时半会能完成(需要能看到中间执行状况): 多个任务 根据条件不同 可能需要不同的处理 项目分析: 多线程并发执行多任务: 对任务进行管理,追踪中间执行状态: 运用策略模式抽象执行类: public enum TaskStatus { wait = 0, working = 1, stop = 2, suspend = 3, complete = 4, f

  • c#实现多线程局域网聊天系统

    觉得好有点帮助就顶一下啦. socke编程,支持多客户端,多线程操作避免界面卡死. 开启socket private void button1_Click(object sender, EventArgs e) { try { int port = int.Parse(txt_port.Text); string host = txt_ip.Text; //创建终结点 IPAddress ip = IPAddress.Parse(host); IPEndPoint ipe = new IPEnd

  • php多任务程序实例解析

    本文以实例简单解析了php多任务程序的实现方法,具体代码如下: <?php error_reporting(E_ALL); set_time_limit(0); /** * php多任务程序的实现 * 借助proc_open * 其实该叫进程(process) * 能启动多进程,你可以使用你的想象力做你想做的了,以后再写个能用的 * 如果你是在linux上跑php,并且启用pcntl模块后,使用pcntl函数该更好 * */ class Thread { protected $_pref; //

  • Python多线程和队列操作实例

    Python3,开一个线程,间隔1秒把一个递增的数字写入队列,再开一个线程,从队列中取出数字并打印到终端 复制代码 代码如下: #! /usr/bin/env python3 import time import threading import queue # 一个线程,间隔一定的时间,把一个递增的数字写入队列 # 生产者 class Producer(threading.Thread): def __init__(self, work_queue):         super().__in

  • C#实现多线程的Web代理服务器实例

    本文实例讲述了C#实现多线程的Web代理服务器.分享给大家供大家参考.具体如下: /** Proxy.cs: C# Programming Tips & Techniques by Charles Wright, Kris Jamsa Publisher: Osborne/McGraw-Hill (December 28, 2001) ISBN: 0072193794 */ // Proxy.cs -- Implements a multi-threaded Web proxy server /

  • PHP中使用协同程序实现合作多任务第1/2页

    PHP5.5一个比较好的新功能是实现对生成器和协同程序的支持.对于生成器,PHP的文档和各种其他的博客文章(就像这一个或这一个)已经有了非常详细的讲解.协同程序相对受到的关注就少了,所以协同程序虽然有很强大的功能但也很难被知晓,解释起来也比较困难. 这篇文章指导你通过使用协同程序来实施任务调度,通过实例实现对技术的理解.我将在前三节做一个简单的背景介绍.如果你已经有了比较好的基础,可以直接跳到"协同多任务处理"一节. 生成器 生成器最基本的思想也是一个函数,这个函数的返回值是依次输出,

  • Lua中的协同程序详解

    前言 协同程序与线程差不多,也就是一条执行序列,拥有自己独立的栈.局部变量和指令指针,同时又与其它协同程序共享全局变量和其它大部分东西.从概念上讲,线程与协同程序的主要区别在于,一个具有多个线程的程序可以同时运行几个线程,而协同程序却需要彼此协作的运行.就是说,一个具有多个协同程序的程序在任意时刻只能运行一个协同程序,并且正在运行的协同程序只会在其显式地要求挂起时,它的执行才会暂停. 协同程序基础 Lua将所有关于协同程序的函数放置在一个名为"coroutine"的table中.函数c

  • 举例详解Lua中的协同程序编程

    协同程序是协同的性质,可以把两个或更多的方法以可控制的方式执行.随着协同程序,在任何给定的时间,只有其协同程序运行之一,这在运行协同程序只能暂停其执行时,明确要求暂停. 上述定义可能看起来模糊.来告诉它更清楚,假设我们有两个方法,一个主程序方法和协同程序.当我们使用恢复功能调用协程,其开始执行,当我们调用yield功能,暂停执行.再次同协程可以继续从它被暂停的另一个恢复功能调用执行.这个过程可以继续,直到执行了协程的结束. 协同程序可用的功能 下表列出了在Lua协同程序及其相应的使用所有的可用功

  • Lua中的协同程序探究

    哎,周五晚上我都还这么努力看书,真是好孩子.(小若:不想吐槽了) 其实我都准备玩游戏看电影去的了,但是这书就摆在桌子上,而且正对着我,就想着,扫两眼吧. 结果一扫就不对劲了,因为这内容有点绕,有点小混乱,如果我现在不记录下来的话,下周一可能又要重新看一次了.   好吧,今天我们来聊聊协同程序. 1.什么是协同程序(coroutinue) 大家都知道线程吧?都知道多线程吧?协同程序就和这线程差不多,但是又有比较明显的区别. 多个协同程序在任意时刻只能执行一个,虽然线程在某种意义上也是这样,但这不是

  • Lua中的协同程序之resume-yield间的数据返回研究

    这次要介绍几个其实很简单,但是一定要小心的返回值规则. 1.resume的参数 resume函数除了第一个参数是协同程序外,还能继续传其他参数,如下代码: 复制代码 代码如下: local co = coroutine.create(function(name)         print(name);     end);     coroutine.resume(co, "resume param"); resume第二个参数为"resume parame",这个

  • Lua协同程序(COROUTINE)运行步骤分解

    这是一段分析 lua 协程(协同程序,coroutine)的代码,来自 Lua reference manual interface (略有修改): 复制代码 代码如下: function foo (a)     print("foo", a)     return coroutine.yield(2*a) end co = coroutine.create(function (a,b)    print("co-body1", a, b)    local r =

  • Lua协同程序函数coroutine使用实例

    协程是协同程序的简称,顾名思义,就是协同工作的程序.协程拥有自己独立的桟.局部变量和PC计数器,同时又与其他协同程序共享全局变量和其他大部分东西: 协程与线程的主要区别在于,一个多线程程序可以同时运行几个线程(并发执行.抢占),而协同程序却需要彼此协作地运行,即一个多协程程序在任意时刻只能运行一个协程,并且正在执行的协程只会在其显式地要求挂起(suspend)时,它的执行才会暂停(无抢占.无并发). Lua中所有与协程相关的函数都在coroutine(一个table)中: 函数create用于创

  • Lua协同程序coroutine的简介及优缺点

    什么是协同(coroutine)? Lua 协同程序(coroutine)与线程比较类似:拥有独立的堆栈,独立的局部变量,独立的指令指针,同时又与其它协同程序共享全局变量和其它大部分东西. 协同是非常强大的功能,但是用起来也很复杂. 线程和协同程序区别 协程是编译器级别的,线程是操作系统级别的,在多处理器情况下,多线程程序同时运行多个线程:而协同程序是通过协作来完成,在任一指定时刻只有一个协同程序在运行,并且这个正在运行的协同程序只在必要时才会被挂起.这样Lua的协程就不能利用现在多核技术了.

  • 从 HTA 中启动应用程序

    如何从 HTA 中启动应用程序? 问: 您好,脚本专家!对于 HTA,有没有什么可以替代 Wscript.Shell 命令?我需要运行某个应用程序并指定要打开的文件. -- DL 答: 您好,DL.是的,我们确实知道这样的命令,可以在 HTA 中使用并可以替代 Wscript.Shell 命令,我们一会儿就会向您介绍.不过,在介绍它之前,我们应注意到您实际上可以在 HTA 中使用 Wscript.Shell 对象.这是一个常会引发混淆之处:因为您在 HTA 中无法使用某些命令(如 Wscript

  • Android中引用其他程序的文本资源超简单方法

    在Android中引用其他程序的文本资源并不是很常见,但是有时候还是很是有需要的,通常引用的多半是系统的程序的文本资源. 下面以一个超简单的例子,来展示以下如何实现. 复制代码 代码如下: public void testUseAndroidString() { Context context = getContext();     Resources res = null;     try {         //I want to use the clear_activities strin

随机推荐