csv-wave-pipeline
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAuto Mode
自动模式
When or : Auto-confirm task decomposition, skip interactive validation, use defaults.
--yes-y当使用或参数时:自动确认任务分解,跳过交互式验证,使用默认设置。
--yes-yCSV Wave Pipeline
CSV波次执行管道
Usage
使用方法
bash
$csv-wave-pipeline "Implement user authentication with OAuth, JWT, and 2FA"
$csv-wave-pipeline -c 4 "Refactor payment module with Stripe and PayPal"
$csv-wave-pipeline -y "Build notification system with email and SMS"
$csv-wave-pipeline --continue "auth-20260228"Flags:
- : Skip all confirmations (auto mode)
-y, --yes - : Max concurrent agents within each wave (default: 4)
-c, --concurrency N - : Resume existing session
--continue
Output Directory:
Core Output: (master state) + (final) + (shared exploration) + (human-readable report)
.workflow/.csv-wave/{session-id}/tasks.csvresults.csvdiscoveries.ndjsoncontext.mdbash
$csv-wave-pipeline "实现基于OAuth、JWT和2FA的用户认证"
$csv-wave-pipeline -c 4 "使用Stripe和PayPal重构支付模块"
$csv-wave-pipeline -y "构建包含邮件和短信的通知系统"
$csv-wave-pipeline --continue "auth-20260228"参数:
- : 跳过所有确认步骤(自动模式)
-y, --yes - : 每个波次内的最大并发Agent数量(默认值:4)
-c, --concurrency N - : 恢复现有会话
--continue
输出目录:
核心输出: (主状态文件)+ (最终结果)+ (共享探索记录)+ (人类可读报告)
.workflow/.csv-wave/{session-id}/tasks.csvresults.csvdiscoveries.ndjsoncontext.mdOverview
概述
Wave-based batch execution using with cross-wave context propagation. Tasks are grouped into dependency waves; each wave executes concurrently, and its results feed into the next wave.
spawn_agents_on_csvCore workflow: Decompose → Compute Waves → Execute Wave-by-Wave → Aggregate
┌─────────────────────────────────────────────────────────────────────────┐
│ CSV BATCH EXECUTION WORKFLOW │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Phase 1: Requirement → CSV │
│ ├─ Parse requirement into subtasks (3-10 tasks) │
│ ├─ Identify dependencies (deps column) │
│ ├─ Compute dependency waves (topological sort → depth grouping) │
│ ├─ Generate tasks.csv with wave column │
│ └─ User validates task breakdown (skip if -y) │
│ │
│ Phase 2: Wave Execution Engine │
│ ├─ For each wave (1..N): │
│ │ ├─ Build wave CSV (filter rows for this wave) │
│ │ ├─ Inject previous wave findings into prev_context column │
│ │ ├─ spawn_agents_on_csv(wave CSV) │
│ │ ├─ Collect results, merge into master tasks.csv │
│ │ └─ Check: any failed? → skip dependents or retry │
│ └─ discoveries.ndjson shared across all waves (append-only) │
│ │
│ Phase 3: Results Aggregation │
│ ├─ Export final results.csv │
│ ├─ Generate context.md with all findings │
│ ├─ Display summary: completed/failed/skipped per wave │
│ └─ Offer: view results | retry failed | done │
│ │
└─────────────────────────────────────────────────────────────────────────┘基于波次的批量执行,使用并支持跨波次上下文传播。任务会被分组为依赖波次;每个波次并发执行,执行结果会传入下一个波次。
spawn_agents_on_csv核心工作流: 分解需求 → 计算波次 → 按波次执行 → 结果聚合
┌─────────────────────────────────────────────────────────────────────────┐
│ CSV批量执行工作流 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 阶段1: 需求 → CSV │
│ ├─ 将需求解析为子任务(3-10个任务) │
│ ├─ 识别任务依赖(deps列) │
│ ├─ 计算依赖波次(拓扑排序 → 按深度分组) │
│ ├─ 生成带wave列的tasks.csv │
│ └─ 用户验证任务分解结果(若使用-y则跳过) │
│ │
│ 阶段2: 波次执行引擎 │
│ ├─ 遍历每个波次(1..N): │
│ │ ├─ 构建当前波次的CSV文件(筛选对应波次的任务行) │
│ │ ├─ 将上一波次的结果注入prev_context列 │
│ │ ├─ 调用spawn_agents_on_csv(当前波次CSV) │
│ │ ├─ 收集执行结果,合并到主tasks.csv文件 │
│ │ └─ 检查:是否有任务失败? → 跳过依赖任务或重试 │
│ └─ discoveries.ndjson在所有波次间共享(仅追加模式) │
│ │
│ 阶段3: 结果聚合 │
│ ├─ 导出最终的results.csv │
│ ├─ 生成包含所有执行结果的context.md文档 │
│ ├─ 显示执行摘要:每个波次的完成/失败/跳过任务数 │
│ └─ 提供后续操作选项:查看结果 | 重试失败任务 | 结束 │
│ │
└─────────────────────────────────────────────────────────────────────────┘CSV Schema
CSV Schema
tasks.csv (Master State)
tasks.csv(主状态文件)
csv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error
"1","Setup auth module","Create auth directory structure and base files","Verify directory exists and base files export expected interfaces","auth/ dir created; index.ts and types.ts export AuthProvider interface","src/auth/**","Follow monorepo module pattern || package.json;src/shared/types.ts","","","","1","","","","","",""
"2","Implement OAuth","Add OAuth provider integration with Google and GitHub","Unit test: mock OAuth callback returns valid token; Integration test: verify redirect URL generation","OAuth login redirects to provider; callback returns JWT; supports Google and GitHub","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth before completion","1","1","2","","","","","",""
"3","Add JWT tokens","Implement JWT generation and validation","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","","","","","",""
"4","Setup 2FA","Add TOTP-based 2FA with QR code generation","Unit test: TOTP verify with correct code; Test: QR data URL is valid","QR code generates scannable image; TOTP verification succeeds within time window","src/auth/2fa/**","Use speakeasy + qrcode libraries || src/auth/oauth/strategy.ts;src/auth/jwt/token.ts","Run full test suite: npm test","2;3","1;2;3","3","","","","","",""Columns:
| Column | Phase | Description |
|---|---|---|
| Input | Unique task identifier (string) |
| Input | Short task title |
| Input | Detailed task description — what to implement |
| Input | Test cases: what tests to write and how to verify (unit/integration/edge) |
| Input | Acceptance criteria: measurable conditions that define "done" |
| Input | Target file/directory glob — constrains agent work area, prevents cross-task file conflicts |
| Input | Implementation tips + reference files. Format: |
| Input | Execution constraints: commands to run for verification, tool restrictions, environment requirements |
| Input | Semicolon-separated dependency task IDs (empty = no deps) |
| Input | Semicolon-separated task IDs whose findings this task needs |
| Computed | Wave number (computed by topological sort, 1-based) |
| Output | |
| Output | Key discoveries or implementation notes (max 500 chars) |
| Output | Semicolon-separated file paths |
| Output | Whether all defined test cases passed (true/false) |
| Output | Summary of which acceptance criteria were met/unmet |
| Output | Error message if failed (empty if success) |
csv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error
"1","Setup auth module","Create auth directory structure and base files","Verify directory exists and base files export expected interfaces","auth/ dir created; index.ts and types.ts export AuthProvider interface","src/auth/**","Follow monorepo module pattern || package.json;src/shared/types.ts","","","","1","","","","","",""
"2","Implement OAuth","Add OAuth provider integration with Google and GitHub","Unit test: mock OAuth callback returns valid token; Integration test: verify redirect URL generation","OAuth login redirects to provider; callback returns JWT; supports Google and GitHub","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth before completion","1","1","2","","","","","",""
"3","Add JWT tokens","Implement JWT generation and validation","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","","","","","",""
"4","Setup 2FA","Add TOTP-based 2FA with QR code generation","Unit test: TOTP verify with correct code; Test: QR data URL is valid","QR code generates scannable image; TOTP verification succeeds within time window","src/auth/2fa/**","Use speakeasy + qrcode libraries || src/auth/oauth/strategy.ts;src/auth/jwt/token.ts","Run full test suite: npm test","2;3","1;2;3","3","","","","","",""列说明:
| 列名 | 所属阶段 | 描述 |
|---|---|---|
| 输入 | 唯一任务标识符(字符串) |
| 输入 | 简短任务标题 |
| 输入 | 详细任务描述 — 需要实现的内容 |
| 输入 | 测试用例:需要编写的测试及验证方式(单元/集成/边界测试) |
| 输入 | 验收标准:定义任务“完成”的可衡量条件 |
| 输入 | 目标文件/目录通配符 — 限制Agent的工作范围,避免跨任务文件冲突 |
| 输入 | 实现提示 + 参考文件。格式: |
| 输入 | 执行约束:用于验证的命令、工具限制、环境要求 |
| 输入 | 分号分隔的依赖任务ID(为空表示无依赖) |
| 输入 | 分号分隔的任务ID,当前任务需要这些任务的执行结果 |
| 计算生成 | 波次编号(通过拓扑排序计算,从1开始) |
| 输出 | |
| 输出 | 关键发现或实现说明(最多500字符) |
| 输出 | 分号分隔的文件路径 |
| 输出 | 是否所有定义的测试用例都通过(true/false) |
| 输出 | 验收标准达成情况的摘要 |
| 输出 | 任务失败时的错误信息(成功则为空) |
Per-Wave CSV (Temporary)
单波次临时CSV文件
Each wave generates a temporary with an extra column:
wave-{N}.csvprev_contextcsv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context
"2","Implement OAuth","Add OAuth integration","Unit test: mock OAuth callback returns valid token","OAuth login redirects to provider; callback returns JWT","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"
"3","Add JWT tokens","Implement JWT","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"The column is built from by looking up completed tasks' in the master CSV.
prev_contextcontext_fromfindings每个波次会生成一个临时的文件,包含额外的列:
wave-{N}.csvprev_contextcsv
id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context
"2","Implement OAuth","Add OAuth integration","Unit test: mock OAuth callback returns valid token","OAuth login redirects to provider; callback returns JWT","src/auth/oauth/**","Use passport.js strategy pattern || src/auth/index.ts;docs/oauth-flow.md","Run npm test -- --grep oauth","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"
"3","Add JWT tokens","Implement JWT","Unit test: sign/verify round-trip; Edge test: expired token returns 401","generateToken() returns valid JWT; verifyToken() rejects expired/tampered tokens","src/auth/jwt/**","Use jsonwebtoken library; Set default expiry 1h || src/config/auth.ts","Ensure tsc --noEmit passes","1","1","2","[Task 1] Created auth/ with index.ts and types.ts"prev_contextfindingscontext_fromOutput Artifacts
输出产物
| File | Purpose | Lifecycle |
|---|---|---|
| Master state — all tasks with status/findings | Updated after each wave |
| Per-wave input (temporary) | Created before wave, deleted after |
| Final export of all task results | Created in Phase 3 |
| Shared exploration board across all agents | Append-only, carries across waves |
| Human-readable execution report | Created in Phase 3 |
| 文件 | 用途 | 生命周期 |
|---|---|---|
| 主状态文件 — 包含所有任务的状态和执行结果 | 每个波次执行后更新 |
| 单波次输入文件(临时文件) | 波次执行前创建,执行后删除 |
| 所有任务结果的最终导出文件 | 阶段3生成 |
| 所有Agent共享的探索记录板 | 仅追加模式,跨波次保留 |
| 人类可读的执行报告 | 阶段3生成 |
Session Structure
会话结构
.workflow/.csv-wave/{session-id}/
├── tasks.csv # Master state (updated per wave)
├── results.csv # Final results export
├── discoveries.ndjson # Shared discovery board (all agents)
├── context.md # Human-readable report
└── wave-{N}.csv # Temporary per-wave input (cleaned up).workflow/.csv-wave/{session-id}/
├── tasks.csv # 主状态文件(每个波次执行后更新)
├── results.csv # 最终结果导出文件
├── discoveries.ndjson # 共享探索记录板(所有Agent共享)
├── context.md # 人类可读报告
└── wave-{N}.csv # 临时单波次输入文件(执行后清理)Implementation
实现细节
Session Initialization
会话初始化
javascript
const getUtc8ISOString = () => new Date(Date.now() + 8 * 60 * 60 * 1000).toISOString()
// Parse flags
const AUTO_YES = $ARGUMENTS.includes('--yes') || $ARGUMENTS.includes('-y')
const continueMode = $ARGUMENTS.includes('--continue')
const concurrencyMatch = $ARGUMENTS.match(/(?:--concurrency|-c)\s+(\d+)/)
const maxConcurrency = concurrencyMatch ? parseInt(concurrencyMatch[1]) : 4
// Clean requirement text (remove flags)
const requirement = $ARGUMENTS
.replace(/--yes|-y|--continue|--concurrency\s+\d+|-c\s+\d+/g, '')
.trim()
const slug = requirement.toLowerCase()
.replace(/[^a-z0-9\u4e00-\u9fa5]+/g, '-')
.substring(0, 40)
const dateStr = getUtc8ISOString().substring(0, 10).replace(/-/g, '')
const sessionId = `cwp-${slug}-${dateStr}`
const sessionFolder = `.workflow/.csv-wave/${sessionId}`
// Continue mode: find existing session
if (continueMode) {
const existing = Bash(`ls -t .workflow/.csv-wave/ 2>/dev/null | head -1`).trim()
if (existing) {
sessionId = existing
sessionFolder = `.workflow/.csv-wave/${sessionId}`
// Read existing tasks.csv, find incomplete waves, resume from there
const existingCsv = Read(`${sessionFolder}/tasks.csv`)
// → jump to Phase 2 with remaining waves
}
}
Bash(`mkdir -p ${sessionFolder}`)javascript
const getUtc8ISOString = () => new Date(Date.now() + 8 * 60 * 60 * 1000).toISOString()
// 解析命令行参数
const AUTO_YES = $ARGUMENTS.includes('--yes') || $ARGUMENTS.includes('-y')
const continueMode = $ARGUMENTS.includes('--continue')
const concurrencyMatch = $ARGUMENTS.match(/(?:--concurrency|-c)\s+(\d+)/)
const maxConcurrency = concurrencyMatch ? parseInt(concurrencyMatch[1]) : 4
// 清理需求文本(移除命令行参数)
const requirement = $ARGUMENTS
.replace(/--yes|-y|--continue|--concurrency\s+\d+|-c\s+\d+/g, '')
.trim()
const slug = requirement.toLowerCase()
.replace(/[^a-z0-9\u4e00-\u9fa5]+/g, '-')
.substring(0, 40)
const dateStr = getUtc8ISOString().substring(0, 10).replace(/-/g, '')
const sessionId = `cwp-${slug}-${dateStr}`
const sessionFolder = `.workflow/.csv-wave/${sessionId}`
// 恢复模式:查找现有会话
if (continueMode) {
const existing = Bash(`ls -t .workflow/.csv-wave/ 2>/dev/null | head -1`).trim()
if (existing) {
sessionId = existing
sessionFolder = `.workflow/.csv-wave/${sessionId}`
// 读取现有tasks.csv,找到未完成的波次,从该位置恢复执行
const existingCsv = Read(`${sessionFolder}/tasks.csv`)
// → 跳转到阶段2,继续执行剩余波次
}
}
Bash(`mkdir -p ${sessionFolder}`)Phase 1: Requirement → CSV
阶段1: 需求 → CSV
Objective: Decompose requirement into tasks, compute dependency waves, generate tasks.csv.
Steps:
-
Decompose Requirementjavascript
// Use ccw cli to decompose requirement into subtasks Bash({ command: `ccw cli -p "PURPOSE: Decompose requirement into 3-10 atomic tasks for batch agent execution. Each task must include implementation description, test cases, and acceptance criteria.
TASK:
• Parse requirement into independent subtasks
• Identify dependencies between tasks (which must complete before others)
• Identify context flow (which tasks need previous tasks' findings)
• For each task, define concrete test cases (unit/integration/edge)
• For each task, define measurable acceptance criteria (what defines 'done')
• Each task must be executable by a single agent with file read/write access
MODE: analysis
CONTEXT: @**/*
EXPECTED: JSON object with tasks array. Each task: {id: string, title: string, description: string, test: string, acceptance_criteria: string, scope: string, hints: string, execution_directives: string, deps: string[], context_from: string[]}.
- description: what to implement (specific enough for an agent to execute independently)
- test: what tests to write and how to verify (e.g. 'Unit test: X returns Y; Edge test: handles Z')
- acceptance_criteria: measurable conditions that define done (e.g. 'API returns 200; token expires after 1h')
- scope: target file/directory glob (e.g. 'src/auth/**') — tasks in same wave MUST have non-overlapping scopes
- hints: implementation tips + reference files, format '<tips> || <ref_file1>;<ref_file2>' (e.g. 'Use strategy pattern || src/base/Strategy.ts;docs/design.md')
- execution_directives: commands to run for verification or tool constraints (e.g. 'Run npm test --bail; Ensure tsc passes')
- deps: task IDs that must complete first
- context_from: task IDs whose findings are needed CONSTRAINTS: 3-10 tasks | Each task is atomic | No circular deps | test and acceptance_criteria must be concrete and verifiable | Same-wave tasks must have non-overlapping scopes
REQUIREMENT: ${requirement}" --tool gemini --mode analysis --rule planning-breakdown-task-steps`,
run_in_background: true
})
// Wait for CLI completion via hook callback
// Parse JSON from CLI output → decomposedTasks[]
2. **Compute Waves** (Topological Sort → Depth Grouping)
```javascript
function computeWaves(tasks) {
// Build adjacency: task.deps → predecessors
const taskMap = new Map(tasks.map(t => [t.id, t]))
const inDegree = new Map(tasks.map(t => [t.id, 0]))
const adjList = new Map(tasks.map(t => [t.id, []]))
for (const task of tasks) {
for (const dep of task.deps) {
if (taskMap.has(dep)) {
adjList.get(dep).push(task.id)
inDegree.set(task.id, inDegree.get(task.id) + 1)
}
}
}
// BFS-based topological sort with depth tracking
const queue = [] // [taskId, depth]
const waveAssignment = new Map()
for (const [id, deg] of inDegree) {
if (deg === 0) {
queue.push([id, 1])
waveAssignment.set(id, 1)
}
}
let maxWave = 1
let idx = 0
while (idx < queue.length) {
const [current, depth] = queue[idx++]
for (const next of adjList.get(current)) {
const newDeg = inDegree.get(next) - 1
inDegree.set(next, newDeg)
const nextDepth = Math.max(waveAssignment.get(next) || 0, depth + 1)
waveAssignment.set(next, nextDepth)
if (newDeg === 0) {
queue.push([next, nextDepth])
maxWave = Math.max(maxWave, nextDepth)
}
}
}
// Detect cycles: any task without wave assignment
for (const task of tasks) {
if (!waveAssignment.has(task.id)) {
throw new Error(`Circular dependency detected involving task ${task.id}`)
}
}
return { waveAssignment, maxWave }
}
const { waveAssignment, maxWave } = computeWaves(decomposedTasks)-
Generate tasks.csvjavascript
const header = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error' const rows = decomposedTasks.map(task => { const wave = waveAssignment.get(task.id) return [ task.id, csvEscape(task.title), csvEscape(task.description), csvEscape(task.test), csvEscape(task.acceptance_criteria), csvEscape(task.scope), csvEscape(task.hints), csvEscape(task.execution_directives), task.deps.join(';'), task.context_from.join(';'), wave, 'pending', // status '', // findings '', // files_modified '', // tests_passed '', // acceptance_met '' // error ].map(cell => `"${String(cell).replace(/"/g, '""')}"`).join(',') }) Write(`${sessionFolder}/tasks.csv`, [header, ...rows].join('\n')) -
User Validation (skip if AUTO_YES)javascript
if (!AUTO_YES) { // Display task breakdown with wave assignment console.log(`\n## Task Breakdown (${decomposedTasks.length} tasks, ${maxWave} waves)\n`) for (let w = 1; w <= maxWave; w++) { const waveTasks = decomposedTasks.filter(t => waveAssignment.get(t.id) === w) console.log(`### Wave ${w} (${waveTasks.length} tasks, concurrent)`) waveTasks.forEach(t => console.log(` - [${t.id}] ${t.title}`)) } const answer = AskUserQuestion({ questions: [{ question: "Approve task breakdown?", header: "Validation", multiSelect: false, options: [ { label: "Approve", description: "Proceed with wave execution" }, { label: "Modify", description: `Edit ${sessionFolder}/tasks.csv manually, then --continue` }, { label: "Cancel", description: "Abort" } ] }] }) // BLOCKS if (answer.Validation === "Modify") { console.log(`Edit: ${sessionFolder}/tasks.csv\nResume: $csv-wave-pipeline --continue`) return } else if (answer.Validation === "Cancel") { return } }
Success Criteria:
- tasks.csv created with valid schema and wave assignments
- No circular dependencies
- User approved (or AUTO_YES)
目标: 将需求分解为任务,计算依赖波次,生成tasks.csv文件。
步骤:
-
分解需求javascript
// 使用ccw cli将需求分解为子任务 Bash({ command: `ccw cli -p "PURPOSE: Decompose requirement into 3-10 atomic tasks for batch agent execution. Each task must include implementation description, test cases, and acceptance criteria.
TASK:
• Parse requirement into independent subtasks
• Identify dependencies between tasks (which must complete before others)
• Identify context flow (which tasks need previous tasks' findings)
• For each task, define concrete test cases (unit/integration/edge)
• For each task, define measurable acceptance criteria (what defines 'done')
• Each task must be executable by a single agent with file read/write access
MODE: analysis
CONTEXT: @**/*
EXPECTED: JSON object with tasks array. Each task: {id: string, title: string, description: string, test: string, acceptance_criteria: string, scope: string, hints: string, execution_directives: string, deps: string[], context_from: string[]}.
- description: what to implement (specific enough for an agent to execute independently)
- test: what tests to write and how to verify (e.g. 'Unit test: X returns Y; Edge test: handles Z')
- acceptance_criteria: measurable conditions that define done (e.g. 'API returns 200; token expires after 1h')
- scope: target file/directory glob (e.g. 'src/auth/**') — tasks in same wave MUST have non-overlapping scopes
- hints: implementation tips + reference files, format '<tips> || <ref_file1>;<ref_file2>' (e.g. 'Use strategy pattern || src/base/Strategy.ts;docs/design.md')
- execution_directives: commands to run for verification or tool constraints (e.g. 'Run npm test --bail; Ensure tsc passes')
- deps: task IDs that must complete first
- context_from: task IDs whose findings are needed CONSTRAINTS: 3-10 tasks | Each task is atomic | No circular deps | test and acceptance_criteria must be concrete and verifiable | Same-wave tasks must have non-overlapping scopes
REQUIREMENT: ${requirement}" --tool gemini --mode analysis --rule planning-breakdown-task-steps`,
run_in_background: true
})
// 通过钩子回调等待CLI执行完成
// 解析CLI输出的JSON → decomposedTasks[]
2. **计算波次**(拓扑排序 → 深度分组)
```javascript
function computeWaves(tasks) {
// 构建邻接表:task.deps → 前置任务
const taskMap = new Map(tasks.map(t => [t.id, t]))
const inDegree = new Map(tasks.map(t => [t.id, 0]))
const adjList = new Map(tasks.map(t => [t.id, []]))
for (const task of tasks) {
for (const dep of task.deps) {
if (taskMap.has(dep)) {
adjList.get(dep).push(task.id)
inDegree.set(task.id, inDegree.get(task.id) + 1)
}
}
}
// 基于BFS的拓扑排序,同时跟踪深度
const queue = [] // [taskId, depth]
const waveAssignment = new Map()
for (const [id, deg] of inDegree) {
if (deg === 0) {
queue.push([id, 1])
waveAssignment.set(id, 1)
}
}
let maxWave = 1
let idx = 0
while (idx < queue.length) {
const [current, depth] = queue[idx++]
for (const next of adjList.get(current)) {
const newDeg = inDegree.get(next) - 1
inDegree.set(next, newDeg)
const nextDepth = Math.max(waveAssignment.get(next) || 0, depth + 1)
waveAssignment.set(next, nextDepth)
if (newDeg === 0) {
queue.push([next, nextDepth])
maxWave = Math.max(maxWave, nextDepth)
}
}
}
// 检测循环依赖:若存在未分配波次的任务则报错
for (const task of tasks) {
if (!waveAssignment.has(task.id)) {
throw new Error(`Circular dependency detected involving task ${task.id}`)
}
}
return { waveAssignment, maxWave }
}
const { waveAssignment, maxWave } = computeWaves(decomposedTasks)-
生成tasks.csv文件javascript
const header = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,status,findings,files_modified,tests_passed,acceptance_met,error' const rows = decomposedTasks.map(task => { const wave = waveAssignment.get(task.id) return [ task.id, csvEscape(task.title), csvEscape(task.description), csvEscape(task.test), csvEscape(task.acceptance_criteria), csvEscape(task.scope), csvEscape(task.hints), csvEscape(task.execution_directives), task.deps.join(';'), task.context_from.join(';'), wave, 'pending', // status '', // findings '', // files_modified '', // tests_passed '', // acceptance_met '' // error ].map(cell => `"${String(cell).replace(/"/g, '""')}"`).join(',') }) Write(`${sessionFolder}/tasks.csv`, [header, ...rows].join('\n')) -
用户验证(若使用AUTO_YES则跳过)javascript
if (!AUTO_YES) { // 显示带波次分配的任务分解结果 console.log(`\n## 任务分解结果(共${decomposedTasks.length}个任务,${maxWave}个波次)\n`) for (let w = 1; w <= maxWave; w++) { const waveTasks = decomposedTasks.filter(t => waveAssignment.get(t.id) === w) console.log(`### 波次 ${w}(共${waveTasks.length}个任务,支持并发执行)`) waveTasks.forEach(t => console.log(` - [${t.id}] ${t.title}`)) } const answer = AskUserQuestion({ questions: [{ question: "是否批准该任务分解结果?", header: "验证步骤", multiSelect: false, options: [ { label: "批准", description: "继续执行波次任务" }, { label: "修改", description: `手动编辑${sessionFolder}/tasks.csv文件,之后使用--continue参数恢复执行` }, { label: "取消", description: "终止执行" } ] }] }) // 阻塞等待用户输入 if (answer.验证步骤 === "修改") { console.log(`编辑路径: ${sessionFolder}/tasks.csv\n恢复执行: $csv-wave-pipeline --continue`) return } else if (answer.验证步骤 === "取消") { return } }
成功标准:
- 生成符合Schema且包含波次分配的tasks.csv文件
- 无循环依赖
- 用户已批准(或使用了AUTO_YES参数)
Phase 2: Wave Execution Engine
阶段2: 波次执行引擎
Objective: Execute tasks wave-by-wave via . Each wave sees previous waves' results.
spawn_agents_on_csvSteps:
-
Wave Loopjavascript
const failedIds = new Set() const skippedIds = new Set() for (let wave = 1; wave <= maxWave; wave++) { console.log(`\n## Wave ${wave}/${maxWave}\n`) // 1. Read current master CSV const masterCsv = parseCsv(Read(`${sessionFolder}/tasks.csv`)) // 2. Filter tasks for this wave const waveTasks = masterCsv.filter(row => parseInt(row.wave) === wave) // 3. Skip tasks whose deps failed const executableTasks = [] for (const task of waveTasks) { const deps = task.deps.split(';').filter(Boolean) if (deps.some(d => failedIds.has(d) || skippedIds.has(d))) { skippedIds.add(task.id) // Update master CSV: mark as skipped updateMasterCsvRow(sessionFolder, task.id, { status: 'skipped', error: 'Dependency failed or skipped' }) console.log(` [${task.id}] ${task.title} → SKIPPED (dependency failed)`) continue } executableTasks.push(task) } if (executableTasks.length === 0) { console.log(` No executable tasks in wave ${wave}`) continue } // 4. Build prev_context for each task for (const task of executableTasks) { const contextIds = task.context_from.split(';').filter(Boolean) const prevFindings = contextIds .map(id => { const prevRow = masterCsv.find(r => r.id === id) if (prevRow && prevRow.status === 'completed' && prevRow.findings) { return `[Task ${id}: ${prevRow.title}] ${prevRow.findings}` } return null }) .filter(Boolean) .join('\n') task.prev_context = prevFindings || 'No previous context available' } // 5. Write wave CSV const waveHeader = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context' const waveRows = executableTasks.map(t => [t.id, t.title, t.description, t.test, t.acceptance_criteria, t.scope, t.hints, t.execution_directives, t.deps, t.context_from, t.wave, t.prev_context] .map(cell => `"${String(cell).replace(/"/g, '""')}"`) .join(',') ) Write(`${sessionFolder}/wave-${wave}.csv`, [waveHeader, ...waveRows].join('\n')) // 6. Execute wave console.log(` Executing ${executableTasks.length} tasks (concurrency: ${maxConcurrency})...`) const waveResult = spawn_agents_on_csv({ csv_path: `${sessionFolder}/wave-${wave}.csv`, id_column: "id", instruction: buildInstructionTemplate(sessionFolder, wave), max_concurrency: maxConcurrency, max_runtime_seconds: 600, output_csv_path: `${sessionFolder}/wave-${wave}-results.csv`, output_schema: { type: "object", properties: { id: { type: "string" }, status: { type: "string", enum: ["completed", "failed"] }, findings: { type: "string" }, files_modified: { type: "array", items: { type: "string" } }, tests_passed: { type: "boolean" }, acceptance_met: { type: "string" }, error: { type: "string" } }, required: ["id", "status", "findings", "tests_passed"] } }) // ↑ Blocks until all agents in this wave complete // 7. Merge results into master CSV const waveResults = parseCsv(Read(`${sessionFolder}/wave-${wave}-results.csv`)) for (const result of waveResults) { updateMasterCsvRow(sessionFolder, result.id, { status: result.status, findings: result.findings || '', files_modified: (result.files_modified || []).join(';'), tests_passed: String(result.tests_passed ?? ''), acceptance_met: result.acceptance_met || '', error: result.error || '' }) if (result.status === 'failed') { failedIds.add(result.id) console.log(` [${result.id}] ${result.title} → FAILED: ${result.error}`) } else { console.log(` [${result.id}] ${result.title} → COMPLETED`) } } // 8. Cleanup temporary wave CSV Bash(`rm -f "${sessionFolder}/wave-${wave}.csv"`) console.log(` Wave ${wave} done: ${waveResults.filter(r => r.status === 'completed').length} completed, ${waveResults.filter(r => r.status === 'failed').length} failed`) } -
Instruction Template Builderjavascript
function buildInstructionTemplate(sessionFolder, wave) { return `
目标: 通过按波次执行任务。每个波次可获取上一波次的执行结果。
spawn_agents_on_csv步骤:
-
波次循环执行javascript
const failedIds = new Set() const skippedIds = new Set() for (let wave = 1; wave <= maxWave; wave++) { console.log(`\n## 执行波次 ${wave}/${maxWave}\n`) // 1. 读取当前主CSV文件 const masterCsv = parseCsv(Read(`${sessionFolder}/tasks.csv`)) // 2. 筛选当前波次的任务 const waveTasks = masterCsv.filter(row => parseInt(row.wave) === wave) // 3. 跳过依赖任务已失败的任务 const executableTasks = [] for (const task of waveTasks) { const deps = task.deps.split(';').filter(Boolean) if (deps.some(d => failedIds.has(d) || skippedIds.has(d))) { skippedIds.add(task.id) // 更新主CSV文件:标记为已跳过 updateMasterCsvRow(sessionFolder, task.id, { status: 'skipped', error: '依赖任务失败或已跳过' }) console.log(` [${task.id}] ${task.title} → 已跳过(依赖任务失败)`) continue } executableTasks.push(task) } if (executableTasks.length === 0) { console.log(` 当前波次${wave}无可执行任务`) continue } // 4. 为每个任务构建prev_context for (const task of executableTasks) { const contextIds = task.context_from.split(';').filter(Boolean) const prevFindings = contextIds .map(id => { const prevRow = masterCsv.find(r => r.id === id) if (prevRow && prevRow.status === 'completed' && prevRow.findings) { return `[任务 ${id}: ${prevRow.title}] ${prevRow.findings}` } return null }) .filter(Boolean) .join('\n') task.prev_context = prevFindings || '无前置上下文可用' } // 5. 写入当前波次的CSV文件 const waveHeader = 'id,title,description,test,acceptance_criteria,scope,hints,execution_directives,deps,context_from,wave,prev_context' const waveRows = executableTasks.map(t => [t.id, t.title, t.description, t.test, t.acceptance_criteria, t.scope, t.hints, t.execution_directives, t.deps, t.context_from, t.wave, t.prev_context] .map(cell => `"${String(cell).replace(/"/g, '""')}"`) .join(',') ) Write(`${sessionFolder}/wave-${wave}.csv`, [waveHeader, ...waveRows].join('\n')) // 6. 执行当前波次 console.log(` 执行${executableTasks.length}个任务(并发数: ${maxConcurrency})...`) const waveResult = spawn_agents_on_csv({ csv_path: `${sessionFolder}/wave-${wave}.csv`, id_column: "id", instruction: buildInstructionTemplate(sessionFolder, wave), max_concurrency: maxConcurrency, max_runtime_seconds: 600, output_csv_path: `${sessionFolder}/wave-${wave}-results.csv`, output_schema: { type: "object", properties: { id: { type: "string" }, status: { type: "string", enum: ["completed", "failed"] }, findings: { type: "string" }, files_modified: { type: "array", items: { type: "string" } }, tests_passed: { type: "boolean" }, acceptance_met: { type: "string" }, error: { type: "string" } }, required: ["id", "status", "findings", "tests_passed"] } }) // ↑ 阻塞直到当前波次所有Agent执行完成 // 7. 将执行结果合并到主CSV文件 const waveResults = parseCsv(Read(`${sessionFolder}/wave-${wave}-results.csv`)) for (const result of waveResults) { updateMasterCsvRow(sessionFolder, result.id, { status: result.status, findings: result.findings || '', files_modified: (result.files_modified || []).join(';'), tests_passed: String(result.tests_passed ?? ''), acceptance_met: result.acceptance_met || '', error: result.error || '' }) if (result.status === 'failed') { failedIds.add(result.id) console.log(` [${result.id}] ${result.title} → 执行失败: ${result.error}`) } else { console.log(` [${result.id}] ${result.title} → 执行完成`) } } // 8. 清理临时波次CSV文件 Bash(`rm -f "${sessionFolder}/wave-${wave}.csv"`) console.log(` 波次${wave}执行完成: ${waveResults.filter(r => r.status === 'completed').length}个任务完成,${waveResults.filter(r => r.status === 'failed').length}个任务失败`) } -
指令模板构建器javascript
function buildInstructionTemplate(sessionFolder, wave) { return `
TASK ASSIGNMENT
任务分配
MANDATORY FIRST STEPS
必须优先执行的步骤
- Read shared discoveries: ${sessionFolder}/discoveries.ndjson (if exists, skip if not)
- Read project context: .workflow/project-tech.json (if exists)
- 读取共享探索记录: ${sessionFolder}/discoveries.ndjson(若存在,不存在则跳过)
- 读取项目上下文: .workflow/project-tech.json(若存在)
Your Task
你的任务
Task ID: {id}
Title: {title}
Description: {description}
Scope: {scope}
任务ID: {id}
标题: {title}
描述: {description}
工作范围: {scope}
Implementation Hints & Reference Files
实现提示与参考文件
{hints}
Format: `<tips> || <ref_file1>;<ref_file2>`. Read ALL reference files (after ||) before starting implementation. Apply tips (before ||) as implementation guidance.
{hints}
格式说明: `<提示内容> || <参考文件1>;<参考文件2>`。开始实现前请阅读`||`之后列出的所有参考文件。将`||`之前的内容作为实现指导。
Execution Directives
执行约束
{execution_directives}
Commands to run for verification, tool restrictions, or environment requirements. Follow these constraints during and after implementation.
{execution_directives}
用于验证的命令、工具限制或环境要求。在实现过程及完成后请遵循这些约束。
Test Cases
测试用例
{test}
{test}
Acceptance Criteria
验收标准
{acceptance_criteria}
{acceptance_criteria}
Previous Tasks' Findings (Context)
前置任务执行结果(上下文)
{prev_context}
{prev_context}
Execution Protocol
执行协议
- Read references: Parse {hints} — read all files listed after `||` to understand existing patterns
- Read discoveries: Load ${sessionFolder}/discoveries.ndjson for shared exploration findings
- Use context: Apply previous tasks' findings from prev_context above
- Stay in scope: ONLY create/modify files within {scope} — do NOT touch files outside this boundary
- Apply hints: Follow implementation tips from {hints} (before `||`)
- Execute: Implement the task as described
- Write tests: Implement the test cases defined above
- Run directives: Execute commands from {execution_directives} to verify your work
- Verify acceptance: Ensure all acceptance criteria are met before reporting completion
- Share discoveries: Append exploration findings to shared board: ```bash echo '{"ts":"<ISO8601>","worker":"{id}","type":"<type>","data":{...}}' >> ${sessionFolder}/discoveries.ndjson ```
- Report result: Return JSON via report_agent_job_result
- 读取参考文件: 解析{hints} — 阅读`||`之后列出的所有文件,了解现有代码模式
- 读取探索记录: 加载${sessionFolder}/discoveries.ndjson获取共享探索结果
- 使用上下文: 应用上述prev_context中的前置任务执行结果
- 遵守工作范围: 仅在{scope}范围内创建/修改文件 — 请勿触碰该范围外的文件
- 遵循实现提示: 按照{hints}中`||`之前的实现建议执行
- 执行任务: 按照描述完成任务实现
- 编写测试: 实现上述定义的测试用例
- 执行验证命令: 运行{execution_directives}中的命令验证你的工作成果
- 验证验收标准: 在报告完成前确保所有验收标准均已满足
- 共享探索结果: 将探索发现追加到共享记录板: ```bash echo '{"ts":"<ISO8601>","worker":"{id}","type":"<type>","data":{...}}' >> ${sessionFolder}/discoveries.ndjson ```
- 报告结果: 通过report_agent_job_result返回JSON格式的结果
Discovery Types to Share
可共享的探索记录类型
- `code_pattern`: {name, file, description} — reusable patterns found
- `integration_point`: {file, description, exports[]} — module connection points
- `convention`: {naming, imports, formatting} — code style conventions
- `blocker`: {issue, severity, impact} — blocking issues encountered
- `code_pattern`: {name, file, description} — 可复用的代码模式
- `integration_point`: {file, description, exports[]} — 模块连接点
- `convention`: {naming, imports, formatting} — 代码风格约定
- `blocker`: {issue, severity, impact} — 遇到的阻塞问题
Output (report_agent_job_result)
输出格式(通过report_agent_job_result返回)
Return JSON:
{
"id": "{id}",
"status": "completed" | "failed",
"findings": "Key discoveries and implementation notes (max 500 chars)",
"files_modified": ["path1", "path2"],
"tests_passed": true | false,
"acceptance_met": "Summary of which acceptance criteria were met/unmet",
"error": ""
}
IMPORTANT: Set status to "completed" ONLY if:
- All test cases pass
- All acceptance criteria are met
Otherwise set status to "failed" with details in error field.
`
}
undefined
-
Master CSV Update Helperjavascript
function updateMasterCsvRow(sessionFolder, taskId, updates) { const csvPath = `${sessionFolder}/tasks.csv` const content = Read(csvPath) const lines = content.split('\n') const header = lines[0].split(',') for (let i = 1; i < lines.length; i++) { const cells = parseCsvLine(lines[i]) if (cells[0] === taskId || cells[0] === `"${taskId}"`) { // Update specified columns for (const [col, val] of Object.entries(updates)) { const colIdx = header.indexOf(col) if (colIdx >= 0) { cells[colIdx] = `"${String(val).replace(/"/g, '""')}"` } } lines[i] = cells.join(',') break } } Write(csvPath, lines.join('\n')) }
Success Criteria:
- All waves executed in order
- Each wave's results merged into master CSV before next wave starts
- Dependent tasks skipped when predecessor failed
- discoveries.ndjson accumulated across all waves
返回JSON格式:
{
"id": "{id}",
"status": "completed" | "failed",
"findings": "关键发现与实现说明(最多500字符)",
"files_modified": ["path1", "path2"],
"tests_passed": true | false,
"acceptance_met": "验收标准达成情况摘要",
"error": ""
}
重要提示: 仅当以下条件满足时,才将status设置为"completed":
- 所有测试用例均通过
- 所有验收标准均已满足
否则将status设置为"failed",并在error字段中填写详细信息。
`
}
undefined
-
主CSV文件更新工具函数javascript
function updateMasterCsvRow(sessionFolder, taskId, updates) { const csvPath = `${sessionFolder}/tasks.csv` const content = Read(csvPath) const lines = content.split('\n') const header = lines[0].split(',') for (let i = 1; i < lines.length; i++) { const cells = parseCsvLine(lines[i]) if (cells[0] === taskId || cells[0] === `"${taskId}"`) { // 更新指定列 for (const [col, val] of Object.entries(updates)) { const colIdx = header.indexOf(col) if (colIdx >= 0) { cells[colIdx] = `"${String(val).replace(/"/g, '""')}"` } } lines[i] = cells.join(',') break } } Write(csvPath, lines.join('\n')) }
成功标准:
- 所有波次按顺序执行
- 每个波次的执行结果在进入下一波次前已合并到主CSV文件
- 当前驱任务失败时,依赖任务会被跳过
- discoveries.ndjson在所有波次执行过程中持续累积
Phase 3: Results Aggregation
阶段3: 结果聚合
Objective: Generate final results and human-readable report.
Steps:
-
Export results.csvjavascript
const masterCsv = Read(`${sessionFolder}/tasks.csv`) // results.csv = master CSV (already has all results populated) Write(`${sessionFolder}/results.csv`, masterCsv) -
Generate context.mdjavascript
const tasks = parseCsv(masterCsv) const completed = tasks.filter(t => t.status === 'completed') const failed = tasks.filter(t => t.status === 'failed') const skipped = tasks.filter(t => t.status === 'skipped') const contextContent = `# CSV Batch Execution Report
Session: ${sessionId}
Requirement: ${requirement}
Completed: ${getUtc8ISOString()}
Waves: ${maxWave} | Concurrency: ${maxConcurrency}
目标: 生成最终结果文件和人类可读报告。
步骤:
-
导出results.csv文件javascript
const masterCsv = Read(`${sessionFolder}/tasks.csv`) // results.csv = 主CSV文件(已包含所有执行结果) Write(`${sessionFolder}/results.csv`, masterCsv) -
生成context.md文档javascript
const tasks = parseCsv(masterCsv) const completed = tasks.filter(t => t.status === 'completed') const failed = tasks.filter(t => t.status === 'failed') const skipped = tasks.filter(t => t.status === 'skipped') const contextContent = `# CSV批量执行报告
会话ID: ${sessionId}
需求: ${requirement}
完成时间: ${getUtc8ISOString()}
波次数量: ${maxWave} | 并发数: ${maxConcurrency}
Summary
执行摘要
| Metric | Count |
|---|---|
| Total Tasks | ${tasks.length} |
| Completed | ${completed.length} |
| Failed | ${failed.length} |
| Skipped | ${skipped.length} |
| Waves | ${maxWave} |
| 指标 | 数量 |
|---|---|
| 总任务数 | ${tasks.length} |
| 已完成 | ${completed.length} |
| 执行失败 | ${failed.length} |
| 已跳过 | ${skipped.length} |
| 波次数量 | ${maxWave} |
Wave Execution
波次执行详情
${Array.from({ length: maxWave }, (_, i) => i + 1).map(w => {
const waveTasks = tasks.filter(t => parseInt(t.wave) === w)
return - [${t.id}] ${t.title}: ${t.status}${t.tests_passed ? ' ✓tests' : ''}${t.error ? ' — ' + t.error : ''}
${t.findings ? 'Findings: ' + t.findings : ''}
}).join('\n\n')}
### Wave ${w} ${waveTasks.map(t => ).join('\n')}${Array.from({ length: maxWave }, (_, i) => i + 1).map(w => {
const waveTasks = tasks.filter(t => parseInt(t.wave) === w)
return - [${t.id}] ${t.title}: ${t.status}${t.tests_passed ? ' ✓测试通过' : ''}${t.error ? ' — ' + t.error : ''}
${t.findings ? '执行发现: ' + t.findings : ''}
}).join('\n\n')}
### 波次 ${w} ${waveTasks.map(t => ).join('\n')}Task Details
任务详情
${tasks.map(t => `### ${t.id}: ${t.title}
| Field | Value |
|---|---|
| Status | ${t.status} |
| Wave | ${t.wave} |
| Scope | ${t.scope |
| Dependencies | ${t.deps |
| Context From | ${t.context_from |
| Tests Passed | ${t.tests_passed |
| Acceptance Met | ${t.acceptance_met |
| Error | ${t.error |
Description: ${t.description}
Test Cases: ${t.test || 'N/A'}
Acceptance Criteria: ${t.acceptance_criteria || 'N/A'}
Hints: ${t.hints || 'N/A'}
Execution Directives: ${t.execution_directives || 'N/A'}
Findings: ${t.findings || 'N/A'}
Files Modified: ${t.files_modified || 'none'}
`).join('\n---\n')}
${tasks.map(t => `### ${t.id}: ${t.title}
| 字段 | 值 |
|---|---|
| 状态 | ${t.status} |
| 所属波次 | ${t.wave} |
| 工作范围 | ${t.scope |
| 依赖任务 | ${t.deps |
| 上下文来源 | ${t.context_from |
| 测试是否通过 | ${t.tests_passed |
| 验收标准达成情况 | ${t.acceptance_met |
| 错误信息 | ${t.error |
任务描述: ${t.description}
测试用例: ${t.test || 'N/A'}
验收标准: ${t.acceptance_criteria || 'N/A'}
实现提示: ${t.hints || 'N/A'}
执行约束: ${t.execution_directives || 'N/A'}
执行发现: ${t.findings || 'N/A'}
修改的文件: ${t.files_modified || '无'}
`).join('\n---\n')}
All Modified Files
所有修改的文件
${[...new Set(tasks.flatMap(t => (t.files_modified || '').split(';')).filter(Boolean))].map(f => '- ' + f).join('\n') || 'None'}
`
Write(, contextContent)
${sessionFolder}/context.md
3. **Display Summary**
```javascript
console.log(`${[...new Set(tasks.flatMap(t => (t.files_modified || '').split(';')).filter(Boolean))].map(f => '- ' + f).join('\n') || '无'}
`
Write(, contextContent)
${sessionFolder}/context.md
3. **显示执行摘要**
```javascript
console.log(`Execution Complete
执行完成
- Session: ${sessionId}
- Waves: ${maxWave}
- Completed: ${completed.length}/${tasks.length}
- Failed: ${failed.length}
- Skipped: ${skipped.length}
Results: ${sessionFolder}/results.csv
Report: ${sessionFolder}/context.md
Discoveries: ${sessionFolder}/discoveries.ndjson
`)
4. **Offer Next Steps** (skip if AUTO_YES)
```javascript
if (!AUTO_YES && failed.length > 0) {
const answer = AskUserQuestion({
questions: [{
question: `${failed.length} tasks failed. Next action?`,
header: "Next Step",
multiSelect: false,
options: [
{ label: "Retry Failed", description: `Re-execute ${failed.length} failed tasks with updated context` },
{ label: "View Report", description: "Display context.md" },
{ label: "Done", description: "Complete session" }
]
}]
}) // BLOCKS
if (answer['Next Step'] === "Retry Failed") {
// Reset failed tasks to pending, re-run Phase 2 for their waves
for (const task of failed) {
updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
}
// Also reset skipped tasks whose deps are now retrying
for (const task of skipped) {
updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
}
// Re-execute Phase 2 (loop will skip already-completed tasks)
// → goto Phase 2
} else if (answer['Next Step'] === "View Report") {
console.log(Read(`${sessionFolder}/context.md`))
}
}Success Criteria:
- results.csv exported
- context.md generated
- Summary displayed to user
- 会话ID: ${sessionId}
- 波次数量: ${maxWave}
- 完成率: ${completed.length}/${tasks.length}
- 失败任务数: ${failed.length}
- 跳过任务数: ${skipped.length}
结果文件: ${sessionFolder}/results.csv
报告文档: ${sessionFolder}/context.md
探索记录: ${sessionFolder}/discoveries.ndjson
`)
4. **提供后续操作选项**(若使用AUTO_YES则跳过)
```javascript
if (!AUTO_YES && failed.length > 0) {
const answer = AskUserQuestion({
questions: [{
question: `有${failed.length}个任务执行失败,下一步操作?`,
header: "后续操作",
multiSelect: false,
options: [
{ label: "重试失败任务", description: `结合更新后的上下文重新执行${failed.length}个失败任务` },
{ label: "查看报告", description: "显示context.md文档内容" },
{ label: "结束", description: "完成会话" }
]
}]
}) // 阻塞等待用户输入
if (answer['后续操作'] === "重试失败任务") {
// 将失败任务重置为pending状态,重新执行其所属波次的阶段2逻辑
for (const task of failed) {
updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
}
// 同时重置因依赖失败而被跳过的任务状态
for (const task of skipped) {
updateMasterCsvRow(sessionFolder, task.id, { status: 'pending', error: '' })
}
// 重新执行阶段2(循环会跳过已完成的任务)
// → 跳转到阶段2
} else if (answer['后续操作'] === "查看报告") {
console.log(Read(`${sessionFolder}/context.md`))
}
}成功标准:
- 已导出results.csv文件
- 已生成context.md文档
- 已向用户显示执行摘要
Shared Discovery Board Protocol
共享探索记录板协议
All agents across all waves share . This eliminates redundant codebase exploration.
discoveries.ndjsonLifecycle:
- Created by the first agent to write a discovery
- Carries over across waves — never cleared
- Agents append via
echo '...' >> discoveries.ndjson
Format: NDJSON, each line is a self-contained JSON:
jsonl
{"ts":"2026-02-28T10:00:00+08:00","worker":"1","type":"code_pattern","data":{"name":"repository-pattern","file":"src/repos/Base.ts","description":"Abstract CRUD repository"}}
{"ts":"2026-02-28T10:01:00+08:00","worker":"2","type":"integration_point","data":{"file":"src/auth/index.ts","description":"Auth module entry","exports":["authenticate","authorize"]}}Discovery Types:
| type | Dedup Key | Description |
|---|---|---|
| | Reusable code pattern found |
| | Module connection point |
| singleton | Code style conventions |
| | Blocking issue encountered |
| singleton | Project technology stack |
| singleton | Test commands discovered |
Protocol Rules:
- Read board before own exploration → skip covered areas
- Write discoveries immediately via → don't batch
echo >> - Deduplicate — check existing entries; skip if same type + dedup key exists
- Append-only — never modify or delete existing lines
所有波次的Agent共享文件。这可以消除重复的代码库探索工作。
discoveries.ndjson生命周期:
- 由第一个写入探索记录的Agent创建
- 跨波次保留 — 永不清空
- Agent通过命令即时追加记录 — 不要批量写入
echo '...' >>
格式: NDJSON格式,每行是一个独立的JSON对象:
jsonl
{"ts":"2026-02-28T10:00:00+08:00","worker":"1","type":"code_pattern","data":{"name":"repository-pattern","file":"src/repos/Base.ts","description":"抽象CRUD仓库模式"}}
{"ts":"2026-02-28T10:01:00+08:00","worker":"2","type":"integration_point","data":{"file":"src/auth/index.ts","description":"认证模块入口","exports":["authenticate","authorize"]}}探索记录类型:
| 类型 | 去重键 | 描述 |
|---|---|---|
| | 发现的可复用代码模式 |
| | 模块连接点 |
| 单例 | 代码风格约定 |
| | 遇到的阻塞问题 |
| 单例 | 项目技术栈 |
| 单例 | 发现的测试命令 |
协议规则:
- 在开始自身探索前先读取记录板 → 跳过已覆盖的内容
- 通过命令即时写入探索记录 → 不要批量操作
echo >> - 去重 — 检查现有记录;若相同类型+去重键的记录已存在则跳过
- 仅追加模式 — 永不修改或删除现有行
Wave Computation Details
波次计算细节
Algorithm
算法
Kahn's BFS topological sort with depth tracking:
Input: tasks[] with deps[]
Output: waveAssignment (taskId → wave number)
1. Build in-degree map and adjacency list from deps
2. Enqueue all tasks with in-degree 0 at wave 1
3. BFS: for each dequeued task at wave W:
- For each dependent task D:
- Decrement D's in-degree
- D.wave = max(D.wave, W + 1)
- If D's in-degree reaches 0, enqueue D
4. Any task without wave assignment → circular dependency error带深度跟踪的Kahn's BFS拓扑排序:
输入: 带deps[]的tasks[]数组
输出: waveAssignment(taskId → 波次编号)
1. 基于依赖关系构建入度映射和邻接表
2. 将所有入度为0的任务加入队列,分配波次为1
3. BFS遍历:对于每个出队的波次W的任务:
- 遍历其所有依赖任务D:
- 减少D的入度
- D.wave = max(D.wave, W + 1)
- 若D的入度变为0,将其加入队列
4. 若存在未分配波次的任务 → 抛出循环依赖错误Wave Properties
波次特性
- Wave 1: No dependencies — all tasks in wave 1 are fully independent
- Wave N: All dependencies are in waves 1..(N-1) — guaranteed completed before wave N starts
- Within a wave: Tasks are independent of each other → safe for concurrent execution
- 波次1: 无依赖 — 波次1中的所有任务完全独立
- 波次N: 所有依赖任务均在波次1..(N-1)中 — 确保在波次N开始前已全部完成
- 同一波次内: 任务之间相互独立 → 支持安全并发执行
Example
示例
Task A (no deps) → Wave 1
Task B (no deps) → Wave 1
Task C (deps: A) → Wave 2
Task D (deps: A, B) → Wave 2
Task E (deps: C, D) → Wave 3
Execution:
Wave 1: [A, B] ← concurrent
Wave 2: [C, D] ← concurrent, sees A+B findings
Wave 3: [E] ← sees A+B+C+D findings任务A(无依赖) → 波次1
任务B(无依赖) → 波次1
任务C(依赖A) → 波次2
任务D(依赖A、B) → 波次2
任务E(依赖C、D) → 波次3
执行流程:
波次1: [A, B] ← 并发执行
波次2: [C, D] ← 并发执行,可获取A+B的执行结果
波次3: [E] ← 可获取A+B+C+D的执行结果Context Propagation Flow
上下文传播流程
Wave 1 agents:
├─ Execute tasks (no prev_context)
├─ Write findings to report_agent_job_result
└─ Append discoveries to discoveries.ndjson
↓ merge results into master CSV
Wave 2 agents:
├─ Read discoveries.ndjson (exploration sharing)
├─ Read prev_context column (wave 1 findings from context_from)
├─ Execute tasks with full upstream context
├─ Write findings to report_agent_job_result
└─ Append new discoveries to discoveries.ndjson
↓ merge results into master CSV
Wave 3 agents:
├─ Read discoveries.ndjson (accumulated from waves 1+2)
├─ Read prev_context column (wave 1+2 findings from context_from)
├─ Execute tasks
└─ ...Two context channels:
- CSV findings (structured): column →
context_frominjection — task-specific directed contextprev_context - NDJSON discoveries (broadcast): — general exploration findings available to all
discoveries.ndjson
波次1的Agent:
├─ 执行任务(无prev_context)
├─ 将执行结果写入report_agent_job_result
└─ 将探索发现追加到discoveries.ndjson
↓ 将结果合并到主CSV文件
波次2的Agent:
├─ 读取discoveries.ndjson(共享探索结果)
├─ 读取prev_context列(来自context_from的波次1执行结果)
├─ 结合完整的上游上下文执行任务
├─ 将执行结果写入report_agent_job_result
└─ 将新的探索发现追加到discoveries.ndjson
↓ 将结果合并到主CSV文件
波次3的Agent:
├─ 读取discoveries.ndjson(累积了波次1+2的探索结果)
├─ 读取prev_context列(来自context_from的波次1+2执行结果)
├─ 执行任务
└─ ...两种上下文通道:
- CSV结构化结果(定向): 列 →
context_from注入 — 任务特定的定向上下文prev_context - NDJSON探索记录(广播): — 所有Agent均可获取的通用探索结果
discoveries.ndjson
Error Handling
错误处理
| Error | Resolution |
|---|---|
| Circular dependency | Detect in wave computation, abort with error message |
| Agent timeout | Mark as failed in results, continue with wave |
| Agent failed | Mark as failed, skip dependent tasks in later waves |
| All agents in wave failed | Log error, offer retry or abort |
| CSV parse error | Validate CSV format before execution, show line number |
| discoveries.ndjson corrupt | Ignore malformed lines, continue with valid entries |
| Continue mode: no session found | List available sessions, prompt user to select |
| 错误类型 | 解决方式 |
|---|---|
| 循环依赖 | 在波次计算阶段检测到,抛出错误信息并终止执行 |
| Agent超时 | 在结果中标记为失败,继续执行当前波次其他任务 |
| Agent执行失败 | 标记为失败,后续波次中跳过其依赖任务 |
| 波次内所有Agent执行失败 | 记录错误信息,提供重试或终止选项 |
| CSV解析错误 | 执行前验证CSV格式,显示错误行号 |
| discoveries.ndjson损坏 | 忽略格式错误的行,继续处理有效条目 |
| 恢复模式:未找到会话 | 列出可用会话,提示用户选择 |
Core Rules
核心规则
- Start Immediately: First action is session initialization, then Phase 1
- Wave Order is Sacred: Never execute wave N before wave N-1 completes and results are merged
- CSV is Source of Truth: Master tasks.csv holds all state — always read before wave, always write after
- Context Propagation: prev_context built from master CSV, not from memory
- Discovery Board is Append-Only: Never clear, modify, or recreate discoveries.ndjson
- Skip on Failure: If a dependency failed, skip the dependent task (don't attempt)
- Cleanup Temp Files: Remove wave-{N}.csv after results are merged
- DO NOT STOP: Continuous execution until all waves complete or all remaining tasks are skipped
- 立即启动: 第一个操作是会话初始化,然后进入阶段1
- 波次顺序不可打破: 绝对不能在波次N-1完成并合并结果前执行波次N
- CSV是唯一可信源: 主tasks.csv文件存储所有状态 — 执行波次前必须读取,执行后必须写入
- 上下文传播规则: prev_context必须从主CSV文件构建,而非内存
- 探索记录板仅追加: 永不清空、修改或重建discoveries.ndjson
- 失败则跳过: 若依赖任务失败,跳过该依赖任务(不尝试执行)
- 清理临时文件: 结果合并后删除wave-{N}.csv文件
- 持续执行: 持续执行直到所有波次完成或剩余任务均被跳过
Best Practices
最佳实践
- Task Granularity: 3-10 tasks optimal; too many = overhead, too few = no parallelism benefit
- Minimize Cross-Wave Deps: More tasks in wave 1 = more parallelism
- Specific Descriptions: Agent sees only its CSV row + prev_context — make description self-contained
- Context From ≠ Deps: = execution order constraint;
deps= information flow. A task can havecontext_fromwithoutcontext_from(it just reads previous findings but doesn't require them to be done first in its wave)deps - Concurrency Tuning: for serial execution (maximum context sharing);
-c 1for I/O-bound tasks-c 8
- 任务粒度: 3-10个任务为最优;任务过多会增加开销,任务过少则无法发挥并行执行的优势
- 减少跨波次依赖: 波次1中的任务越多,并行执行的效率越高
- 描述具体化: Agent仅能看到其CSV行和prev_context — 确保任务描述足够独立完整
- Context From ≠ Deps: = 执行顺序约束;
deps= 信息流。任务可以在没有context_from的情况下设置deps(仅需要读取前置任务的执行结果,但不需要在自身波次前完成)context_from - 并发数调优: 用于串行执行(最大化上下文共享);
-c 1用于I/O密集型任务-c 8
Usage Recommendations
使用建议
| Scenario | Recommended Approach |
|---|---|
| Independent parallel tasks (no deps) | |
| Linear pipeline (A→B→C) | |
| Diamond dependency (A→B,C→D) | |
| Complex requirement, unclear tasks | Use |
| Single complex task | Use |
| 场景 | 推荐方案 |
|---|---|
| 独立并行任务(无依赖) | |
| 线性流水线(A→B→C) | |
| 菱形依赖(A→B,C→D) | |
| 复杂需求,任务不明确 | 先使用 |
| 单个复杂任务 | 改用 |