pueue-job-orchestration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Pueue Job Orchestration

Pueue任务编排

Manage long-running tasks on BigBlack/LittleBlack GPU workstations using Pueue job queue.
使用Pueue任务队列管理BigBlack/LittleBlack GPU工作站上的长时间运行任务。

Overview

概述

Pueue is a Rust CLI tool for managing shell command queues. It provides:
  • Daemon persistence - Survives SSH disconnects, crashes, reboots
  • Disk-backed queue - Auto-resumes after any failure
  • Group-based parallelism - Control concurrent jobs per group
  • Easy failure recovery - Restart failed jobs with one command
Pueue 是一款基于Rust的CLI工具,用于管理Shell命令队列。它提供以下功能:
  • 守护进程持久化 - 即使SSH断开连接、程序崩溃或系统重启,任务仍能继续运行
  • 磁盘存储队列 - 发生任何故障后可自动恢复任务
  • 基于分组的并行执行 - 控制每个分组的并发任务数
  • 便捷的故障恢复 - 一条命令即可重启失败的任务

When to Use This Skill

何时使用该工具

Use this skill when the user mentions:
TriggerExample
Running tasks on BigBlack/LittleBlack"Run this on bigblack"
Long-running data processing"Populate the cache for all symbols"
Batch/parallel operations"Process these 70 jobs"
SSH remote execution"Execute this overnight on the GPU server"
Cache population"Fill the ClickHouse cache"
当用户提及以下场景时,可使用该工具:
触发场景示例
在BigBlack/LittleBlack上运行任务"在bigblack上运行这个任务"
长时间数据处理"为所有交易对填充缓存"
批处理/并行操作"处理这70个任务"
SSH远程执行"在GPU服务器上通宵执行这个任务"
缓存填充"填充ClickHouse缓存"

Quick Reference

快速参考

Check Status

查看状态

bash
undefined
bash
undefined

Local

本地环境

pueue status
pueue status

Remote (BigBlack)

远程(BigBlack)

ssh bigblack "~/.local/bin/pueue status"
undefined
ssh bigblack "~/.local/bin/pueue status"
undefined

Queue a Job

任务入队

bash
undefined
bash
undefined

Local

本地环境

pueue add -- python long_running_script.py
pueue add -- python long_running_script.py

Remote (BigBlack)

远程(BigBlack)

ssh bigblack "~/.local/bin/pueue add -- cd ~/project && uv run python script.py"
ssh bigblack "~/.local/bin/pueue add -- cd ~/project && uv run python script.py"

With group (for parallelism control)

指定分组(用于控制并行数)

pueue add --group p1 --label "BTCUSDT@1000" -- python populate.py --symbol BTCUSDT
undefined
pueue add --group p1 --label "BTCUSDT@1000" -- python populate.py --symbol BTCUSDT
undefined

Monitor Jobs

监控任务

bash
pueue follow <id>         # Watch job output in real-time
pueue log <id>            # View completed job output
pueue log <id> --full     # Full output (not truncated)
bash
pueue follow <id>         # 实时查看任务输出
pueue log <id>            # 查看已完成任务的输出
pueue log <id> --full     # 查看完整输出(不截断)

Manage Jobs

管理任务

bash
pueue restart <id>        # Restart failed job
pueue restart --all-failed # Restart ALL failed jobs
pueue kill <id>           # Kill running job
pueue clean               # Remove completed jobs from list
pueue reset               # Clear all jobs (use with caution)
bash
pueue restart <id>        # 重启指定失败任务
pueue restart --all-failed # 重启所有失败任务
pueue kill <id>           # 终止运行中的任务
pueue clean               # 从列表中移除已完成的任务
pueue reset               # 清空所有任务(谨慎使用)

Host Configuration

主机配置

HostLocationParallelism Groups
BigBlack
~/.local/bin/pueue
p1 (4), p2 (2), p3 (3), p4 (1)
LittleBlack
~/.local/bin/pueue
default (2)
Local (macOS)
/opt/homebrew/bin/pueue
default
主机安装路径并行执行分组
BigBlack
~/.local/bin/pueue
p1 (4), p2 (2), p3 (3), p4 (1)
LittleBlack
~/.local/bin/pueue
default (2)
本地(macOS)
/opt/homebrew/bin/pueue
default

Workflows

工作流程

1. Queue Single Remote Job

1. 单个远程任务入队

bash
undefined
bash
undefined

Step 1: Verify daemon is running

步骤1:验证守护进程是否运行

ssh bigblack "~/.local/bin/pueue status"
ssh bigblack "~/.local/bin/pueue status"

Step 2: Queue the job

步骤2:将任务加入队列

ssh bigblack "~/.local/bin/pueue add --label 'my-job' -- cd ~/project && uv run python script.py"
ssh bigblack "~/.local/bin/pueue add --label 'my-job' -- cd ~/project && uv run python script.py"

Step 3: Monitor progress

步骤3:监控进度

ssh bigblack "~/.local/bin/pueue follow <id>"
undefined
ssh bigblack "~/.local/bin/pueue follow <id>"
undefined

2. Batch Job Submission (Multiple Symbols)

2. 批量任务提交(多交易对)

For rangebar cache population or similar batch operations:
bash
undefined
适用于rangebar缓存填充或类似的批处理操作:
bash
undefined

Use the pueue-populate.sh script

使用pueue-populate.sh脚本

ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh setup" # One-time ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh phase1" # Queue Phase 1 ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh status" # Check progress
undefined
ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh setup" # 一次性初始化 ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh phase1" # 入队第一阶段任务 ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh status" # 查看进度
undefined

3. Configure Parallelism Groups

3. 配置并行执行分组

bash
undefined
bash
undefined

Create groups with different parallelism limits

创建不同并行限制的分组

pueue group add fast # Create 'fast' group pueue parallel 4 --group fast # Allow 4 parallel jobs
pueue group add slow pueue parallel 1 --group slow # Sequential execution
pueue group add fast # 创建'fast'分组 pueue parallel 4 --group fast # 允许4个并发任务
pueue group add slow pueue parallel 1 --group slow # 串行执行

Queue jobs to specific groups

将任务加入指定分组

pueue add --group fast -- echo "fast job" pueue add --group slow -- echo "slow job"
undefined
pueue add --group fast -- echo "fast job" pueue add --group slow -- echo "slow job"
undefined

4. Handle Failed Jobs

4. 处理失败任务

bash
undefined
bash
undefined

Check what failed

查看失败任务

pueue status | grep Failed
pueue status | grep Failed

View error output

查看错误输出

pueue log <id>
pueue log <id>

Restart specific job

重启指定任务

pueue restart <id>
pueue restart <id>

Restart all failed jobs

重启所有失败任务

pueue restart --all-failed
undefined
pueue restart --all-failed
undefined

Installation

安装

macOS (Local)

macOS(本地)

bash
brew install pueue
pueued -d  # Start daemon
bash
brew install pueue
pueued -d  # 启动守护进程

Linux (BigBlack/LittleBlack)

Linux(BigBlack/LittleBlack)

bash
undefined
bash
undefined

Download from GitHub releases (see https://github.com/Nukesor/pueue/releases for latest)

Or manually:

或手动安装:

SSoT-OK: Version from GitHub releases page

注意:版本号请以GitHub Releases页面为准

PUEUE_VERSION="v4.0.2" curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueue-x86_64-unknown-linux-musl" -o ~/.local/bin/pueue curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueued-x86_64-unknown-linux-musl" -o ~/.local/bin/pueued chmod +x ~/.local/bin/pueue ~/.local/bin/pueued
PUEUE_VERSION="v4.0.2" curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueue-x86_64-unknown-linux-musl" -o ~/.local/bin/pueue curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueued-x86_64-unknown-linux-musl" -o ~/.local/bin/pueued chmod +x ~/.local/bin/pueue ~/.local/bin/pueued

Start daemon

启动守护进程

~/.local/bin/pueued -d
undefined
~/.local/bin/pueued -d
undefined

Systemd Auto-Start (Linux)

Systemd自动启动(Linux)

bash
mkdir -p ~/.config/systemd/user
cat > ~/.config/systemd/user/pueued.service << 'EOF'
[Unit]
Description=Pueue Daemon
After=network.target

[Service]
ExecStart=%h/.local/bin/pueued -v
Restart=on-failure

[Install]
WantedBy=default.target
EOF

systemctl --user daemon-reload
systemctl --user enable --now pueued
bash
mkdir -p ~/.config/systemd/user
cat > ~/.config/systemd/user/pueued.service << 'EOF'
[Unit]
Description=Pueue Daemon
After=network.target

[Service]
ExecStart=%h/.local/bin/pueued -v
Restart=on-failure

[Install]
WantedBy=default.target
EOF

systemctl --user daemon-reload
systemctl --user enable --now pueued

Integration with rangebar-py

与rangebar-py的集成

The rangebar-py project has Pueue integration scripts:
ScriptPurpose
scripts/pueue-populate.sh
Queue cache population jobs with group-based parallelism
scripts/setup-pueue-linux.sh
Install Pueue on Linux servers
scripts/populate_full_cache.py
Python script for individual symbol/threshold jobs
rangebar-py项目提供了Pueue集成脚本:
脚本用途
scripts/pueue-populate.sh
将缓存填充任务加入队列,并支持基于分组的并行执行
scripts/setup-pueue-linux.sh
在Linux服务器上安装Pueue
scripts/populate_full_cache.py
用于单个交易对/阈值任务的Python脚本

Phase-Based Execution

分阶段执行

bash
undefined
bash
undefined

Phase 1: 1000 dbps (fast, 4 parallel)

第一阶段:1000 dbps(快速,4个并行任务)

./scripts/pueue-populate.sh phase1
./scripts/pueue-populate.sh phase1

Phase 2: 250 dbps (moderate, 2 parallel)

第二阶段:250 dbps(中等速度,2个并行任务)

./scripts/pueue-populate.sh phase2
./scripts/pueue-populate.sh phase2

Phase 3: 500, 750 dbps (3 parallel)

第三阶段:500、750 dbps(3个并行任务)

./scripts/pueue-populate.sh phase3
./scripts/pueue-populate.sh phase3

Phase 4: 100 dbps (resource intensive, 1 at a time)

第四阶段:100 dbps(资源密集型,串行执行)

./scripts/pueue-populate.sh phase4
undefined
./scripts/pueue-populate.sh phase4
undefined

Troubleshooting

故障排查

IssueCauseSolution
pueue: command not found
Not in PATHUse full path:
~/.local/bin/pueue
Connection refused
Daemon not runningStart with
pueued -d
Jobs stuck in QueuedGroup paused or at limitCheck
pueue status
,
pueue start
SSH disconnect kills jobsNot using PueueQueue via Pueue instead of direct SSH
Job fails immediatelyWrong working directoryUse
cd /path && command
pattern
问题原因解决方案
pueue: command not found
未加入PATH环境变量使用完整路径:
~/.local/bin/pueue
Connection refused
守护进程未运行使用
pueued -d
启动
任务卡在Queued状态分组已暂停或达到并行数上限查看
pueue status
,使用
pueue start
恢复
SSH断开导致任务终止未使用Pueue执行通过Pueue将任务入队,而非直接使用SSH执行
任务立即失败工作目录错误使用
cd /path && command
的格式

Production Lessons (Issue #88)

生产环境经验总结(Issue #88)

Battle-tested patterns from real production deployments.
来自实际生产环境部署的经过验证的实践方案。

Dependency Chaining with
--after

使用
--after
实现依赖链式执行

Pueue supports automatic job dependency resolution via
--after
. This is critical for post-processing pipelines where steps must run sequentially after batch jobs complete.
Key flags:
  • --after <id>...
    -- Start job only after ALL specified jobs succeed. If any dependency fails, this job fails too.
  • --print-task-id
    (or
    -p
    ) -- Return only the numeric job ID (for scripting).
Pattern: Capturing job IDs for dependency wiring
bash
undefined
Pueue支持通过
--after
参数自动解析任务依赖关系。这对于后处理流水线至关重要,因为这类流水线中的步骤必须在批处理任务全部完成后才能按顺序执行。
关键参数:
  • --after <id>...
    -- 仅在所有指定任务成功完成后才启动当前任务。如果任何依赖任务失败,当前任务也会失败。
  • --print-task-id
    (或
    -p
    ) -- 仅返回数字任务ID(用于脚本编写)。
实践方案:捕获任务ID以配置依赖关系
bash
undefined

Capture job IDs during batch submission

批量提交时捕获任务ID

JOB_IDS=() for symbol in BTCUSDT ETHUSDT; do job_id=$(pueue add --print-task-id --group mygroup
--label "${symbol}@250"
--working-directory /path/to/project
-- uv run python scripts/process.py --symbol "$symbol") JOB_IDS+=("$job_id") done
JOB_IDS=() for symbol in BTCUSDT ETHUSDT; do job_id=$(pueue add --print-task-id --group mygroup
--label "${symbol}@250"
--working-directory /path/to/project
-- uv run python scripts/process.py --symbol "$symbol") JOB_IDS+=("$job_id") done

Chain post-processing after ALL batch jobs

在所有批处理任务完成后执行后处理

optimize_id=$(pueue add --print-task-id --group mygroup
--label "optimize-table"
--after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
optimize_id=$(pueue add --print-task-id --group mygroup
--label "optimize-table"
--after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")

Chain validation after optimize

在优化完成后执行验证

pueue add --group mygroup
--label "validate"
--after "$optimize_id"
-- uv run python scripts/validate.py

**Result in pueue status:**
Job 0 BTCUSDT@250 Running Job 1 ETHUSDT@250 Running Job 2 optimize-table Queued Deps: 0, 1 Job 3 validate Queued Deps: 2

**When to use `--after`:**

- Post-processing steps (OPTIMIZE TABLE, validation scripts, cleanup)
- Multi-stage pipelines where Stage N depends on Stage N-1
- Verification jobs that should only run after data is fully written

**Anti-pattern: Manual waiting**

```bash
pueue add --group mygroup
--label "validate"
--after "$optimize_id"
-- uv run python scripts/validate.py

**pueue status中的结果:**
Job 0 BTCUSDT@250 Running Job 1 ETHUSDT@250 Running Job 2 optimize-table Queued Deps: 0, 1 Job 3 validate Queued Deps: 2

**何时使用`--after`:**

- 后处理步骤(OPTIMIZE TABLE、验证脚本、清理操作)
- 多阶段流水线,其中第N阶段依赖第N-1阶段的完成
- 仅在数据完全写入后才运行的验证任务

**反模式:手动等待**

```bash

BAD: Manual polling or instructions to "run this after that finishes"

错误做法:手动轮询或提示“等那个完成后再运行这个”

postprocess_all() { queue_repopulation_jobs echo "Run 'pueue wait --group postfix' then run optimize manually" # NO! }
postprocess_all() { queue_repopulation_jobs echo "运行'pueue wait --group postfix'后手动执行优化操作" # 不推荐! }

GOOD: Automatic dependency chain

正确做法:自动依赖链式执行

postprocess_all() { queue_repopulation_jobs # captures JOB_IDS pueue add --after "${JOB_IDS[@]}" -- optimize_command pueue add --after "$optimize_id" -- validate_command }
undefined
postprocess_all() { queue_repopulation_jobs # 捕获JOB_IDS pueue add --after "${JOB_IDS[@]}" -- optimize_command pueue add --after "$optimize_id" -- validate_command }
undefined

Mise Task to Pueue Pipeline Integration

Mise任务与Pueue流水线集成

Pattern for
mise run
commands that build pueue DAGs:
toml
undefined
mise run
命令与Pueue DAG结合的实践方案:
toml
undefined

.mise/tasks/cache.toml

.mise/tasks/cache.toml

["cache:postprocess-all"] description = "Full post-fix pipeline via pueue: repopulate -> optimize -> detect (auto-chained)" run = "./scripts/pueue-populate.sh postprocess-all"

The shell script captures pueue job IDs and chains them with `--after`. Mise provides the entry point; pueue provides the execution engine with dependency resolution.
["cache:postprocess-all"] description = "通过Pueue实现完整的后处理流水线:重新填充缓存 -> 优化 -> 检测(自动链式执行)" run = "./scripts/pueue-populate.sh postprocess-all"

Shell脚本负责捕获Pueue任务ID并通过`--after`建立依赖关系。Mise提供入口点,Pueue提供具备依赖解析能力的执行引擎。

Forensic Audit Before Deployment

部署前的 forensic 审计

ALWAYS audit the remote host before mutating anything:
bash
undefined
在对远程主机进行任何变更前,务必执行审计:
bash
undefined

1. Pueue job state

1. Pueue任务状态

ssh host 'pueue status' ssh host 'pueue status --json | python3 -c "import json,sys; d=json.load(sys.stdin); print(sum(1 for t in d["tasks"].values() if "Running" in str(t["status"])))"'
ssh host 'pueue status' ssh host 'pueue status --json | python3 -c "import json,sys; d=json.load(sys.stdin); print(sum(1 for t in d["tasks"].values() if "Running" in str(t["status"])))"'

2. Database state (ClickHouse example)

2. 数据库状态(ClickHouse示例)

ssh host 'clickhouse-client --query "SELECT symbol, threshold, count(), countIf(volume < 0) FROM mytable GROUP BY ALL"'
ssh host 'clickhouse-client --query "SELECT symbol, threshold, count(), countIf(volume < 0) FROM mytable GROUP BY ALL"'

3. Checkpoint state

3. 检查点状态

ssh host 'ls -la ~/.cache/myapp/checkpoints/' ssh host 'cat ~/.cache/myapp/checkpoints/latest.json'
ssh host 'ls -la ~/.cache/myapp/checkpoints/' ssh host 'cat ~/.cache/myapp/checkpoints/latest.json'

4. System resources

4. 系统资源

ssh host 'uptime && free -h && df -h /home'
ssh host 'uptime && free -h && df -h /home'

5. Installed version

5. 已安装版本

ssh host 'cd ~/project && git log --oneline -1'
undefined
ssh host 'cd ~/project && git log --oneline -1'
undefined

Force-Refresh vs Checkpoint Resume

强制刷新 vs 从检查点恢复

Decision matrix for restarting killed/failed jobs:
ScenarioActionFlag
Job killed mid-run, data is cleanResume from checkpoint(no --force-refresh)
Data is corrupt (overflow, schema bug)Wipe and restart--force-refresh
Code fix changes output formatWipe and restart--force-refresh
Code fix is internal-only (no output change)Resume from checkpoint(no --force-refresh)
重启被终止/失败任务的决策矩阵:
场景操作参数
任务中途被终止,数据未损坏从检查点恢复(无需--force-refresh)
数据损坏(溢出、Schema错误)清理后重启--force-refresh
代码修复改变了输出格式清理后重启--force-refresh
代码修复仅涉及内部逻辑(不改变输出)从检查点恢复(无需--force-refresh)

PATH Gotcha: Rust Not in PATH via
uv run

PATH陷阱:
uv run
无法找到Rust

On remote hosts,
uv run maturin develop
may fail because
~/.cargo/bin
is not in
uv run
's PATH:
bash
undefined
在远程主机上,
uv run maturin develop
可能会失败,因为
~/.cargo/bin
不在
uv run
的PATH环境变量中:
bash
undefined

FAILS: rustc not found

失败:找不到rustc

ssh host 'cd ~/project && uv run maturin develop --uv'
ssh host 'cd ~/project && uv run maturin develop --uv'

WORKS: Prepend cargo bin to PATH

成功:将cargo bin目录加入PATH开头

ssh host 'cd ~/project && PATH="$HOME/.cargo/bin:$PATH" uv run maturin develop --uv'

For pueue jobs that need Rust compilation:

```bash
pueue add -- env PATH="/home/user/.cargo/bin:$PATH" uv run maturin develop
ssh host 'cd ~/project && PATH="$HOME/.cargo/bin:$PATH" uv run maturin develop --uv'

对于需要Rust编译的Pueue任务:

```bash
pueue add -- env PATH="/home/user/.cargo/bin:$PATH" uv run maturin develop

Per-Year (Epoch) Parallelization

按年份(纪元)并行执行

When a processing pipeline has natural reset boundaries (yearly, monthly, etc.) where processor state resets, each epoch becomes an independent processing unit. This enables massive speedup by splitting a multi-year sequential job into concurrent per-year pueue jobs.
Why it's safe (three isolation layers):
LayerWhy No Conflicts
Checkpoint filesFilename includes
{start}_{end}
— each year gets unique file
Database writesINSERT is append-only;
OPTIMIZE TABLE FINAL
deduplicates after
Source dataRead-only files (Parquet, CSV, etc.) — no write contention
Pattern: Per-symbol pueue groups
Give each symbol (or job family) its own pueue group for independent parallelism control:
bash
undefined
当处理流水线存在天然的重置边界(每年、每月等),且处理器状态会在边界处重置时,每个纪元可作为独立的处理单元。通过将多年的串行任务拆分为按年份的并行Pueue任务,可大幅提升处理速度。
安全性保障(三层隔离):
隔离层无冲突原因
检查点文件文件名包含
{start}_{end}
— 每个年份对应唯一的文件
数据库写入INSERT为追加模式;
OPTIMIZE TABLE FINAL
会在后续去重
源数据只读文件(Parquet、CSV等) — 无写入冲突
实践方案:按交易对划分Pueue分组
为每个交易对(或任务类型)创建独立的Pueue分组,以便独立控制并行数:
bash
undefined

Create per-symbol groups

创建按交易对划分的分组

pueue group add btc-yearly --parallel 4 pueue group add eth-yearly --parallel 4 pueue group add shib-yearly --parallel 4
pueue group add btc-yearly --parallel 4 pueue group add eth-yearly --parallel 4 pueue group add shib-yearly --parallel 4

Queue per-year jobs

按年份入队任务

for year in 2019 2020 2021 2022 2023 2024 2025 2026; do pueue add --group btc-yearly
--label "BTC@250:${year}"
-- uv run python scripts/process.py
--symbol BTCUSDT --threshold 250
--start-date "${year}-01-01" --end-date "${year}-12-31" done
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do pueue add --group btc-yearly
--label "BTC@250:${year}"
-- uv run python scripts/process.py
--symbol BTCUSDT --threshold 250
--start-date "${year}-01-01" --end-date "${year}-12-31" done

Chain post-processing after ALL groups complete

在所有年份任务完成后执行链式后处理

ALL_JOB_IDS=($(pueue status --json | jq -r
'.tasks | to_entries[] | select(.value.group | test("-yearly$")) | .value.id')) pueue add --after "${ALL_JOB_IDS[@]}"
--label "optimize-table:final"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL"

**When to use per-year vs sequential:**

| Scenario                                | Approach                 |
| --------------------------------------- | ------------------------ |
| High-volume symbol (many output items)  | Per-year (5+ cores idle) |
| Low-volume symbol (fast enough already) | Sequential (simpler)     |
| Single parameter, long backfill         | Per-year                 |
| Multiple parameters, same symbol        | Sequential per parameter |

**Critical rules:**

1. First year uses domain-specific effective start date, not `01-01`
2. Last year uses actual latest available date as end
3. Chain `OPTIMIZE TABLE FINAL` after ALL year-jobs via `--after`
4. Memory budget: each job peaks independently — with 61 GB total, 4-5 concurrent jobs at 5 GB each are safe
ALL_JOB_IDS=($(pueue status --json | jq -r
'.tasks | to_entries[] | select(.value.group | test("-yearly$")) | .value.id')) pueue add --after "${ALL_JOB_IDS[@]}"
--label "optimize-table:final"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL"

**何时选择按年份并行 vs 串行执行:**

| 场景                                | 处理方式                 |
| --------------------------------------- | ------------------------ |
| 高交易量交易对(输出项多)  | 按年份并行(5+核心空闲时) |
| 低交易量交易对(速度已足够) | 串行执行(更简单)     |
| 单一参数、长期回填任务         | 按年份并行                 |
| 多参数、同一交易对        | 按参数串行执行 |

**关键规则:**

1. 第一年使用特定领域的有效起始日期,而非`01-01`
2. 最后一年使用实际可用的最新日期作为结束日期
3. 通过`--after`在所有年份任务完成后执行`OPTIMIZE TABLE FINAL`
4. 内存预算:每个任务独立达到峰值 — 总内存61 GB时,同时运行4-5个占用5 GB内存的任务是安全的

Pipeline Monitoring (Group-Based Phase Detection)

流水线监控(基于分组的阶段检测)

For multi-group pipelines, monitor job phases by group completion, not hardcoded job IDs. Job IDs change when jobs are removed, re-queued, or split into per-year jobs.
Anti-pattern: Hardcoded job IDs in monitors
bash
undefined
对于多分组流水线,应通过分组完成状态监控任务阶段,而非硬编码任务ID。当任务被移除、重新入队或拆分为按年份的任务时,任务ID会发生变化。
反模式:监控中硬编码任务ID
bash
undefined

WRONG: Breaks when jobs are removed/re-queued

错误做法:当任务被移除/重新入队时会失效

job14=$(echo "$JOBS" | grep "^14|") if [ "$(echo "$job14" | cut -d'|' -f2)" = "Done" ]; then echo "Phase 1 complete" fi

**Correct pattern: Dynamic group detection**

```bash
get_job_status() {
    ssh host "pueue status --json 2>/dev/null" | jq -r \
        '.tasks | to_entries[] |
         "\(.value.id)|\(.value.status | if type == "object" then keys[0] else . end)|\(.value.label // "-")|\(.value.group)"'
}

group_all_done() {
    local group="$1"
    local group_jobs
    group_jobs=$(echo "$JOBS" | grep "|${group}$" || true)
    [ -z "$group_jobs" ] && return 1
    echo "$group_jobs" | grep -qE "\|(Running|Queued)\|" && return 1
    return 0
}
job14=$(echo "$JOBS" | grep "^14|") if [ "$(echo "$job14" | cut -d'|' -f2)" = "Done" ]; then echo "第一阶段完成" fi

**正确实践:动态分组检测**

```bash
get_job_status() {
    ssh host "pueue status --json 2>/dev/null" | jq -r \
        '.tasks | to_entries[] |
         "\(.value.id)|\(.value.status | if type == "object" then keys[0] else . end)|\(.value.label // "-")|\(.value.group)"'
}

group_all_done() {
    local group="$1"
    local group_jobs
    group_jobs=$(echo "$JOBS" | grep "|${group}$" || true)
    [ -z "$group_jobs" ] && return 1
    echo "$group_jobs" | grep -qE "\|(Running|Queued)\|" && return 1
    return 0
}

Detect phase transitions by group name

通过分组名称检测阶段转换

SEEN_GROUPS="" for group in $(echo "$JOBS" | cut -d'|' -f4 | sort -u); do if group_all_done "$group" && [[ "$SEEN_GROUPS" != "|${group}|" ]]; then echo "GROUP COMPLETE: $group" run_integrity_checks "$group" SEEN_GROUPS="${SEEN_GROUPS}|${group}|" fi done

**Integrity checks at phase boundaries:**

Run automated validation when a group finishes, before starting the next phase:

```bash
run_integrity_checks() {
    local phase="$1"
    # Check 1: Data corruption (negative values, out-of-bounds)
    ssh host 'clickhouse-client --query "SELECT ... countIf(value < 0) ... HAVING count > 0"'
    # Check 2: Duplicate rows
    ssh host 'clickhouse-client --query "SELECT ... count(*) - uniqExact(key) as dupes HAVING dupes > 0"'
    # Check 3: Coverage gaps (NULL required fields)
    ssh host 'clickhouse-client --query "SELECT ... countIf(field IS NULL) ... HAVING missing > 0"'
    # Check 4: System resources (load, memory)
    ssh host 'uptime && free -h'
}
Monitoring as a background loop:
bash
POLL_INTERVAL=300  # 5 minutes
while true; do
    JOBS=$(get_job_status)
    # Count statuses, detect failures, detect group completions
    # Run integrity checks at phase boundaries
    # Exit when all jobs complete
    sleep "$POLL_INTERVAL"
done
SEEN_GROUPS="" for group in $(echo "$JOBS" | cut -d'|' -f4 | sort -u); do if group_all_done "$group" && [[ "$SEEN_GROUPS" != "|${group}|" ]]; then echo "分组完成: $group" run_integrity_checks "$group" SEEN_GROUPS="${SEEN_GROUPS}|${group}|" fi done

**阶段边界处的完整性检查:**

当一个分组完成时,在启动下一阶段前执行自动验证:

```bash
run_integrity_checks() {
    local phase="$1"
    # 检查1:数据损坏(负值、超出范围)
    ssh host 'clickhouse-client --query "SELECT ... countIf(value < 0) ... HAVING count > 0"'
    # 检查2:重复行
    ssh host 'clickhouse-client --query "SELECT ... count(*) - uniqExact(key) as dupes HAVING dupes > 0"'
    # 检查3:覆盖缺口(必填字段为NULL)
    ssh host 'clickhouse-client --query "SELECT ... countIf(field IS NULL) ... HAVING missing > 0"'
    # 检查4:系统资源(负载、内存)
    ssh host 'uptime && free -h'
}
后台循环监控:
bash
POLL_INTERVAL=300  # 5分钟
while true; do
    JOBS=$(get_job_status)
    # 统计状态、检测失败任务、检测分组完成情况
    # 在阶段边界执行完整性检查
    # 所有任务完成后退出
    sleep "$POLL_INTERVAL"
done

Related

相关资源

  • Hook:
    itp-hooks/posttooluse-reminder.ts
    - Reminds to use Pueue for detected long-running commands
  • Reference: Pueue GitHub
  • Issue: rangebar-py#77 - Original implementation
  • Issue: rangebar-py#88 - Production deployment lessons
  • Hook:
    itp-hooks/posttooluse-reminder.ts
    - 当检测到长时间运行的命令时,提醒用户使用Pueue
  • 参考文档: Pueue GitHub
  • Issue: rangebar-py#77 - 最初实现方案
  • Issue: rangebar-py#88 - 生产环境部署经验总结