oban-thinking

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Oban Thinking

Oban 思考

Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.

Oban任务处理的范式转换。这些见解可避免常见bug,指导你采用正确的实践模式。

Part 1: Oban (Non-Pro)

第一部分:Oban(非专业版)

The Iron Law: JSON Serialization

铁律:JSON序列化

JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
This single fact causes most Oban debugging headaches.
elixir
undefined
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
这一事实是Oban调试中大部分问题的根源。
elixir
undefined

Creating - atom keys are fine

Creating - atom keys are fine

MyWorker.new(%{user_id: 123})
MyWorker.new(%{user_id: 123})

Processing - must use string keys (JSON converted atoms to strings)

Processing - must use string keys (JSON converted atoms to strings)

def perform(%Oban.Job{args: %{"user_id" => user_id}}) do

...

end
undefined
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do

...

end
undefined

Error Handling: Let It Crash

错误处理:让它崩溃

Don't catch errors in Oban jobs. Let them bubble up to Oban for proper handling.
不要在Oban任务中捕获错误,让错误向上冒泡到Oban进行正确处理。

Why?

原因:

  1. Automatic logging: Oban logs the full error with stacktrace
  2. Automatic retries: Jobs retry with exponential backoff
  3. Visibility: Failed jobs appear in Oban Web dashboard
  4. Consistency: Error states are tracked in the database
  1. 自动日志记录:Oban会记录包含堆栈跟踪的完整错误信息
  2. 自动重试:任务会以指数退避策略进行重试
  3. 可见性:失败的任务会显示在Oban Web仪表板中
  4. 一致性:错误状态会在数据库中被跟踪

Anti-Pattern

反模式

elixir
undefined
elixir
undefined

Bad: Swallowing errors

Bad: Swallowing errors

def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> Logger.error("Failed: #{reason}") {:ok, :failed} # Silently marks as complete! end end
undefined
def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> Logger.error("Failed: #{reason}") {:ok, :failed} # Silently marks as complete! end end
undefined

Correct Pattern

正确模式

elixir
undefined
elixir
undefined

Good: Let errors propagate

Good: Let errors propagate

def perform(%Oban.Job{} = job) do result = do_work!(job.args) # Raises on failure {:ok, result} end
def perform(%Oban.Job{} = job) do result = do_work!(job.args) # Raises on failure {:ok, result} end

Or return error tuple - Oban treats as failure

Or return error tuple - Oban treats as failure

def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> {:error, reason} # Oban will retry end end
undefined
def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> {:error, reason} # Oban will retry end end
undefined

When to Catch Errors

何时捕获错误

Only catch errors when you need custom retry logic or want to mark a job as permanently failed:
elixir
def perform(%Oban.Job{} = job) do
  case external_api_call(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, :not_found} -> {:cancel, :resource_not_found}  # Don't retry
    {:error, :rate_limited} -> {:snooze, 60}  # Retry in 60 seconds
    {:error, _} -> {:error, :will_retry}  # Normal retry
  end
end
仅当你需要自定义重试逻辑或希望将任务标记为永久失败时,才捕获错误:
elixir
def perform(%Oban.Job{} = job) do
  case external_api_call(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, :not_found} -> {:cancel, :resource_not_found}  # Don't retry
    {:error, :rate_limited} -> {:snooze, 60}  # Retry in 60 seconds
    {:error, _} -> {:error, :will_retry}  # Normal retry
  end
end

Snoozing for Polling

轮询使用Snooze

Use
{:snooze, seconds}
for polling external state instead of manual retry logic:
elixir
def perform(%Oban.Job{} = job) do
  if external_thing_finished?(job.args) do
    {:ok, :done}
  else
    {:snooze, 5}  # Check again in 5 seconds
  end
end
使用
{:snooze, seconds}
来轮询外部状态,而非手动实现重试逻辑:
elixir
def perform(%Oban.Job{} = job) do
  if external_thing_finished?(job.args) do
    {:ok, :done}
  else
    {:snooze, 5}  # Check again in 5 seconds
  end
end

Simple Job Chaining

简单任务链

For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:
elixir
def perform(%Oban.Job{} = job) do
  result = do_work(job.args)
  # Enqueue next job on success
  NextWorker.new(%{data: result}) |> Oban.insert()
  {:ok, result}
end
Don't reach for Oban Pro Workflows for linear chains.
对于简单的顺序链(任务A → 任务B → 任务C),让每个任务在完成后将下一个任务加入队列:
elixir
def perform(%Oban.Job{} = job) do
  result = do_work(job.args)
  # Enqueue next job on success
  NextWorker.new(%{data: result}) |> Oban.insert()
  {:ok, result}
end
不要为线性任务链使用Oban Pro Workflows。

Unique Jobs

唯一任务

Prevent duplicate jobs with the
unique
option:
elixir
use Oban.Worker,
  queue: :default,
  unique: [period: 60]  # Only one job with same args per 60 seconds
使用
unique
选项防止重复任务:
elixir
use Oban.Worker,
  queue: :default,
  unique: [period: 60]  # Only one job with same args per 60 seconds

Or scope uniqueness to specific fields

Or scope uniqueness to specific fields

unique: [period: 300, keys: [:user_id]]

**Gotcha:** Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.
unique: [period: 300, keys: [:user_id]]

**注意事项**:唯一性是在插入时检查的,而非执行时。如果两个相同的任务在61秒后插入,它们都会被执行。

High Throughput: Chunking

高吞吐量:分块处理

For millions of records, chunk work into batches rather than one job per item:
elixir
undefined
对于数百万条记录,将工作拆分为批次,而非为每个条目创建一个任务:
elixir
undefined

Bad: One job per contact (millions of jobs = database strain)

Bad: One job per contact (millions of jobs = database strain)

Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())

Good: Chunk into batches

Good: Chunk into batches

contacts |> Enum.chunk_every(100) |> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())

Use bulk inserts without uniqueness constraints for maximum throughput.

---
contacts |> Enum.chunk_every(100) |> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())

在不设置唯一性约束的情况下使用批量插入,以实现最大吞吐量。

---

Part 2: Oban Pro

第二部分:Oban Pro

Cascade Context: Erlang Term Serialization

级联上下文:Erlang术语序列化

Unlike regular job args, cascade context preserves atoms:
elixir
undefined
与常规任务参数不同,级联上下文会保留原子
elixir
undefined

Creating - atom keys

Creating - atom keys

Workflow.put_context(%{score_run_id: id})
Workflow.put_context(%{score_run_id: id})

Processing - atom keys still work!

Processing - atom keys still work!

def my_cascade(%{score_run_id: id}) do

...

end
def my_cascade(%{score_run_id: id}) do

...

end

Dot notation works too

Dot notation works too

def later_step(context) do context.score_run_id context.previous_result end
undefined
def later_step(context) do context.score_run_id context.previous_result end
undefined

Serialization Summary

序列化总结

CreatingProcessing
Regular jobsatoms okstrings only
Cascade contextatoms okatoms ok
创建时处理时
常规任务允许使用原子键仅能使用字符串键
级联上下文允许使用原子键允许使用原子键

When to Use Workflows

何时使用Workflows

Reserve Workflows for:
  • Complex dependency graphs (not just linear chains)
  • Fan-out/fan-in patterns
  • When you need recorded values across steps
  • Conditional branching based on runtime state
Don't use Workflows for simple A → B → C chains.
仅在以下场景使用Workflows:
  • 复杂的依赖图(而非简单的线性链)
  • 扇出/扇入模式
  • 需要跨步骤共享记录值时
  • 根据运行时状态进行条件分支时
不要为简单的A→B→C线性任务链使用Workflows。

Workflow Composition with Graft

使用Graft组合工作流

When you need a parent workflow to wait for a sub-workflow to complete before continuing, use
add_graft
instead of
add_workflow
.
当你需要父工作流等待子工作流完成后再继续时,使用
add_graft
而非
add_workflow

Key Differences

核心差异

MethodSub-workflow completes before deps run?Output accessible?
add_workflow
No - just inserts jobsNo
add_graft
Yes - waits for all jobsYes, via recorded values
方法子工作流完成后才运行依赖任务?输出可访问?
add_workflow
否 - 仅插入任务
add_graft
是 - 等待所有任务完成是,可通过记录值访问

Pattern: Composing Independent Concerns

模式:组合独立关注点

Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:
elixir
undefined
不要将无关的关注点(如通知)与领域特定工作流(如评分)耦合。相反,创建一个更高级别的编排器:
elixir
undefined

Bad: Notification logic buried in AggregateScores

Bad: Notification logic buried in AggregateScores

defmodule AggregateScores do def workflow(score_run_id) do Workflow.new() |> Workflow.add(:aggregate, AggregateJob.new(...)) |> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place! end end
defmodule AggregateScores do def workflow(score_run_id) do Workflow.new() |> Workflow.add(:aggregate, AggregateJob.new(...)) |> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place! end end

Good: Higher-level workflow composes scoring + notification

Good: Higher-level workflow composes scoring + notification

defmodule FullRunWithNotifications do def workflow(site_url, opts) do notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do # Sub-workflow doesn't know about notifications FullRun.workflow(context.site_url, context.opts) |> Workflow.apply_graft() |> Oban.insert_all() end end
undefined
defmodule FullRunWithNotifications do def workflow(site_url, opts) do notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do # Sub-workflow doesn't know about notifications FullRun.workflow(context.site_url, context.opts) |> Workflow.apply_graft() |> Oban.insert_all() end end
undefined

Recording Values for Dependent Steps

为依赖步骤记录值

For a grafted workflow's output to be available to dependent steps, the final job must use
recorded: true
:
elixir
defmodule FinalJob do
  use Oban.Pro.Worker, queue: :default, recorded: true

  def perform(%Oban.Job{} = job) do
    # Return value becomes available in context
    {:ok, %{score_run_id: score_run_id, composite_score: score}}
  end
end
要让嫁接工作流的输出能被依赖步骤访问,最终任务必须使用
recorded: true
elixir
defmodule FinalJob do
  use Oban.Pro.Worker, queue: :default, recorded: true

  def perform(%Oban.Job{} = job) do
    # Return value becomes available in context
    {:ok, %{score_run_id: score_run_id, composite_score: score}}
  end
end

Dynamic Workflow Appending

动态追加工作流

Add jobs to a running workflow with
Workflow.append/2
:
elixir
def perform(%Oban.Job{} = job) do
  if needs_extra_step?(job.args) do
    job
    |> Workflow.append()
    |> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
    |> Oban.insert_all()
  end
  {:ok, :done}
end
Caveat: Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.
使用
Workflow.append/2
向运行中的工作流添加任务:
elixir
def perform(%Oban.Job{} = job) do
  if needs_extra_step?(job.args) do
    job
    |> Workflow.append()
    |> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
    |> Oban.insert_all()
  end
  {:ok, :done}
end
注意:无法覆盖上下文或为已在运行的任务添加依赖。对于复杂的动态场景,可在任务本身中检查外部状态。

Fan-Out/Fan-In with Batches

使用批处理实现扇出/扇入

To run a final job after multiple paginated workflows complete, use Batch callbacks:
elixir
undefined
要在多个分页工作流完成后运行最终任务,使用Batch回调:
elixir
undefined

Wrap workflows in a shared batch

Wrap workflows in a shared batch

batch_id = "import-#{import_id}"
pages |> Enum.each(fn page -> PageWorkflow.workflow(page) |> Batch.from_workflow(batch_id: batch_id) |> Oban.insert_all() end)
batch_id = "import-#{import_id}"
pages |> Enum.each(fn page -> PageWorkflow.workflow(page) |> Batch.from_workflow(batch_id: batch_id) |> Oban.insert_all() end)

Add completion callback

Add completion callback

Batch.new(batch_id: batch_id) |> Batch.add_callback(:completed, CompletionWorker) |> Oban.insert()

**Tip:** Include pagination workers in the batch to prevent premature completion.
Batch.new(batch_id: batch_id) |> Batch.add_callback(:completed, CompletionWorker) |> Oban.insert()

**提示**:将分页工作者加入批处理,以避免提前触发完成回调。

Testing Workflows

测试工作流

Don't use inline testing mode - workflows need database interaction.
elixir
undefined
不要使用内联测试模式 - 工作流需要与数据库交互。
elixir
undefined

Use run_workflow/1 for integration tests

Use run_workflow/1 for integration tests

assert %{completed: 3} = Workflow.new() |> Workflow.add(:a, WorkerA.new(%{})) |> Workflow.add(:b, WorkerB.new(%{}), deps: [:a]) |> Workflow.add(:c, WorkerC.new(%{}), deps: [:b]) |> run_workflow()

For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.

---
assert %{completed: 3} = Workflow.new() |> Workflow.add(:a, WorkerA.new(%{})) |> Workflow.add(:b, WorkerB.new(%{}), deps: [:a]) |> Workflow.add(:c, WorkerC.new(%{}), deps: [:b]) |> run_workflow()

要测试工作者之间的记录值,可插入带有预填充元数据的前置任务。

---

Red Flags - STOP and Reconsider

危险信号 - 立即停止并重新考虑

Non-Pro:
  • Pattern matching on atom keys in
    perform/1
  • Catching all errors and returning
    {:ok, _}
  • Wrapping job logic in try/rescue
  • Creating one job per item when processing millions of records
Pro:
  • Using
    add_workflow
    when you need to wait for completion
  • Coupling notifications/emails to domain workflows
  • Not using
    recorded: true
    when you need output from grafted workflows
  • Using Workflows for simple linear job chains
  • Testing workflows with inline mode
Any of these? Re-read the serialization rules.
非专业版:
  • perform/1
    中对原子键进行模式匹配
  • 捕获所有错误并返回
    {:ok, _}
  • 将任务逻辑包裹在try/rescue中
  • 处理数百万条记录时,为每个条目创建一个任务
专业版:
  • 需要等待完成时使用
    add_workflow
  • 将通知/邮件与领域工作流耦合
  • 需要从嫁接工作流获取输出时未使用
    recorded: true
  • 为简单的线性任务链使用Workflows
  • 使用内联模式测试工作流
出现以上任何一种情况?请重新阅读序列化规则。