langgraph-parallel

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangGraph Parallel Execution

LangGraph并行执行

Run independent nodes concurrently for performance.
为提升性能,并发运行独立节点。

Fan-Out/Fan-In Pattern

扇出/扇入模式

python
from langgraph.graph import StateGraph

def fan_out(state):
    """Split work into parallel tasks."""
    state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
    return state

def worker(state):
    """Process one task."""
    task = state["current_task"]
    result = process(task)
    return {"results": [result]}

def fan_in(state):
    """Combine parallel results."""
    combined = aggregate(state["results"])
    return {"final": combined}

workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)

workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in")  # Waits for all workers
python
from langgraph.graph import StateGraph

def fan_out(state):
    """将工作拆分到并行任务中。"""
    state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
    return state

def worker(state):
    """处理单个任务。"""
    task = state["current_task"]
    result = process(task)
    return {"results": [result]}

def fan_in(state):
    """合并并行任务结果。"""
    combined = aggregate(state["results"])
    return {"final": combined}

workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)

workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in")  # 等待所有worker完成

Using Send API

使用Send API

python
from langgraph.constants import Send

def router(state):
    """Route to multiple workers in parallel."""
    return [
        Send("worker", {"task": task})
        for task in state["tasks"]
    ]

workflow.add_conditional_edges("router", router)
python
from langgraph.constants import Send

def router(state):
    """并行路由到多个worker。"""
    return [
        Send("worker", {"task": task})
        for task in state["tasks"]
    ]

workflow.add_conditional_edges("router", router)

Complete Send API Example (2026 Pattern)

完整Send API示例(2026模式)

python
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
from operator import add

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], add]  # Accumulates from parallel branches

class JokeState(TypedDict):
    subject: str

def generate_topics(state: OverallState) -> dict:
    """Initial node: create list of subjects."""
    return {"subjects": ["cats", "dogs", "programming", "coffee"]}

def continue_to_jokes(state: OverallState) -> list[Send]:
    """Fan-out: create parallel branch for each subject."""
    return [
        Send("generate_joke", {"subject": s})
        for s in state["subjects"]
    ]

def generate_joke(state: JokeState) -> dict:
    """Worker: process one subject, return to accumulator."""
    joke = llm.invoke(f"Tell a short joke about {state['subject']}")
    return {"jokes": [f"{state['subject']}: {joke.content}"]}
python
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
from operator import add

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], add]  # 从并行分支累积结果

class JokeState(TypedDict):
    subject: str

def generate_topics(state: OverallState) -> dict:
    """初始节点:创建主题列表。"""
    return {"subjects": ["cats", "dogs", "programming", "coffee"]}

def continue_to_jokes(state: OverallState) -> list[Send]:
    """扇出:为每个主题创建并行分支。"""
    return [
        Send("generate_joke", {"subject": s})
        for s in state["subjects"]
    ]

def generate_joke(state: JokeState) -> dict:
    """Worker:处理单个主题,返回结果至累加器。"""
    joke = llm.invoke(f"Tell a short joke about {state['subject']}")
    return {"jokes": [f"{state['subject']}: {joke.content}"]}

Build graph

构建图

builder = StateGraph(OverallState) builder.add_node("generate_topics", generate_topics) builder.add_node("generate_joke", generate_joke)
builder.add_edge(START, "generate_topics") builder.add_conditional_edges("generate_topics", continue_to_jokes) builder.add_edge("generate_joke", END) # All branches converge automatically
graph = builder.compile()
builder = StateGraph(OverallState) builder.add_node("generate_topics", generate_topics) builder.add_node("generate_joke", generate_joke)
builder.add_edge(START, "generate_topics") builder.add_conditional_edges("generate_topics", continue_to_jokes) builder.add_edge("generate_joke", END) # 所有分支自动收敛
graph = builder.compile()

Invoke

调用

result = graph.invoke({"subjects": [], "jokes": []})
result = graph.invoke({"subjects": [], "jokes": []})

result["jokes"] contains all 4 jokes

result["jokes"] 包含所有4个笑话

undefined
undefined

Parallel Agent Analysis

并行Agent分析

python
from typing import Annotated
from operator import add

class AnalysisState(TypedDict):
    content: str
    findings: Annotated[list[dict], add]  # Accumulates

async def run_parallel_agents(state: AnalysisState):
    """Run multiple agents in parallel."""
    agents = [security_agent, tech_agent, quality_agent]

    # Run all concurrently
    tasks = [agent.analyze(state["content"]) for agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter successful results
    findings = [r for r in results if not isinstance(r, Exception)]

    return {"findings": findings}
python
from typing import Annotated
from operator import add

class AnalysisState(TypedDict):
    content: str
    findings: Annotated[list[dict], add]  # 累积结果

async def run_parallel_agents(state: AnalysisState):
    """并行运行多个Agent。"""
    agents = [security_agent, tech_agent, quality_agent]

    # 并发运行所有Agent
    tasks = [agent.analyze(state["content"]) for agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 过滤成功结果
    findings = [r for r in results if not isinstance(r, Exception)]

    return {"findings": findings}

Map-Reduce Pattern (True Parallel)

Map-Reduce模式(真正并行)

python
import asyncio

async def parallel_map(items: list, process_fn) -> list:
    """Map: Process all items concurrently."""
    tasks = [asyncio.create_task(process_fn(item)) for item in items]
    return await asyncio.gather(*tasks, return_exceptions=True)

def reduce_results(results: list) -> dict:
    """Reduce: Combine all results."""
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]

    return {
        "total": len(results),
        "passed": len(successes),
        "failed": len(failures),
        "results": successes,
        "errors": [str(e) for e in failures]
    }

async def map_reduce_node(state: State) -> dict:
    """Combined map-reduce in single node."""
    results = await parallel_map(state["items"], process_item_async)
    summary = reduce_results(results)
    return {"summary": summary}
python
import asyncio

async def parallel_map(items: list, process_fn) -> list:
    """Map:并发处理所有项。"""
    tasks = [asyncio.create_task(process_fn(item)) for item in items]
    return await asyncio.gather(*tasks, return_exceptions=True)

def reduce_results(results: list) -> dict:
    """Reduce:合并所有结果。"""
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]

    return {
        "total": len(results),
        "passed": len(successes),
        "failed": len(failures),
        "results": successes,
        "errors": [str(e) for e in failures]
    }

async def map_reduce_node(state: State) -> dict:
    """在单个节点中组合Map-Reduce。"""
    results = await parallel_map(state["items"], process_item_async)
    summary = reduce_results(results)
    return {"summary": summary}

Alternative: Using Send API for true parallelism in graph

替代方案:在图中使用Send API实现真正的并行

def fan_out_to_mappers(state: State) -> list[Send]: """Fan-out pattern for parallel map.""" return [ Send("mapper", {"item": item, "index": i}) for i, item in enumerate(state["items"]) ]
def fan_out_to_mappers(state: State) -> list[Send]: """并行Map的扇出模式。""" return [ Send("mapper", {"item": item, "index": i}) for i, item in enumerate(state["items"]) ]

All mappers write to accumulating state key

所有mapper写入累积状态键

Reducer runs after all mappers complete (automatic fan-in)

Reducer在所有mapper完成后自动运行(自动扇入)

undefined
undefined

Error Isolation

错误隔离

python
async def parallel_with_isolation(tasks: list):
    """Run parallel tasks, isolate failures."""
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = []
    failures = []

    for task, result in zip(tasks, results):
        if isinstance(result, Exception):
            failures.append({"task": task, "error": str(result)})
        else:
            successes.append(result)

    return {"successes": successes, "failures": failures}
python
async def parallel_with_isolation(tasks: list):
    """运行并行任务,隔离失败项。"""
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = []
    failures = []

    for task, result in zip(tasks, results):
        if isinstance(result, Exception):
            failures.append({"task": task, "error": str(result)})
        else:
            successes.append(result)

    return {"successes": successes, "failures": failures}

Timeout per Branch

分支超时设置

python
import asyncio

async def parallel_with_timeout(agents: list, content: str, timeout: int = 30):
    """Run agents with per-agent timeout."""
    async def run_with_timeout(agent):
        try:
            return await asyncio.wait_for(
                agent.analyze(content),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return {"agent": agent.name, "error": "timeout"}

    tasks = [run_with_timeout(a) for a in agents]
    return await asyncio.gather(*tasks)
python
import asyncio

async def parallel_with_timeout(agents: list, content: str, timeout: int = 30):
    """为每个Agent设置超时并运行。"""
    async def run_with_timeout(agent):
        try:
            return await asyncio.wait_for(
                agent.analyze(content),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return {"agent": agent.name, "error": "timeout"}

    tasks = [run_with_timeout(a) for a in agents]
    return await asyncio.gather(*tasks)

Key Decisions

关键决策

DecisionRecommendation
Max parallel5-10 concurrent (avoid overwhelming APIs)
Error handlingreturn_exceptions=True (don't fail all)
Timeout30-60s per branch
AccumulatorUse
Annotated[list, add]
for results
决策项推荐方案
最大并行数5-10个并发任务(避免压垮API)
错误处理使用return_exceptions=True(不全部失败)
超时时间每个分支30-60秒
累加器使用
Annotated[list, add]
存储结果

Common Mistakes

常见错误

  • No error isolation (one failure kills all)
  • No timeout (one slow branch blocks)
  • Sequential where parallel possible
  • Forgetting to wait for all branches
  • 未做错误隔离(一个失败导致全部失败)
  • 未设置超时(一个慢分支阻塞所有流程)
  • 可并行场景却用了串行执行
  • 忘记等待所有分支完成

Evaluations

评估测试

See references/evaluations.md for test cases.
查看references/evaluations.md获取测试用例。

Related Skills

相关技能

  • langgraph-state
    - Accumulating state with
    Annotated[list, add]
    reducer
  • langgraph-supervisor
    - Supervisor dispatching to parallel workers
  • langgraph-subgraphs
    - Parallel subgraph execution
  • langgraph-streaming
    - Stream progress from parallel branches
  • langgraph-checkpoints
    - Checkpoint parallel execution for recovery
  • multi-agent-orchestration
    - Higher-level coordination patterns
  • langgraph-state
    - 使用
    Annotated[list, add]
    reducer实现状态累积
  • langgraph-supervisor
    - 调度器分配任务给并行worker
  • langgraph-subgraphs
    - 并行子图执行
  • langgraph-streaming
    - 从并行分支流式输出进度
  • langgraph-checkpoints
    - 为并行执行设置检查点以实现恢复
  • multi-agent-orchestration
    - 更高层级的协调模式

Capability Details

能力细节

fanout-pattern

fanout-pattern

Keywords: fanout, parallel, concurrent, scatter Solves:
  • Run agents in parallel
  • Implement fan-out pattern
  • Distribute work across workers
关键词: fanout, parallel, concurrent, scatter 解决场景:
  • 并行运行Agent
  • 实现扇出模式
  • 在多个worker间分配工作

fanin-pattern

fanin-pattern

Keywords: fanin, gather, aggregate, collect Solves:
  • Aggregate parallel results
  • Implement fan-in pattern
  • Collect worker outputs
关键词: fanin, gather, aggregate, collect 解决场景:
  • 聚合并行结果
  • 实现扇入模式
  • 收集worker输出

parallel-template

parallel-template

Keywords: template, implementation, parallel, agent Solves:
  • Parallel agent fanout template
  • Production-ready code
  • Copy-paste implementation
关键词: template, implementation, parallel, agent 解决场景:
  • 并行Agent扇出模板
  • 生产就绪代码
  • 可直接复制使用的实现