Loading...
Loading...
PocketFlow framework for building LLM applications with graph-based abstractions, design patterns, and agentic coding workflows
npx skill4agent add nickth3man/claude_market pocketflow# Shared Store: Central data storage
shared = {
"data": {},
"summary": {},
"config": {...}
}
# Graph: Nodes connected by transitions
node_a >> node_b >> node_c
flow = Flow(start=node_a)
flow.run(shared)prep()exec()post()class SummarizeFile(Node):
def prep(self, shared):
# Get data from shared store
return shared["data"]
def exec(self, prep_res):
# Process with LLM (retries built-in)
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# Write results back to shared store
shared["summary"] = exec_res
return "default" # Action for flow control# Simple sequence
load_data >> summarize >> save_result
flow = Flow(start=load_data)
flow.run(shared)
# Branching with actions
review - "approved" >> payment
review - "needs_revision" >> revise
review - "rejected" >> finish
revise >> review # Loop back
flow = Flow(start=review)class LoadData(Node):
def post(self, shared, prep_res, exec_res):
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
return call_llm(f"Summarize: {prep_res}")
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
return "default"
# Connect and run
load_data >> summarize
flow = Flow(start=load_data)
flow.run(shared)class MapSummaries(BatchNode):
def prep(self, shared):
# Chunk big file
content = shared["data"]
chunk_size = 10000
return [content[i:i+chunk_size]
for i in range(0, len(content), chunk_size)]
def exec(self, chunk):
# Process each chunk
return call_llm(f"Summarize: {chunk}")
def post(self, shared, prep_res, exec_res_list):
# Combine all results
shared["summary"] = "\n".join(exec_res_list)
return "default"class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
filenames = list(shared["data"].keys())
# Return list of parameter dicts
return [{"filename": fn} for fn in filenames]
class LoadFile(Node):
def prep(self, shared):
# Access filename from params
filename = self.params["filename"]
return filenameclass DecideAction(Node):
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0]
action_data = yaml.safe_load(yaml_str)
return action_data
# Build agent graph
decide >> search_web
decide - "answer" >> provide_answer
search_web >> decide # Loop back for more searches
agent_flow = Flow(start=decide)class ChunkDocs(BatchNode):
def prep(self, shared):
return shared["files"]
def exec(self, filepath):
with open(filepath, "r") as f:
text = f.read()
# Chunk by 100 chars
size = 100
return [text[i:i+size] for i in range(0, len(text), size)]
def post(self, shared, prep_res, exec_res_list):
shared["all_chunks"] = [c for chunks in exec_res_list
for c in chunks]
chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)class RetrieveDocs(Node):
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
return chunks[I[0][0]]
embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "default"
# Must wrap in AsyncFlow
node = SummarizeThenVerify()
flow = AsyncFlow(start=node)
await flow.run_async(shared)class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["texts"] # List of texts
async def exec_async(self, text):
# Runs in parallel for each text
return await call_llm_async(f"Summarize: {text}")
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
return call_llm(f"Create outline for: {topic}")
def post(self, shared, prep_res, exec_res):
shared["outline"] = exec_res
class WriteSection(Node):
def exec(self, outline):
return call_llm(f"Write content: {outline}")
def post(self, shared, prep_res, exec_res):
shared["draft"] = exec_res
class ReviewAndRefine(Node):
def exec(self, draft):
return call_llm(f"Review and improve: {draft}")
# Chain the workflow
outline >> write >> review
workflow = Flow(start=outline)class SummarizeNode(Node):
def exec(self, prep_res):
prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
# Validate
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_resultassets/examples/assets/template/assets/examples/01_chat.py# Key pattern: Self-looping node
chat_node = ChatNode()
chat_node - "continue" >> chat_node # Loop for continuous chat
flow = Flow(start=chat_node)python assets/examples/01_chat.pyassets/examples/02_workflow.py# Sequential pipeline
outline >> draft >> refine
flow = Flow(start=outline)python assets/examples/02_workflow.py "AI Safety"assets/examples/03_agent.py# Branching based on decision
decide - "search" >> search
decide - "answer" >> answer
search - "continue" >> decide # Loop backpython assets/examples/03_agent.py "Nobel Prize 2024"assets/examples/04_rag.py# Stage 1: Offline indexing
embed_docs >> build_index
offline_flow = Flow(start=embed_docs)
# Stage 2: Online query
embed_query >> retrieve >> generate
online_flow = Flow(start=embed_query)python assets/examples/04_rag.py --"How to install PocketFlow?"assets/examples/05_structured_output.py# Parse YAML from LLM response
yaml_str = response.split("```yaml")[1].split("```")[0]
structured_result = yaml.safe_load(yaml_str)
# Validate structure
assert "name" in structured_result
assert "email" in structured_resultpython assets/examples/05_structured_output.pyassets/examples/06_multi_agent.py# Agents with message queues
shared = {
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue()
}
# Run concurrently
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)python assets/examples/06_multi_agent.pyassets/template/main.pyflow.pynodes.pyutils.pyrequirements.txtcd assets/template/
pip install -r requirements.txt
# Edit utils.py to add your LLM API key
python main.pyassets/COOKBOOK_GUIDE.md| Pattern | Use Case | Key Component |
|---|---|---|
| Agent | Dynamic action selection | Action space + context management |
| Workflow | Multi-step task decomposition | Chained nodes |
| RAG | Context-aware answers | Offline indexing + online retrieval |
| Map Reduce | Large input processing | BatchNode with aggregation |
| Multi-Agent | Collaborative agents | Message queues + AsyncNode |
| Structured Output | Typed LLM responses | YAML prompting + validation |
# Design data structure first
shared = {
"user": {
"id": "user123",
"context": {
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {}
}class SummarizeFile(Node):
def prep(self, shared):
# Access node's params
filename = self.params["filename"]
return shared["data"].get(filename, "")
# Set params
node = SummarizeFile()
node.set_params({"filename": "report.txt"})# Automatic retries
my_node = SummarizeFile(max_retries=3, wait=10)
# Graceful fallback
class ResilientNode(Node):
def exec_fallback(self, prep_res, exc):
# Return fallback instead of crashing
return "There was an error processing your request."# Flows can act as nodes
node_a >> node_b
subflow = Flow(start=node_a)
# Connect to other nodes
subflow >> node_c
# Create parent flow
parent_flow = Flow(start=subflow)class AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return message
# Create self-loop for continuous listening
agent = AgentNode()
agent >> agent
flow = AsyncFlow(start=agent)# OpenAI
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Anthropic Claude
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY")
r = client.messages.create(
model="claude-sonnet-4-0",
messages=[{"role": "user", "content": prompt}]
)
return r.content[0].text
# Google Gemini
def call_llm(prompt):
from google import genai
client = genai.Client(api_key='GEMINI_API_KEY')
response = client.models.generate_content(
model='gemini-2.5-pro',
contents=prompt
)
return response.text# OpenAI
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
response = client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
embedding = response.data[0].embedding# Fixed-size chunking
def fixed_size_chunk(text, chunk_size=100):
return [text[i:i+chunk_size]
for i in range(0, len(text), chunk_size)]
# Sentence-based chunking
import nltk
def sentence_based_chunk(text, max_sentences=2):
sentences = nltk.sent_tokenize(text)
return [" ".join(sentences[i:i+max_sentences])
for i in range(0, len(sentences), max_sentences)]my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.mdflowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]read_databaseread_databasesread_csvsreferences/core_abstraction.mdreferences/core_abstraction.md