oban-thinking
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseOban Thinking
Oban 思考
Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.
Oban任务处理的范式转换。这些见解可避免常见bug,指导你采用正确的实践模式。
Part 1: Oban (Non-Pro)
第一部分:Oban(非专业版)
The Iron Law: JSON Serialization
铁律:JSON序列化
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.This single fact causes most Oban debugging headaches.
elixir
undefinedJOB ARGS ARE JSON. ATOMS BECOME STRINGS.这一事实是Oban调试中大部分问题的根源。
elixir
undefinedCreating - atom keys are fine
Creating - atom keys are fine
MyWorker.new(%{user_id: 123})
MyWorker.new(%{user_id: 123})
Processing - must use string keys (JSON converted atoms to strings)
Processing - must use string keys (JSON converted atoms to strings)
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
...
end
undefineddef perform(%Oban.Job{args: %{"user_id" => user_id}}) do
...
end
undefinedError Handling: Let It Crash
错误处理:让它崩溃
Don't catch errors in Oban jobs. Let them bubble up to Oban for proper handling.
不要在Oban任务中捕获错误,让错误向上冒泡到Oban进行正确处理。
Why?
原因:
- Automatic logging: Oban logs the full error with stacktrace
- Automatic retries: Jobs retry with exponential backoff
- Visibility: Failed jobs appear in Oban Web dashboard
- Consistency: Error states are tracked in the database
- 自动日志记录:Oban会记录包含堆栈跟踪的完整错误信息
- 自动重试:任务会以指数退避策略进行重试
- 可见性:失败的任务会显示在Oban Web仪表板中
- 一致性:错误状态会在数据库中被跟踪
Anti-Pattern
反模式
elixir
undefinedelixir
undefinedBad: Swallowing errors
Bad: Swallowing errors
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} ->
Logger.error("Failed: #{reason}")
{:ok, :failed} # Silently marks as complete!
end
end
undefineddef perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} ->
Logger.error("Failed: #{reason}")
{:ok, :failed} # Silently marks as complete!
end
end
undefinedCorrect Pattern
正确模式
elixir
undefinedelixir
undefinedGood: Let errors propagate
Good: Let errors propagate
def perform(%Oban.Job{} = job) do
result = do_work!(job.args) # Raises on failure
{:ok, result}
end
def perform(%Oban.Job{} = job) do
result = do_work!(job.args) # Raises on failure
{:ok, result}
end
Or return error tuple - Oban treats as failure
Or return error tuple - Oban treats as failure
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} -> {:error, reason} # Oban will retry
end
end
undefineddef perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} -> {:error, reason} # Oban will retry
end
end
undefinedWhen to Catch Errors
何时捕获错误
Only catch errors when you need custom retry logic or want to mark a job as permanently failed:
elixir
def perform(%Oban.Job{} = job) do
case external_api_call(job.args) do
{:ok, result} -> {:ok, result}
{:error, :not_found} -> {:cancel, :resource_not_found} # Don't retry
{:error, :rate_limited} -> {:snooze, 60} # Retry in 60 seconds
{:error, _} -> {:error, :will_retry} # Normal retry
end
end仅当你需要自定义重试逻辑或希望将任务标记为永久失败时,才捕获错误:
elixir
def perform(%Oban.Job{} = job) do
case external_api_call(job.args) do
{:ok, result} -> {:ok, result}
{:error, :not_found} -> {:cancel, :resource_not_found} # Don't retry
{:error, :rate_limited} -> {:snooze, 60} # Retry in 60 seconds
{:error, _} -> {:error, :will_retry} # Normal retry
end
endSnoozing for Polling
轮询使用Snooze
Use for polling external state instead of manual retry logic:
{:snooze, seconds}elixir
def perform(%Oban.Job{} = job) do
if external_thing_finished?(job.args) do
{:ok, :done}
else
{:snooze, 5} # Check again in 5 seconds
end
end使用来轮询外部状态,而非手动实现重试逻辑:
{:snooze, seconds}elixir
def perform(%Oban.Job{} = job) do
if external_thing_finished?(job.args) do
{:ok, :done}
else
{:snooze, 5} # Check again in 5 seconds
end
endSimple Job Chaining
简单任务链
For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:
elixir
def perform(%Oban.Job{} = job) do
result = do_work(job.args)
# Enqueue next job on success
NextWorker.new(%{data: result}) |> Oban.insert()
{:ok, result}
endDon't reach for Oban Pro Workflows for linear chains.
对于简单的顺序链(任务A → 任务B → 任务C),让每个任务在完成后将下一个任务加入队列:
elixir
def perform(%Oban.Job{} = job) do
result = do_work(job.args)
# Enqueue next job on success
NextWorker.new(%{data: result}) |> Oban.insert()
{:ok, result}
end不要为线性任务链使用Oban Pro Workflows。
Unique Jobs
唯一任务
Prevent duplicate jobs with the option:
uniqueelixir
use Oban.Worker,
queue: :default,
unique: [period: 60] # Only one job with same args per 60 seconds使用选项防止重复任务:
uniqueelixir
use Oban.Worker,
queue: :default,
unique: [period: 60] # Only one job with same args per 60 secondsOr scope uniqueness to specific fields
Or scope uniqueness to specific fields
unique: [period: 300, keys: [:user_id]]
**Gotcha:** Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.unique: [period: 300, keys: [:user_id]]
**注意事项**:唯一性是在插入时检查的,而非执行时。如果两个相同的任务在61秒后插入,它们都会被执行。High Throughput: Chunking
高吞吐量:分块处理
For millions of records, chunk work into batches rather than one job per item:
elixir
undefined对于数百万条记录,将工作拆分为批次,而非为每个条目创建一个任务:
elixir
undefinedBad: One job per contact (millions of jobs = database strain)
Bad: One job per contact (millions of jobs = database strain)
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
Good: Chunk into batches
Good: Chunk into batches
contacts
|> Enum.chunk_every(100)
|> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())
Use bulk inserts without uniqueness constraints for maximum throughput.
---contacts
|> Enum.chunk_every(100)
|> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())
在不设置唯一性约束的情况下使用批量插入,以实现最大吞吐量。
---Part 2: Oban Pro
第二部分:Oban Pro
Cascade Context: Erlang Term Serialization
级联上下文:Erlang术语序列化
Unlike regular job args, cascade context preserves atoms:
elixir
undefined与常规任务参数不同,级联上下文会保留原子:
elixir
undefinedCreating - atom keys
Creating - atom keys
Workflow.put_context(%{score_run_id: id})
Workflow.put_context(%{score_run_id: id})
Processing - atom keys still work!
Processing - atom keys still work!
def my_cascade(%{score_run_id: id}) do
...
end
def my_cascade(%{score_run_id: id}) do
...
end
Dot notation works too
Dot notation works too
def later_step(context) do
context.score_run_id
context.previous_result
end
undefineddef later_step(context) do
context.score_run_id
context.previous_result
end
undefinedSerialization Summary
序列化总结
| Creating | Processing | |
|---|---|---|
| Regular jobs | atoms ok | strings only |
| Cascade context | atoms ok | atoms ok |
| 创建时 | 处理时 | |
|---|---|---|
| 常规任务 | 允许使用原子键 | 仅能使用字符串键 |
| 级联上下文 | 允许使用原子键 | 允许使用原子键 |
When to Use Workflows
何时使用Workflows
Reserve Workflows for:
- Complex dependency graphs (not just linear chains)
- Fan-out/fan-in patterns
- When you need recorded values across steps
- Conditional branching based on runtime state
Don't use Workflows for simple A → B → C chains.
仅在以下场景使用Workflows:
- 复杂的依赖图(而非简单的线性链)
- 扇出/扇入模式
- 需要跨步骤共享记录值时
- 根据运行时状态进行条件分支时
不要为简单的A→B→C线性任务链使用Workflows。
Workflow Composition with Graft
使用Graft组合工作流
When you need a parent workflow to wait for a sub-workflow to complete before continuing, use instead of .
add_graftadd_workflow当你需要父工作流等待子工作流完成后再继续时,使用而非。
add_graftadd_workflowKey Differences
核心差异
| Method | Sub-workflow completes before deps run? | Output accessible? |
|---|---|---|
| No - just inserts jobs | No |
| Yes - waits for all jobs | Yes, via recorded values |
| 方法 | 子工作流完成后才运行依赖任务? | 输出可访问? |
|---|---|---|
| 否 - 仅插入任务 | 否 |
| 是 - 等待所有任务完成 | 是,可通过记录值访问 |
Pattern: Composing Independent Concerns
模式:组合独立关注点
Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:
elixir
undefined不要将无关的关注点(如通知)与领域特定工作流(如评分)耦合。相反,创建一个更高级别的编排器:
elixir
undefinedBad: Notification logic buried in AggregateScores
Bad: Notification logic buried in AggregateScores
defmodule AggregateScores do
def workflow(score_run_id) do
Workflow.new()
|> Workflow.add(:aggregate, AggregateJob.new(...))
|> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place!
end
end
defmodule AggregateScores do
def workflow(score_run_id) do
Workflow.new()
|> Workflow.add(:aggregate, AggregateJob.new(...))
|> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place!
end
end
Good: Higher-level workflow composes scoring + notification
Good: Higher-level workflow composes scoring + notification
defmodule FullRunWithNotifications do
def workflow(site_url, opts) do
notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)end
defp graft_full_run(context) do
# Sub-workflow doesn't know about notifications
FullRun.workflow(context.site_url, context.opts)
|> Workflow.apply_graft()
|> Oban.insert_all()
end
end
undefineddefmodule FullRunWithNotifications do
def workflow(site_url, opts) do
notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)end
defp graft_full_run(context) do
# Sub-workflow doesn't know about notifications
FullRun.workflow(context.site_url, context.opts)
|> Workflow.apply_graft()
|> Oban.insert_all()
end
end
undefinedRecording Values for Dependent Steps
为依赖步骤记录值
For a grafted workflow's output to be available to dependent steps, the final job must use :
recorded: trueelixir
defmodule FinalJob do
use Oban.Pro.Worker, queue: :default, recorded: true
def perform(%Oban.Job{} = job) do
# Return value becomes available in context
{:ok, %{score_run_id: score_run_id, composite_score: score}}
end
end要让嫁接工作流的输出能被依赖步骤访问,最终任务必须使用:
recorded: trueelixir
defmodule FinalJob do
use Oban.Pro.Worker, queue: :default, recorded: true
def perform(%Oban.Job{} = job) do
# Return value becomes available in context
{:ok, %{score_run_id: score_run_id, composite_score: score}}
end
endDynamic Workflow Appending
动态追加工作流
Add jobs to a running workflow with :
Workflow.append/2elixir
def perform(%Oban.Job{} = job) do
if needs_extra_step?(job.args) do
job
|> Workflow.append()
|> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
|> Oban.insert_all()
end
{:ok, :done}
endCaveat: Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.
使用向运行中的工作流添加任务:
Workflow.append/2elixir
def perform(%Oban.Job{} = job) do
if needs_extra_step?(job.args) do
job
|> Workflow.append()
|> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
|> Oban.insert_all()
end
{:ok, :done}
end注意:无法覆盖上下文或为已在运行的任务添加依赖。对于复杂的动态场景,可在任务本身中检查外部状态。
Fan-Out/Fan-In with Batches
使用批处理实现扇出/扇入
To run a final job after multiple paginated workflows complete, use Batch callbacks:
elixir
undefined要在多个分页工作流完成后运行最终任务,使用Batch回调:
elixir
undefinedWrap workflows in a shared batch
Wrap workflows in a shared batch
batch_id = "import-#{import_id}"
pages
|> Enum.each(fn page ->
PageWorkflow.workflow(page)
|> Batch.from_workflow(batch_id: batch_id)
|> Oban.insert_all()
end)
batch_id = "import-#{import_id}"
pages
|> Enum.each(fn page ->
PageWorkflow.workflow(page)
|> Batch.from_workflow(batch_id: batch_id)
|> Oban.insert_all()
end)
Add completion callback
Add completion callback
Batch.new(batch_id: batch_id)
|> Batch.add_callback(:completed, CompletionWorker)
|> Oban.insert()
**Tip:** Include pagination workers in the batch to prevent premature completion.Batch.new(batch_id: batch_id)
|> Batch.add_callback(:completed, CompletionWorker)
|> Oban.insert()
**提示**:将分页工作者加入批处理,以避免提前触发完成回调。Testing Workflows
测试工作流
Don't use inline testing mode - workflows need database interaction.
elixir
undefined不要使用内联测试模式 - 工作流需要与数据库交互。
elixir
undefinedUse run_workflow/1 for integration tests
Use run_workflow/1 for integration tests
assert %{completed: 3} =
Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{}))
|> Workflow.add(:b, WorkerB.new(%{}), deps: [:a])
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:b])
|> run_workflow()
For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.
---assert %{completed: 3} =
Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{}))
|> Workflow.add(:b, WorkerB.new(%{}), deps: [:a])
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:b])
|> run_workflow()
要测试工作者之间的记录值,可插入带有预填充元数据的前置任务。
---Red Flags - STOP and Reconsider
危险信号 - 立即停止并重新考虑
Non-Pro:
- Pattern matching on atom keys in
perform/1 - Catching all errors and returning
{:ok, _} - Wrapping job logic in try/rescue
- Creating one job per item when processing millions of records
Pro:
- Using when you need to wait for completion
add_workflow - Coupling notifications/emails to domain workflows
- Not using when you need output from grafted workflows
recorded: true - Using Workflows for simple linear job chains
- Testing workflows with inline mode
Any of these? Re-read the serialization rules.
非专业版:
- 在中对原子键进行模式匹配
perform/1 - 捕获所有错误并返回
{:ok, _} - 将任务逻辑包裹在try/rescue中
- 处理数百万条记录时,为每个条目创建一个任务
专业版:
- 需要等待完成时使用
add_workflow - 将通知/邮件与领域工作流耦合
- 需要从嫁接工作流获取输出时未使用
recorded: true - 为简单的线性任务链使用Workflows
- 使用内联模式测试工作流
出现以上任何一种情况?请重新阅读序列化规则。