Loading...
Loading...
Compare original and translation side by side
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operatorfrom langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
---
---from langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
"""State schema shared across all nodes."""
messages: list
user_input: str
final_output: str
metadata: dictfrom langgraph.graph import StateGraph, END
from typing import TypedDict
class AgentState(TypedDict):
"""所有节点共享的状态 schema。"""
messages: list
user_input: str
final_output: str
metadata: dict
**Key Properties**:
- **Nodes**: Agent functions that transform state
- **Edges**: Transitions between nodes (static or conditional)
- **State**: Shared data structure passed between nodes
- **Entry Point**: Starting node of execution
- **END**: Terminal node signaling completion
**核心属性**:
- **节点**:用于转换状态的Agent函数
- **边**:节点之间的流转规则(静态或条件)
- **状态**:在节点之间传递的共享数据结构
- **入口点**:执行的起始节点
- **END**:标识执行完成的终端节点def research_agent(state: AgentState) -> dict:
"""Node function: receives state, returns updates."""
query = state["user_input"]
# Perform research (simplified)
results = search_web(query)
# Return state updates (partial state)
return {
"messages": state["messages"] + [f"Research: {results}"],
"metadata": {"research_complete": True}
}def research_agent(state: AgentState) -> dict:
"""节点函数:接收状态,返回更新。"""
query = state["user_input"]
# 执行调研(简化版)
results = search_web(query)
# 返回状态更新(部分状态)
return {
"messages": state["messages"] + [f"Research: {results}"],
"metadata": {"research_complete": True}
}
**Node Behavior**:
- Receives **full state** as input
- Returns **partial state** (only fields to update)
- Can be sync or async functions
- Can invoke LLMs, call APIs, run computations
**节点行为**:
- 接收**完整状态**作为输入
- 返回**部分状态**(仅需要更新的字段)
- 可以是同步或异步函数
- 可以调用LLM、API、执行计算undefinedundefinedundefinedundefineddef should_continue(state: AgentState) -> str:
"""Routing function: decides next node based on state."""
last_message = state["messages"][-1]
if "APPROVED" in last_message:
return END
elif "NEEDS_REVISION" in last_message:
return "writer"
else:
return "reviewer"
workflow.add_conditional_edges(
"reviewer", # Source node
should_continue, # Routing function
{
END: END,
"writer": "writer",
"reviewer": "reviewer"
}
)def should_continue(state: AgentState) -> str:
"""路由函数:根据状态决定下一个节点。"""
last_message = state["messages"][-1]
if "APPROVED" in last_message:
return END
elif "NEEDS_REVISION" in last_message:
return "writer"
else:
return "reviewer"
workflow.add_conditional_edges(
"reviewer", # 源节点
should_continue, # 路由函数
{
END: END,
"writer": "writer",
"reviewer": "reviewer"
}
)from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropicfrom langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropicprompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)
return {
"research_notes": [response.content]
}prompt = f"Research key points about: {topic}"
response = llm.invoke(prompt)
return {
"research_notes": [response.content]
}prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)
return {
"draft": response.content,
"revision_count": state.get("revision_count", 0)
}prompt = f"Write article based on:\n{notes}"
response = llm.invoke(prompt)
return {
"draft": response.content,
"revision_count": state.get("revision_count", 0)
}prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)
approved = "APPROVED" in response.content
return {
"approved": approved,
"revision_count": state["revision_count"] + 1,
"research_notes": [f"Review feedback: {response.content}"]
}prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
response = llm.invoke(prompt)
approved = "APPROVED" in response.content
return {
"approved": approved,
"revision_count": state["revision_count"] + 1,
"research_notes": [f"Review feedback: {response.content}"]
}
---
---from typing import TypedDict, Annotated, Literal
import operator
class WorkflowState(TypedDict):
# Simple fields (replaced on update)
user_id: str
request_id: str
status: Literal["pending", "processing", "complete", "failed"]
# List with reducer (appends instead of replacing)
messages: Annotated[list, operator.add]
# Dict with custom reducer
metadata: Annotated[dict, lambda x, y: {**x, **y}]
# Optional fields
error: str | None
result: dict | Nonefrom typing import TypedDict, Annotated, Literal
import operator
class WorkflowState(TypedDict):
# 简单字段(更新时直接替换)
user_id: str
request_id: str
status: Literal["pending", "processing", "complete", "failed"]
# 带 reducer 的列表(追加而非替换)
messages: Annotated[list, operator.add]
# 带自定义 reducer 的字典
metadata: Annotated[dict, lambda x, y: {**x, **y}]
# 可选字段
error: str | None
result: dict | Noneimport operator
from typing import Annotatedimport operator
from typing import Annotatedundefinedundefineddef node_function(state: State) -> dict:
"""Return only fields that should be updated."""
return {
"messages": ["New message"], # Will be appended
"status": "processing" # Will be replaced
}
# Other fields remain unchangeddef node_function(state: State) -> dict:
"""仅返回需要更新的字段。"""
return {
"messages": ["New message"], # 会被追加
"status": "processing" # 会被替换
}
# 其他字段保持不变def route_based_on_score(state: State) -> str:
"""Route to different nodes based on state."""
score = state["quality_score"]
if score >= 0.9:
return "publish"
elif score >= 0.6:
return "review"
else:
return "revise"
workflow.add_conditional_edges(
"evaluator",
route_based_on_score,
{
"publish": "publisher",
"review": "reviewer",
"revise": "reviser"
}
)def route_based_on_score(state: State) -> str:
"""根据状态路由到不同节点。"""
score = state["quality_score"]
if score >= 0.9:
return "publish"
elif score >= 0.6:
return "review"
else:
return "revise"
workflow.add_conditional_edges(
"evaluator",
route_based_on_score,
{
"publish": "publisher",
"review": "reviewer",
"revise": "reviser"
}
)from langchain_anthropic import ChatAnthropic
def llm_router(state: State) -> str:
"""Use LLM to decide next step."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Current state: {state['current_step']}
User request: {state['user_input']}
Which agent should handle this next?
Options: researcher, coder, writer, FINISH
Return only one word.
"""
response = llm.invoke(prompt)
next_agent = response.content.strip().lower()
if next_agent == "finish":
return END
else:
return next_agent
workflow.add_conditional_edges(
"supervisor",
llm_router,
{
"researcher": "researcher",
"coder": "coder",
"writer": "writer",
END: END
}
)from langchain_anthropic import ChatAnthropic
def llm_router(state: State) -> str:
"""使用 LLM 决定下一步操作。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Current state: {state['current_step']}
User request: {state['user_input']}
Which agent should handle this next?
Options: researcher, coder, writer, FINISH
Return only one word.
"""
response = llm.invoke(prompt)
next_agent = response.content.strip().lower()
if next_agent == "finish":
return END
else:
return next_agent
workflow.add_conditional_edges(
"supervisor",
llm_router,
{
"researcher": "researcher",
"coder": "coder",
"writer": "writer",
END: END
}
)def complex_router(state: State) -> str:
"""Route based on multiple conditions."""
# Check multiple conditions
has_errors = bool(state.get("errors"))
is_approved = state.get("approved", False)
iteration_count = state.get("iterations", 0)
# Priority-based routing
if has_errors:
return "error_handler"
elif is_approved:
return END
elif iteration_count >= 5:
return "escalation"
else:
return "processor"
workflow.add_conditional_edges(
"validator",
complex_router,
{
"error_handler": "error_handler",
"processor": "processor",
"escalation": "escalation",
END: END
}
)def complex_router(state: State) -> str:
"""基于多个条件进行路由。"""
# 检查多个条件
has_errors = bool(state.get("errors"))
is_approved = state.get("approved", False)
iteration_count = state.get("iterations", 0)
# 基于优先级的路由
if has_errors:
return "error_handler"
elif is_approved:
return END
elif iteration_count >= 5:
return "escalation"
else:
return "processor"
workflow.add_conditional_edges(
"validator",
complex_router,
{
"error_handler": "error_handler",
"processor": "processor",
"escalation": "escalation",
END: END
}
)from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal
class SupervisorState(TypedDict):
task: str
agent_history: list
result: dict
next_agent: strfrom langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal
class SupervisorState(TypedDict):
task: str
agent_history: list
result: dict
next_agent: strdef run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Research: {state['task']}")
return {
"agent_history": [f"Research: {response.content}"],
"result": {"research": response.content}
}def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Code: {state['task']}")
return {
"agent_history": [f"Code: {response.content}"],
"result": {"code": response.content}
}def route(self, state: SupervisorState) -> dict:
"""Decide which agent to use next."""
history = "\n".join(state.get("agent_history", []))
prompt = f"""
Task: {state['task']}
Progress: {history}
Which agent should handle the next step?
Options: researcher, coder, FINISH
Return only one word.
"""
response = self.llm.invoke(prompt)
next_agent = response.content.strip().lower()
return {"next_agent": next_agent}def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Research: {state['task']}")
return {
"agent_history": [f"Research: {response.content}"],
"result": {"research": response.content}
}def run(self, state: SupervisorState) -> dict:
response = self.llm.invoke(f"Code: {state['task']}")
return {
"agent_history": [f"Code: {response.content}"],
"result": {"code": response.content}
}def route(self, state: SupervisorState) -> dict:
"""决定下一步使用哪个Agent。"""
history = "\n".join(state.get("agent_history", []))
prompt = f"""
Task: {state['task']}
Progress: {history}
Which agent should handle the next step?
Options: researcher, coder, FINISH
Return only one word.
"""
response = self.llm.invoke(prompt)
next_agent = response.content.strip().lower()
return {"next_agent": next_agent}# Initialize agents
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()
# Add nodes
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)
# Conditional routing from supervisor
def route_from_supervisor(state: SupervisorState) -> str:
next_agent = state.get("next_agent", "FINISH")
if next_agent == "finish":
return END
return next_agent
workflow.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"researcher": "researcher",
"coder": "coder",
END: END
}
)
# Loop back to supervisor after each agent
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.set_entry_point("supervisor")
return workflow.compile()# 初始化Agent
research_agent = ResearchAgent()
coding_agent = CodingAgent()
supervisor = SupervisorAgent()
# 添加节点
workflow.add_node("supervisor", supervisor.route)
workflow.add_node("researcher", research_agent.run)
workflow.add_node("coder", coding_agent.run)
# 来自主管的条件路由
def route_from_supervisor(state: SupervisorState) -> str:
next_agent = state.get("next_agent", "FINISH")
if next_agent == "finish":
return END
return next_agent
workflow.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"researcher": "researcher",
"coder": "coder",
END: END
}
)
# 每个Agent执行完成后回到主管
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.set_entry_point("supervisor")
return workflow.compile()undefinedundefinedclass TeamState(TypedDict):
task: str
team_results: dict
def create_backend_team():
"""Sub-graph for backend development."""
workflow = StateGraph(TeamState)
workflow.add_node("api_designer", design_api)
workflow.add_node("database_designer", design_db)
workflow.add_node("implementer", implement_backend)
workflow.add_edge("api_designer", "database_designer")
workflow.add_edge("database_designer", "implementer")
workflow.add_edge("implementer", END)
workflow.set_entry_point("api_designer")
return workflow.compile()
def create_frontend_team():
"""Sub-graph for frontend development."""
workflow = StateGraph(TeamState)
workflow.add_node("ui_designer", design_ui)
workflow.add_node("component_builder", build_components)
workflow.add_node("integrator", integrate_frontend)
workflow.add_edge("ui_designer", "component_builder")
workflow.add_edge("component_builder", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("ui_designer")
return workflow.compile()class TeamState(TypedDict):
task: str
team_results: dict
def create_backend_team():
"""后端开发子图。"""
workflow = StateGraph(TeamState)
workflow.add_node("api_designer", design_api)
workflow.add_node("database_designer", design_db)
workflow.add_node("implementer", implement_backend)
workflow.add_edge("api_designer", "database_designer")
workflow.add_edge("database_designer", "implementer")
workflow.add_edge("implementer", END)
workflow.set_entry_point("api_designer")
return workflow.compile()
def create_frontend_team():
"""前端开发子图。"""
workflow = StateGraph(TeamState)
workflow.add_node("ui_designer", design_ui)
workflow.add_node("component_builder", build_components)
workflow.add_node("integrator", integrate_frontend)
workflow.add_edge("ui_designer", "component_builder")
workflow.add_edge("component_builder", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("ui_designer")
return workflow.compile()# Add team sub-graphs as nodes
backend_team = create_backend_team()
frontend_team = create_frontend_team()
workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)
# Parallel execution of teams
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("backend_team")
return workflow.compile()undefined# 将团队子图作为节点添加
backend_team = create_backend_team()
frontend_team = create_frontend_team()
workflow.add_node("backend_team", backend_team)
workflow.add_node("frontend_team", frontend_team)
workflow.add_node("integrator", integrate_teams)
# 团队并行执行
workflow.add_edge("backend_team", "integrator")
workflow.add_edge("frontend_team", "integrator")
workflow.add_edge("integrator", END)
workflow.set_entry_point("backend_team")
return workflow.compile()undefinedfrom typing import Literal
class SwarmState(TypedDict):
task: str
messages: list
current_agent: str
handoff_reason: str
def create_swarm():
"""Agents can dynamically hand off to each other."""
def research_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
# Perform research
response = llm.invoke(f"Research: {state['task']}")
# Decide if handoff needed
needs_code = "implementation" in response.content.lower()
if needs_code:
return {
"messages": [response.content],
"current_agent": "coder",
"handoff_reason": "Need implementation"
}
else:
return {
"messages": [response.content],
"current_agent": "writer",
"handoff_reason": "Ready for documentation"
}
def coding_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Implement: {state['task']}")
return {
"messages": [response.content],
"current_agent": "tester",
"handoff_reason": "Code complete, needs testing"
}
def tester_agent(state: SwarmState) -> dict:
# Test the code
tests_pass = True # Simplified
if tests_pass:
return {
"messages": ["Tests passed"],
"current_agent": "DONE",
"handoff_reason": "All tests passing"
}
else:
return {
"messages": ["Tests failed"],
"current_agent": "coder",
"handoff_reason": "Fix failing tests"
}
# Build graph
workflow = StateGraph(SwarmState)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", coding_agent)
workflow.add_node("tester", tester_agent)
# Dynamic routing based on current_agent
def route_swarm(state: SwarmState) -> str:
next_agent = state.get("current_agent", "DONE")
if next_agent == "DONE":
return END
return next_agent
workflow.add_conditional_edges(
"researcher",
route_swarm,
{"coder": "coder", "writer": "writer"}
)
workflow.add_conditional_edges(
"coder",
route_swarm,
{"tester": "tester"}
)
workflow.add_conditional_edges(
"tester",
route_swarm,
{"coder": "coder", END: END}
)
workflow.set_entry_point("researcher")
return workflow.compile()from typing import Literal
class SwarmState(TypedDict):
task: str
messages: list
current_agent: str
handoff_reason: str
def create_swarm():
"""Agent之间可以动态交接。"""
def research_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
# 执行调研
response = llm.invoke(f"Research: {state['task']}")
# 决定是否需要交接
needs_code = "implementation" in response.content.lower()
if needs_code:
return {
"messages": [response.content],
"current_agent": "coder",
"handoff_reason": "Need implementation"
}
else:
return {
"messages": [response.content],
"current_agent": "writer",
"handoff_reason": "Ready for documentation"
}
def coding_agent(state: SwarmState) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Implement: {state['task']}")
return {
"messages": [response.content],
"current_agent": "tester",
"handoff_reason": "Code complete, needs testing"
}
def tester_agent(state: SwarmState) -> dict:
# 测试代码
tests_pass = True # 简化版
if tests_pass:
return {
"messages": ["Tests passed"],
"current_agent": "DONE",
"handoff_reason": "All tests passing"
}
else:
return {
"messages": ["Tests failed"],
"current_agent": "coder",
"handoff_reason": "Fix failing tests"
}
# 构建图
workflow = StateGraph(SwarmState)
workflow.add_node("researcher", research_agent)
workflow.add_node("coder", coding_agent)
workflow.add_node("tester", tester_agent)
# 基于 current_agent 动态路由
def route_swarm(state: SwarmState) -> str:
next_agent = state.get("current_agent", "DONE")
if next_agent == "DONE":
return END
return next_agent
workflow.add_conditional_edges(
"researcher",
route_swarm,
{"coder": "coder", "writer": "writer"}
)
workflow.add_conditional_edges(
"coder",
route_swarm,
{"tester": "tester"}
)
workflow.add_conditional_edges(
"tester",
route_swarm,
{"coder": "coder", END: END}
)
workflow.set_entry_point("researcher")
return workflow.compile()from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, ENDworkflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval) # Interrupt point
workflow.add_node("publish", publish_content)
workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)
workflow.set_entry_point("draft")
# Compile with interrupt before "approve"
return workflow.compile(
checkpointer=memory,
interrupt_before=["approve"] # Pause here
)workflow.add_node("draft", create_draft)
workflow.add_node("approve", human_approval) # 中断点
workflow.add_node("publish", publish_content)
workflow.add_edge("draft", "approve")
workflow.add_edge("approve", "publish")
workflow.add_edge("publish", END)
workflow.set_entry_point("draft")
# 编译时配置在"approve"节点前中断
return workflow.compile(
checkpointer=memory,
interrupt_before=["approve"] # 在此处暂停
)undefinedundefinedclass ApprovalState(TypedDict):
draft: str
approved: bool
feedback: str
def approval_node(state: ApprovalState) -> dict:
"""This node requires human interaction."""
# This is where workflow pauses
# Human provides input through update_state
return state # No automatic changes
def create_interactive_workflow():
workflow = StateGraph(ApprovalState)
workflow.add_node("writer", write_draft)
workflow.add_node("approval_gate", approval_node)
workflow.add_node("reviser", revise_draft)
workflow.add_node("publisher", publish_final)
workflow.add_edge("writer", "approval_gate")
# Route based on approval
def route_after_approval(state: ApprovalState) -> str:
if state.get("approved"):
return "publisher"
else:
return "reviser"
workflow.add_conditional_edges(
"approval_gate",
route_after_approval,
{"publisher": "publisher", "reviser": "reviser"}
)
workflow.add_edge("reviser", "approval_gate") # Loop back
workflow.add_edge("publisher", END)
workflow.set_entry_point("writer")
memory = SqliteSaver.from_conn_string("approvals.db")
return workflow.compile(
checkpointer=memory,
interrupt_before=["approval_gate"]
)class ApprovalState(TypedDict):
draft: str
approved: bool
feedback: str
def approval_node(state: ApprovalState) -> dict:
"""该节点需要人工交互。"""
# 工作流在此处暂停
# 人工通过 update_state 提供输入
return state # 无自动变更
def create_interactive_workflow():
workflow = StateGraph(ApprovalState)
workflow.add_node("writer", write_draft)
workflow.add_node("approval_gate", approval_node)
workflow.add_node("reviser", revise_draft)
workflow.add_node("publisher", publish_final)
workflow.add_edge("writer", "approval_gate")
# 基于审批结果路由
def route_after_approval(state: ApprovalState) -> str:
if state.get("approved"):
return "publisher"
else:
return "reviser"
workflow.add_conditional_edges(
"approval_gate",
route_after_approval,
{"publisher": "publisher", "reviser": "reviser"}
)
workflow.add_edge("reviser", "approval_gate") # 循环回到审批关口
workflow.add_edge("publisher", END)
workflow.set_entry_point("writer")
memory = SqliteSaver.from_conn_string("approvals.db")
return workflow.compile(
checkpointer=memory,
interrupt_before=["approval_gate"]
)
---
---from langgraph.checkpoint.memory import MemorySaverfrom langgraph.checkpoint.memory import MemorySaverundefinedundefinedfrom langgraph.checkpoint.sqlite import SqliteSaverfrom langgraph.checkpoint.sqlite import SqliteSaverundefinedundefinedfrom langgraph.checkpoint.postgres import PostgresSaverfrom langgraph.checkpoint.postgres import PostgresSaverundefinedundefinedfrom langgraph.checkpoint.redis import RedisSaverfrom langgraph.checkpoint.redis import RedisSaver
---
---from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug_session"}}from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug_session"}}undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefineddef debug_workflow():
"""Debug workflow step-by-step."""
checkpointer = SqliteSaver.from_conn_string("debug.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug"}}
# Execute step-by-step
state = initial_state
for step in range(10): # Max 10 steps
print(f"\n=== Step {step} ===")
# Get current state
current = app.get_state(config)
print(f"Current state: {current.values}")
print(f"Next node: {current.next}")
if not current.next:
print("Workflow complete")
break
# Execute one step
for event in app.stream(None, config):
print(f"Event: {event}")
# Pause for inspection
input("Press Enter to continue...")
# View full history
print("\n=== Full History ===")
for i, snapshot in enumerate(app.get_state_history(config)):
print(f"Step {i}: {snapshot.next} -> {snapshot.values}")
debug_workflow()def debug_workflow():
"""逐步调试工作流。"""
checkpointer = SqliteSaver.from_conn_string("debug.db")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug"}}
# 逐步执行
state = initial_state
for step in range(10): # 最多10步
print(f"\n=== Step {step} ===")
# 获取当前状态
current = app.get_state(config)
print(f"Current state: {current.values}")
print(f"Next node: {current.next}")
if not current.next:
print("Workflow complete")
break
# 执行一步
for event in app.stream(None, config):
print(f"Event: {event}")
# 暂停以便检查
input("Press Enter to continue...")
# 查看完整历史
print("\n=== Full History ===")
for i, snapshot in enumerate(app.get_state_history(config)):
print(f"Step {i}: {snapshot.next} -> {snapshot.values}")
debug_workflow()from langchain_anthropic import ChatAnthropic
def streaming_node(state: dict) -> dict:
"""Node that streams LLM output."""
llm = ChatAnthropic(model="claude-sonnet-4")
# Use streaming
response = ""
for chunk in llm.stream(state["prompt"]):
content = chunk.content
print(content, end="", flush=True)
response += content
return {"response": response}from langchain_anthropic import ChatAnthropic
def streaming_node(state: dict) -> dict:
"""流式输出LLM结果的节点。"""
llm = ChatAnthropic(model="claude-sonnet-4")
# 使用流式输出
response = ""
for chunk in llm.stream(state["prompt"]):
content = chunk.content
print(content, end="", flush=True)
response += content
return {"response": response}undefinedundefinedundefinedundefinedundefinedundefinedfrom typing import AsyncGenerator
async def streaming_workflow(input_data: dict) -> AsyncGenerator:
"""Stream workflow progress in real-time."""
config = {"configurable": {"thread_id": "stream"}}
async for event in app.astream(input_data, config):
# Stream each state update
yield {
"type": "state_update",
"data": event
}
# Stream final result
final_state = app.get_state(config)
yield {
"type": "complete",
"data": final_state.values
}from typing import AsyncGenerator
async def streaming_workflow(input_data: dict) -> AsyncGenerator:
"""实时流式输出工作流进度。"""
config = {"configurable": {"thread_id": "stream"}}
async for event in app.astream(input_data, config):
# 流式输出每个状态更新
yield {
"type": "state_update",
"data": event
}
# 流式输出最终结果
final_state = app.get_state(config)
yield {
"type": "complete",
"data": final_state.values
}
---
---from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agentfrom langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agentllm = ChatAnthropic(model="claude-sonnet-4")
# Create agent with tools
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# Execute with tools
result = agent_executor.invoke({"input": state["query"]})
return {"result": result["output"]}llm = ChatAnthropic(model="claude-sonnet-4")
# 创建带工具的Agent
agent = create_react_agent(llm, tools, prompt_template)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# 使用工具执行
result = agent_executor.invoke({"input": state["query"]})
return {"result": result["output"]}undefinedundefinedfrom langchain.tools import StructuredTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression to evaluate")
def calculate(expression: str) -> str:
"""Evaluate mathematical expression."""
try:
result = eval(expression) # Simplified
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
calculator_tool = StructuredTool.from_function(
func=calculate,
name="Calculator",
description="Evaluate mathematical expressions",
args_schema=CalculatorInput
)
tools = [calculator_tool]
def tool_using_node(state: dict) -> dict:
"""Use custom tools."""
result = calculator_tool.invoke({"expression": state["math_problem"]})
return {"solution": result}from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression to evaluate")
def calculate(expression: str) -> str:
"""计算数学表达式。"""
try:
result = eval(expression) # 简化版
return f"Result: {result}"
except Exception as e:
return f"Error: {e}"
calculator_tool = StructuredTool.from_function(
func=calculate,
name="Calculator",
description="Evaluate mathematical expressions",
args_schema=CalculatorInput
)
tools = [calculator_tool]
def tool_using_node(state: dict) -> dict:
"""使用自定义工具。"""
result = calculator_tool.invoke({"expression": state["math_problem"]})
return {"solution": result}from anthropic import Anthropic
import json
def node_with_anthropic_tools(state: dict) -> dict:
"""Use Anthropic's tool use feature."""
client = Anthropic()
# Define tools
tools = [
{
"name": "get_weather",
"description": "Get weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
# First call: Request tool use
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": state["query"]}]
)
# Execute tool if requested
if response.stop_reason == "tool_use":
tool_use = next(
block for block in response.content
if block.type == "tool_use"
)
# Execute tool
tool_result = execute_tool(tool_use.name, tool_use.input)
# Second call: Provide tool result
final_response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": state["query"]},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(tool_result)
}
]
}
]
)
return {"response": final_response.content[0].text}
return {"response": response.content[0].text}from anthropic import Anthropic
import json
def node_with_anthropic_tools(state: dict) -> dict:
"""使用Anthropic的工具调用特性。"""
client = Anthropic()
# 定义工具
tools = [
{
"name": "get_weather",
"description": "Get weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
# 第一次调用:请求工具调用
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": state["query"]}]
)
# 如果请求工具调用则执行
if response.stop_reason == "tool_use":
tool_use = next(
block for block in response.content
if block.type == "tool_use"
)
# 执行工具
tool_result = execute_tool(tool_use.name, tool_use.input)
# 第二次调用:提供工具结果
final_response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": state["query"]},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(tool_result)
}
]
}
]
)
return {"response": final_response.content[0].text}
return {"response": response.content[0].text}def robust_node(state: dict) -> dict:
"""Node with error handling."""
try:
# Main logic
result = risky_operation(state["input"])
return {
"result": result,
"error": None,
"status": "success"
}
except ValueError as e:
# Handle specific error
return {
"error": f"Validation error: {e}",
"status": "validation_failed"
}
except Exception as e:
# Handle unexpected errors
return {
"error": f"Unexpected error: {e}",
"status": "failed"
}def robust_node(state: dict) -> dict:
"""带错误处理的节点。"""
try:
# 核心逻辑
result = risky_operation(state["input"])
return {
"result": result,
"error": None,
"status": "success"
}
except ValueError as e:
# 处理特定错误
return {
"error": f"Validation error: {e}",
"status": "validation_failed"
}
except Exception as e:
# 处理非预期错误
return {
"error": f"Unexpected error: {e}",
"status": "failed"
}if status == "success":
return "next_step"
elif status == "validation_failed":
return "validator"
else:
return "error_handler"undefinedif status == "success":
return "next_step"
elif status == "validation_failed":
return "validator"
else:
return "error_handler"undefinedfrom typing import TypedDict
class RetryState(TypedDict):
input: str
result: str
error: str | None
retry_count: int
max_retries: int
def node_with_retry(state: RetryState) -> dict:
"""Node that can be retried on failure."""
try:
result = unstable_operation(state["input"])
return {
"result": result,
"error": None,
"retry_count": 0
}
except Exception as e:
return {
"error": str(e),
"retry_count": state.get("retry_count", 0) + 1
}
def should_retry(state: RetryState) -> str:
"""Decide whether to retry or fail."""
if state.get("error") is None:
return "success"
retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)
if retry_count < max_retries:
return "retry"
else:
return "failed"from typing import TypedDict
class RetryState(TypedDict):
input: str
result: str
error: str | None
retry_count: int
max_retries: int
def node_with_retry(state: RetryState) -> dict:
"""失败时可重试的节点。"""
try:
result = unstable_operation(state["input"])
return {
"result": result,
"error": None,
"retry_count": 0
}
except Exception as e:
return {
"error": str(e),
"retry_count": state.get("retry_count", 0) + 1
}
def should_retry(state: RetryState) -> str:
"""决定是重试还是失败。"""
if state.get("error") is None:
return "success"
retry_count = state.get("retry_count", 0)
max_retries = state.get("max_retries", 3)
if retry_count < max_retries:
return "retry"
else:
return "failed"undefinedundefineddef node_with_fallback(state: dict) -> dict:
"""Try primary method, fall back to secondary."""
# Try primary LLM
try:
result = primary_llm(state["prompt"])
return {
"result": result,
"method": "primary"
}
except Exception as e_primary:
# Fallback to secondary LLM
try:
result = secondary_llm(state["prompt"])
return {
"result": result,
"method": "fallback",
"primary_error": str(e_primary)
}
except Exception as e_secondary:
# Both failed
return {
"error": f"Both methods failed: {e_primary}, {e_secondary}",
"method": "failed"
}def node_with_fallback(state: dict) -> dict:
"""尝试主方法,失败则降级到备用方法。"""
# 尝试主LLM
try:
result = primary_llm(state["prompt"])
return {
"result": result,
"method": "primary"
}
except Exception as e_primary:
# 降级到备用LLM
try:
result = secondary_llm(state["prompt"])
return {
"result": result,
"method": "fallback",
"primary_error": str(e_primary)
}
except Exception as e_secondary:
# 都失败了
return {
"error": f"Both methods failed: {e_primary}, {e_secondary}",
"method": "failed"
}def create_subgraph():
"""Create reusable subgraph."""
subgraph = StateGraph(dict)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.add_edge("step2", END)
subgraph.set_entry_point("step1")
return subgraph.compile()def create_subgraph():
"""创建可复用的子图。"""
subgraph = StateGraph(dict)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.add_edge("step2", END)
subgraph.set_entry_point("step1")
return subgraph.compile()# Add subgraph as a node
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)
workflow.add_node("before", before_func)
workflow.add_node("after", after_func)
workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)
workflow.set_entry_point("before")
return workflow.compile()undefined# 将子图作为节点添加
subgraph_compiled = create_subgraph()
workflow.add_node("subgraph", subgraph_compiled)
workflow.add_node("before", before_func)
workflow.add_node("after", after_func)
workflow.add_edge("before", "subgraph")
workflow.add_edge("subgraph", "after")
workflow.add_edge("after", END)
workflow.set_entry_point("before")
return workflow.compile()undefinedfrom langgraph.graph import StateGraph, END
def create_parallel_workflow():
"""Execute multiple subgraphs in parallel."""
# Create subgraphs
backend_graph = create_backend_subgraph()
frontend_graph = create_frontend_subgraph()
database_graph = create_database_subgraph()
# Main graph
workflow = StateGraph(dict)
# Add subgraphs as parallel nodes
workflow.add_node("backend", backend_graph)
workflow.add_node("frontend", frontend_graph)
workflow.add_node("database", database_graph)
# Merge results
workflow.add_node("merge", merge_results)
# All subgraphs lead to merge
workflow.add_edge("backend", "merge")
workflow.add_edge("frontend", "merge")
workflow.add_edge("database", "merge")
workflow.add_edge("merge", END)
# Set all as entry points (parallel execution)
workflow.set_entry_point("backend")
workflow.set_entry_point("frontend")
workflow.set_entry_point("database")
return workflow.compile()from langgraph.graph import StateGraph, END
def create_parallel_workflow():
"""并行执行多个子图。"""
# 创建子图
backend_graph = create_backend_subgraph()
frontend_graph = create_frontend_subgraph()
database_graph = create_database_subgraph()
# 主图
workflow = StateGraph(dict)
# 将子图作为并行节点添加
workflow.add_node("backend", backend_graph)
workflow.add_node("frontend", frontend_graph)
workflow.add_node("database", database_graph)
# 合并结果
workflow.add_node("merge", merge_results)
# 所有子图执行完成后进入合并节点
workflow.add_edge("backend", "merge")
workflow.add_edge("frontend", "merge")
workflow.add_edge("database", "merge")
workflow.add_edge("merge", END)
# 将所有子图设为入口点(并行执行)
workflow.set_entry_point("backend")
workflow.set_entry_point("frontend")
workflow.set_entry_point("database")
return workflow.compile()from typing import TypedDict, List
class MapReduceState(TypedDict):
documents: List[str]
summaries: List[str]
final_summary: str
def map_node(state: MapReduceState) -> dict:
"""Map: Summarize each document."""
llm = ChatAnthropic(model="claude-sonnet-4")
summaries = []
for doc in state["documents"]:
summary = llm.invoke(f"Summarize: {doc}")
summaries.append(summary.content)
return {"summaries": summaries}
def reduce_node(state: MapReduceState) -> dict:
"""Reduce: Combine summaries into final summary."""
llm = ChatAnthropic(model="claude-sonnet-4")
combined = "\n".join(state["summaries"])
final = llm.invoke(f"Combine these summaries:\n{combined}")
return {"final_summary": final.content}from typing import TypedDict, List
class MapReduceState(TypedDict):
documents: List[str]
summaries: List[str]
final_summary: str
def map_node(state: MapReduceState) -> dict:
"""Map:为每个文档生成摘要。"""
llm = ChatAnthropic(model="claude-sonnet-4")
summaries = []
for doc in state["documents"]:
summary = llm.invoke(f"Summarize: {doc}")
summaries.append(summary.content)
return {"summaries": summaries}
def reduce_node(state: MapReduceState) -> dict:
"""Reduce:将摘要合并为最终摘要。"""
llm = ChatAnthropic(model="claude-sonnet-4")
combined = "\n".join(state["summaries"])
final = llm.invoke(f"Combine these summaries:\n{combined}")
return {"final_summary": final.content}undefinedundefinedimport asyncio
from typing import List
async def parallel_map_node(state: dict) -> dict:
"""Map: Process documents in parallel."""
llm = ChatAnthropic(model="claude-sonnet-4")
async def summarize(doc: str) -> str:
response = await llm.ainvoke(f"Summarize: {doc}")
return response.content
# Process all documents in parallel
summaries = await asyncio.gather(
*[summarize(doc) for doc in state["documents"]]
)
return {"summaries": list(summaries)}import asyncio
from typing import List
async def parallel_map_node(state: dict) -> dict:
"""Map:并行处理文档。"""
llm = ChatAnthropic(model="claude-sonnet-4")
async def summarize(doc: str) -> str:
response = await llm.ainvoke(f"Summarize: {doc}")
return response.content
# 并行处理所有文档
summaries = await asyncio.gather(
*[summarize(doc) for doc in state["documents"]]
)
return {"summaries": list(summaries)}
---
---undefinedundefinedundefinedundefinedfrom fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicornfrom fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicorn return WorkflowResponse(
output=result,
thread_id=request.thread_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))config = {"configurable": {"thread_id": request.thread_id}}
async def generate():
async for event in workflow_app.astream(request.input, config):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream") return WorkflowResponse(
output=result,
thread_id=request.thread_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))config = {"configurable": {"thread_id": request.thread_id}}
async def generate():
async for event in workflow_app.astream(request.input, config):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")undefinedundefinedundefinedundefined
```yaml
```yaml
---
---import os
from langchain_anthropic import ChatAnthropicimport os
from langchain_anthropic import ChatAnthropicundefinedundefinedfrom langsmith import traceable
@traceable(
run_type="chain",
name="research_agent",
metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
"""Node with custom LangSmith metadata."""
# Function execution automatically traced
return {"research": "results"}
workflow.add_node("researcher", research_agent)from langsmith import traceable
@traceable(
run_type="chain",
name="research_agent",
metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
"""带自定义 LangSmith 元数据的节点。"""
# 函数执行会自动被追踪
return {"research": "results"}
workflow.add_node("researcher", research_agent)from langsmith import Client
from langsmith.evaluation import evaluate
client = Client()from langsmith import Client
from langsmith.evaluation import evaluate
client = Client()
---
---from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic
class SupportState(TypedDict):
customer_query: str
category: Literal["billing", "technical", "general"]
priority: Literal["low", "medium", "high"]
resolution: str
escalated: bool
def categorize(state: SupportState) -> dict:
"""Categorize customer query."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Categorize this support query:
"{state['customer_query']}"
Categories: billing, technical, general
Priority: low, medium, high
Return format: category|priority
"""
response = llm.invoke(prompt)
category, priority = response.content.strip().split("|")
return {
"category": category,
"priority": priority
}
def handle_billing(state: SupportState) -> dict:
"""Handle billing issues."""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")
return {"resolution": response.content}
def handle_technical(state: SupportState) -> dict:
"""Handle technical issues."""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")
return {"resolution": response.content}
def escalate(state: SupportState) -> dict:
"""Escalate to human agent."""
return {
"escalated": True,
"resolution": "Escalated to senior support team"
}
def route_by_category(state: SupportState) -> str:
"""Route based on category and priority."""
if state["priority"] == "high":
return "escalate"
category = state["category"]
if category == "billing":
return "billing"
elif category == "technical":
return "technical"
else:
return "general"from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic
class SupportState(TypedDict):
customer_query: str
category: Literal["billing", "technical", "general"]
priority: Literal["low", "medium", "high"]
resolution: str
escalated: bool
def categorize(state: SupportState) -> dict:
"""对客户查询进行分类。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Categorize this support query:
"{state['customer_query']}"
Categories: billing, technical, general
Priority: low, medium, high
Return format: category|priority
"""
response = llm.invoke(prompt)
category, priority = response.content.strip().split("|")
return {
"category": category,
"priority": priority
}
def handle_billing(state: SupportState) -> dict:
"""处理账单问题。"""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")
return {"resolution": response.content}
def handle_technical(state: SupportState) -> dict:
"""处理技术问题。"""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")
return {"resolution": response.content}
def escalate(state: SupportState) -> dict:
"""升级到人工Agent。"""
return {
"escalated": True,
"resolution": "Escalated to senior support team"
}
def route_by_category(state: SupportState) -> str:
"""基于分类和优先级路由。"""
if state["priority"] == "high":
return "escalate"
category = state["category"]
if category == "billing":
return "billing"
elif category == "technical":
return "technical"
else:
return "general"undefinedundefinedclass ResearchState(TypedDict):
topic: str
research_notes: list
outline: str
draft: str
final_article: str
revision_count: int
def research(state: ResearchState) -> dict:
"""Gather information on topic."""
# Use search tools
results = search_web(state["topic"])
return {
"research_notes": [f"Research: {results}"]
}
def create_outline(state: ResearchState) -> dict:
"""Create article outline."""
llm = ChatAnthropic(model="claude-sonnet-4")
notes = "\n".join(state["research_notes"])
response = llm.invoke(f"Create outline for: {notes}")
return {"outline": response.content}
def write_draft(state: ResearchState) -> dict:
"""Write article draft."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Topic: {state['topic']}
Outline: {state['outline']}
Research: {state['research_notes']}
Write comprehensive article.
"""
response = llm.invoke(prompt)
return {"draft": response.content}
def review_and_revise(state: ResearchState) -> dict:
"""Review draft quality."""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Review and improve:\n{state['draft']}")
return {
"final_article": response.content,
"revision_count": state.get("revision_count", 0) + 1
}class ResearchState(TypedDict):
topic: str
research_notes: list
outline: str
draft: str
final_article: str
revision_count: int
def research(state: ResearchState) -> dict:
"""收集主题相关信息。"""
# 使用搜索工具
results = search_web(state["topic"])
return {
"research_notes": [f"Research: {results}"]
}
def create_outline(state: ResearchState) -> dict:
"""生成文章大纲。"""
llm = ChatAnthropic(model="claude-sonnet-4")
notes = "\n".join(state["research_notes"])
response = llm.invoke(f"Create outline for: {notes}")
return {"outline": response.content}
def write_draft(state: ResearchState) -> dict:
"""撰写文章草稿。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Topic: {state['topic']}
Outline: {state['outline']}
Research: {state['research_notes']}
Write comprehensive article.
"""
response = llm.invoke(prompt)
return {"draft": response.content}
def review_and_revise(state: ResearchState) -> dict:
"""审核草稿质量。"""
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(f"Review and improve:\n{state['draft']}")
return {
"final_article": response.content,
"revision_count": state.get("revision_count", 0) + 1
}# Simple quality check
if len(state.get("final_article", "")) > 1000:
return END
else:
return "write" # Reviseundefined# 简单质量检查
if len(state.get("final_article", "")) > 1000:
return END
else:
return "write" # 修订undefinedclass CodeReviewState(TypedDict):
code: str
language: str
lint_results: list
test_results: dict
security_scan: dict
review_comments: list
approved: bool
def lint_code(state: CodeReviewState) -> dict:
"""Run linters on code."""
# Run appropriate linter
issues = run_linter(state["code"], state["language"])
return {"lint_results": issues}
def run_tests(state: CodeReviewState) -> dict:
"""Execute test suite."""
results = execute_tests(state["code"])
return {"test_results": results}
def security_scan(state: CodeReviewState) -> dict:
"""Scan for security vulnerabilities."""
scan_results = scan_for_vulnerabilities(state["code"])
return {"security_scan": scan_results}
def ai_review(state: CodeReviewState) -> dict:
"""AI-powered code review."""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Review this {state['language']} code:
{state['code']}
Lint issues: {state['lint_results']}
Test results: {state['test_results']}
Security: {state['security_scan']}
Provide review comments.
"""
response = llm.invoke(prompt)
return {"review_comments": [response.content]}
def approve_or_reject(state: CodeReviewState) -> dict:
"""Final approval decision."""
# Auto-approve if all checks pass
lint_ok = len(state["lint_results"]) == 0
tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0
approved = lint_ok and tests_ok and security_ok
return {"approved": approved}class CodeReviewState(TypedDict):
code: str
language: str
lint_results: list
test_results: dict
security_scan: dict
review_comments: list
approved: bool
def lint_code(state: CodeReviewState) -> dict:
"""对代码运行静态检查。"""
# 运行对应的 linter
issues = run_linter(state["code"], state["language"])
return {"lint_results": issues}
def run_tests(state: CodeReviewState) -> dict:
"""执行测试套件。"""
results = execute_tests(state["code"])
return {"test_results": results}
def security_scan(state: CodeReviewState) -> dict:
"""扫描安全漏洞。"""
scan_results = scan_for_vulnerabilities(state["code"])
return {"security_scan": scan_results}
def ai_review(state: CodeReviewState) -> dict:
"""AI驱动的代码评审。"""
llm = ChatAnthropic(model="claude-sonnet-4")
prompt = f"""
Review this {state['language']} code:
{state['code']}
Lint issues: {state['lint_results']}
Test results: {state['test_results']}
Security: {state['security_scan']}
Provide review comments.
"""
response = llm.invoke(prompt)
return {"review_comments": [response.content]}
def approve_or_reject(state: CodeReviewState) -> dict:
"""最终审批决策。"""
# 如果所有检查通过则自动审批
lint_ok = len(state["lint_results"]) == 0
tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0
approved = lint_ok and tests_ok and security_ok
return {"approved": approved}
---
---undefinedundefinedundefinedundefinedfrom langgraph.checkpoint.sqlite import SqliteSaverfrom langgraph.checkpoint.sqlite import SqliteSaverundefinedundefineddef batch_processing_node(state: dict) -> dict:
"""Process items in batches."""
items = state["items"]
batch_size = 10
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_result = process_batch(batch)
results.extend(batch_result)
return {"results": results}def batch_processing_node(state: dict) -> dict:
"""分批处理条目。"""
items = state["items"]
batch_size = 10
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
batch_result = process_batch(batch)
results.extend(batch_result)
return {"results": results}from langgraph.graph import StateGraph
import asyncio
async def async_node(state: dict) -> dict:
"""Async node for I/O-bound operations."""
results = await asyncio.gather(
fetch_api_1(state["input"]),
fetch_api_2(state["input"]),
fetch_api_3(state["input"])
)
return {"results": results}from langgraph.graph import StateGraph
import asyncio
async def async_node(state: dict) -> dict:
"""适用于IO密集型操作的异步节点。"""
results = await asyncio.gather(
fetch_api_1(state["input"]),
fetch_api_2(state["input"]),
fetch_api_3(state["input"])
)
return {"results": results}
---
---from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
**Limitations**:
- ❌ No state persistence
- ❌ No conditional branching
- ❌ No cycles/loops
- ❌ No human-in-the-loop
- ❌ Limited debugging
**局限性**:
- ❌ 无状态持久化
- ❌ 无条件分支
- ❌ 无循环/回路
- ❌ 无人在回路支持
- ❌ 调试能力有限from langgraph.graph import StateGraph, END
workflow = StateGraph(dict)
workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)from langgraph.graph import StateGraph, END
workflow = StateGraph(dict)
workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)
**Advantages**:
- ✅ Persistent state across steps
- ✅ Conditional branching
- ✅ Cycles and loops
- ✅ Human-in-the-loop support
- ✅ Time-travel debugging
- ✅ Production-ready
---
**优势**:
- ✅ 跨步骤持久化状态
- ✅ 条件分支
- ✅ 循环和回路
- ✅ 人在回路支持
- ✅ 时间旅行调试
- ✅ 生产就绪
---from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
chain = (
{"input": RunnablePassthrough()}
| llm
| {"output": lambda x: x.content}
)
result = chain.invoke("Question")from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4")
chain = (
{"input": RunnablePassthrough()}
| llm
| {"output": lambda x: x.content}
)
result = chain.invoke("Question")from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
input: str
output: str
def llm_node(state: State) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(state["input"])
return {"output": response.content}
workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")
app = workflow.compile()
result = app.invoke({"input": "Question", "output": ""})from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
input: str
output: str
def llm_node(state: State) -> dict:
llm = ChatAnthropic(model="claude-sonnet-4")
response = llm.invoke(state["input"])
return {"output": response.content}
workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")
app = workflow.compile()
result = app.invoke({"input": "Question", "output": ""})undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedimport logging
logger = logging.getLogger(__name__)
def monitored_node(state: dict) -> dict:
"""Node with logging."""
logger.info(f"Executing node with state: {state.keys()}")
try:
result = operation()
logger.info("Node succeeded")
return {"result": result}
except Exception as e:
logger.error(f"Node failed: {e}")
raiseimport logging
logger = logging.getLogger(__name__)
def monitored_node(state: dict) -> dict:
"""带日志的节点。"""
logger.info(f"Executing node with state: {state.keys()}")
try:
result = operation()
logger.info("Node succeeded")
return {"result": result}
except Exception as e:
logger.error(f"Node failed: {e}")
raise