langgraph-parallel
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangGraph Parallel Execution
LangGraph并行执行
Run independent nodes concurrently for performance.
为提升性能,并发运行独立节点。
Fan-Out/Fan-In Pattern
扇出/扇入模式
python
from langgraph.graph import StateGraph
def fan_out(state):
"""Split work into parallel tasks."""
state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
return state
def worker(state):
"""Process one task."""
task = state["current_task"]
result = process(task)
return {"results": [result]}
def fan_in(state):
"""Combine parallel results."""
combined = aggregate(state["results"])
return {"final": combined}
workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)
workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in") # Waits for all workerspython
from langgraph.graph import StateGraph
def fan_out(state):
"""将工作拆分到并行任务中。"""
state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
return state
def worker(state):
"""处理单个任务。"""
task = state["current_task"]
result = process(task)
return {"results": [result]}
def fan_in(state):
"""合并并行任务结果。"""
combined = aggregate(state["results"])
return {"final": combined}
workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)
workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in") # 等待所有worker完成Using Send API
使用Send API
python
from langgraph.constants import Send
def router(state):
"""Route to multiple workers in parallel."""
return [
Send("worker", {"task": task})
for task in state["tasks"]
]
workflow.add_conditional_edges("router", router)python
from langgraph.constants import Send
def router(state):
"""并行路由到多个worker。"""
return [
Send("worker", {"task": task})
for task in state["tasks"]
]
workflow.add_conditional_edges("router", router)Complete Send API Example (2026 Pattern)
完整Send API示例(2026模式)
python
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
from operator import add
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], add] # Accumulates from parallel branches
class JokeState(TypedDict):
subject: str
def generate_topics(state: OverallState) -> dict:
"""Initial node: create list of subjects."""
return {"subjects": ["cats", "dogs", "programming", "coffee"]}
def continue_to_jokes(state: OverallState) -> list[Send]:
"""Fan-out: create parallel branch for each subject."""
return [
Send("generate_joke", {"subject": s})
for s in state["subjects"]
]
def generate_joke(state: JokeState) -> dict:
"""Worker: process one subject, return to accumulator."""
joke = llm.invoke(f"Tell a short joke about {state['subject']}")
return {"jokes": [f"{state['subject']}: {joke.content}"]}python
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
from typing import TypedDict, Annotated
from operator import add
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], add] # 从并行分支累积结果
class JokeState(TypedDict):
subject: str
def generate_topics(state: OverallState) -> dict:
"""初始节点:创建主题列表。"""
return {"subjects": ["cats", "dogs", "programming", "coffee"]}
def continue_to_jokes(state: OverallState) -> list[Send]:
"""扇出:为每个主题创建并行分支。"""
return [
Send("generate_joke", {"subject": s})
for s in state["subjects"]
]
def generate_joke(state: JokeState) -> dict:
"""Worker:处理单个主题,返回结果至累加器。"""
joke = llm.invoke(f"Tell a short joke about {state['subject']}")
return {"jokes": [f"{state['subject']}: {joke.content}"]}Build graph
构建图
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes)
builder.add_edge("generate_joke", END) # All branches converge automatically
graph = builder.compile()
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes)
builder.add_edge("generate_joke", END) # 所有分支自动收敛
graph = builder.compile()
Invoke
调用
result = graph.invoke({"subjects": [], "jokes": []})
result = graph.invoke({"subjects": [], "jokes": []})
result["jokes"] contains all 4 jokes
result["jokes"] 包含所有4个笑话
undefinedundefinedParallel Agent Analysis
并行Agent分析
python
from typing import Annotated
from operator import add
class AnalysisState(TypedDict):
content: str
findings: Annotated[list[dict], add] # Accumulates
async def run_parallel_agents(state: AnalysisState):
"""Run multiple agents in parallel."""
agents = [security_agent, tech_agent, quality_agent]
# Run all concurrently
tasks = [agent.analyze(state["content"]) for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
findings = [r for r in results if not isinstance(r, Exception)]
return {"findings": findings}python
from typing import Annotated
from operator import add
class AnalysisState(TypedDict):
content: str
findings: Annotated[list[dict], add] # 累积结果
async def run_parallel_agents(state: AnalysisState):
"""并行运行多个Agent。"""
agents = [security_agent, tech_agent, quality_agent]
# 并发运行所有Agent
tasks = [agent.analyze(state["content"]) for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤成功结果
findings = [r for r in results if not isinstance(r, Exception)]
return {"findings": findings}Map-Reduce Pattern (True Parallel)
Map-Reduce模式(真正并行)
python
import asyncio
async def parallel_map(items: list, process_fn) -> list:
"""Map: Process all items concurrently."""
tasks = [asyncio.create_task(process_fn(item)) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
def reduce_results(results: list) -> dict:
"""Reduce: Combine all results."""
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return {
"total": len(results),
"passed": len(successes),
"failed": len(failures),
"results": successes,
"errors": [str(e) for e in failures]
}
async def map_reduce_node(state: State) -> dict:
"""Combined map-reduce in single node."""
results = await parallel_map(state["items"], process_item_async)
summary = reduce_results(results)
return {"summary": summary}python
import asyncio
async def parallel_map(items: list, process_fn) -> list:
"""Map:并发处理所有项。"""
tasks = [asyncio.create_task(process_fn(item)) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
def reduce_results(results: list) -> dict:
"""Reduce:合并所有结果。"""
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return {
"total": len(results),
"passed": len(successes),
"failed": len(failures),
"results": successes,
"errors": [str(e) for e in failures]
}
async def map_reduce_node(state: State) -> dict:
"""在单个节点中组合Map-Reduce。"""
results = await parallel_map(state["items"], process_item_async)
summary = reduce_results(results)
return {"summary": summary}Alternative: Using Send API for true parallelism in graph
替代方案:在图中使用Send API实现真正的并行
def fan_out_to_mappers(state: State) -> list[Send]:
"""Fan-out pattern for parallel map."""
return [
Send("mapper", {"item": item, "index": i})
for i, item in enumerate(state["items"])
]
def fan_out_to_mappers(state: State) -> list[Send]:
"""并行Map的扇出模式。"""
return [
Send("mapper", {"item": item, "index": i})
for i, item in enumerate(state["items"])
]
All mappers write to accumulating state key
所有mapper写入累积状态键
Reducer runs after all mappers complete (automatic fan-in)
Reducer在所有mapper完成后自动运行(自动扇入)
undefinedundefinedError Isolation
错误隔离
python
async def parallel_with_isolation(tasks: list):
"""Run parallel tasks, isolate failures."""
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for task, result in zip(tasks, results):
if isinstance(result, Exception):
failures.append({"task": task, "error": str(result)})
else:
successes.append(result)
return {"successes": successes, "failures": failures}python
async def parallel_with_isolation(tasks: list):
"""运行并行任务,隔离失败项。"""
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for task, result in zip(tasks, results):
if isinstance(result, Exception):
failures.append({"task": task, "error": str(result)})
else:
successes.append(result)
return {"successes": successes, "failures": failures}Timeout per Branch
分支超时设置
python
import asyncio
async def parallel_with_timeout(agents: list, content: str, timeout: int = 30):
"""Run agents with per-agent timeout."""
async def run_with_timeout(agent):
try:
return await asyncio.wait_for(
agent.analyze(content),
timeout=timeout
)
except asyncio.TimeoutError:
return {"agent": agent.name, "error": "timeout"}
tasks = [run_with_timeout(a) for a in agents]
return await asyncio.gather(*tasks)python
import asyncio
async def parallel_with_timeout(agents: list, content: str, timeout: int = 30):
"""为每个Agent设置超时并运行。"""
async def run_with_timeout(agent):
try:
return await asyncio.wait_for(
agent.analyze(content),
timeout=timeout
)
except asyncio.TimeoutError:
return {"agent": agent.name, "error": "timeout"}
tasks = [run_with_timeout(a) for a in agents]
return await asyncio.gather(*tasks)Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Max parallel | 5-10 concurrent (avoid overwhelming APIs) |
| Error handling | return_exceptions=True (don't fail all) |
| Timeout | 30-60s per branch |
| Accumulator | Use |
| 决策项 | 推荐方案 |
|---|---|
| 最大并行数 | 5-10个并发任务(避免压垮API) |
| 错误处理 | 使用return_exceptions=True(不全部失败) |
| 超时时间 | 每个分支30-60秒 |
| 累加器 | 使用 |
Common Mistakes
常见错误
- No error isolation (one failure kills all)
- No timeout (one slow branch blocks)
- Sequential where parallel possible
- Forgetting to wait for all branches
- 未做错误隔离(一个失败导致全部失败)
- 未设置超时(一个慢分支阻塞所有流程)
- 可并行场景却用了串行执行
- 忘记等待所有分支完成
Evaluations
评估测试
See references/evaluations.md for test cases.
查看references/evaluations.md获取测试用例。
Related Skills
相关技能
- - Accumulating state with
langgraph-statereducerAnnotated[list, add] - - Supervisor dispatching to parallel workers
langgraph-supervisor - - Parallel subgraph execution
langgraph-subgraphs - - Stream progress from parallel branches
langgraph-streaming - - Checkpoint parallel execution for recovery
langgraph-checkpoints - - Higher-level coordination patterns
multi-agent-orchestration
- - 使用
langgraph-statereducer实现状态累积Annotated[list, add] - - 调度器分配任务给并行worker
langgraph-supervisor - - 并行子图执行
langgraph-subgraphs - - 从并行分支流式输出进度
langgraph-streaming - - 为并行执行设置检查点以实现恢复
langgraph-checkpoints - - 更高层级的协调模式
multi-agent-orchestration
Capability Details
能力细节
fanout-pattern
fanout-pattern
Keywords: fanout, parallel, concurrent, scatter
Solves:
- Run agents in parallel
- Implement fan-out pattern
- Distribute work across workers
关键词: fanout, parallel, concurrent, scatter
解决场景:
- 并行运行Agent
- 实现扇出模式
- 在多个worker间分配工作
fanin-pattern
fanin-pattern
Keywords: fanin, gather, aggregate, collect
Solves:
- Aggregate parallel results
- Implement fan-in pattern
- Collect worker outputs
关键词: fanin, gather, aggregate, collect
解决场景:
- 聚合并行结果
- 实现扇入模式
- 收集worker输出
parallel-template
parallel-template
Keywords: template, implementation, parallel, agent
Solves:
- Parallel agent fanout template
- Production-ready code
- Copy-paste implementation
关键词: template, implementation, parallel, agent
解决场景:
- 并行Agent扇出模板
- 生产就绪代码
- 可直接复制使用的实现