Loading...
Loading...
This skill should be used when the user asks to "add a background job", "process async", "schedule a task", "retry failed jobs", "add email sending", "run this later", "add a cron job", "unique jobs", "batch process", or mentions Oban, Oban Pro, workflows, job queues, cascades, grafting, recorded values, job args, or troubleshooting job failures.
npx skill4agent add georgeguimaraes/claude-code-elixir oban-thinkingJOB ARGS ARE JSON. ATOMS BECOME STRINGS.# Creating - atom keys are fine
MyWorker.new(%{user_id: 123})
# Processing - must use string keys (JSON converted atoms to strings)
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
# ...
end# Bad: Swallowing errors
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} ->
Logger.error("Failed: #{reason}")
{:ok, :failed} # Silently marks as complete!
end
end# Good: Let errors propagate
def perform(%Oban.Job{} = job) do
result = do_work!(job.args) # Raises on failure
{:ok, result}
end
# Or return error tuple - Oban treats as failure
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} -> {:error, reason} # Oban will retry
end
enddef perform(%Oban.Job{} = job) do
case external_api_call(job.args) do
{:ok, result} -> {:ok, result}
{:error, :not_found} -> {:cancel, :resource_not_found} # Don't retry
{:error, :rate_limited} -> {:snooze, 60} # Retry in 60 seconds
{:error, _} -> {:error, :will_retry} # Normal retry
end
end{:snooze, seconds}def perform(%Oban.Job{} = job) do
if external_thing_finished?(job.args) do
{:ok, :done}
else
{:snooze, 5} # Check again in 5 seconds
end
enddef perform(%Oban.Job{} = job) do
result = do_work(job.args)
# Enqueue next job on success
NextWorker.new(%{data: result}) |> Oban.insert()
{:ok, result}
enduniqueuse Oban.Worker,
queue: :default,
unique: [period: 60] # Only one job with same args per 60 seconds
# Or scope uniqueness to specific fields
unique: [period: 300, keys: [:user_id]]# Bad: One job per contact (millions of jobs = database strain)
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
# Good: Chunk into batches
contacts
|> Enum.chunk_every(100)
|> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())# Creating - atom keys
Workflow.put_context(%{score_run_id: id})
# Processing - atom keys still work!
def my_cascade(%{score_run_id: id}) do
# ...
end
# Dot notation works too
def later_step(context) do
context.score_run_id
context.previous_result
end| Creating | Processing | |
|---|---|---|
| Regular jobs | atoms ok | strings only |
| Cascade context | atoms ok | atoms ok |
add_graftadd_workflow| Method | Sub-workflow completes before deps run? | Output accessible? |
|---|---|---|
| No - just inserts jobs | No |
| Yes - waits for all jobs | Yes, via recorded values |
# Bad: Notification logic buried in AggregateScores
defmodule AggregateScores do
def workflow(score_run_id) do
Workflow.new()
|> Workflow.add(:aggregate, AggregateJob.new(...))
|> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place!
end
end
# Good: Higher-level workflow composes scoring + notification
defmodule FullRunWithNotifications do
def workflow(site_url, opts) do
notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do
# Sub-workflow doesn't know about notifications
FullRun.workflow(context.site_url, context.opts)
|> Workflow.apply_graft()
|> Oban.insert_all()
end
endrecorded: truedefmodule FinalJob do
use Oban.Pro.Worker, queue: :default, recorded: true
def perform(%Oban.Job{} = job) do
# Return value becomes available in context
{:ok, %{score_run_id: score_run_id, composite_score: score}}
end
endWorkflow.append/2def perform(%Oban.Job{} = job) do
if needs_extra_step?(job.args) do
job
|> Workflow.append()
|> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
|> Oban.insert_all()
end
{:ok, :done}
end# Wrap workflows in a shared batch
batch_id = "import-#{import_id}"
pages
|> Enum.each(fn page ->
PageWorkflow.workflow(page)
|> Batch.from_workflow(batch_id: batch_id)
|> Oban.insert_all()
end)
# Add completion callback
Batch.new(batch_id: batch_id)
|> Batch.add_callback(:completed, CompletionWorker)
|> Oban.insert()# Use run_workflow/1 for integration tests
assert %{completed: 3} =
Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{}))
|> Workflow.add(:b, WorkerB.new(%{}), deps: [:a])
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:b])
|> run_workflow()perform/1{:ok, _}add_workflowrecorded: true