Skip to content

队列

介绍

在构建 Web 应用程序时,您可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。幸运的是,Laravel 允许您轻松创建可以在后台处理的队列作业。通过将耗时的任务移到队列中,您的应用程序可以以极快的速度响应 Web 请求,并为客户提供更好的用户体验。

Laravel 队列提供了一个统一的队列 API,支持多种不同的队列后端,例如 Amazon SQSRedis 或关系数据库。

Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到每个队列驱动程序的连接配置,这些驱动程序包括数据库、Amazon SQSRedisBeanstalkd 驱动程序,以及一个同步驱动程序,该驱动程序将在本地开发期间立即执行作业。还包括一个 null 队列驱动程序,该驱动程序会丢弃队列作业。

lightbulb

Laravel 现在提供了 Horizon,这是一个用于 Redis 驱动队列的漂亮仪表板和配置系统。有关更多信息,请查看完整的 Horizon 文档

连接与队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别非常重要。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的堆栈或队列作业的堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue 属性中定义的队列中:

php
use App\Jobs\ProcessPodcast;

// 该作业被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 该作业被发送到默认连接的“emails”队列...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您按优先级指定应处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个工作者,给予它们更高的处理优先级:

shell
php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

要使用 database 队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table Artisan 命令。创建迁移后,您可以使用 migrate 命令迁移数据库:

shell
php artisan queue:table

php artisan migrate

最后,不要忘记通过更新应用程序的 .env 文件中的 QUEUE_CONNECTION 变量来指示应用程序使用 database 驱动程序:

php
QUEUE_CONNECTION=database

Redis

要使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

exclamation

redis 队列驱动程序不支持 serializercompression Redis 选项。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

阻塞

使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在进入工作者循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据队列负载调整此值比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5,以指示驱动程序在等待作业可用时应阻塞五秒钟:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],
exclamation

block_for 设置为 0 将导致队列工作者无限期阻塞,直到作业可用。这也将阻止信号(如 SIGTERM)在处理下一个作业之前被处理。

其他驱动程序先决条件

以下依赖项是列出的队列驱动程序所需的。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 或 phpredis PHP 扩展

创建作业

生成作业类

默认情况下,应用程序的所有可队列作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时将创建它:

shell
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 该作业应被推送到队列以异步运行。

lightbulb

作业存根可以使用 存根发布 进行自定义。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在此示例中,我们假设我们管理一个播客发布服务,并且需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到队列作业的构造函数中。由于作业使用的 SerializesModels trait,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。

如果您的队列作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业负载发送到队列驱动程序。

handle 方法依赖注入

handle 方法在作业被队列处理时调用。请注意,我们能够在作业的 handle 方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,您可以自由地以任何方式调用 handle 方法。通常,您应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});
exclamation

二进制数据(如原始图像内容)应在传递给队列作业之前通过 base64_encode 函数进行处理。否则,作业在放入队列时可能无法正确序列化为 JSON。

队列关系

由于所有加载的 Eloquent 模型关系在作业被队列时也会被序列化,因此序列化的作业字符串有时可能会变得相当大。此外,当作业被反序列化并且模型关系从数据库中重新检索时,它们将被完整检索。在作业队列过程中应用的任何先前关系约束在作业反序列化时将不再适用。因此,如果您希望处理给定关系的子集,您应该在队列作业中重新约束该关系。

或者,为了防止关系被序列化,您可以在设置属性值时调用模型的 withoutRelations 方法。此方法将返回一个没有加载关系的模型实例:

php
/**
 * 创建一个新的作业实例。
 */
public function __construct(Podcast $podcast)
{
    $this->podcast = $podcast->withoutRelations();
}

如果您使用 PHP 构造函数属性提升,并且希望指示 Eloquent 模型不应序列化其关系,可以使用 WithoutRelations 属性:

php
use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建一个新的作业实例。
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast
) {
}

如果作业接收 Eloquent 模型的集合或数组而不是单个模型,则集合中的模型在作业反序列化和执行时将不会恢复其关系。这是为了防止处理大量模型的作业使用过多资源。

唯一作业

exclamation

唯一作业需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批处理中的作业。

有时,您可能希望确保在任何时候队列中只有一个特定作业实例。您可以通过在作业类上实现 ShouldBeUnique 接口来实现。这一接口不需要您在类上定义任何其他方法:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...
}

在上面的示例中,UpdateSearchIndex 作业是唯一的。因此,如果队列中已经有另一个作业实例并且尚未完成处理,则不会调度该作业。

在某些情况下,您可能希望定义一个特定的“键”来使作业唯一,或者您可能希望指定一个超时,以便作业不再保持唯一性。为此,您可以在作业类上定义 uniqueIduniqueFor 属性或方法:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例。
     *
     * @var \App\Product
     */
    public $product;

    /**
     * 作业唯一锁将被释放的秒数。
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * 获取作业的唯一 ID。
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 作业通过产品 ID 唯一。因此,具有相同产品 ID 的作业的新调度将被忽略,直到现有作业完成处理。此外,如果现有作业在一小时内未处理,唯一锁将被释放,并且可以将另一个具有相同唯一键的作业调度到队列。

exclamation

如果您的应用程序从多个 Web 服务器或容器调度作业,您应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定作业是否唯一。

保持作业唯一直到处理开始

默认情况下,唯一作业在作业完成处理或失败所有重试尝试后被“解锁”。然而,在某些情况下,您可能希望在作业处理之前立即解锁作业。为此,您的作业应实现 ShouldBeUniqueUntilProcessing 合约,而不是 ShouldBeUnique 合约:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一作业锁

在后台,当调度 ShouldBeUnique 作业时,Laravel 尝试获取一个带有 uniqueId 键的 。如果未获取到锁,则不会调度作业。此锁在作业完成处理或失败所有重试尝试时释放。默认情况下,Laravel 将使用默认缓存驱动程序获取此锁。然而,如果您希望使用其他驱动程序获取锁,可以定义一个 uniqueVia 方法,该方法返回应使用的缓存驱动程序:

php
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...

    /**
     * 获取唯一作业锁的缓存驱动程序。
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}
lightbulb

如果您只需要限制作业的并发处理,请使用 WithoutOverlapping 作业中间件。

加密作业

Laravel 允许您通过 加密 确保作业数据的隐私和完整性。要开始,只需将 ShouldBeEncrypted 接口添加到作业类中。一旦将此接口添加到类中,Laravel 将在将作业推送到队列之前自动加密您的作业:

php
<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

作业中间件

作业中间件允许您在队列作业的执行过程中包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 速率限制功能,每五秒钟只允许一个作业处理一次:

php
use Illuminate\Support\Facades\Redis;

/**
 * 执行作业。
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('获得锁...');

        // 处理作业...
    }, function () {
        // 无法获得锁...

        return $this->release(5);
    });
}

虽然此代码有效,但 handle 方法的实现变得嘈杂,因为它被 Redis 速率限制逻辑所干扰。此外,必须为我们希望速率限制的任何其他作业复制此速率限制逻辑。

相反,我们可以定义一个处理速率限制的作业中间件,而不是在 handle 方法中进行速率限制。Laravel 没有作业中间件的默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放置在 app/Jobs/Middleware 目录中:

php
<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理队列作业。
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // 获得锁...

                    $next($job);
                }, function () use ($job) {
                    // 无法获得锁...

                    $job->release(5);
                });
    }
}

如您所见,像 路由中间件 一样,作业中间件接收正在处理的作业和一个回调,该回调应被调用以继续处理作业。

创建作业中间件后,可以通过从作业的 middleware 方法返回它们来将它们附加到作业上。此方法在 make:job Artisan 命令生成的作业中不存在,因此您需要手动将其添加到作业类中:

php
use App\Jobs\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}
lightbulb

作业中间件也可以分配给可队列的事件监听器、邮件和通知。

速率限制

虽然我们刚刚演示了如何编写自己的速率限制作业中间件,但 Laravel 实际上包括一个速率限制中间件,您可以利用它来限制作业的速率。像 路由速率限制器 一样,作业速率限制器是使用 RateLimiter facade 的 for 方法定义的。

例如,您可能希望允许用户每小时备份一次数据,而对高级客户没有此类限制。为此,您可以在 AppServiceProviderboot 方法中定义一个 RateLimiter

php
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * 启动任何应用程序服务。
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的示例中,我们定义了一个每小时的速率限制;然而,您可以轻松地使用 perMinute 方法定义基于分钟的速率限制。此外,您可以将任何值传递给速率限制的 by 方法;然而,此值最常用于按客户分段速率限制:

php
return Limit::perMinute(50)->by($job->user->id);

定义速率限制后,您可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将速率限制器附加到作业上。每次作业超过速率限制时,此中间件将根据速率限制持续时间将作业释放回队列,并适当延迟。

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将速率限制的作业释放回队列仍会增加作业的总 attempts 数量。您可能希望相应地调整作业类上的 triesmaxExceptions 属性。或者,您可能希望使用 retryUntil 方法 来定义作业不再尝试的时间。

如果您不希望在速率限制时重试作业,可以使用 dontRelease 方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}
lightbulb

如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,该中间件针对 Redis 进行了微调,比基本速率限制中间件更高效。

防止作业重叠

Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许您根据任意键防止作业重叠。这在队列作业修改一个应该只由一个作业一次修改的资源时非常有用。

例如,假设您有一个队列作业更新用户的信用评分,并且您希望防止相同用户 ID 的信用评分更新作业重叠。为此,您可以从作业的 middleware 方法返回 WithoutOverlapping 中间件:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

相同类型的任何重叠作业将被释放回队列。您还可以指定必须经过的秒数,然后才会再次尝试释放的作业:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果您希望立即删除任何重叠作业,以便它们不会被重试,可以使用 dontRelease 方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,您的作业可能会意外失败或超时,以至于锁未被释放。因此,您可以使用 expireAfter 方法显式定义锁过期时间。例如,下面的示例将指示 Laravel 在作业开始处理三分钟后释放 WithoutOverlapping 锁:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
exclamation

WithoutOverlapping 中间件需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。

在作业类之间共享锁键

默认情况下,WithoutOverlapping 中间件只会防止相同类的作业重叠。因此,尽管两个不同的作业类可能使用相同的锁键,但它们不会被阻止重叠。然而,您可以使用 shared 方法指示 Laravel 在作业类之间应用键:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...


    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...


    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

节流异常

Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许您节流异常。一旦作业抛出给定数量的异常,所有进一步的作业执行尝试将被延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。

例如,假设一个队列作业与第三方 API 交互,该 API 开始抛出异常。要节流异常,您可以从作业的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现 基于时间的尝试 的作业配对:

php
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5)];
}

/**
 * 确定作业应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(5);
}

中间件接受的第一个构造函数参数是作业可以抛出的异常数量,第二个构造函数参数是作业在被节流后应等待的分钟数。在上面的代码示例中,如果作业在 5 分钟内抛出 10 个异常,我们将等待 5 分钟,然后再尝试作业。

当作业抛出异常但异常阈值尚未达到时,作业通常会立即重试。然而,您可以通过在将中间件附加到作业时调用 backoff 方法来指定此类作业应延迟的分钟数:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}

在内部,此中间件使用 Laravel 的缓存系统实现速率限制,并且作业的类名用作缓存“键”。您可以在将中间件附加到作业时调用 by 方法来覆盖此键。如果您有多个作业与同一第三方服务交互,并且希望它们共享一个公共节流“桶”,这可能会很有用:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10))->by('key')];
}
lightbulb

如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,该中间件针对 Redis 进行了微调,比基本异常节流中间件更高效。

调度作业

编写作业类后,您可以使用作业本身的 dispatch 方法调度它。传递给 dispatch 方法的参数将传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

如果您希望有条件地调度作业,可以使用 dispatchIfdispatchUnless 方法:

php
ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,sync 驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台同步执行作业,这在本地开发期间通常很方便。如果您希望实际开始将作业排队以进行后台处理,可以在应用程序的 config/queue.php 配置文件中指定不同的队列驱动程序。

延迟调度

如果您希望指定作业在队列工作者处理之前不立即可用,可以在调度作业时使用 delay 方法。例如,让我们指定作业在调度后 10 分钟内不可用:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}
exclamation

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

在响应发送到浏览器后调度

或者,dispatchAfterResponse 方法会延迟调度作业,直到 HTTP 响应发送到用户的浏览器(如果您的 Web 服务器使用 FastCGI)。这仍然允许用户开始使用应用程序,即使队列作业仍在执行。通常,这仅应用于大约需要一秒钟的作业,例如发送电子邮件。由于它们在当前 HTTP 请求中处理,因此以这种方式调度的作业不需要队列工作者运行即可处理:

php
use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

您还可以 dispatch 一个闭包,并将 afterResponse 方法链接到 dispatch 助手,以便在 HTTP 响应发送到浏览器后执行闭包:

php
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果您希望立即(同步)调度作业,可以使用 dispatchSync 方法。使用此方法时,作业不会被排队,而是立即在当前进程中执行:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

作业与数据库事务

虽然在数据库事务中调度作业是完全可以的,但您应该特别注意确保您的作业能够成功执行。在事务中调度作业时,可能会在父事务提交之前由工作者处理作业。当这种情况发生时,您在数据库事务中对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。

幸运的是,Laravel 提供了几种方法来解决此问题。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项:

php
'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项为 true 时,您可以在数据库事务中调度作业;然而,Laravel 将等待打开的父数据库事务提交后再实际调度作业。当然,如果当前没有打开的数据库事务,作业将立即调度。

如果由于事务期间发生的异常而回滚事务,则在该事务期间调度的作业将被丢弃。

lightbulb

after_commit 配置选项设置为 true 还会导致任何队列事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后调度。

内联指定提交调度行为

如果您未将 after_commit 队列连接配置选项设置为 true,您仍然可以指示特定作业在所有打开的数据库事务提交后调度。为此,您可以将 afterCommit 方法链接到您的调度操作:

php
use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,您可以指示特定作业立即调度,而无需等待任何打开的数据库事务提交:

php
ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定在主作业成功执行后应按顺序运行的队列作业列表。如果序列中的一个作业失败,则不会运行其余作业。要执行队列作业链,您可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是构建在队列作业调度之上的较低级别组件:

php
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接作业类实例,您还可以链接闭包:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();
exclamation

使用 $this->delete() 方法删除作业不会阻止处理链接的作业。链只会在链中的作业失败时停止执行。

链连接和队列

如果您希望指定应为链接作业使用的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定队列连接和队列名称,除非队列作业明确分配了不同的连接/队列:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

链失败

在链接作业时,您可以使用 catch 方法指定在链中的作业失败时应调用的闭包。给定的回调将接收导致作业失败的 Throwable 实例:

php
use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的作业失败...
})->dispatch();
exclamation

由于链回调在稍后由 Laravel 队列序列化并执行,因此您不应在链回调中使用 $this 变量。

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以“分类”队列作业,甚至可以优先考虑分配给各种队列的工作者数量。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在调度作业时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,您可以通过在作业的构造函数中调用 onQueue 方法来指定作业的队列:

php
<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

调度到特定连接

如果您的应用程序与多个队列连接交互,可以使用 onConnection 方法指定将作业推送到哪个连接:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

您可以将 onConnectiononQueue 方法链接在一起,以指定作业的连接和队列:

php
ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

或者,您可以通过在作业的构造函数中调用 onConnection 方法来指定作业的连接:

php
<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大作业尝试次数/超时值

最大尝试次数

如果您的队列作业遇到错误,您可能不希望它无限期地重试。因此,Laravel 提供了多种方法来指定作业可以尝试的最大次数或时间。

指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将适用于工作者处理的所有作业,除非正在处理的作业指定了可以尝试的次数:

shell
php artisan queue:work --tries=3

如果作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请查阅 失败作业文档。如果 --tries=0 提供给 queue:work 命令,作业将无限期重试。

您可以通过在作业类本身上定义作业可以尝试的最大次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

如果您需要对特定作业的最大尝试次数进行动态控制,可以在作业上定义一个 tries 方法:

php
/**
 * 确定作业可以尝试的次数。
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义作业可以尝试的最大次数的替代方法,您可以定义作业不再尝试的时间。这允许在给定时间范围内尝试任意次数的作业。要定义作业不再尝试的时间,请在作业类中添加一个 retryUntil 方法。此方法应返回一个 DateTime 实例:

php
use DateTime;

/**
 * 确定作业应超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}
lightbulb

您还可以在 队列事件监听器 上定义 tries 属性或 retryUntil 方法。

最大异常

有时您可能希望指定作业可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接通过 release 方法释放),则应失败。为此,您可以在作业类上定义一个 maxExceptions 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 25;

    /**
     * 允许的最大未处理异常数。
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获得锁,处理播客...
        }, function () {
            // 无法获得锁...
            return $this->release(10);
        });
    }
}

在此示例中,如果应用程序无法获得 Redis 锁,作业将释放十秒钟,并将继续重试最多 25 次。然而,如果作业抛出三个未处理异常,它将失败。

超时

通常,您大致知道队列作业的预期执行时间。因此,Laravel 允许您指定“超时”值。默认情况下,超时值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理作业的工作者将以错误退出。通常,工作者将由 服务器上配置的进程管理器 自动重启。

可以使用 Artisan 命令行上的 --timeout 开关指定作业可以运行的最大秒数:

shell
php artisan queue:work --timeout=30

如果作业因持续超时而超过其最大尝试次数,它将被标记为失败。

您还可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以运行的最大秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不尊重您指定的超时。因此,在使用这些功能时,您应始终尝试使用其 API 指定超时值。例如,在使用 Guzzle 时,您应始终指定连接和请求超时值。

exclamation

必须安装 pcntl PHP 扩展才能指定作业超时。此外,作业的“超时”值应始终小于其 “重试后” 值。否则,作业可能会在实际完成执行或超时之前被重新尝试。

超时失败

如果您希望指示作业在超时时被标记为 失败,可以在作业类上定义 $failOnTimeout 属性:

php
/**
 * 指示作业在超时时是否应标记为失败。
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到

手动释放作业

有时您可能希望手动将作业释放回队列,以便稍后可以再次尝试。您可以通过调用 release 方法来实现:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,release 方法会将作业释放回队列以立即处理。然而,您可以指示队列在给定秒数经过之前不使作业可用,以便通过将整数或日期实例传递给 release 方法:

php
$this->release(10);

$this->release(now()->addSeconds(10));

手动失败作业

有时您可能需要手动将作业标记为“失败”。为此,您可以调用 fail 方法:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果您希望由于捕获的异常而将作业标记为失败,可以将异常传递给 fail 方法。或者,为了方便起见,您可以传递一个字符串错误消息,该消息将为您转换为异常:

php
$this->fail($exception);

$this->fail('出了点问题。');
lightbulb

有关失败作业的更多信息,请查看 处理作业失败的文档

作业批处理

Laravel 的作业批处理功能允许您轻松执行一批作业,然后在作业批处理完成执行时执行某些操作。在开始之前,您应该创建一个数据库迁移,以构建一个表,其中包含有关作业批处理的元信息,例如其完成百分比。可以使用 queue:batches-table Artisan 命令生成此迁移:

shell
php artisan queue:batches-table

php artisan migrate

定义可批处理的作业

要定义可批处理的作业,您应该像往常一样 创建一个可队列作业;然而,您应该将 Illuminate\Bus\Batchable trait 添加到作业类中。此 trait 提供了一个 batch 方法,可用于检索作业正在执行的当前批处理:

php
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // 确定批处理是否已取消...

            return;
        }

        // 导入 CSV 文件的一部分...
    }
}

调度批处理

要调度一批作业,您应该使用 Bus facade 的 batch 方法。当然,批处理在与完成回调结合使用时最有用。因此,您可以使用 thencatchfinally 方法为批处理定义完成回调。每个回调在调用时都会接收一个 Illuminate\Bus\Batch 实例。在此示例中,我们将假设我们正在排队一批作业,每个作业处理 CSV 文件中的给定行数:

php
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // 批处理已创建,但尚未添加作业...
})->progress(function (Batch $batch) {
    // 单个作业已成功完成...
})->then(function (Batch $batch) {
    // 所有作业均成功完成...
})->catch(function (Batch $batch, Throwable $e) {
    // 检测到第一个批处理作业失败...
})->finally(function (Batch $batch) {
    // 批处理已完成执行...
})->dispatch();

return $batch->id;

批处理的 ID,可以通过 $batch->id 属性访问,可用于 查询 Laravel 命令总线 以获取有关批处理的信息。

exclamation

由于批处理回调在稍后由 Laravel 队列序列化并执行,因此您不应在回调中使用 $this 变量。

命名批处理

某些工具(如 Laravel Horizon 和 Laravel Telescope)可能会为批处理提供更用户友好的调试信息。如果批处理被命名,可以通过在定义批处理时调用 name 方法来为批处理分配任意名称:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均成功完成...
})->name('Import CSV')->dispatch();

批处理连接和队列

如果您希望指定应为批处理作业使用的连接和队列,可以使用 onConnectiononQueue 方法。所有批处理作业必须在同一连接和队列中执行:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();

链和批处理

您可以通过将链接作业放在数组中来在批处理中定义一组 链接作业。例如,我们可以并行执行两个作业链,并在两个作业链完成处理时执行回调:

php
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

相反,您可以通过在链中定义批处理来在 中运行批处理作业。例如,您可以先运行一批作业以发布多个播客,然后再运行一批作业以发送发布通知:

php
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

向批处理中添加作业

有时,在批处理作业中添加其他作业可能很有用。这种模式在您需要批处理数千个作业时很有用,因为在 Web 请求期间调度这些作业可能需要太长时间。因此,您可能希望调度一批初始“加载器”作业,以便为批处理添加更多作业:

php
$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // 所有作业均成功完成...
})->name('Import Contacts')->dispatch();

在此示例中,我们将使用 LoadImportBatch 作业为批处理添加其他作业。为此,我们可以使用作业的 batch 方法访问的批处理实例上的 add 方法:

php
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}
exclamation

您只能在属于同一批处理的作业中向批处理中添加作业。

检查批处理

提供给批处理完成回调的 Illuminate\Bus\Batch 实例具有多种属性和方法,可帮助您与给定的作业批处理进行交互和检查:

php
// 批处理的 UUID...
$batch->id;

// 批处理的名称(如果适用)...
$batch->name;

// 分配给批处理的作业数量...
$batch->totalJobs;

// 尚未被队列处理的作业数量...
$batch->pendingJobs;

// 失败的作业数量...
$batch->failedJobs;

// 到目前为止已处理的作业数量...
$batch->processedJobs();

// 批处理的完成百分比(0-100)...
$batch->progress();

// 指示批处理是否已完成执行...
$batch->finished();

// 取消批处理的执行...
$batch->cancel();

// 指示批处理是否已取消...
$batch->cancelled();

从路由返回批处理

所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由返回它们,以检索包含有关批处理的信息的 JSON 负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批处理完成进度的信息变得方便。

要通过其 ID 检索批处理,可以使用 Bus facade 的 findBatch 方法:

php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批处理

有时您可能需要取消给定批处理的执行。这可以通过调用 Illuminate\Bus\Batch 实例上的 cancel 方法来实现:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

正如您在前面的示例中可能注意到的,批处理作业通常应在继续执行之前确定其对应的批处理是否已取消。然而,为了方便起见,您可以将 SkipIfBatchCancelled 中间件 分配给作业。顾名思义,此中间件将指示 Laravel 如果其对应的批处理已取消,则不处理作业:

php
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * 获取作业应通过的中间件。
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批处理失败

当批处理作业失败时,将调用 catch 回调(如果已分配)。此回调仅在批处理中的第一个作业失败时调用。

允许失败

当批处理中的作业失败时,Laravel 将自动将批处理标记为“已取消”。如果您愿意,可以禁用此行为,以便作业失败不会自动将批处理标记为已取消。这可以通过在调度批处理时调用 allowFailures 方法来实现:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业均成功完成...
})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许您轻松重试给定批处理的所有失败作业。queue:retry-batch 命令接受应重试其失败作业的批处理的 UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批处理

如果不进行修剪,job_batches 表可能会迅速积累记录。为了解决这个问题,您应该 计划 queue:prune-batches Artisan 命令每天运行:

php
$schedule->command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批处理将被修剪。您可以在调用命令时使用 hours 选项来确定保留批处理数据的时间。例如,以下命令将删除所有在 48 小时前完成的批处理:

php
$schedule->command('queue:prune-batches --hours=48')->daily();

有时,您的 jobs_batches 表可能会积累从未成功完成的批处理的批处理记录,例如作业失败且该作业从未成功重试的批处理。您可以指示 queue:prune-batches 命令使用 unfinished 选项修剪这些未完成的批处理记录:

php
$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的 jobs_batches 表也可能会积累已取消批处理的批处理记录。您可以指示 queue:prune-batches 命令使用 cancelled 选项修剪这些已取消的批处理记录:

php
$schedule->command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批处理

Laravel 还提供了在 DynamoDB 中存储批处理元信息的支持,而不是关系数据库。然而,您需要手动创建一个 DynamoDB 表来存储所有批处理记录。

通常,此表应命名为 job_batches,但您应根据应用程序的 queue 配置文件中的 queue.batching.table 配置值命名表。

DynamoDB 批处理表配置

job_batches 表应具有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含应用程序的名称,如应用程序的 app 配置文件中的 name 配置值所定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的作业批处理。

此外,如果您希望利用 自动批处理修剪,可以为表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您应在批处理配置数组中定义 keysecretregion 配置选项。这些选项将用于 AWS 身份验证。使用 dynamodb 驱动程序时,不需要 queue.batching.database 配置选项:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在 DynamoDB 中修剪批处理

在使用 DynamoDB 存储作业批处理信息时,通常用于修剪存储在关系数据库中的批处理的修剪命令将不起作用。相反,您可以利用 DynamoDB 的本机 TTL 功能 自动删除旧批处理的记录。

如果您在 DynamoDB 表中定义了 ttl 属性,可以定义配置参数以指示 Laravel 如何修剪批处理记录。queue.batching.ttl_attribute 配置值定义了保存 TTL 的属性名称,而 queue.batching.ttl 配置值定义了相对于上次更新记录的时间,批处理记录可以从 DynamoDB 表中删除的秒数:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 天...
],

队列闭包

除了将作业类调度到队列之外,您还可以调度闭包。这对于需要在当前请求周期之外执行的快速简单任务非常有用。在将闭包调度到队列时,闭包的代码内容会被加密签名,以防止在传输过程中被修改:

php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

使用 catch 方法,您可以提供一个闭包,如果队列闭包在耗尽所有 配置的重试尝试 后未能成功完成,则应执行该闭包:

php
use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // 此作业已失败...
});
exclamation

由于 catch 回调在稍后由 Laravel 队列序列化并执行,因此您不应在 catch 回调中使用 $this 变量。

运行队列工作者

queue:work 命令

Laravel 包含一个 Artisan 命令,该命令将启动队列工作者并在作业推送到队列时处理新作业。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

shell
php artisan queue:work
lightbulb

要使 queue:work 进程永久在后台运行,您应该使用 Supervisor 等进程监视器,以确保队列工作者不会停止运行。

如果您希望在调用 queue:work 命令时包含处理的作业 ID,可以包含 -v 标志:

shell
php artisan queue:work -v

请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重启队列工作者。此外,请记住,应用程序创建或修改的任何静态状态在作业之间不会自动重置。

或者,您可以运行 queue:listen 命令。使用 queue:listen 命令时,您不必在想要重新加载更新的代码或重置应用程序状态时手动重启工作者;然而,此命令的效率明显低于 queue:work 命令:

shell
php artisan queue:listen

运行多个队列工作者

要为队列分配多个工作者并并发处理作业,您只需启动多个 queue:work 进程。这可以在本地通过终端中的多个选项卡完成,也可以在生产环境中使用进程管理器的配置设置完成。使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接的默认队列的作业。然而,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

shell
php artisan queue:work redis --queue=emails

处理指定数量的作业

可以使用 --once 选项指示工作者仅处理队列中的一个作业:

shell
php artisan queue:work --once

可以使用 --max-jobs 选项指示工作者处理给定数量的作业,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作者在处理给定数量的作业后自动重启,释放它们可能积累的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有排队的作业然后退出

可以使用 --stop-when-empty 选项指示工作者处理所有作业,然后优雅地退出。如果您希望在队列为空后关闭容器,此选项在 Docker 容器中处理 Laravel 队列时可能很有用:

shell
php artisan queue:work --stop-when-empty

处理给定秒数的作业

可以使用 --max-time 选项指示工作者处理作业的给定秒数,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作者在处理作业给定时间后自动重启,释放它们可能积累的任何内存:

shell
# 处理作业一小时然后退出...
php artisan queue:work --max-time=3600

工作者休眠时长

当队列中有作业可用时,工作者将继续处理作业而不在作业之间延迟。然而,sleep 选项决定了如果没有作业可用,工作者将“休眠”多少秒。当然,在休眠期间,工作者不会处理任何新作业:

shell
php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于 维护模式 时,不会处理任何队列作业。应用程序退出维护模式后,作业将继续正常处理。

要强制队列工作者在启用维护模式时处理作业,可以使用 --force 选项:

shell
php artisan queue:work --force

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,您应该在处理完图像后使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。然而,偶尔您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个验证所有 high 队列作业在继续处理 low 队列的作业之前已处理的工作者,请将队列名称的逗号分隔列表传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长时间运行的进程,它们不会注意到代码的更改而无需重启。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重启工作者。您可以通过发出 queue:restart 命令优雅地重启所有工作者:

shell
php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业后优雅地退出,以便不会丢失现有作业。由于在执行 queue:restart 命令时队列工作者将退出,您应该运行一个进程管理器,如 Supervisor,以自动重启队列工作者。

lightbulb

队列使用 缓存 存储重启信号,因此在使用此功能之前,您应该验证应用程序已正确配置缓存驱动程序。

作业过期和超时

作业过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未被释放或删除,作业将被释放回队列。通常,您应该将 retry_after 值设置为作业合理完成处理的最大秒数。

exclamation

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理作业的工作者将以错误退出。通常,工作者将由 服务器上配置的进程管理器 自动重启:

shell
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项不同,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。

exclamation

--timeout 值应始终至少比 retry_after 配置值短几秒钟。这将确保在作业重试之前,处理冻结作业的工作者始终被终止。如果您的 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

Supervisor 配置

在生产环境中,您需要一种方法来保持 queue:work 进程运行。queue:work 进程可能会因多种原因停止运行,例如超出工作者超时或执行 queue:restart 命令。

因此,您需要配置一个进程监视器,可以检测到您的 queue:work 进程何时退出并自动重启它们。此外,进程监视器可以让您指定希望并发运行的 queue:work 进程数量。Supervisor 是一个常用于 Linux 环境的进程监视器,我们将在以下文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,如果它们失败,将自动重启您的 queue:work 进程。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

shell
sudo apt-get install supervisor
lightbulb

如果自己配置和管理 Supervisor 听起来很复杂,请考虑使用 Laravel Forge,它将自动为您的生产 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监视所有进程,如果它们失败,将自动重启它们。您应该更改配置的 command 指令以反映您所需的队列连接和工作者选项。

exclamation

您应该确保 stopwaitsecs 的值大于最长运行作业消耗的秒数。否则,Supervisor 可能会在作业完成处理之前将其终止。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

shell
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的作业

有时您的队列作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来 指定作业应尝试的最大次数。在异步作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步调度的作业 失败时不会存储在此表中,其异常会立即由应用程序处理。

在新的 Laravel 应用程序中,通常已经存在一个创建 failed_jobs 表的迁移。然而,如果您的应用程序不包含此表的迁移,您可以使用 queue:failed-table 命令创建迁移:

shell
php artisan queue:failed-table

php artisan migrate

在运行 队列工作者 进程时,您可以使用 queue:work 命令上的 --tries 开关指定作业应尝试的最大次数。如果您未指定 --tries 选项的值,作业将仅尝试一次或作业类的 $tries 属性指定的次数:

shell
php artisan queue:work redis --tries=3

使用 --backoff 选项,您可以指定 Laravel 在重试遇到异常的作业之前应等待的秒数。默认情况下,作业会立即释放回队列,以便可以再次尝试:

shell
php artisan queue:work redis --tries=3 --backoff=3

如果您希望在每个作业上配置 Laravel 在重试遇到异常的作业之前应等待的秒数,可以通过在作业类上定义一个 backoff 属性来实现:

php
/**
 * 重试作业之前应等待的秒数。
 *
 * @var int
 */
public $backoff = 3;

如果您需要更复杂的逻辑来确定作业的回退时间,可以在作业类上定义一个 backoff 方法:

php
/**
* 计算重试作业之前应等待的秒数。
*/
public function backoff(): int
{
    return 3;
}

您可以通过从 backoff 方法返回一个回退值数组来轻松配置“指数”回退。在此示例中,重试延迟将为第一次重试 1 秒,第二次重试 5 秒,第三次重试 10 秒,如果还有更多尝试剩余,则每次后续重试 10 秒:

php
/**
* 计算重试作业之前应等待的秒数。
*
* @return array<int, int>
*/
public function backoff(): array
{
    return [1, 5, 10];
}

清理失败的作业

当特定作业失败时,您可能希望向用户发送警报或撤销作业部分完成的任何操作。为此,您可以在作业类上定义一个 failed 方法。导致作业失败的 Throwable 实例将传递给 failed 方法:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }

    /**
     * 处理作业失败。
     */
    public function failed(?Throwable $exception): void
    {
        // 发送用户失败通知等...
    }
}
exclamation

在调用 failed 方法之前,将实例化作业的新实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。

重试失败的作业

要查看已插入到 failed_jobs 数据库表中的所有失败作业,可以使用 queue:failed Artisan 命令:

shell
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列、失败时间和有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业,请发出以下命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如果需要,您可以将多个 ID 传递给命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

您还可以重试特定队列的所有失败作业:

shell
php artisan queue:retry --queue=name

要重试所有失败的作业,请执行 queue:retry 命令并传递 all 作为 ID:

shell
php artisan queue:retry all

如果您希望删除失败的作业,可以使用 queue:forget 命令:

shell
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
lightbulb

使用 Horizon 时,您应该使用 horizon:forget 命令删除失败的作业,而不是 queue:forget 命令。

要从 failed_jobs 表中删除所有失败的作业,可以使用 queue:flush 命令:

shell
php artisan queue:flush

忽略缺失的模型

在作业中注入 Eloquent 模型时,模型在放入队列之前会自动序列化,并在作业处理时从数据库中重新检索。然而,如果模型在作业等待工作者处理时被删除,您的作业可能会因 ModelNotFoundException 而失败。

为了方便起见,您可以选择通过将作业的 deleteWhenMissingModels 属性设置为 true 来自动删除缺失模型的作业。当此属性设置为 true 时,Laravel 将安静地丢弃作业而不引发异常:

php
/**
 * 如果模型不再存在,则删除作业。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

修剪失败的作业

您可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序的 failed_jobs 表中的记录:

shell
php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败作业记录将被修剪。如果您为命令提供 --hours 选项,则仅保留在过去 N 小时内插入的失败作业记录。例如,以下命令将删除所有在 48 小时前插入的失败作业记录:

shell
php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还提供了在 DynamoDB 中存储失败作业记录的支持,而不是关系数据库表。然而,您必须手动创建一个 DynamoDB 表来存储所有失败作业记录。通常,此表应命名为 failed_jobs,但您应根据应用程序的 queue 配置文件中的 queue.failed.table 配置值命名表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含应用程序的名称,如应用程序的 app 配置文件中的 name 配置值所定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的失败作业。

此外,请确保安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,您应在失败作业配置数组中定义 keysecretregion 配置选项。这些选项将用于 AWS 身份验证。使用 dynamodb 驱动程序时,不需要 queue.failed.database 配置选项:

php
'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败作业存储

您可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的作业而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量实现:

ini
QUEUE_FAILED_DRIVER=null

失败作业事件

如果您希望注册一个在作业失败时调用的事件监听器,可以使用 Queue facade 的 failing 方法。例如,我们可以从 Laravel 附带的 AppServiceProviderboot 方法中附加一个闭包到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动任何应用程序服务。
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

从队列中清除作业

lightbulb

使用 Horizon 时,您应该使用 horizon:clear 命令清除队列中的作业,而不是 queue:clear 命令。

如果您希望从默认连接的默认队列中删除所有作业,可以使用 queue:clear Artisan 命令:

shell
php artisan queue:clear

您还可以提供 connection 参数和 queue 选项以从特定连接和队列中删除作业:

shell
php artisan queue:clear redis --queue=emails
exclamation

从队列中清除作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在清除队列后最多 60 秒内发送到 SQS 队列的作业也可能会被删除。

监控队列

如果您的队列突然涌入作业,可能会导致队列不堪重负,导致作业完成的等待时间过长。如果您愿意,Laravel 可以在队列作业计数超过指定阈值时提醒您。

要开始,您应该计划 queue:monitor 命令 每分钟运行一次。该命令接受您希望监控的队列名称以及您期望的作业计数阈值:

shell
php artisan queue:monitor redis:default,redis:deployments --max=100

仅调度此命令不足以触发通知,提醒您队列的超负荷状态。当命令遇到作业计数超过阈值的队列时,将调度一个 Illuminate\Queue\Events\QueueBusy 事件。您可以在应用程序的 EventServiceProvider 中监听此事件,以便向您或您的开发团队发送通知:

php
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * 为应用程序注册任何其他事件。
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

测试

在测试调度作业的代码时,您可能希望指示 Laravel 不实际执行作业本身,因为作业的代码可以直接测试,并且与调度它的代码分开测试。当然,要测试作业本身,您可以实例化作业实例并直接在测试中调用 handle 方法。

您可以使用 Queue facade 的 fake 方法来防止队列作业实际推送到队列。在调用 Queue facade 的 fake 方法后,您可以断言应用程序尝试将作业推送到队列:

php
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // 执行订单发货...

        // 断言没有作业被推送...
        Queue::assertNothingPushed();

        // 断言作业被推送到给定队列...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // 断言作业被推送两次...
        Queue::assertPushed(ShipOrder::class, 2);

        // 断言作业未被推送...
        Queue::assertNotPushed(AnotherJob::class);

        // 断言闭包被推送到队列...
        Queue::assertClosurePushed();

        // 断言推送的作业总数...
        Queue::assertCount(3);
    }
}

您可以将闭包传递给 assertPushedassertNotPushed 方法,以断言推送的作业通过给定的“真值测试”。如果至少有一个作业被推送并通过给定的真值测试,则断言将成功:

php
Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

伪造部分作业

如果您只需要伪造特定作业,同时允许其他作业正常执行,可以将需要伪造的作业类名传递给 fake 方法:

php
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // 执行订单发货...

    // 断言作业被推送两次...
    Queue::assertPushed(ShipOrder::class, 2);
}

您可以使用 except 方法伪造除指定作业之外的所有作业:

php
Queue::fake()->except([
    ShipOrder::class,
]);

测试作业链

要测试作业链,您需要利用 Bus facade 的伪造功能。Bus facade 的 assertChained 方法可用于断言已调度 作业链assertChained 方法接受一个链作业数组作为其第一个参数:

php
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

如上例所示,链作业数组可以是作业的类名数组。然而,您也可以提供实际作业实例的数组。这样做时,Laravel 将确保作业实例属于相同的类,并且具有与应用程序调度的链作业相同的属性值:

php
Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

您可以使用 assertDispatchedWithoutChain 方法断言作业被推送而没有作业链:

php
Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链批处理

如果您的作业链 包含一批作业,您可以通过在链断言中插入 Bus::chainedBatch 定义来断言链批处理符合您的期望:

php
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

测试作业批处理

Bus facade 的 assertBatched 方法可用于断言已调度 作业批处理。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,可用于检查批处理中的作业:

php
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

您可以使用 assertBatchCount 方法断言已调度的批处理数量:

php
Bus::assertBatchCount(3);

您可以使用 assertNothingBatched 断言没有批处理被调度:

php
Bus::assertNothingBatched();

测试作业/批处理交互

此外,您可能偶尔需要测试单个作业与其底层批处理的交互。例如,您可能需要测试作业是否取消了其批处理的进一步处理。为此,您需要通过 withFakeBatch 方法为作业分配一个伪造批处理。withFakeBatch 方法返回一个包含作业实例和伪造批处理的元组:

php
[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

作业事件

使用 Queue facade 上的 beforeafter 方法,您可以指定在队列作业处理之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计信息的好机会。通常,您应该从 服务提供者boot 方法中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动任何应用程序服务。
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用 Queue facade 上的 looping 方法,您可以指定在工作者尝试从队列获取作业之前执行的回调。例如,您可以注册一个闭包以回滚任何由先前失败的作业留下的未完成事务:

php
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});