Loading...
Loading...
Wave-based parallel scheduling for DAG execution. Manages execution order, resource allocation, and parallelism constraints. Activate on 'schedule dag', 'execution waves', 'parallel scheduling', 'task queue', 'resource allocation'. NOT for building DAGs (use dag-graph-builder) or actual execution (use dag-parallel-executor).
npx skill4agent add erichowens/some_claude_skills dag-task-schedulerinterface ScheduledWave {
waveNumber: number;
tasks: ScheduledTask[];
estimatedStart: Date;
estimatedEnd: Date;
resourceAllocation: ResourceAllocation;
}
interface ScheduledTask {
nodeId: NodeId;
priority: number;
resourceRequirements: ResourceRequirements;
estimatedDuration: number;
deadline?: Date;
}
function scheduleDAG(
waves: NodeId[][],
dag: DAG,
config: SchedulerConfig
): ScheduledWave[] {
const schedule: ScheduledWave[] = [];
let currentTime = new Date();
for (let i = 0; i < waves.length; i++) {
const wave = waves[i];
const tasks = wave.map(nodeId => {
const node = dag.nodes.get(nodeId);
return {
nodeId,
priority: node.config.priority || 0,
resourceRequirements: estimateResources(node),
estimatedDuration: node.config.timeoutMs || 30000,
deadline: node.config.deadline,
};
});
// Sort by priority (higher first)
tasks.sort((a, b) => b.priority - a.priority);
// Apply parallelism constraints
const constrainedTasks = applyConstraints(tasks, config);
// Allocate resources
const allocation = allocateResources(constrainedTasks, config);
// Calculate timing
const maxDuration = Math.max(...tasks.map(t => t.estimatedDuration));
const waveEnd = new Date(currentTime.getTime() + maxDuration);
schedule.push({
waveNumber: i,
tasks: constrainedTasks,
estimatedStart: currentTime,
estimatedEnd: waveEnd,
resourceAllocation: allocation,
});
currentTime = waveEnd;
}
return schedule;
}interface TokenBudget {
totalTokens: number;
usedTokens: number;
perWaveBudget: number;
perTaskBudget: number;
}
function allocateTokenBudget(
schedule: ScheduledWave[],
totalBudget: number
): TokenBudget[] {
const waveCount = schedule.length;
const perWaveBudget = Math.floor(totalBudget / waveCount);
return schedule.map(wave => ({
totalTokens: perWaveBudget,
usedTokens: 0,
perWaveBudget,
perTaskBudget: Math.floor(perWaveBudget / wave.tasks.length),
}));
}function applyConstraints(
tasks: ScheduledTask[],
config: SchedulerConfig
): ScheduledTask[] {
const maxParallelism = config.maxParallelism || 3;
if (tasks.length <= maxParallelism) {
return tasks;
}
// Group tasks into sub-waves respecting parallelism limit
const subWaves: ScheduledTask[][] = [];
for (let i = 0; i < tasks.length; i += maxParallelism) {
subWaves.push(tasks.slice(i, i + maxParallelism));
}
return subWaves.flat();
}schedule:
dagId: research-pipeline
totalWaves: 4
estimatedDuration: 120000ms
maxParallelism: 3
waves:
- wave: 0
status: pending
estimatedStart: "2024-01-15T10:00:00Z"
estimatedEnd: "2024-01-15T10:00:30Z"
tasks:
- nodeId: gather-sources
priority: 1
estimatedDuration: 30000
resources:
maxTokens: 5000
timeoutMs: 30000
- wave: 1
status: pending
estimatedStart: "2024-01-15T10:00:30Z"
estimatedEnd: "2024-01-15T10:01:00Z"
tasks:
- nodeId: validate-sources
priority: 1
estimatedDuration: 15000
- nodeId: extract-metadata
priority: 0
estimatedDuration: 20000
resourceSummary:
totalTokenBudget: 50000
perWaveBudget: 12500
estimatedCost: 0.25
criticalPath:
- gather-sources → validate-sources → analyze → report
- bottleneck: analyze (30000ms)Pros: Simple, low overhead
Cons: May not be optimal
Best for: Homogeneous task sizesPros: Minimizes average completion time
Cons: May starve long tasks
Best for: Mixed task sizesPros: Respects business requirements
Cons: Requires priority specification
Best for: Deadline-sensitive workloadsPros: Prevents starvation
Cons: May not optimize throughput
Best for: Multi-tenant scenariosfunction handleEarlyCompletion(
completedTask: NodeId,
schedule: ScheduledWave[]
): ScheduledWave[] {
// Check if dependent tasks can start early
const dependentWaves = schedule.filter(wave =>
wave.tasks.some(task =>
dag.nodes.get(task.nodeId).dependencies.includes(completedTask)
)
);
// Update timing estimates
for (const wave of dependentWaves) {
wave.estimatedStart = new Date(); // Can start now if all deps complete
}
return schedule;
}function handleTaskFailure(
failedTask: NodeId,
schedule: ScheduledWave[],
errorHandling: ErrorHandlingStrategy
): ScheduledWave[] {
switch (errorHandling) {
case 'stop-on-failure':
// Mark all dependent tasks as skipped
return markDependentsSkipped(failedTask, schedule);
case 'continue-on-failure':
// Continue with tasks that don't depend on failed task
return schedule;
case 'retry-then-skip':
// Retry the task, then skip if still failing
return addRetryToSchedule(failedTask, schedule);
}
}dag-dependency-resolverdag-parallel-executordag-execution-tracerdag-dynamic-replannermetrics:
schedulingLatency: 5ms
averageWaveUtilization: 0.85
parallelizationEfficiency: 2.3x
resourceWaste: 15%
perWaveMetrics:
- wave: 0
tasksScheduled: 3
resourceUtilization: 0.9
actualDuration: 28000ms
estimatedDuration: 30000ms
variance: -7%