pueue-job-orchestration
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePueue Job Orchestration
Pueue任务编排
Manage long-running tasks on BigBlack/LittleBlack GPU workstations using Pueue job queue.
使用Pueue任务队列管理BigBlack/LittleBlack GPU工作站上的长时间运行任务。
Overview
概述
Pueue is a Rust CLI tool for managing shell command queues. It provides:
- Daemon persistence - Survives SSH disconnects, crashes, reboots
- Disk-backed queue - Auto-resumes after any failure
- Group-based parallelism - Control concurrent jobs per group
- Easy failure recovery - Restart failed jobs with one command
Pueue 是一款基于Rust的CLI工具,用于管理Shell命令队列。它提供以下功能:
- 守护进程持久化 - 即使SSH断开连接、程序崩溃或系统重启,任务仍能继续运行
- 磁盘存储队列 - 发生任何故障后可自动恢复任务
- 基于分组的并行执行 - 控制每个分组的并发任务数
- 便捷的故障恢复 - 一条命令即可重启失败的任务
When to Use This Skill
何时使用该工具
Use this skill when the user mentions:
| Trigger | Example |
|---|---|
| Running tasks on BigBlack/LittleBlack | "Run this on bigblack" |
| Long-running data processing | "Populate the cache for all symbols" |
| Batch/parallel operations | "Process these 70 jobs" |
| SSH remote execution | "Execute this overnight on the GPU server" |
| Cache population | "Fill the ClickHouse cache" |
当用户提及以下场景时,可使用该工具:
| 触发场景 | 示例 |
|---|---|
| 在BigBlack/LittleBlack上运行任务 | "在bigblack上运行这个任务" |
| 长时间数据处理 | "为所有交易对填充缓存" |
| 批处理/并行操作 | "处理这70个任务" |
| SSH远程执行 | "在GPU服务器上通宵执行这个任务" |
| 缓存填充 | "填充ClickHouse缓存" |
Quick Reference
快速参考
Check Status
查看状态
bash
undefinedbash
undefinedLocal
本地环境
pueue status
pueue status
Remote (BigBlack)
远程(BigBlack)
ssh bigblack "~/.local/bin/pueue status"
undefinedssh bigblack "~/.local/bin/pueue status"
undefinedQueue a Job
任务入队
bash
undefinedbash
undefinedLocal
本地环境
pueue add -- python long_running_script.py
pueue add -- python long_running_script.py
Remote (BigBlack)
远程(BigBlack)
ssh bigblack "~/.local/bin/pueue add -- cd ~/project && uv run python script.py"
ssh bigblack "~/.local/bin/pueue add -- cd ~/project && uv run python script.py"
With group (for parallelism control)
指定分组(用于控制并行数)
pueue add --group p1 --label "BTCUSDT@1000" -- python populate.py --symbol BTCUSDT
undefinedpueue add --group p1 --label "BTCUSDT@1000" -- python populate.py --symbol BTCUSDT
undefinedMonitor Jobs
监控任务
bash
pueue follow <id> # Watch job output in real-time
pueue log <id> # View completed job output
pueue log <id> --full # Full output (not truncated)bash
pueue follow <id> # 实时查看任务输出
pueue log <id> # 查看已完成任务的输出
pueue log <id> --full # 查看完整输出(不截断)Manage Jobs
管理任务
bash
pueue restart <id> # Restart failed job
pueue restart --all-failed # Restart ALL failed jobs
pueue kill <id> # Kill running job
pueue clean # Remove completed jobs from list
pueue reset # Clear all jobs (use with caution)bash
pueue restart <id> # 重启指定失败任务
pueue restart --all-failed # 重启所有失败任务
pueue kill <id> # 终止运行中的任务
pueue clean # 从列表中移除已完成的任务
pueue reset # 清空所有任务(谨慎使用)Host Configuration
主机配置
| Host | Location | Parallelism Groups |
|---|---|---|
| BigBlack | | p1 (4), p2 (2), p3 (3), p4 (1) |
| LittleBlack | | default (2) |
| Local (macOS) | | default |
| 主机 | 安装路径 | 并行执行分组 |
|---|---|---|
| BigBlack | | p1 (4), p2 (2), p3 (3), p4 (1) |
| LittleBlack | | default (2) |
| 本地(macOS) | | default |
Workflows
工作流程
1. Queue Single Remote Job
1. 单个远程任务入队
bash
undefinedbash
undefinedStep 1: Verify daemon is running
步骤1:验证守护进程是否运行
ssh bigblack "~/.local/bin/pueue status"
ssh bigblack "~/.local/bin/pueue status"
Step 2: Queue the job
步骤2:将任务加入队列
ssh bigblack "~/.local/bin/pueue add --label 'my-job' -- cd ~/project && uv run python script.py"
ssh bigblack "~/.local/bin/pueue add --label 'my-job' -- cd ~/project && uv run python script.py"
Step 3: Monitor progress
步骤3:监控进度
ssh bigblack "~/.local/bin/pueue follow <id>"
undefinedssh bigblack "~/.local/bin/pueue follow <id>"
undefined2. Batch Job Submission (Multiple Symbols)
2. 批量任务提交(多交易对)
For rangebar cache population or similar batch operations:
bash
undefined适用于rangebar缓存填充或类似的批处理操作:
bash
undefinedUse the pueue-populate.sh script
使用pueue-populate.sh脚本
ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh setup" # One-time
ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh phase1" # Queue Phase 1
ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh status" # Check progress
undefinedssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh setup" # 一次性初始化
ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh phase1" # 入队第一阶段任务
ssh bigblack "cd ~/rangebar-py && ./scripts/pueue-populate.sh status" # 查看进度
undefined3. Configure Parallelism Groups
3. 配置并行执行分组
bash
undefinedbash
undefinedCreate groups with different parallelism limits
创建不同并行限制的分组
pueue group add fast # Create 'fast' group
pueue parallel 4 --group fast # Allow 4 parallel jobs
pueue group add slow
pueue parallel 1 --group slow # Sequential execution
pueue group add fast # 创建'fast'分组
pueue parallel 4 --group fast # 允许4个并发任务
pueue group add slow
pueue parallel 1 --group slow # 串行执行
Queue jobs to specific groups
将任务加入指定分组
pueue add --group fast -- echo "fast job"
pueue add --group slow -- echo "slow job"
undefinedpueue add --group fast -- echo "fast job"
pueue add --group slow -- echo "slow job"
undefined4. Handle Failed Jobs
4. 处理失败任务
bash
undefinedbash
undefinedCheck what failed
查看失败任务
pueue status | grep Failed
pueue status | grep Failed
View error output
查看错误输出
pueue log <id>
pueue log <id>
Restart specific job
重启指定任务
pueue restart <id>
pueue restart <id>
Restart all failed jobs
重启所有失败任务
pueue restart --all-failed
undefinedpueue restart --all-failed
undefinedInstallation
安装
macOS (Local)
macOS(本地)
bash
brew install pueue
pueued -d # Start daemonbash
brew install pueue
pueued -d # 启动守护进程Linux (BigBlack/LittleBlack)
Linux(BigBlack/LittleBlack)
bash
undefinedbash
undefinedDownload from GitHub releases (see https://github.com/Nukesor/pueue/releases for latest)
从GitHub Releases下载(请查看https://github.com/Nukesor/pueue/releases获取最新版本)
Or manually:
或手动安装:
SSoT-OK: Version from GitHub releases page
注意:版本号请以GitHub Releases页面为准
PUEUE_VERSION="v4.0.2"
curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueue-x86_64-unknown-linux-musl" -o ~/.local/bin/pueue
curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueued-x86_64-unknown-linux-musl" -o ~/.local/bin/pueued
chmod +x ~/.local/bin/pueue ~/.local/bin/pueued
PUEUE_VERSION="v4.0.2"
curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueue-x86_64-unknown-linux-musl" -o ~/.local/bin/pueue
curl -sSL "https://github.com/Nukesor/pueue/releases/download/${PUEUE_VERSION}/pueued-x86_64-unknown-linux-musl" -o ~/.local/bin/pueued
chmod +x ~/.local/bin/pueue ~/.local/bin/pueued
Start daemon
启动守护进程
~/.local/bin/pueued -d
undefined~/.local/bin/pueued -d
undefinedSystemd Auto-Start (Linux)
Systemd自动启动(Linux)
bash
mkdir -p ~/.config/systemd/user
cat > ~/.config/systemd/user/pueued.service << 'EOF'
[Unit]
Description=Pueue Daemon
After=network.target
[Service]
ExecStart=%h/.local/bin/pueued -v
Restart=on-failure
[Install]
WantedBy=default.target
EOF
systemctl --user daemon-reload
systemctl --user enable --now pueuedbash
mkdir -p ~/.config/systemd/user
cat > ~/.config/systemd/user/pueued.service << 'EOF'
[Unit]
Description=Pueue Daemon
After=network.target
[Service]
ExecStart=%h/.local/bin/pueued -v
Restart=on-failure
[Install]
WantedBy=default.target
EOF
systemctl --user daemon-reload
systemctl --user enable --now pueuedIntegration with rangebar-py
与rangebar-py的集成
The rangebar-py project has Pueue integration scripts:
| Script | Purpose |
|---|---|
| Queue cache population jobs with group-based parallelism |
| Install Pueue on Linux servers |
| Python script for individual symbol/threshold jobs |
rangebar-py项目提供了Pueue集成脚本:
| 脚本 | 用途 |
|---|---|
| 将缓存填充任务加入队列,并支持基于分组的并行执行 |
| 在Linux服务器上安装Pueue |
| 用于单个交易对/阈值任务的Python脚本 |
Phase-Based Execution
分阶段执行
bash
undefinedbash
undefinedPhase 1: 1000 dbps (fast, 4 parallel)
第一阶段:1000 dbps(快速,4个并行任务)
./scripts/pueue-populate.sh phase1
./scripts/pueue-populate.sh phase1
Phase 2: 250 dbps (moderate, 2 parallel)
第二阶段:250 dbps(中等速度,2个并行任务)
./scripts/pueue-populate.sh phase2
./scripts/pueue-populate.sh phase2
Phase 3: 500, 750 dbps (3 parallel)
第三阶段:500、750 dbps(3个并行任务)
./scripts/pueue-populate.sh phase3
./scripts/pueue-populate.sh phase3
Phase 4: 100 dbps (resource intensive, 1 at a time)
第四阶段:100 dbps(资源密集型,串行执行)
./scripts/pueue-populate.sh phase4
undefined./scripts/pueue-populate.sh phase4
undefinedTroubleshooting
故障排查
| Issue | Cause | Solution |
|---|---|---|
| Not in PATH | Use full path: |
| Daemon not running | Start with |
| Jobs stuck in Queued | Group paused or at limit | Check |
| SSH disconnect kills jobs | Not using Pueue | Queue via Pueue instead of direct SSH |
| Job fails immediately | Wrong working directory | Use |
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 未加入PATH环境变量 | 使用完整路径: |
| 守护进程未运行 | 使用 |
| 任务卡在Queued状态 | 分组已暂停或达到并行数上限 | 查看 |
| SSH断开导致任务终止 | 未使用Pueue执行 | 通过Pueue将任务入队,而非直接使用SSH执行 |
| 任务立即失败 | 工作目录错误 | 使用 |
Production Lessons (Issue #88)
生产环境经验总结(Issue #88)
Battle-tested patterns from real production deployments.
来自实际生产环境部署的经过验证的实践方案。
Dependency Chaining with --after
--after使用--after
实现依赖链式执行
--afterPueue supports automatic job dependency resolution via . This is critical for post-processing pipelines where steps must run sequentially after batch jobs complete.
--afterKey flags:
- -- Start job only after ALL specified jobs succeed. If any dependency fails, this job fails too.
--after <id>... - (or
--print-task-id) -- Return only the numeric job ID (for scripting).-p
Pattern: Capturing job IDs for dependency wiring
bash
undefinedPueue支持通过参数自动解析任务依赖关系。这对于后处理流水线至关重要,因为这类流水线中的步骤必须在批处理任务全部完成后才能按顺序执行。
--after关键参数:
- -- 仅在所有指定任务成功完成后才启动当前任务。如果任何依赖任务失败,当前任务也会失败。
--after <id>... - (或
--print-task-id) -- 仅返回数字任务ID(用于脚本编写)。-p
实践方案:捕获任务ID以配置依赖关系
bash
undefinedCapture job IDs during batch submission
批量提交时捕获任务ID
JOB_IDS=()
for symbol in BTCUSDT ETHUSDT; do
job_id=$(pueue add --print-task-id --group mygroup
--label "${symbol}@250"
--working-directory /path/to/project
-- uv run python scripts/process.py --symbol "$symbol") JOB_IDS+=("$job_id") done
--label "${symbol}@250"
--working-directory /path/to/project
-- uv run python scripts/process.py --symbol "$symbol") JOB_IDS+=("$job_id") done
JOB_IDS=()
for symbol in BTCUSDT ETHUSDT; do
job_id=$(pueue add --print-task-id --group mygroup
--label "${symbol}@250"
--working-directory /path/to/project
-- uv run python scripts/process.py --symbol "$symbol") JOB_IDS+=("$job_id") done
--label "${symbol}@250"
--working-directory /path/to/project
-- uv run python scripts/process.py --symbol "$symbol") JOB_IDS+=("$job_id") done
Chain post-processing after ALL batch jobs
在所有批处理任务完成后执行后处理
optimize_id=$(pueue add --print-task-id --group mygroup
--label "optimize-table"
--after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
--label "optimize-table"
--after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
optimize_id=$(pueue add --print-task-id --group mygroup
--label "optimize-table"
--after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
--label "optimize-table"
--after "${JOB_IDS[@]}"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL")
Chain validation after optimize
在优化完成后执行验证
pueue add --group mygroup
--label "validate"
--after "$optimize_id"
-- uv run python scripts/validate.py
--label "validate"
--after "$optimize_id"
-- uv run python scripts/validate.py
**Result in pueue status:**
Job 0 BTCUSDT@250 Running
Job 1 ETHUSDT@250 Running
Job 2 optimize-table Queued Deps: 0, 1
Job 3 validate Queued Deps: 2
**When to use `--after`:**
- Post-processing steps (OPTIMIZE TABLE, validation scripts, cleanup)
- Multi-stage pipelines where Stage N depends on Stage N-1
- Verification jobs that should only run after data is fully written
**Anti-pattern: Manual waiting**
```bashpueue add --group mygroup
--label "validate"
--after "$optimize_id"
-- uv run python scripts/validate.py
--label "validate"
--after "$optimize_id"
-- uv run python scripts/validate.py
**pueue status中的结果:**
Job 0 BTCUSDT@250 Running
Job 1 ETHUSDT@250 Running
Job 2 optimize-table Queued Deps: 0, 1
Job 3 validate Queued Deps: 2
**何时使用`--after`:**
- 后处理步骤(OPTIMIZE TABLE、验证脚本、清理操作)
- 多阶段流水线,其中第N阶段依赖第N-1阶段的完成
- 仅在数据完全写入后才运行的验证任务
**反模式:手动等待**
```bashBAD: Manual polling or instructions to "run this after that finishes"
错误做法:手动轮询或提示“等那个完成后再运行这个”
postprocess_all() {
queue_repopulation_jobs
echo "Run 'pueue wait --group postfix' then run optimize manually" # NO!
}
postprocess_all() {
queue_repopulation_jobs
echo "运行'pueue wait --group postfix'后手动执行优化操作" # 不推荐!
}
GOOD: Automatic dependency chain
正确做法:自动依赖链式执行
postprocess_all() {
queue_repopulation_jobs # captures JOB_IDS
pueue add --after "${JOB_IDS[@]}" -- optimize_command
pueue add --after "$optimize_id" -- validate_command
}
undefinedpostprocess_all() {
queue_repopulation_jobs # 捕获JOB_IDS
pueue add --after "${JOB_IDS[@]}" -- optimize_command
pueue add --after "$optimize_id" -- validate_command
}
undefinedMise Task to Pueue Pipeline Integration
Mise任务与Pueue流水线集成
Pattern for commands that build pueue DAGs:
mise runtoml
undefined将命令与Pueue DAG结合的实践方案:
mise runtoml
undefined.mise/tasks/cache.toml
.mise/tasks/cache.toml
["cache:postprocess-all"]
description = "Full post-fix pipeline via pueue: repopulate -> optimize -> detect (auto-chained)"
run = "./scripts/pueue-populate.sh postprocess-all"
The shell script captures pueue job IDs and chains them with `--after`. Mise provides the entry point; pueue provides the execution engine with dependency resolution.["cache:postprocess-all"]
description = "通过Pueue实现完整的后处理流水线:重新填充缓存 -> 优化 -> 检测(自动链式执行)"
run = "./scripts/pueue-populate.sh postprocess-all"
Shell脚本负责捕获Pueue任务ID并通过`--after`建立依赖关系。Mise提供入口点,Pueue提供具备依赖解析能力的执行引擎。Forensic Audit Before Deployment
部署前的 forensic 审计
ALWAYS audit the remote host before mutating anything:
bash
undefined在对远程主机进行任何变更前,务必执行审计:
bash
undefined1. Pueue job state
1. Pueue任务状态
ssh host 'pueue status'
ssh host 'pueue status --json | python3 -c "import json,sys; d=json.load(sys.stdin); print(sum(1 for t in d["tasks"].values() if "Running" in str(t["status"])))"'
ssh host 'pueue status'
ssh host 'pueue status --json | python3 -c "import json,sys; d=json.load(sys.stdin); print(sum(1 for t in d["tasks"].values() if "Running" in str(t["status"])))"'
2. Database state (ClickHouse example)
2. 数据库状态(ClickHouse示例)
ssh host 'clickhouse-client --query "SELECT symbol, threshold, count(), countIf(volume < 0) FROM mytable GROUP BY ALL"'
ssh host 'clickhouse-client --query "SELECT symbol, threshold, count(), countIf(volume < 0) FROM mytable GROUP BY ALL"'
3. Checkpoint state
3. 检查点状态
ssh host 'ls -la ~/.cache/myapp/checkpoints/'
ssh host 'cat ~/.cache/myapp/checkpoints/latest.json'
ssh host 'ls -la ~/.cache/myapp/checkpoints/'
ssh host 'cat ~/.cache/myapp/checkpoints/latest.json'
4. System resources
4. 系统资源
ssh host 'uptime && free -h && df -h /home'
ssh host 'uptime && free -h && df -h /home'
5. Installed version
5. 已安装版本
ssh host 'cd ~/project && git log --oneline -1'
undefinedssh host 'cd ~/project && git log --oneline -1'
undefinedForce-Refresh vs Checkpoint Resume
强制刷新 vs 从检查点恢复
Decision matrix for restarting killed/failed jobs:
| Scenario | Action | Flag |
|---|---|---|
| Job killed mid-run, data is clean | Resume from checkpoint | (no --force-refresh) |
| Data is corrupt (overflow, schema bug) | Wipe and restart | --force-refresh |
| Code fix changes output format | Wipe and restart | --force-refresh |
| Code fix is internal-only (no output change) | Resume from checkpoint | (no --force-refresh) |
重启被终止/失败任务的决策矩阵:
| 场景 | 操作 | 参数 |
|---|---|---|
| 任务中途被终止,数据未损坏 | 从检查点恢复 | (无需--force-refresh) |
| 数据损坏(溢出、Schema错误) | 清理后重启 | --force-refresh |
| 代码修复改变了输出格式 | 清理后重启 | --force-refresh |
| 代码修复仅涉及内部逻辑(不改变输出) | 从检查点恢复 | (无需--force-refresh) |
PATH Gotcha: Rust Not in PATH via uv run
uv runPATH陷阱:uv run
无法找到Rust
uv runOn remote hosts, may fail because is not in 's PATH:
uv run maturin develop~/.cargo/binuv runbash
undefined在远程主机上,可能会失败,因为不在的PATH环境变量中:
uv run maturin develop~/.cargo/binuv runbash
undefinedFAILS: rustc not found
失败:找不到rustc
ssh host 'cd ~/project && uv run maturin develop --uv'
ssh host 'cd ~/project && uv run maturin develop --uv'
WORKS: Prepend cargo bin to PATH
成功:将cargo bin目录加入PATH开头
ssh host 'cd ~/project && PATH="$HOME/.cargo/bin:$PATH" uv run maturin develop --uv'
For pueue jobs that need Rust compilation:
```bash
pueue add -- env PATH="/home/user/.cargo/bin:$PATH" uv run maturin developssh host 'cd ~/project && PATH="$HOME/.cargo/bin:$PATH" uv run maturin develop --uv'
对于需要Rust编译的Pueue任务:
```bash
pueue add -- env PATH="/home/user/.cargo/bin:$PATH" uv run maturin developPer-Year (Epoch) Parallelization
按年份(纪元)并行执行
When a processing pipeline has natural reset boundaries (yearly, monthly, etc.) where processor state resets, each epoch becomes an independent processing unit. This enables massive speedup by splitting a multi-year sequential job into concurrent per-year pueue jobs.
Why it's safe (three isolation layers):
| Layer | Why No Conflicts |
|---|---|
| Checkpoint files | Filename includes |
| Database writes | INSERT is append-only; |
| Source data | Read-only files (Parquet, CSV, etc.) — no write contention |
Pattern: Per-symbol pueue groups
Give each symbol (or job family) its own pueue group for independent parallelism control:
bash
undefined当处理流水线存在天然的重置边界(每年、每月等),且处理器状态会在边界处重置时,每个纪元可作为独立的处理单元。通过将多年的串行任务拆分为按年份的并行Pueue任务,可大幅提升处理速度。
安全性保障(三层隔离):
| 隔离层 | 无冲突原因 |
|---|---|
| 检查点文件 | 文件名包含 |
| 数据库写入 | INSERT为追加模式; |
| 源数据 | 只读文件(Parquet、CSV等) — 无写入冲突 |
实践方案:按交易对划分Pueue分组
为每个交易对(或任务类型)创建独立的Pueue分组,以便独立控制并行数:
bash
undefinedCreate per-symbol groups
创建按交易对划分的分组
pueue group add btc-yearly --parallel 4
pueue group add eth-yearly --parallel 4
pueue group add shib-yearly --parallel 4
pueue group add btc-yearly --parallel 4
pueue group add eth-yearly --parallel 4
pueue group add shib-yearly --parallel 4
Queue per-year jobs
按年份入队任务
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do
pueue add --group btc-yearly
--label "BTC@250:${year}"
-- uv run python scripts/process.py
--symbol BTCUSDT --threshold 250
--start-date "${year}-01-01" --end-date "${year}-12-31" done
--label "BTC@250:${year}"
-- uv run python scripts/process.py
--symbol BTCUSDT --threshold 250
--start-date "${year}-01-01" --end-date "${year}-12-31" done
for year in 2019 2020 2021 2022 2023 2024 2025 2026; do
pueue add --group btc-yearly
--label "BTC@250:${year}"
-- uv run python scripts/process.py
--symbol BTCUSDT --threshold 250
--start-date "${year}-01-01" --end-date "${year}-12-31" done
--label "BTC@250:${year}"
-- uv run python scripts/process.py
--symbol BTCUSDT --threshold 250
--start-date "${year}-01-01" --end-date "${year}-12-31" done
Chain post-processing after ALL groups complete
在所有年份任务完成后执行链式后处理
ALL_JOB_IDS=($(pueue status --json | jq -r
'.tasks | to_entries[] | select(.value.group | test("-yearly$")) | .value.id')) pueue add --after "${ALL_JOB_IDS[@]}"
--label "optimize-table:final"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL"
'.tasks | to_entries[] | select(.value.group | test("-yearly$")) | .value.id')) pueue add --after "${ALL_JOB_IDS[@]}"
--label "optimize-table:final"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL"
**When to use per-year vs sequential:**
| Scenario | Approach |
| --------------------------------------- | ------------------------ |
| High-volume symbol (many output items) | Per-year (5+ cores idle) |
| Low-volume symbol (fast enough already) | Sequential (simpler) |
| Single parameter, long backfill | Per-year |
| Multiple parameters, same symbol | Sequential per parameter |
**Critical rules:**
1. First year uses domain-specific effective start date, not `01-01`
2. Last year uses actual latest available date as end
3. Chain `OPTIMIZE TABLE FINAL` after ALL year-jobs via `--after`
4. Memory budget: each job peaks independently — with 61 GB total, 4-5 concurrent jobs at 5 GB each are safeALL_JOB_IDS=($(pueue status --json | jq -r
'.tasks | to_entries[] | select(.value.group | test("-yearly$")) | .value.id')) pueue add --after "${ALL_JOB_IDS[@]}"
--label "optimize-table:final"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL"
'.tasks | to_entries[] | select(.value.group | test("-yearly$")) | .value.id')) pueue add --after "${ALL_JOB_IDS[@]}"
--label "optimize-table:final"
-- clickhouse-client --query "OPTIMIZE TABLE mydb.mytable FINAL"
**何时选择按年份并行 vs 串行执行:**
| 场景 | 处理方式 |
| --------------------------------------- | ------------------------ |
| 高交易量交易对(输出项多) | 按年份并行(5+核心空闲时) |
| 低交易量交易对(速度已足够) | 串行执行(更简单) |
| 单一参数、长期回填任务 | 按年份并行 |
| 多参数、同一交易对 | 按参数串行执行 |
**关键规则:**
1. 第一年使用特定领域的有效起始日期,而非`01-01`
2. 最后一年使用实际可用的最新日期作为结束日期
3. 通过`--after`在所有年份任务完成后执行`OPTIMIZE TABLE FINAL`
4. 内存预算:每个任务独立达到峰值 — 总内存61 GB时,同时运行4-5个占用5 GB内存的任务是安全的Pipeline Monitoring (Group-Based Phase Detection)
流水线监控(基于分组的阶段检测)
For multi-group pipelines, monitor job phases by group completion, not hardcoded job IDs. Job IDs change when jobs are removed, re-queued, or split into per-year jobs.
Anti-pattern: Hardcoded job IDs in monitors
bash
undefined对于多分组流水线,应通过分组完成状态监控任务阶段,而非硬编码任务ID。当任务被移除、重新入队或拆分为按年份的任务时,任务ID会发生变化。
反模式:监控中硬编码任务ID
bash
undefinedWRONG: Breaks when jobs are removed/re-queued
错误做法:当任务被移除/重新入队时会失效
job14=$(echo "$JOBS" | grep "^14|")
if [ "$(echo "$job14" | cut -d'|' -f2)" = "Done" ]; then
echo "Phase 1 complete"
fi
**Correct pattern: Dynamic group detection**
```bash
get_job_status() {
ssh host "pueue status --json 2>/dev/null" | jq -r \
'.tasks | to_entries[] |
"\(.value.id)|\(.value.status | if type == "object" then keys[0] else . end)|\(.value.label // "-")|\(.value.group)"'
}
group_all_done() {
local group="$1"
local group_jobs
group_jobs=$(echo "$JOBS" | grep "|${group}$" || true)
[ -z "$group_jobs" ] && return 1
echo "$group_jobs" | grep -qE "\|(Running|Queued)\|" && return 1
return 0
}job14=$(echo "$JOBS" | grep "^14|")
if [ "$(echo "$job14" | cut -d'|' -f2)" = "Done" ]; then
echo "第一阶段完成"
fi
**正确实践:动态分组检测**
```bash
get_job_status() {
ssh host "pueue status --json 2>/dev/null" | jq -r \
'.tasks | to_entries[] |
"\(.value.id)|\(.value.status | if type == "object" then keys[0] else . end)|\(.value.label // "-")|\(.value.group)"'
}
group_all_done() {
local group="$1"
local group_jobs
group_jobs=$(echo "$JOBS" | grep "|${group}$" || true)
[ -z "$group_jobs" ] && return 1
echo "$group_jobs" | grep -qE "\|(Running|Queued)\|" && return 1
return 0
}Detect phase transitions by group name
通过分组名称检测阶段转换
SEEN_GROUPS=""
for group in $(echo "$JOBS" | cut -d'|' -f4 | sort -u); do
if group_all_done "$group" && [[ "$SEEN_GROUPS" != "|${group}|" ]]; then
echo "GROUP COMPLETE: $group"
run_integrity_checks "$group"
SEEN_GROUPS="${SEEN_GROUPS}|${group}|"
fi
done
**Integrity checks at phase boundaries:**
Run automated validation when a group finishes, before starting the next phase:
```bash
run_integrity_checks() {
local phase="$1"
# Check 1: Data corruption (negative values, out-of-bounds)
ssh host 'clickhouse-client --query "SELECT ... countIf(value < 0) ... HAVING count > 0"'
# Check 2: Duplicate rows
ssh host 'clickhouse-client --query "SELECT ... count(*) - uniqExact(key) as dupes HAVING dupes > 0"'
# Check 3: Coverage gaps (NULL required fields)
ssh host 'clickhouse-client --query "SELECT ... countIf(field IS NULL) ... HAVING missing > 0"'
# Check 4: System resources (load, memory)
ssh host 'uptime && free -h'
}Monitoring as a background loop:
bash
POLL_INTERVAL=300 # 5 minutes
while true; do
JOBS=$(get_job_status)
# Count statuses, detect failures, detect group completions
# Run integrity checks at phase boundaries
# Exit when all jobs complete
sleep "$POLL_INTERVAL"
doneSEEN_GROUPS=""
for group in $(echo "$JOBS" | cut -d'|' -f4 | sort -u); do
if group_all_done "$group" && [[ "$SEEN_GROUPS" != "|${group}|" ]]; then
echo "分组完成: $group"
run_integrity_checks "$group"
SEEN_GROUPS="${SEEN_GROUPS}|${group}|"
fi
done
**阶段边界处的完整性检查:**
当一个分组完成时,在启动下一阶段前执行自动验证:
```bash
run_integrity_checks() {
local phase="$1"
# 检查1:数据损坏(负值、超出范围)
ssh host 'clickhouse-client --query "SELECT ... countIf(value < 0) ... HAVING count > 0"'
# 检查2:重复行
ssh host 'clickhouse-client --query "SELECT ... count(*) - uniqExact(key) as dupes HAVING dupes > 0"'
# 检查3:覆盖缺口(必填字段为NULL)
ssh host 'clickhouse-client --query "SELECT ... countIf(field IS NULL) ... HAVING missing > 0"'
# 检查4:系统资源(负载、内存)
ssh host 'uptime && free -h'
}后台循环监控:
bash
POLL_INTERVAL=300 # 5分钟
while true; do
JOBS=$(get_job_status)
# 统计状态、检测失败任务、检测分组完成情况
# 在阶段边界执行完整性检查
# 所有任务完成后退出
sleep "$POLL_INTERVAL"
doneRelated
相关资源
- Hook: - Reminds to use Pueue for detected long-running commands
itp-hooks/posttooluse-reminder.ts - Reference: Pueue GitHub
- Issue: rangebar-py#77 - Original implementation
- Issue: rangebar-py#88 - Production deployment lessons
- Hook: - 当检测到长时间运行的命令时,提醒用户使用Pueue
itp-hooks/posttooluse-reminder.ts - 参考文档: Pueue GitHub
- Issue: rangebar-py#77 - 最初实现方案
- Issue: rangebar-py#88 - 生产环境部署经验总结