laravel源码分析队列Queue方法示例

目录
  • 前言
  • 队列任务的创建
  • 队列任务的分发

前言

队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。本文我们就来分析下队列创建和执行的源码。

队列任务的创建

先通过命令创建一个 Job 类,成功之后会创建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php。

> php artisan make:job DemoJob

> Job created successfully.

下面我们来分析一下 Job 类的具体生成过程。

执行 php artisan make:job DemoJob 后,会触发调用如下方法。

laravel-src/laravel/vendor/laravel/framework/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

/**
 * Register the command.
 * [A] make:job 时触发的方法
 * @return void
 */
protected function registerJobMakeCommand()
{
    $this->app->singleton('command.job.make', function ($app) {
        return new JobMakeCommand($app['files']);
    });
}

接着我们来看下 JobMakeCommand 这个类,这个类里面没有过多的处理逻辑,处理方法在其父类中。

 class JobMakeCommand extends GeneratorCommand

我们直接看父类中的处理方法,GeneratorCommand->handle(),以下是该方法中的主要方法。

public function handle()
{
    // 获取类名
    $name = $this->qualifyClass($this->getNameInput());
    // 获取文件路径
    $path = $this->getPath($name);
    // 创建目录和文件
    $this->makeDirectory($path);
    // buildClass() 通过模板获取新类文件的内容
    $this->files->put($path, $this->buildClass($name));
    // $this->type 在子类中定义好了,例如 JobMakeCommand 中 type = 'Job'
    $this->info($this->type.' created successfully.');
}

方法就是通过目录和文件,创建对应的类文件,至于新文件的内容,都是基于已经设置好的模板来创建的,具体的内容在 buildClass($name) 方法中。

 protected function buildClass($name)
{
    // 得到类文件模板,getStub() 在子类中有实现,具体看 JobMakeCommand
    $stub = $this->files->get($this->getStub());
    // 用实际的name来替换模板中的内容,都是关键词替换
    return $this->replaceNamespace($stub, $name)->replaceClass($stub, $name);
}

获取模板文件

protected function getStub()
{
    return $this->option('sync')
                    ? __DIR__.'/stubs/job.stub'
                    : __DIR__.'/stubs/job-queued.stub';
}

job.stub

 <?php
/**
* job 类的生成模板
*/
namespace DummyNamespace;

use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;

class DummyClass
{
    use Dispatchable, Queueable;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

job-queued.stub

<?php
/**
* job 类的生成模板
*/
namespace DummyNamespace;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class DummyClass implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

下面看一下前面我们创建的一个Job类,DemoJob.php,就是来源于模板 job-queued.stub。

<?php
/**
* job 类的生成模板
*/
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class DemoJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

至此,我们已经大致明白了队列任务类是如何创建的了。下面我们来分析下其是如何生效运行的。

队列任务的分发

任务类创建后,我们就可以在需要的地方进行任务的分发,常见的方法如下:

DemoJob::dispatch(); // 任务分发
DemoJob::dispatchNow(); // 同步调度,队列任务不会排队,并立即在当前进程中进行

下面先以 dispatch() 为例分析下分发过程。

trait Dispatchable
{
    public static function dispatch()
    {
        return new PendingDispatch(new static(...func_get_args()));
    }
}
 class PendingDispatch
{
    protected $job;

    public function __construct($job)
    {   echo '[Max] ' . 'PendingDispatch ' . '__construct' . PHP_EOL;
        $this->job = $job;
    }
    public function __destruct()
    {   echo '[Max] ' . 'PendingDispatch ' . '__destruct' . PHP_EOL;
        app(Dispatcher::class)->dispatch($this->job);
    }
}

重点是 app(Dispatcher::class)->dispatch($this->job) 这部分。

我们先来分析下前部分 app(Dispatcher::class),它是在 laravel 框架中自带的 BusServiceProvider 中向 $app 中注入的。

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });
    }
}

看一下 Dispatcher 的构造方法,至此,我们已经知道前半部分 app(Dispatcher::class) 是如何来的了。

class Dispatcher implements QueueingDispatcher
{
    protected $container;
    protected $pipeline;
    protected $queueResolver;

    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        /**
         * Illuminate/Bus/BusServiceProvider.php->register()中
         * $queueResolver 传入的是一个闭包
         * function ($connection = null) use ($app) {
         *   return $app[QueueFactoryContract::class]->connection($connection);
         * }
         */
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }
    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        		// 将 $command 存入队列
            return $this->dispatchToQueue($command);
        }
        return $this->dispatchNow($command);
    }
}

BusServiceProvider 中注册了 Dispatcher::class ,然后 app(Dispatcher::class)->dispatch($this->job) 调用的即是 Dispatcher->dispatch()。

 public function dispatchToQueue($command)
{
    // 获取任务所属的 connection
    $connection = $command->connection ?? null;
    /*
     * 获取队列实例,根据config/queue.php中的配置
     * 此处我们配置 QUEUE_CONNECTION=redis 为例,则获取的是RedisQueue
     * 至于如何通过 QUEUE_CONNECTION 的配置获取 queue ,此处先跳过,本文后面会具体分析。
     */
    $queue = call_user_func($this->queueResolver, $connection);
    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }
    // 我们创建的DemoJob无queue方法,则不会调用
    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }
    // 将 job 放入队列
    return $this->pushCommandToQueue($queue, $command);
}
protected function pushCommandToQueue($queue, $command)
{
    // 在指定了 queue 或者 delay 时会调用不同的方法,基本大同小异
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }
    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }
    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }
    // 此处我们先看最简单的无参数时的情况,调用push()
    return $queue->push($command);
}

笔者的配置是 QUEUE_CONNECTION=redis ,估以此来分析,其他类型的原理基本类似。

配置的是 redis 时, $queue 是 RedisQueue 实例,下面我们看下 RedisQueue->push() 的内容。

Illuminate/Queue/RedisQueue.php

public function push($job, $data = '', $queue = null)
{
    /**
     * 获取队列名称
     * var_dump($this->getQueue($queue));
     * 创建统一的 payload,转成 json
     * var_dump($this->createPayload($job, $this->getQueue($queue), $data));
     */
    // 将任务和数据存入队列
    return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
    // 写入redis中
    $this->getConnection()->eval(
        LuaScripts::push(), 2, $this->getQueue($queue),
        $this->getQueue($queue).':notify', $payload
    );
    // 返回id
    return json_decode($payload, true)['id'] ?? null;
}

至此,我们已经分析完了任务是如何被加入到队列中的。

以上就是laravel源码分析队列Queue方法示例的详细内容,更多关于laravel源码分析队列Queue方法的资料请关注我们其它相关文章!

(0)

相关推荐

  • Laravel 4.2 中队列服务(queue)使用感受

    这半个月,我参与重写了一个微信公众号后端系统,首次使用了laravel 4.2,以及laravel引以为傲的队列服务(queue). 由于整个系统涉及到多端交互,又有大量语音传输.处理的业务,我们在一些地方发现响应时间过长.之前的系统基于node.js和mongoDB,由于node天生就是异步,有守护进程,所以并没有出现过这个问题,而这次重写必然要引入异步流程了.Queue进入了我们的视线. 根据这一页几乎还全是英文的"中文文档" ,laravel恰好在4.2版本中刚刚引入了redis

  • PHP的Laravel框架中使用消息队列queue及异步队列的方法

    queue配置 首先说明一下我之前的项目中如何使用queue的. 我们现在的项目都是用的symfony,老一点的项目用的symfony1.4,新一点的项目用的都是symfony2.symfony用起来整体感觉还是很爽的,尤其symfony2,整体上来讲使用了很多java里面框架的设计思想.但是他不支持queue.在symfony,我们使用queue也经历了几个过程.最开始使用张堰同学的httpsqs.这个简单使用,但是存在单点.毕竟我们的项目还是正式对外服务的,所以我们研究了Apache旗下的开

  • Laravel使用Queue队列的技巧汇总

    前言 Laravel 队列为不同的后台队列服务提供统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列.队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和相应的时间. 队列配置文件存放在 config/queue.php .每一种队列驱动的配置都可以在该文件中找到,包括数据库,Beanstalkd ,Amazon SQS,Redis,以及同步(本地使用)驱动.其中还包含了一个 null 队列驱动用于那些放弃队列的任

  • Laravel框架中队列和工作(Queues、Jobs)操作实例详解

    在我们的web应用中,经常会遇到这样的情况: 用户在进行了某项操作后,我们需要在后台完成一个耗时且耗费资源的任务,以对应用户的操作. 通常来说,web应用中的操作都是同步的(synchronous),即用户的操作可以立即得到回馈. 但是在以上情况下,同步等待操作结果将是灾难性的.比如用户点击了申请密码重置邮件,倘若我们让用户一直停滞在等待页面,直至邮件发送成功,那么用户体验将非常地不好,因为有时候可能需要很长的时间才能将邮件发送完成. 从另一个角度来说,如果我们服务器处于高负荷的情况,当多个用户

  • laravel源码分析队列Queue方法示例

    目录 前言 队列任务的创建 队列任务的分发 前言 队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间.本文我们就来分析下队列创建和执行的源码. 队列任务的创建 先通过命令创建一个 Job 类,成功之后会创建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php. > php artisan make:job DemoJob > Job created suc

  • Vue3源码分析reactivity实现方法示例

    目录 深入分析对于map.set.weakMap.weakSet的响应式拦截 (1).mutableInstrumentations (2).shallowInstrumentations (3).readonlyInstrumentations (4).shallowReadonlyInstrumentations ref.computed等方法的实现 (1).ref与shallowRef源码解析 (2).toRefs (4).computed (5)其他api源码 最后总结: 深入分析对于m

  • java 源码分析Arrays.asList方法详解

    最近,抽空把java Arrays 工具类的asList 方法做了源码分析,在网上整理了相关资料,记录下来,希望也能帮助读者! Arrays工具类提供了一个方法asList, 使用该方法可以将一个变长参数或者数组转换成List . 其源代码如下: /** * Returns a fixed-size list backed by the specified array. (Changes to * the returned list "write through" to the arr

  • jQuery插件-jRating评分插件源码分析及使用方法

    该插件被广泛应用于各种需要评分的页面当中,今天作为学习,把源码拿出来分析一下,顺便学习其使用方法. 一.插件使用一览. 复制代码 代码如下: <div> <div>第一个例子</div> <div id="16_1" class="myRating"></div> </div> 复制代码 代码如下: <link href="Script/jRating/jRating.jquer

  • 解析ConcurrentHashMap: put方法源码分析

    上一章:预热(内部一些小方法分析) put()方法是并发HashMap源码分析的重点方法,这里涉及到并发扩容,桶位寻址等等- JDK1.8 ConcurrentHashMap结构图: 1.put方法源码解析 // 向并发Map中put一个数据 public V put(K key, V value) { return putVal(key, value, false); } // 向并发Map中put一个数据 // Key: 数据的键 // value:数据的值 // onlyIfAbsent:

  • Netty启动步骤绑定端口示例方法源码分析

    目录 绑定端口 我们继续跟第一小节的最初的doBind()方法 第二步, 获得channel 重点关注下doBind(localAddress)方法 最终会走到这一步, pipeline.fireChannelActive() 章节总结 前文传送门:Netty启动流程注册多路复用源码解析 绑定端口 上一小节我们学习了channel注册在selector的步骤, 仅仅做了注册但并没有监听事件, 事件是如何监听的呢? 我们继续跟第一小节的最初的doBind()方法 private ChannelFu

  • java编程Reference核心原理示例源码分析

    带着问题,看源码针对性会更强一点.印象会更深刻.并且效果也会更好.所以我先卖个关子,提两个问题(没准下次跳槽时就被问到). 我们可以用ByteBuffer的allocateDirect方法,申请一块堆外内存创建一个DirectByteBuffer对象,然后利用它去操作堆外内存.这些申请完的堆外内存,我们可以回收吗?可以的话是通过什么样的机制回收的? 大家应该都知道WeakHashMap可以用来实现内存相对敏感的本地缓存,为什么WeakHashMap合适这种业务场景,其内部实现会做什么特殊处理呢?

  • Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)

    通过前面三篇的分析,我们深入了解了AbstractQueuedSynchronizer的内部结构和一些设计理念,知道了AbstractQueuedSynchronizer内部维护了一个同步状态和两个排队区,这两个排队区分别是同步队列和条件队列.我们还是拿公共厕所做比喻,同步队列是主要的排队区,如果公共厕所没开放,所有想要进入厕所的人都得在这里排队.而条件队列主要是为条件等待设置的,我们想象一下如果一个人通过排队终于成功获取锁进入了厕所,但在方便之前发现自己没带手纸,碰到这种情况虽然很无奈,但是它

  • jQuery源码解读之addClass()方法分析

    本文较为详细的分析了jQuery源码解读之addClass()方法.分享给大家供大家参考.具体分析如下: 给jQuery原型对象扩展addClass功能,jQuery.fn就是jQuery.prototype 复制代码 代码如下: jQuery.fn.extend({ /* 可以看出这是一个函数名叫addClass的插件方法. */     addClass: function( value ) {         var classes, elem, cur, clazz, j, finalV

  • jQuery源码解读之hasClass()方法分析

    本文较为详细的分析了jQuery源码解读之hasClass()方法.分享给大家供大家参考.具体分析如下: 复制代码 代码如下: jQuery.fn.extend({     hasClass: function( selector ) { //将要检查的类名selector赋值给className, l为选择器选择的当前要检查的jQuery对象数组的长度.         var className = " " + selector + " ",          

随机推荐