Loading...
Loading...
Combines and synthesizes outputs from parallel DAG branches. Handles merge strategies, conflict resolution, and result formatting. Activate on 'aggregate results', 'combine outputs', 'merge branches', 'synthesize results', 'fan-in'. NOT for execution (use dag-parallel-executor) or scheduling (use dag-task-scheduler).
npx skill4agent add erichowens/some_claude_skills dag-result-aggregatorfunction unionMerge<T>(
results: Map<NodeId, T[]>
): T[] {
const merged: T[] = [];
for (const items of results.values()) {
merged.push(...items);
}
return merged;
}function intersectionMerge<T>(
results: Map<NodeId, Set<T>>
): Set<T> {
const sets = Array.from(results.values());
if (sets.length === 0) return new Set();
return sets.reduce((acc, set) =>
new Set([...acc].filter(x => set.has(x)))
);
}function priorityMerge<T>(
results: Map<NodeId, T>,
priorities: Map<NodeId, number>
): T {
const sorted = Array.from(results.entries())
.sort((a, b) =>
(priorities.get(b[0]) ?? 0) - (priorities.get(a[0]) ?? 0)
);
return sorted[0]?.[1];
}function weightedAverage(
results: Map<NodeId, number>,
weights: Map<NodeId, number>
): number {
let sum = 0;
let totalWeight = 0;
for (const [nodeId, value] of results) {
const weight = weights.get(nodeId) ?? 1;
sum += value * weight;
totalWeight += weight;
}
return totalWeight > 0 ? sum / totalWeight : 0;
}function deepMerge(
results: Map<NodeId, object>,
conflictStrategy: ConflictStrategy
): object {
const merged = {};
for (const [nodeId, obj] of results) {
for (const [key, value] of Object.entries(obj)) {
if (key in merged) {
merged[key] = resolveConflict(
merged[key],
value,
conflictStrategy
);
} else {
merged[key] = value;
}
}
}
return merged;
}type ConflictStrategy =
| 'first-wins' // Keep first value encountered
| 'last-wins' // Use most recent value
| 'highest-wins' // For numeric: keep highest
| 'lowest-wins' // For numeric: keep lowest
| 'concatenate' // For strings/arrays: combine
| 'error' // Throw on conflict
| 'custom'; // Use custom resolver
function resolveConflict(
existing: unknown,
incoming: unknown,
strategy: ConflictStrategy
): unknown {
switch (strategy) {
case 'first-wins':
return existing;
case 'last-wins':
return incoming;
case 'highest-wins':
return Math.max(
Number(existing),
Number(incoming)
);
case 'lowest-wins':
return Math.min(
Number(existing),
Number(incoming)
);
case 'concatenate':
if (Array.isArray(existing)) {
return [...existing, ...(incoming as unknown[])];
}
return `${existing}\n${incoming}`;
case 'error':
throw new ConflictError(
`Conflict detected: ${existing} vs ${incoming}`
);
default:
return incoming;
}
}aggregation:
nodeId: aggregate-results
inputs:
- sourceNode: branch-a
field: findings
- sourceNode: branch-b
field: findings
- sourceNode: branch-c
field: findings
strategy:
type: deep-merge
conflictResolution: last-wins
transformations:
- deduplicate:
field: items
key: id
- sort:
field: items
by: relevance
order: desc
- limit:
field: items
max: 100
output:
schema:
type: object
properties:
combinedFindings:
type: array
metadata:
type: objectinterface AggregatedResult {
// Aggregation metadata
aggregationId: string;
aggregatedAt: Date;
sourceNodes: NodeId[];
strategy: string;
// Aggregated data
data: unknown;
// Conflict information
conflicts: ConflictRecord[];
resolutions: ResolutionRecord[];
// Statistics
stats: {
totalInputs: number;
successfulInputs: number;
failedInputs: number;
conflictsResolved: number;
};
}
interface ConflictRecord {
field: string;
values: Array<{
nodeId: NodeId;
value: unknown;
}>;
resolution: unknown;
strategy: ConflictStrategy;
}aggregationReport:
nodeId: combine-analysis
completedAt: "2024-01-15T10:01:30Z"
inputs:
- nodeId: analyze-code
status: completed
outputSize: 2500
- nodeId: analyze-tests
status: completed
outputSize: 1800
- nodeId: analyze-docs
status: failed
error: "Timeout exceeded"
aggregation:
strategy: union-merge
totalItems: 45
uniqueItems: 38
duplicatesRemoved: 7
conflicts:
- field: severity
count: 3
resolution: highest-wins
output:
type: array
itemCount: 38
schema: Finding[]function aggregateWithPartialResults(
expected: NodeId[],
results: Map<NodeId, TaskResult>,
config: AggregationConfig
): AggregatedResult {
const successful = new Map<NodeId, unknown>();
const failed: NodeId[] = [];
for (const nodeId of expected) {
const result = results.get(nodeId);
if (result?.status === 'completed') {
successful.set(nodeId, result.output);
} else {
failed.push(nodeId);
}
}
// Check if we have enough results
const successRate = successful.size / expected.length;
if (successRate < config.minimumSuccessRate) {
throw new InsufficientResultsError(
`Only ${successRate * 100}% of branches succeeded`
);
}
// Aggregate available results
return aggregate(successful, config);
}dag-parallel-executordag-output-validatordag-context-bridgerdag-failure-analyzer