Loading...
Loading...
Guides the agent through building LLM-powered applications with LangChain and stateful agent workflows with LangGraph. Triggered when the user asks to "create an AI agent", "build a LangChain chain", "create a LangGraph workflow", "implement tool calling", "build RAG pipeline", "create a multi-agent system", "define agent state", "add human-in-the-loop", "implement streaming", or mentions LangChain, LangGraph, chains, agents, tools, retrieval augmented generation, state graphs, or LLM orchestration.
npx skill4agent add ingpdw/pdw-python-dev-tool agent-workflowlangchain_<provider>langchain.chat_modelsfrom langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# OpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Anthropic
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)from langchain_core.messages import (
HumanMessage,
AIMessage,
SystemMessage,
ToolMessage,
)
messages = [
SystemMessage(content="Answer concisely."),
HumanMessage(content="What is LangGraph?"),
]
response = llm.invoke(messages) # returns AIMessageAIMessagecontenttool_callsresponse_metadataChatPromptTemplatefrom langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
prompt = ChatPromptTemplate.from_messages([
("system", "Answer questions about {topic}. Be concise."),
MessagesPlaceholder("chat_history", optional=True),
("human", "{question}"),
])
# Invoke with variables
messages = prompt.invoke({
"topic": "Python",
"question": "What is asyncio?",
"chat_history": [],
})MessagesPlaceholderoptional=Truefrom langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
# Simple string output
parser = StrOutputParser()
# Pydantic structured output
class MovieReview(BaseModel):
title: str = Field(description="Movie title")
rating: int = Field(description="Rating from 1 to 10", ge=1, le=10)
summary: str = Field(description="Brief summary")
pydantic_parser = PydanticOutputParser(pydantic_object=MovieReview)
# JSON output
json_parser = JsonOutputParser(pydantic_object=MovieReview)model.with_structured_output(MovieReview)structured_llm = llm.with_structured_output(MovieReview)
result = structured_llm.invoke("Review the movie Inception")
# result is a MovieReview instancepydantic|Runnablefrom langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_messages([
("system", "Translate the following to {language}."),
("human", "{text}"),
])
chain = prompt | ChatOpenAI(model="gpt-4o") | StrOutputParser()
result = chain.invoke({"language": "French", "text": "Hello, world!"})from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
# Pass input through unchanged (useful for forwarding data)
RunnablePassthrough()
# Wrap any function as a Runnable
RunnableLambda(lambda x: x["text"].upper())
# Run multiple chains in parallel, merge results
RunnableParallel(
summary=summary_chain,
keywords=keywords_chain,
)from langchain_core.runnables import RunnablePassthrough
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
).invoke().ainvoke().stream().astream().batch().abatch()from langchain_core.tools import tool
@tool
def search_database(query: str, limit: int = 10) -> list[dict]:
"""Search the product database by query string.
Args:
query: The search query to match against product names and descriptions.
limit: Maximum number of results to return.
"""
return db.search(query, limit=limit)args_schemafrom langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchArgs(BaseModel):
query: str = Field(description="Search query string")
category: str = Field(description="Product category to filter")
limit: int = Field(default=10, ge=1, le=100)
search_tool = StructuredTool.from_function(
func=search_database,
name="search_database",
description="Search the product database with filters",
args_schema=SearchArgs,
)tools = [search_database, get_weather, calculate]
# Bind tools to a model
llm_with_tools = llm.bind_tools(tools)
# Force a specific tool
llm_with_tools = llm.bind_tools(tools, tool_choice="search_database")
# Parse tool calls from the response
response = llm_with_tools.invoke("Find laptops under $1000")
for tool_call in response.tool_calls:
print(tool_call["name"], tool_call["args"])references/tools.mdTypedDictAnnotatedimport operator
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages] # append messages
context: str # overwrite on each update
iteration_count: Annotated[int, operator.add] # sum valuesadd_messagesRemoveMessageoperator.addfrom langchain_core.messages import AIMessage
async def agent_node(state: AgentState) -> dict:
"""Call the LLM with the current messages."""
response = await llm_with_tools.ainvoke(state["messages"])
return {"messages": [response]}
async def process_node(state: AgentState) -> dict:
"""Post-process the agent output."""
last_message = state["messages"][-1]
return {"context": last_message.content, "iteration_count": 1}from langgraph.graph import StateGraph, END
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
graph.add_node("process", process_node)
# Static edge
graph.add_edge("tools", "agent")
# Conditional edge -- route based on state
def should_continue(state: AgentState) -> str:
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return "process"
graph.add_conditional_edges("agent", should_continue)
# Terminal edge
graph.add_edge("process", END)
# Set entry point
graph.set_entry_point("agent")from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "user-123"}}
result = await app.ainvoke(
{"messages": [HumanMessage(content="Find laptops under $1000")]},
config=config,
)thread_idfrom langgraph.prebuilt import ToolNode
tools = [search_database, get_weather]
tool_node = ToolNode(tools)
llm_with_tools = llm.bind_tools(tools)
async def call_model(state: AgentState) -> dict:
response = await llm_with_tools.ainvoke(state["messages"])
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
if state["messages"][-1].tool_calls:
return "tools"
return END
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", tool_node)
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue)
graph.add_edge("tools", "agent")
agent = graph.compile(checkpointer=MemorySaver())assets/graph-template.pyfrom langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
# In-memory (development)
checkpointer = MemorySaver()
# SQLite (single-server persistence)
checkpointer = AsyncSqliteSaver.from_conn_string("checkpoints.db")
# PostgreSQL (production -- requires langgraph-checkpoint-postgres)
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
checkpointer = AsyncPostgresSaver.from_conn_string(database_url)thread_idconfig = {"configurable": {"thread_id": "session-abc123"}}# Interrupt before the tool node executes
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["tools"],
)
# Run until interrupted
result = await app.ainvoke(
{"messages": [HumanMessage(content="Delete all records")]},
config=config,
)
# Inspect pending tool calls, get human approval, then resume
result = await app.ainvoke(None, config=config)interrupt_afterasync for event in app.astream(
{"messages": [HumanMessage(content="Summarize this document")]},
config=config,
):
for node_name, output in event.items():
print(f"Node '{node_name}': {output}")async for event in app.astream_events(
{"messages": [HumanMessage(content="Write a poem")]},
config=config,
version="v2",
):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)astream_events()from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
app = FastAPI()
@app.post("/chat")
async def chat(request: ChatRequest):
config = {"configurable": {"thread_id": request.thread_id}}
result = await agent.ainvoke(
{"messages": [HumanMessage(content=request.message)]},
config=config,
)
return {"response": result["messages"][-1].content}
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
config = {"configurable": {"thread_id": request.thread_id}}
async def event_generator():
async for event in agent.astream_events(
{"messages": [HumanMessage(content=request.message)]},
config=config,
version="v2",
):
if event["event"] == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")fastapiasync-patternsreferences/langgraph-workflows.mdreferences/tools.mdassets/graph-template.py