Loading...
Loading...
Concurrency safety patterns for distributed pueue + mise + systemd-run job pipelines. TRIGGERS - queue pueue jobs, deploy to remote host, concurrent job collisions, checkpoint races, resource guards, cgroup memory limits, systemd-run, autoscale, batch processing safety, job parameter isolation.
npx skill4agent add terrylica/cc-skills distributed-job-safetydevops-tools:pueue-job-orchestrationitp:mise-tasksitp:mise-configurationWRONG: {symbol}_{start}_{end}.json # Two thresholds collide
RIGHT: {symbol}_{threshold}_{start}_{end}.json # Each job gets its own file# WRONG: Blind queue
for item in "${ITEMS[@]}"; do
pueue add --group mygroup -- run_job "$item" "$param"
done
# RIGHT: Check first
running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")')
if echo "$running" | grep -q "${item}@${param}"; then
echo "SKIP: ${item}@${param} already running"
continue
fi# WRONG: TOCTOU race
if path.exists():
path.unlink() # Crashes if another job deleted between check and unlink
# RIGHT: Idempotent
path.unlink(missing_ok=True)fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp")
with os.fdopen(fd, "w") as f:
f.write(json.dumps(data))
f.flush()
os.fsync(f.fileno())
os.replace(temp_path, path) # POSIX atomic rename# Atomic multi-line append via flock + temp file
TMPOUT=$(mktemp)
# ... write lines to $TMPOUT ...
flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'"
rm -f "$TMPOUT".mise.toml[env]env# WRONG: Per-job override bypasses mise SSoT
pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py
# RIGHT: Set the correct value in .mise.toml, no per-job override needed
pueue add -- uv run python script.pypueue env set <id> KEY VALUE[env]pueue env setdevops-tools:pueue-job-orchestration# Probe host resources
ssh host 'nproc && free -h && uptime'
# Sizing formula (leave 20% margin for OS + DB + overhead)
# max_jobs = min(
# (available_memory_gb * 0.8) / per_job_memory_gb,
# (total_cores * 0.8) / per_job_cpu_cores
# )concurrent_threads_soft_limitmax_threads--max_threadsuptimevmstat 1 5SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing'devops-tools:pueue-job-orchestrationsystemd-runsystemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0 \
uv run python scripts/process.py --symbol BTCUSDT --threshold 250MemorySwapMax=0# WRONG: Hardcoded job IDs
if pueue status --json | jq -e ".tasks.\"14\"" >/dev/null; then ...
# RIGHT: Query by group/label
pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running")] | length'
# If > 0, decide: wait, kill gracefully, or abortFileNotFoundError{item}_{start}_{end}.json{item}_{config}_{start}_{end}.jsonpueue restartpueue log <id>pueue restart# More reliable than restart
pueue add --group mygroup --label "BTCUSDT@750-retry" -- <same command>uv pip install pkg==X.Y.Z--refreshuv pip install --refresh --index-url https://pypi.org/simple/ mypkg==<version>uv runuv.locksource = { editable = "." }uv rungit pulluv run--after <id># Queue Phase 1, wait for completion, then Phase 2
pueue add --label "phase1" -- run_phase_1
# ... wait and verify ...
pueue add --label "phase2" -- run_phase_2# WRONG
postprocess_all() {
queue_batch_jobs
echo "Run 'pueue wait' then manually run optimize and validate" # NO!
}--after# RIGHT
postprocess_all() {
JOB_IDS=()
for param in 250 500 750 1000; do
job_id=$(pueue add --print-task-id --group mygroup \
--label "ITEM@${param}" -- uv run python process.py --param "$param")
JOB_IDS+=("$job_id")
done
# Chain optimize after ALL batch jobs
optimize_id=$(pueue add --print-task-id --after "${JOB_IDS[@]}" \
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
# Chain validation after optimize
pueue add --after "$optimize_id" -- uv run python scripts/validate.py
}devops-tools:pueue-job-orchestration--aftergrep "^14|"group_all_done()devops-tools:pueue-job-orchestrationgroup_all_done()# WRONG: Single monolithic job, wastes idle cores
pueue add -- process --start 2019-01-01 --end 2026-12-31 # 1,700 hours single-threaded
# RIGHT: Per-year splits, 5x+ speedup on multi-core
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do
pueue add --group item-yearly --label "ITEM@250:${year}" \
-- process --start "${year}-01-01" --end "${year}-12-31"
donedevops-tools:pueue-job-orchestrationstate.jsonpueue addstate.json# Before bulk submission: always clean
pueue clean -g mygroup 2>/dev/null || true
# During long sweeps: clean between batches
# (See pueue-job-orchestration skill for full batch pattern)
# Monitor state size as part of health checks
STATE_FILE="$HOME/.local/share/pueue/state.json"
ls -lh "$STATE_FILE" # Should be <10MB for healthy operationstate.jsonpueue adddevops-tools:pueue-job-orchestrationcan't open file 'scripts/populate.py': [Errno 2] No such file or directoryssh host "pueue add -- uv run python scripts/process.py"$HOME~/scripts/process.py~/project/scripts/process.py-wcd &&# WRONG: pueue inherits SSH cwd ($HOME)
ssh host "pueue add --group mygroup -- uv run python scripts/process.py"
# RIGHT (preferred): -w flag sets working directory explicitly
ssh host "pueue add -w ~/project --group mygroup -- uv run python scripts/process.py"
# RIGHT (alternative): cd first, then pueue add inherits project cwd
ssh host "cd ~/project && pueue add --group mygroup -- uv run python scripts/process.py"-w--working-directorycd &&-w-w /tmp/private/tmppueue status$HOMEpueue addssh host "pueue add ..."xargs -P# Step 1 (local): Generate commands file
bash gen_commands.sh > /tmp/commands.txt
# Step 2 (local): Transfer to remote
rsync /tmp/commands.txt host:/tmp/commands.txt
# Step 3 (remote): Feed via xargs -P (no SSH per-job)
ssh host "xargs -P16 -I{} bash -c '{}' < /tmp/commands.txt"ls *.sql | head -10headlsset -o pipefailhead# WRONG (exit 141)
ls /tmp/sql/*.sql | head -10
# RIGHT (temp file)
ls /tmp/sql/*.sql > /tmp/filelist.txt
head -10 /tmp/filelist.txtwc -lexpected = N_normal * barriers_per_query + N_skipped * 1 + N_error * 1mise (environment + task discovery)
|-- .mise.toml [env] -> SSoT for defaults
|-- .mise/tasks/jobs.toml -> task definitions
| |-- mise run jobs:submit-all
| | |-- submit-all.sh (orchestrator)
| | |-- pueue add (per-unit, NOT per-query)
| | |-- submit_unit.sh (per unit)
| | |-- xargs -P16 (parallel queries)
| | |-- wrapper.sh (per query)
| | |-- clickhouse-client < sql_file
| | |-- flock + append NDJSON
| |
| |-- mise run jobs:process-all (Python pipeline variant)
| | |-- job-runner.sh (orchestrator)
| | |-- pueue add (per-job)
| | |-- systemd-run --scope -p MemoryMax=XG -p MemorySwapMax=0
| | |-- uv run python scripts/process.py
| | |-- run_resumable_job()
| | |-- get_checkpoint_path() -> param-aware
| | |-- checkpoint.save() -> atomic write
| | |-- checkpoint.unlink() -> missing_ok=True
| |
| |-- mise run jobs:autoscale-loop
| |-- autoscaler.sh --loop (60s interval)
| |-- reads: free -m, uptime, pueue status --json
| |-- adjusts: pueue parallel N --group <group>| Layer | Responsibility |
|---|---|
| mise | Environment variables, tool versions, task discovery |
| pueue | Daemon persistence, parallelism limits, restart, |
| systemd-run | Per-job cgroup memory caps (Linux only, no-op on macOS) |
| autoscaler | Dynamic parallelism tuning based on host resources |
| Python/app | Domain logic, checkpoint management, data integrity |
1. AUDIT: ssh host 'pueue status --json' -> count running/queued/failed
2. DECIDE: Wait for running jobs? Kill? Let them finish with old code?
3. PULL: ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
4. VERIFY: ssh host 'cd ~/project && python -c "import pkg; print(pkg.__version__)"'
5. UPGRADE: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
6. RESTART: ssh host 'pueue restart <failed_id>' OR add fresh jobs
7. MONITOR: ssh host 'pueue status --group mygroup'Adding a new parameter to a resumable job function?
|-- Is it job-differentiating (two jobs can have different values)?
| |-- YES -> Add to checkpoint filename
| | Add to pueue job label
| | Add to remote checkpoint key
| |-- NO -> Skip (e.g., verbose, notify are per-run, not per-job)
|
|-- Does the function delete files?
| |-- YES -> Use missing_ok=True
| | Use atomic write for creates
| |-- NO -> Standard operation
|
|-- Does the function write to shared storage?
|-- YES -> Force deduplication after write
| Use UPSERT semantics where possible
|-- NO -> Standard operationpueue parallel NCPU < 40% AND MEM < 60% -> SCALE UP (+1 per group)
CPU > 80% OR MEM > 80% -> SCALE DOWN (-1 per group)
Otherwise -> HOLDStep 1: Start with conservative defaults (e.g., group1=2, group2=3)
Step 2: After jobs stabilize (~5 min), probe: uptime + free -h + ps aux
Step 3: If load < 40% cores AND memory < 60% available:
Bump by +1-2 jobs per group
Step 4: Wait ~5 min for new jobs to reach peak memory
Step 5: Probe again. If still within 80% margin, bump again
Step 6: Repeat until load ~50% cores OR memory ~70% available# Scale up when resources are available
pueue parallel 4 --group group1
pueue parallel 5 --group group2
# Scale down if memory pressure detected
pueue parallel 2 --group group1# Example: high-volume symbols need fewer concurrent jobs (5 GB each)
pueue group add highvol-yearly --parallel 2
# Low-volume symbols can run more concurrently (1 GB each)
pueue group add lowvol-yearly --parallel 6myproject-job-safety.claude/skills/| Local Extension Provides | Example |
|---|---|
| Concrete function names | |
| Application-specific env vars | |
| Memory profiles per job type | "250 dbps peaks at 5 GB, use MemoryMax=8G" |
| Database-specific audit queries | |
| Issue provenance tracking | "Checkpoint race: GH-84" |
| Host-specific configuration | "bigblack: 32 cores, 61 GB, groups p1/p2/p3/p4" |
*-job-safety.claude/skills/devops-tools:distributed-job-safety (universal patterns - this skill)
+ .claude/skills/myproject-job-safety (project-specific config)
= Complete operational knowledgedevops-tools:pueue-job-orchestration