laravel-queue-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Laravel Queue Patterns

Laravel 队列模式

Job Structure

任务结构

php
<?php

namespace App\Jobs;

use App\Models\Order;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessOrder implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(
        public readonly Order $order,
    ) {}

    public function handle(PaymentGateway $gateway): void
    {
        $gateway->charge($this->order);

        $this->order->update(['status' => 'processed']);
    }

    public function failed(\Throwable $exception): void
    {
        $this->order->update(['status' => 'failed']);

        // Notify admin, log, etc.
    }
}
php
<?php

namespace App\Jobs;

use App\Models\Order;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessOrder implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(
        public readonly Order $order,
    ) {}

    public function handle(PaymentGateway $gateway): void
    {
        $gateway->charge($this->order);

        $this->order->update(['status' => 'processed']);
    }

    public function failed(\Throwable $exception): void
    {
        $this->order->update(['status' => 'failed']);

        // 通知管理员、记录日志等
    }
}

Dispatch Patterns

任务分发模式

php
// ✅ Standard dispatch
ProcessOrder::dispatch($order);

// ✅ Dispatch to specific queue/connection
ProcessOrder::dispatch($order)
    ->onQueue('payments')
    ->onConnection('redis');

// ✅ Delayed dispatch
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));

// ✅ Conditional dispatch
ProcessOrder::dispatchIf($order->isPaid(), $order);
ProcessOrder::dispatchUnless($order->isCancelled(), $order);

// ✅ Dispatch after database transaction commits
ProcessOrder::dispatch($order)->afterCommit();

// ❌ Dispatching inside a transaction without afterCommit
DB::transaction(function () use ($order) {
    $order->save();
    ProcessOrder::dispatch($order); // Job may run before commit
});

// ✅ Safe inside transactions
DB::transaction(function () use ($order) {
    $order->save();
    ProcessOrder::dispatch($order)->afterCommit();
});
php
// ✅ 标准分发
ProcessOrder::dispatch($order);

// ✅ 分发到指定队列/连接
ProcessOrder::dispatch($order)
    ->onQueue('payments')
    ->onConnection('redis');

// ✅ 延迟分发
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));

// ✅ 条件分发
ProcessOrder::dispatchIf($order->isPaid(), $order);
ProcessOrder::dispatchUnless($order->isCancelled(), $order);

// ✅ 数据库事务提交后分发
ProcessOrder::dispatch($order)->afterCommit();

// ❌ 在事务内分发但未使用afterCommit
DB::transaction(function () use ($order) {
    $order->save();
    ProcessOrder::dispatch($order); // 任务可能在提交前执行
});

// ✅ 事务内安全分发
DB::transaction(function () use ($order) {
    $order->save();
    ProcessOrder::dispatch($order)->afterCommit();
});

Job Middleware

任务中间件

php
use Illuminate\Queue\Middleware\RateLimited;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProcessOrder implements ShouldQueue
{
    public function middleware(): array
    {
        return [
            // Rate limit to 10 jobs per minute
            new RateLimited('orders'),

            // Prevent overlapping by order ID
            (new WithoutOverlapping($this->order->id))
                ->releaseAfter(60)
                ->expireAfter(300),

            // Throttle on exceptions - wait 5 min after 3 exceptions
            (new ThrottlesExceptions(3, 5))
                ->backoff(5),
        ];
    }
}

// Define rate limiter in AppServiceProvider
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

RateLimiter::for('orders', function ($job) {
    return Limit::perMinute(10);
});
php
use Illuminate\Queue\Middleware\RateLimited;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProcessOrder implements ShouldQueue
{
    public function middleware(): array
    {
        return [
            // 限制为每分钟10个任务
            new RateLimited('orders'),

            // 根据订单ID防止任务重叠
            (new WithoutOverlapping($this->order->id))
                ->releaseAfter(60)
                ->expireAfter(300),

            // 异常时限流 - 3次异常后等待5分钟
            (new ThrottlesExceptions(3, 5))
                ->backoff(5),
        ];
    }
}

// 在AppServiceProvider中定义限流器
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

RateLimiter::for('orders', function ($job) {
    return Limit::perMinute(10);
});

Job Chaining

任务链式调用

php
use Illuminate\Support\Facades\Bus;

// ✅ Sequential execution - next job runs only if previous succeeds
Bus::chain([
    new ValidateOrder($order),
    new ChargePayment($order),
    new SendConfirmation($order),
    new UpdateInventory($order),
])->onQueue('orders')->dispatch();

// ✅ With catch callback
Bus::chain([
    new ValidateOrder($order),
    new ChargePayment($order),
])->catch(function (\Throwable $e) use ($order) {
    $order->update(['status' => 'failed']);
})->dispatch();
php
use Illuminate\Support\Facades\Bus;

// ✅ 顺序执行 - 仅当前一个任务成功时才运行下一个
Bus::chain([
    new ValidateOrder($order),
    new ChargePayment($order),
    new SendConfirmation($order),
    new UpdateInventory($order),
])->onQueue('orders')->dispatch();

// ✅ 带有捕获回调
Bus::chain([
    new ValidateOrder($order),
    new ChargePayment($order),
])->catch(function (\Throwable $e) use ($order) {
    $order->update(['status' => 'failed']);
})->dispatch();

Job Batching

任务批处理

php
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

$batch = Bus::batch([
    new ProcessCsvChunk($file, 0, 1000),
    new ProcessCsvChunk($file, 1000, 2000),
    new ProcessCsvChunk($file, 2000, 3000),
])
->then(function (Batch $batch) {
    // All jobs completed successfully
    Notification::send($user, new ImportComplete());
})
->catch(function (Batch $batch, \Throwable $e) {
    // First batch job failure detected
})
->finally(function (Batch $batch) {
    // Batch finished (success or failure)
    Storage::delete($file);
})
->name('CSV Import')
->onQueue('imports')
->allowFailures()
->dispatch();

// Check batch progress
$batch = Bus::findBatch($batchId);
echo $batch->progress(); // Percentage complete
Jobs in a batch must use the
Illuminate\Bus\Batchable
trait.
php
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

$batch = Bus::batch([
    new ProcessCsvChunk($file, 0, 1000),
    new ProcessCsvChunk($file, 1000, 2000),
    new ProcessCsvChunk($file, 2000, 3000),
])
->then(function (Batch $batch) {
    // 所有任务执行成功
    Notification::send($user, new ImportComplete());
})
->catch(function (Batch $batch, \Throwable $e) {
    // 检测到批处理中第一个任务失败
})
->finally(function (Batch $batch) {
    // 批处理完成(成功或失败)
    Storage::delete($file);
})
->name('CSV Import')
->onQueue('imports')
->allowFailures()
->dispatch();

// 检查批处理进度
$batch = Bus::findBatch($batchId);
echo $batch->progress(); // 完成百分比
批处理中的任务必须使用
Illuminate\Bus\Batchable
trait。

Unique Jobs

唯一任务

php
use Illuminate\Contracts\Queue\ShouldBeUnique;

class RecalculateReport implements ShouldQueue, ShouldBeUnique
{
    public function __construct(
        public readonly int $reportId,
    ) {}

    // Unique for 1 hour
    public int $uniqueFor = 3600;

    // Custom unique ID
    public function uniqueId(): string
    {
        return (string) $this->reportId;
    }
}
php
use Illuminate\Contracts\Queue\ShouldBeUnique;

class RecalculateReport implements ShouldQueue, ShouldBeUnique
{
    public function __construct(
        public readonly int $reportId,
    ) {}

    // 1小时内唯一
    public int $uniqueFor = 3600;

    // 自定义唯一ID
    public function uniqueId(): string
    {
        return (string) $this->reportId;
    }
}

Retry Strategies

重试策略

php
class ProcessWebhook implements ShouldQueue
{
    // ✅ Fixed number of attempts
    public int $tries = 5;

    // ✅ Or retry until a time limit
    public function retryUntil(): \DateTime
    {
        return now()->addHours(2);
    }

    // ✅ Max exceptions before marking failed (allows manual releases)
    public int $maxExceptions = 3;

    // ✅ Exponential backoff (seconds between retries)
    public array $backoff = [10, 60, 300]; // 10s, 1m, 5m

    // ✅ Timeout per attempt
    public int $timeout = 120;

    public function handle(): void
    {
        // If an unrecoverable error occurs, fail immediately
        if ($this->isInvalid()) {
            $this->fail('Invalid webhook payload.');
            return;
        }

        // Process...
    }
}
php
class ProcessWebhook implements ShouldQueue
{
    // ✅ 固定尝试次数
    public int $tries = 5;

    // ✅ 或重试直到达到时间限制
    public function retryUntil(): \DateTime
    {
        return now()->addHours(2);
    }

    // ✅ 标记失败前的最大异常次数(允许手动释放)
    public int $maxExceptions = 3;

    // ✅ 指数退避(重试间隔秒数)
    public array $backoff = [10, 60, 300]; // 10秒, 1分钟, 5分钟

    // ✅ 每次尝试的超时时间
    public int $timeout = 120;

    public function handle(): void
    {
        // 如果发生不可恢复的错误,立即标记失败
        if ($this->isInvalid()) {
            $this->fail('Invalid webhook payload.');
            return;
        }

        // 处理任务...
    }
}

Idempotency Patterns

幂等性模式

php
class ChargePayment implements ShouldQueue
{
    public function handle(PaymentGateway $gateway): void
    {
        // ✅ Check if already processed before acting
        if ($this->order->payment_id) {
            return; // Already charged, skip
        }

        $payment = $gateway->charge($this->order->total);

        // ✅ Use atomic update to prevent double processing
        $affected = Order::where('id', $this->order->id)
            ->whereNull('payment_id')
            ->update(['payment_id' => $payment->id]);

        if ($affected === 0) {
            return; // Another worker already processed this
        }
    }
}
php
class ChargePayment implements ShouldQueue
{
    public function handle(PaymentGateway $gateway): void
    {
        // ✅ 执行前检查是否已处理
        if ($this->order->payment_id) {
            return; // 已完成支付,跳过
        }

        $payment = $gateway->charge($this->order->total);

        // ✅ 使用原子更新防止重复处理
        $affected = Order::where('id', $this->order->id)
            ->whereNull('payment_id')
            ->update(['payment_id' => $payment->id]);

        if ($affected === 0) {
            return; // 其他工作进程已处理该任务
        }
    }
}

Testing Queues

队列测试

php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Queue;

// ✅ Assert job was dispatched
public function test_order_dispatches_processing_job(): void
{
    Queue::fake();

    $order = Order::factory()->create();
    $order->process();

    Queue::assertPushed(ProcessOrder::class, function ($job) use ($order) {
        return $job->order->id === $order->id;
    });
}

// ✅ Assert chain
public function test_order_dispatches_chain(): void
{
    Bus::fake();

    $order = Order::factory()->create();
    $order->fulfill();

    Bus::assertChained([
        ValidateOrder::class,
        ChargePayment::class,
        SendConfirmation::class,
    ]);
}

// ✅ Assert batch
public function test_import_dispatches_batch(): void
{
    Bus::fake();

    (new CsvImporter)->import($file);

    Bus::assertBatched(function ($batch) {
        return $batch->jobs->count() === 3
            && $batch->jobs->every(fn ($job) => $job instanceof ProcessCsvChunk);
    });
}

// ✅ Execute job to test handler logic
public function test_process_order_charges_payment(): void
{
    $order = Order::factory()->create();

    ProcessOrder::dispatchSync($order);

    $this->assertNotNull($order->fresh()->payment_id);
}
php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Queue;

// ✅ 断言任务已分发
public function test_order_dispatches_processing_job(): void
{
    Queue::fake();

    $order = Order::factory()->create();
    $order->process();

    Queue::assertPushed(ProcessOrder::class, function ($job) use ($order) {
        return $job->order->id === $order->id;
    });
}

// ✅ 断言链式任务
public function test_order_dispatches_chain(): void
{
    Bus::fake();

    $order = Order::factory()->create();
    $order->fulfill();

    Bus::assertChained([
        ValidateOrder::class,
        ChargePayment::class,
        SendConfirmation::class,
    ]);
}

// ✅ 断言批处理任务
public function test_import_dispatches_batch(): void
{
    Bus::fake();

    (new CsvImporter)->import($file);

    Bus::assertBatched(function ($batch) {
        return $batch->jobs->count() === 3
            && $batch->jobs->every(fn ($job) => $job instanceof ProcessCsvChunk);
    });
}

// ✅ 执行任务以测试处理器逻辑
public function test_process_order_charges_payment(): void
{
    $order = Order::factory()->create();

    ProcessOrder::dispatchSync($order);

    $this->assertNotNull($order->fresh()->payment_id);
}

Checklist

检查清单

  • Jobs implement ShouldQueue and use standard traits
  • Jobs accept only serializable data (models, primitives)
  • Retry strategy configured ($tries, $backoff, retryUntil)
  • failed() method handles cleanup and notifications
  • afterCommit() used when dispatching inside transactions
  • Job middleware used for rate limiting and overlap prevention
  • Chains used for sequential dependent operations
  • Batches used for parallel independent operations
  • Jobs are idempotent (safe to run multiple times)
  • ShouldBeUnique used to prevent duplicate jobs
  • Queue and Bus fakes used in tests
  • 任务实现ShouldQueue并使用标准trait
  • 任务仅接受可序列化数据(模型、基本类型)
  • 已配置重试策略($tries、$backoff、retryUntil)
  • failed()方法处理清理和通知
  • 在事务内分发时使用afterCommit()
  • 使用任务中间件进行限流和防止重叠
  • 对顺序依赖操作使用链式调用
  • 对并行独立操作使用批处理
  • 任务具备幂等性(可安全多次执行)
  • 使用ShouldBeUnique防止重复任务
  • 测试中使用Queue和Bus假对象