oban-thinking

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Oban Thinking

Oban 思考

Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.

Oban任务处理的范式转变。这些洞察可避免常见错误,指导正确的实践模式。

Part 1: Oban (Non-Pro)

第一部分:Oban(非Pro版)

The Iron Law: JSON Serialization

铁律:JSON序列化

JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
This single fact causes most Oban debugging headaches.
elixir
undefined
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
这一事实是大多数Oban调试难题的根源。
elixir
undefined

Creating - atom keys are fine

创建任务 - 使用原子键没问题

MyWorker.new(%{user_id: 123})
MyWorker.new(%{user_id: 123})

Processing - must use string keys (JSON converted atoms to strings)

处理任务 - 必须使用字符串键(JSON会将原子转换为字符串)

def perform(%Oban.Job{args: %{"user_id" => user_id}}) do

...

end
undefined
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do

...

end
undefined

Error Handling: Let It Crash

错误处理:让它崩溃

Don't catch errors in Oban jobs. Let them bubble up to Oban for proper handling.
不要在Oban任务中捕获错误。 让错误向上冒泡到Oban进行正确处理。

Why?

原因:

  1. Automatic logging: Oban logs the full error with stacktrace
  2. Automatic retries: Jobs retry with exponential backoff
  3. Visibility: Failed jobs appear in Oban Web dashboard
  4. Consistency: Error states are tracked in the database
  1. 自动日志记录:Oban会记录包含堆栈跟踪的完整错误信息
  2. 自动重试:任务会以指数退避策略重试
  3. 可见性:失败的任务会显示在Oban Web控制台中
  4. 一致性:错误状态会在数据库中被追踪

Anti-Pattern

反模式

elixir
undefined
elixir
undefined

Bad: Swallowing errors

错误示例:吞掉错误

def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> Logger.error("Failed: #{reason}") {:ok, :failed} # Silently marks as complete! end end
undefined
def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> Logger.error("Failed: #{reason}") {:ok, :failed} # 静默标记为完成! end end
undefined

Correct Pattern

正确模式

elixir
undefined
elixir
undefined

Good: Let errors propagate

正确示例:让错误传播

def perform(%Oban.Job{} = job) do result = do_work!(job.args) # Raises on failure {:ok, result} end
def perform(%Oban.Job{} = job) do result = do_work!(job.args) # 失败时抛出异常 {:ok, result} end

Or return error tuple - Oban treats as failure

或者返回错误元组 - Oban会将其视为失败

def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> {:error, reason} # Oban will retry end end
undefined
def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> {:error, reason} # Oban会重试 end end
undefined

When to Catch Errors

何时捕获错误

Only catch errors when you need custom retry logic or want to mark a job as permanently failed:
elixir
def perform(%Oban.Job{} = job) do
  case external_api_call(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, :not_found} -> {:cancel, :resource_not_found}  # Don't retry
    {:error, :rate_limited} -> {:snooze, 60}  # Retry in 60 seconds
    {:error, _} -> {:error, :will_retry}  # Normal retry
  end
end
仅当你需要自定义重试逻辑或希望将任务标记为永久失败时才捕获错误:
elixir
def perform(%Oban.Job{} = job) do
  case external_api_call(job.args) do
    {:ok, result} -> {:ok, result}
    {:error, :not_found} -> {:cancel, :resource_not_found}  # 不再重试
    {:error, :rate_limited} -> {:snooze, 60}  # 60秒后重试
    {:error, _} -> {:error, :will_retry}  # 正常重试
  end
end

Snoozing for Polling

轮询时使用Snooze

Use
{:snooze, seconds}
for polling external state instead of manual retry logic:
elixir
def perform(%Oban.Job{} = job) do
  if external_thing_finished?(job.args) do
    {:ok, :done}
  else
    {:snooze, 5}  # Check again in 5 seconds
  end
end
使用
{:snooze, seconds}
来轮询外部状态,而非手动重试逻辑:
elixir
def perform(%Oban.Job{} = job) do
  if external_thing_finished?(job.args) do
    {:ok, :done}
  else
    {:snooze, 5}  # 5秒后再次检查
  end
end

Simple Job Chaining

简单任务链式调用

For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:
elixir
def perform(%Oban.Job{} = job) do
  result = do_work(job.args)
  # Enqueue next job on success
  NextWorker.new(%{data: result}) |> Oban.insert()
  {:ok, result}
end
Don't reach for Oban Pro Workflows for linear chains.
对于简单的顺序链式任务(JobA → JobB → JobC),让每个任务在执行完成后入队下一个任务:
elixir
def perform(%Oban.Job{} = job) do
  result = do_work(job.args)
  # 成功时入队下一个任务
  NextWorker.new(%{data: result}) |> Oban.insert()
  {:ok, result}
end
不要为线性链式任务使用Oban Pro Workflows。

Unique Jobs

唯一任务

Prevent duplicate jobs with the
unique
option:
elixir
use Oban.Worker,
  queue: :default,
  unique: [period: 60]  # Only one job with same args per 60 seconds
使用
unique
选项防止重复任务:
elixir
use Oban.Worker,
  queue: :default,
  unique: [period: 60]  # 每60秒内,相同参数的任务仅允许一个

Or scope uniqueness to specific fields

或者将唯一性限定在特定字段

unique: [period: 300, keys: [:user_id]]

**Gotcha:** Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.
unique: [period: 300, keys: [:user_id]]

**注意:** 唯一性是在插入时检查,而非执行时。如果两个相同的任务在61秒后插入,它们都会被执行。

High Throughput: Chunking

高吞吐量:分块处理

For millions of records, chunk work into batches rather than one job per item:
elixir
undefined
对于数百万条记录,将工作拆分为批次,而非为每个条目创建一个任务:
elixir
undefined

Bad: One job per contact (millions of jobs = database strain)

错误示例:每个联系人对应一个任务(数百万个任务会给数据库带来压力)

Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())

Good: Chunk into batches

正确示例:拆分为批次

contacts |> Enum.chunk_every(100) |> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())

Use bulk inserts without uniqueness constraints for maximum throughput.

---
contacts |> Enum.chunk_every(100) |> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())

在没有唯一性约束的情况下使用批量插入以获得最大吞吐量。

---

Part 2: Oban Pro

第二部分:Oban Pro

Cascade Context: Erlang Term Serialization

级联上下文:Erlang Term序列化

Unlike regular job args, cascade context preserves atoms:
elixir
undefined
与常规任务参数不同,级联上下文会保留原子
elixir
undefined

Creating - atom keys

创建时 - 使用原子键

Workflow.put_context(%{score_run_id: id})
Workflow.put_context(%{score_run_id: id})

Processing - atom keys still work!

处理时 - 原子键仍然可用!

def my_cascade(%{score_run_id: id}) do

...

end
def my_cascade(%{score_run_id: id}) do

...

end

Dot notation works too

点语法也适用

def later_step(context) do context.score_run_id context.previous_result end
undefined
def later_step(context) do context.score_run_id context.previous_result end
undefined

Serialization Summary

序列化总结

CreatingProcessing
Regular jobsatoms okstrings only
Cascade contextatoms okatoms ok
创建时处理时
常规任务允许使用原子仅允许字符串
级联上下文允许使用原子允许使用原子

When to Use Workflows

何时使用Workflows

Reserve Workflows for:
  • Complex dependency graphs (not just linear chains)
  • Fan-out/fan-in patterns
  • When you need recorded values across steps
  • Conditional branching based on runtime state
Don't use Workflows for simple A → B → C chains.
仅在以下场景使用Workflows:
  • 复杂的依赖图(不仅仅是线性链式任务)
  • 扇出/扇入模式
  • 需要跨步骤共享记录值时
  • 根据运行时状态进行条件分支时
不要为简单的A → B → C链式任务使用Workflows。

Workflow Composition with Graft

使用Graft组合工作流

When you need a parent workflow to wait for a sub-workflow to complete before continuing, use
add_graft
instead of
add_workflow
.
当父工作流需要等待子工作流完成后再继续时,使用
add_graft
而非
add_workflow

Key Differences

核心差异

MethodSub-workflow completes before deps run?Output accessible?
add_workflow
No - just inserts jobsNo
add_graft
Yes - waits for all jobsYes, via recorded values
方法子工作流完成后才运行依赖任务?输出是否可访问?
add_workflow
否 - 仅插入任务
add_graft
是 - 等待所有任务完成是,可通过记录值访问

Pattern: Composing Independent Concerns

模式:组合独立关注点

Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:
elixir
undefined
不要将无关的关注点(例如通知)与特定领域的工作流(例如评分)耦合。相反,创建一个更高层级的编排器:
elixir
undefined

Bad: Notification logic buried in AggregateScores

错误示例:通知逻辑嵌入在AggregateScores中

defmodule AggregateScores do def workflow(score_run_id) do Workflow.new() |> Workflow.add(:aggregate, AggregateJob.new(...)) |> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place! end end
defmodule AggregateScores do def workflow(score_run_id) do Workflow.new() |> Workflow.add(:aggregate, AggregateJob.new(...)) |> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # 位置错误! end end

Good: Higher-level workflow composes scoring + notification

正确示例:高层级工作流组合评分+通知

defmodule FullRunWithNotifications do def workflow(site_url, opts) do notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do # Sub-workflow doesn't know about notifications FullRun.workflow(context.site_url, context.opts) |> Workflow.apply_graft() |> Oban.insert_all() end end
undefined
defmodule FullRunWithNotifications do def workflow(site_url, opts) do notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do # 子工作流无需知晓通知逻辑 FullRun.workflow(context.site_url, context.opts) |> Workflow.apply_graft() |> Oban.insert_all() end end
undefined

Recording Values for Dependent Steps

为依赖步骤记录值

For a grafted workflow's output to be available to dependent steps, the final job must use
recorded: true
:
elixir
defmodule FinalJob do
  use Oban.Pro.Worker, queue: :default, recorded: true

  def perform(%Oban.Job{} = job) do
    # Return value becomes available in context
    {:ok, %{score_run_id: score_run_id, composite_score: score}}
  end
end
要让嫁接工作流的输出对依赖步骤可用,最终任务必须使用
recorded: true
elixir
defmodule FinalJob do
  use Oban.Pro.Worker, queue: :default, recorded: true

  def perform(%Oban.Job{} = job) do
    # 返回值会在上下文中可用
    {:ok, %{score_run_id: score_run_id, composite_score: score}}
  end
end

Dynamic Workflow Appending

动态追加工作流任务

Add jobs to a running workflow with
Workflow.append/2
:
elixir
def perform(%Oban.Job{} = job) do
  if needs_extra_step?(job.args) do
    job
    |> Workflow.append()
    |> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
    |> Oban.insert_all()
  end
  {:ok, :done}
end
Caveat: Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.
使用
Workflow.append/2
向运行中的工作流添加任务:
elixir
def perform(%Oban.Job{} = job) do
  if needs_extra_step?(job.args) do
    job
    |> Workflow.append()
    |> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
    |> Oban.insert_all()
  end
  {:ok, :done}
end
注意: 无法覆盖上下文或向已在运行的任务添加依赖。对于复杂的动态场景,可在任务本身中检查外部状态。

Fan-Out/Fan-In with Batches

使用批处理实现扇出/扇入

To run a final job after multiple paginated workflows complete, use Batch callbacks:
elixir
undefined
要在多个分页工作流完成后运行最终任务,使用Batch回调:
elixir
undefined

Wrap workflows in a shared batch

将工作流包装在共享批处理中

batch_id = "import-#{import_id}"
pages |> Enum.each(fn page -> PageWorkflow.workflow(page) |> Batch.from_workflow(batch_id: batch_id) |> Oban.insert_all() end)
batch_id = "import-#{import_id}"
pages |> Enum.each(fn page -> PageWorkflow.workflow(page) |> Batch.from_workflow(batch_id: batch_id) |> Oban.insert_all() end)

Add completion callback

添加完成回调

Batch.new(batch_id: batch_id) |> Batch.add_callback(:completed, CompletionWorker) |> Oban.insert()

**Tip:** Include pagination workers in the batch to prevent premature completion.
Batch.new(batch_id: batch_id) |> Batch.add_callback(:completed, CompletionWorker) |> Oban.insert()

**提示:** 将分页Worker包含在批处理中,以避免提前触发完成回调。

Testing Workflows

测试工作流

Don't use inline testing mode - workflows need database interaction.
elixir
undefined
不要使用内联测试模式 - 工作流需要与数据库交互。
elixir
undefined

Use run_workflow/1 for integration tests

在集成测试中使用run_workflow/1

assert %{completed: 3} = Workflow.new() |> Workflow.add(:a, WorkerA.new(%{})) |> Workflow.add(:b, WorkerB.new(%{}), deps: [:a]) |> Workflow.add(:c, WorkerC.new(%{}), deps: [:b]) |> run_workflow()

For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.

---
assert %{completed: 3} = Workflow.new() |> Workflow.add(:a, WorkerA.new(%{})) |> Workflow.add(:b, WorkerB.new(%{}), deps: [:a]) |> Workflow.add(:c, WorkerC.new(%{}), deps: [:b]) |> run_workflow()

要测试Worker之间的记录值,可插入带有预填充元数据的前置任务。

---

Red Flags - STOP and Reconsider

危险信号 - 立即停止并重新考虑

Non-Pro:
  • Pattern matching on atom keys in
    perform/1
  • Catching all errors and returning
    {:ok, _}
  • Wrapping job logic in try/rescue
  • Creating one job per item when processing millions of records
Pro:
  • Using
    add_workflow
    when you need to wait for completion
  • Coupling notifications/emails to domain workflows
  • Not using
    recorded: true
    when you need output from grafted workflows
  • Using Workflows for simple linear job chains
  • Testing workflows with inline mode
Any of these? Re-read the serialization rules.
非Pro版:
  • perform/1
    中对原子键进行模式匹配
  • 捕获所有错误并返回
    {:ok, _}
  • 将任务逻辑包裹在try/rescue中
  • 处理数百万条记录时为每个条目创建一个任务
Pro版:
  • 需要等待完成时使用
    add_workflow
  • 将通知/邮件与领域工作流耦合
  • 需要嫁接工作流的输出时未使用
    recorded: true
  • 为简单的线性任务链使用Workflows
  • 使用内联模式测试工作流
出现以上任何情况?请重新阅读序列化规则。