laravel-queue-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLaravel Queue Patterns
Laravel 队列模式
Job Structure
任务结构
php
<?php
namespace App\Jobs;
use App\Models\Order;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessOrder implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public readonly Order $order,
) {}
public function handle(PaymentGateway $gateway): void
{
$gateway->charge($this->order);
$this->order->update(['status' => 'processed']);
}
public function failed(\Throwable $exception): void
{
$this->order->update(['status' => 'failed']);
// Notify admin, log, etc.
}
}php
<?php
namespace App\Jobs;
use App\Models\Order;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessOrder implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public readonly Order $order,
) {}
public function handle(PaymentGateway $gateway): void
{
$gateway->charge($this->order);
$this->order->update(['status' => 'processed']);
}
public function failed(\Throwable $exception): void
{
$this->order->update(['status' => 'failed']);
// 通知管理员、记录日志等
}
}Dispatch Patterns
任务分发模式
php
// ✅ Standard dispatch
ProcessOrder::dispatch($order);
// ✅ Dispatch to specific queue/connection
ProcessOrder::dispatch($order)
->onQueue('payments')
->onConnection('redis');
// ✅ Delayed dispatch
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));
// ✅ Conditional dispatch
ProcessOrder::dispatchIf($order->isPaid(), $order);
ProcessOrder::dispatchUnless($order->isCancelled(), $order);
// ✅ Dispatch after database transaction commits
ProcessOrder::dispatch($order)->afterCommit();
// ❌ Dispatching inside a transaction without afterCommit
DB::transaction(function () use ($order) {
$order->save();
ProcessOrder::dispatch($order); // Job may run before commit
});
// ✅ Safe inside transactions
DB::transaction(function () use ($order) {
$order->save();
ProcessOrder::dispatch($order)->afterCommit();
});php
// ✅ 标准分发
ProcessOrder::dispatch($order);
// ✅ 分发到指定队列/连接
ProcessOrder::dispatch($order)
->onQueue('payments')
->onConnection('redis');
// ✅ 延迟分发
ProcessOrder::dispatch($order)->delay(now()->addMinutes(5));
// ✅ 条件分发
ProcessOrder::dispatchIf($order->isPaid(), $order);
ProcessOrder::dispatchUnless($order->isCancelled(), $order);
// ✅ 数据库事务提交后分发
ProcessOrder::dispatch($order)->afterCommit();
// ❌ 在事务内分发但未使用afterCommit
DB::transaction(function () use ($order) {
$order->save();
ProcessOrder::dispatch($order); // 任务可能在提交前执行
});
// ✅ 事务内安全分发
DB::transaction(function () use ($order) {
$order->save();
ProcessOrder::dispatch($order)->afterCommit();
});Job Middleware
任务中间件
php
use Illuminate\Queue\Middleware\RateLimited;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProcessOrder implements ShouldQueue
{
public function middleware(): array
{
return [
// Rate limit to 10 jobs per minute
new RateLimited('orders'),
// Prevent overlapping by order ID
(new WithoutOverlapping($this->order->id))
->releaseAfter(60)
->expireAfter(300),
// Throttle on exceptions - wait 5 min after 3 exceptions
(new ThrottlesExceptions(3, 5))
->backoff(5),
];
}
}
// Define rate limiter in AppServiceProvider
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
RateLimiter::for('orders', function ($job) {
return Limit::perMinute(10);
});php
use Illuminate\Queue\Middleware\RateLimited;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProcessOrder implements ShouldQueue
{
public function middleware(): array
{
return [
// 限制为每分钟10个任务
new RateLimited('orders'),
// 根据订单ID防止任务重叠
(new WithoutOverlapping($this->order->id))
->releaseAfter(60)
->expireAfter(300),
// 异常时限流 - 3次异常后等待5分钟
(new ThrottlesExceptions(3, 5))
->backoff(5),
];
}
}
// 在AppServiceProvider中定义限流器
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
RateLimiter::for('orders', function ($job) {
return Limit::perMinute(10);
});Job Chaining
任务链式调用
php
use Illuminate\Support\Facades\Bus;
// ✅ Sequential execution - next job runs only if previous succeeds
Bus::chain([
new ValidateOrder($order),
new ChargePayment($order),
new SendConfirmation($order),
new UpdateInventory($order),
])->onQueue('orders')->dispatch();
// ✅ With catch callback
Bus::chain([
new ValidateOrder($order),
new ChargePayment($order),
])->catch(function (\Throwable $e) use ($order) {
$order->update(['status' => 'failed']);
})->dispatch();php
use Illuminate\Support\Facades\Bus;
// ✅ 顺序执行 - 仅当前一个任务成功时才运行下一个
Bus::chain([
new ValidateOrder($order),
new ChargePayment($order),
new SendConfirmation($order),
new UpdateInventory($order),
])->onQueue('orders')->dispatch();
// ✅ 带有捕获回调
Bus::chain([
new ValidateOrder($order),
new ChargePayment($order),
])->catch(function (\Throwable $e) use ($order) {
$order->update(['status' => 'failed']);
})->dispatch();Job Batching
任务批处理
php
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([
new ProcessCsvChunk($file, 0, 1000),
new ProcessCsvChunk($file, 1000, 2000),
new ProcessCsvChunk($file, 2000, 3000),
])
->then(function (Batch $batch) {
// All jobs completed successfully
Notification::send($user, new ImportComplete());
})
->catch(function (Batch $batch, \Throwable $e) {
// First batch job failure detected
})
->finally(function (Batch $batch) {
// Batch finished (success or failure)
Storage::delete($file);
})
->name('CSV Import')
->onQueue('imports')
->allowFailures()
->dispatch();
// Check batch progress
$batch = Bus::findBatch($batchId);
echo $batch->progress(); // Percentage completeJobs in a batch must use the trait.
Illuminate\Bus\Batchablephp
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([
new ProcessCsvChunk($file, 0, 1000),
new ProcessCsvChunk($file, 1000, 2000),
new ProcessCsvChunk($file, 2000, 3000),
])
->then(function (Batch $batch) {
// 所有任务执行成功
Notification::send($user, new ImportComplete());
})
->catch(function (Batch $batch, \Throwable $e) {
// 检测到批处理中第一个任务失败
})
->finally(function (Batch $batch) {
// 批处理完成(成功或失败)
Storage::delete($file);
})
->name('CSV Import')
->onQueue('imports')
->allowFailures()
->dispatch();
// 检查批处理进度
$batch = Bus::findBatch($batchId);
echo $batch->progress(); // 完成百分比批处理中的任务必须使用 trait。
Illuminate\Bus\BatchableUnique Jobs
唯一任务
php
use Illuminate\Contracts\Queue\ShouldBeUnique;
class RecalculateReport implements ShouldQueue, ShouldBeUnique
{
public function __construct(
public readonly int $reportId,
) {}
// Unique for 1 hour
public int $uniqueFor = 3600;
// Custom unique ID
public function uniqueId(): string
{
return (string) $this->reportId;
}
}php
use Illuminate\Contracts\Queue\ShouldBeUnique;
class RecalculateReport implements ShouldQueue, ShouldBeUnique
{
public function __construct(
public readonly int $reportId,
) {}
// 1小时内唯一
public int $uniqueFor = 3600;
// 自定义唯一ID
public function uniqueId(): string
{
return (string) $this->reportId;
}
}Retry Strategies
重试策略
php
class ProcessWebhook implements ShouldQueue
{
// ✅ Fixed number of attempts
public int $tries = 5;
// ✅ Or retry until a time limit
public function retryUntil(): \DateTime
{
return now()->addHours(2);
}
// ✅ Max exceptions before marking failed (allows manual releases)
public int $maxExceptions = 3;
// ✅ Exponential backoff (seconds between retries)
public array $backoff = [10, 60, 300]; // 10s, 1m, 5m
// ✅ Timeout per attempt
public int $timeout = 120;
public function handle(): void
{
// If an unrecoverable error occurs, fail immediately
if ($this->isInvalid()) {
$this->fail('Invalid webhook payload.');
return;
}
// Process...
}
}php
class ProcessWebhook implements ShouldQueue
{
// ✅ 固定尝试次数
public int $tries = 5;
// ✅ 或重试直到达到时间限制
public function retryUntil(): \DateTime
{
return now()->addHours(2);
}
// ✅ 标记失败前的最大异常次数(允许手动释放)
public int $maxExceptions = 3;
// ✅ 指数退避(重试间隔秒数)
public array $backoff = [10, 60, 300]; // 10秒, 1分钟, 5分钟
// ✅ 每次尝试的超时时间
public int $timeout = 120;
public function handle(): void
{
// 如果发生不可恢复的错误,立即标记失败
if ($this->isInvalid()) {
$this->fail('Invalid webhook payload.');
return;
}
// 处理任务...
}
}Idempotency Patterns
幂等性模式
php
class ChargePayment implements ShouldQueue
{
public function handle(PaymentGateway $gateway): void
{
// ✅ Check if already processed before acting
if ($this->order->payment_id) {
return; // Already charged, skip
}
$payment = $gateway->charge($this->order->total);
// ✅ Use atomic update to prevent double processing
$affected = Order::where('id', $this->order->id)
->whereNull('payment_id')
->update(['payment_id' => $payment->id]);
if ($affected === 0) {
return; // Another worker already processed this
}
}
}php
class ChargePayment implements ShouldQueue
{
public function handle(PaymentGateway $gateway): void
{
// ✅ 执行前检查是否已处理
if ($this->order->payment_id) {
return; // 已完成支付,跳过
}
$payment = $gateway->charge($this->order->total);
// ✅ 使用原子更新防止重复处理
$affected = Order::where('id', $this->order->id)
->whereNull('payment_id')
->update(['payment_id' => $payment->id]);
if ($affected === 0) {
return; // 其他工作进程已处理该任务
}
}
}Testing Queues
队列测试
php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Queue;
// ✅ Assert job was dispatched
public function test_order_dispatches_processing_job(): void
{
Queue::fake();
$order = Order::factory()->create();
$order->process();
Queue::assertPushed(ProcessOrder::class, function ($job) use ($order) {
return $job->order->id === $order->id;
});
}
// ✅ Assert chain
public function test_order_dispatches_chain(): void
{
Bus::fake();
$order = Order::factory()->create();
$order->fulfill();
Bus::assertChained([
ValidateOrder::class,
ChargePayment::class,
SendConfirmation::class,
]);
}
// ✅ Assert batch
public function test_import_dispatches_batch(): void
{
Bus::fake();
(new CsvImporter)->import($file);
Bus::assertBatched(function ($batch) {
return $batch->jobs->count() === 3
&& $batch->jobs->every(fn ($job) => $job instanceof ProcessCsvChunk);
});
}
// ✅ Execute job to test handler logic
public function test_process_order_charges_payment(): void
{
$order = Order::factory()->create();
ProcessOrder::dispatchSync($order);
$this->assertNotNull($order->fresh()->payment_id);
}php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Queue;
// ✅ 断言任务已分发
public function test_order_dispatches_processing_job(): void
{
Queue::fake();
$order = Order::factory()->create();
$order->process();
Queue::assertPushed(ProcessOrder::class, function ($job) use ($order) {
return $job->order->id === $order->id;
});
}
// ✅ 断言链式任务
public function test_order_dispatches_chain(): void
{
Bus::fake();
$order = Order::factory()->create();
$order->fulfill();
Bus::assertChained([
ValidateOrder::class,
ChargePayment::class,
SendConfirmation::class,
]);
}
// ✅ 断言批处理任务
public function test_import_dispatches_batch(): void
{
Bus::fake();
(new CsvImporter)->import($file);
Bus::assertBatched(function ($batch) {
return $batch->jobs->count() === 3
&& $batch->jobs->every(fn ($job) => $job instanceof ProcessCsvChunk);
});
}
// ✅ 执行任务以测试处理器逻辑
public function test_process_order_charges_payment(): void
{
$order = Order::factory()->create();
ProcessOrder::dispatchSync($order);
$this->assertNotNull($order->fresh()->payment_id);
}Checklist
检查清单
- Jobs implement ShouldQueue and use standard traits
- Jobs accept only serializable data (models, primitives)
- Retry strategy configured ($tries, $backoff, retryUntil)
- failed() method handles cleanup and notifications
- afterCommit() used when dispatching inside transactions
- Job middleware used for rate limiting and overlap prevention
- Chains used for sequential dependent operations
- Batches used for parallel independent operations
- Jobs are idempotent (safe to run multiple times)
- ShouldBeUnique used to prevent duplicate jobs
- Queue and Bus fakes used in tests
- 任务实现ShouldQueue并使用标准trait
- 任务仅接受可序列化数据(模型、基本类型)
- 已配置重试策略($tries、$backoff、retryUntil)
- failed()方法处理清理和通知
- 在事务内分发时使用afterCommit()
- 使用任务中间件进行限流和防止重叠
- 对顺序依赖操作使用链式调用
- 对并行独立操作使用批处理
- 任务具备幂等性(可安全多次执行)
- 使用ShouldBeUnique防止重复任务
- 测试中使用Queue和Bus假对象