Loading...
Loading...
Compare original and translation side by side
undefinedundefinedundefinedundefinedprep()exec()post()class SummarizeFile(Node):
def prep(self, shared):
# Get data from shared store
return shared["data"]
def exec(self, prep_res):
# Process with LLM (retries built-in)
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# Write results back to shared store
shared["summary"] = exec_res
return "default" # Action for flow controlprep()exec()post()class SummarizeFile(Node):
def prep(self, shared):
# Get data from shared store
return shared["data"]
def exec(self, prep_res):
# Process with LLM (retries built-in)
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# Write results back to shared store
shared["summary"] = exec_res
return "default" # Action for flow controlundefinedundefinedundefinedundefinedclass LoadData(Node):
def post(self, shared, prep_res, exec_res):
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
return call_llm(f"Summarize: {prep_res}")
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
return "default"class LoadData(Node):
def post(self, shared, prep_res, exec_res):
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
return call_llm(f"Summarize: {prep_res}")
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
return "default"undefinedundefinedclass MapSummaries(BatchNode):
def prep(self, shared):
# Chunk big file
content = shared["data"]
chunk_size = 10000
return [content[i:i+chunk_size]
for i in range(0, len(content), chunk_size)]
def exec(self, chunk):
# Process each chunk
return call_llm(f"Summarize: {chunk}")
def post(self, shared, prep_res, exec_res_list):
# Combine all results
shared["summary"] = "\n".join(exec_res_list)
return "default"class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
filenames = list(shared["data"].keys())
# Return list of parameter dicts
return [{"filename": fn} for fn in filenames]
class LoadFile(Node):
def prep(self, shared):
# Access filename from params
filename = self.params["filename"]
return filenameclass MapSummaries(BatchNode):
def prep(self, shared):
# Chunk big file
content = shared["data"]
chunk_size = 10000
return [content[i:i+chunk_size]
for i in range(0, len(content), chunk_size)]
def exec(self, chunk):
# Process each chunk
return call_llm(f"Summarize: {chunk}")
def post(self, shared, prep_res, exec_res_list):
# Combine all results
shared["summary"] = "\n".join(exec_res_list)
return "default"class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
filenames = list(shared["data"].keys())
# Return list of parameter dicts
return [{"filename": fn} for fn in filenames]
class LoadFile(Node):
def prep(self, shared):
# Access filename from params
filename = self.params["filename"]
return filenameclass DecideAction(Node):
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0]
action_data = yaml.safe_load(yaml_str)
return action_dataclass DecideAction(Node):
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0]
action_data = yaml.safe_load(yaml_str)
return action_dataundefinedundefinedclass ChunkDocs(BatchNode):
def prep(self, shared):
return shared["files"]
def exec(self, filepath):
with open(filepath, "r") as f:
text = f.read()
# Chunk by 100 chars
size = 100
return [text[i:i+size] for i in range(0, len(text), size)]
def post(self, shared, prep_res, exec_res_list):
shared["all_chunks"] = [c for chunks in exec_res_list
for c in chunks]
chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)class RetrieveDocs(Node):
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
return chunks[I[0][0]]
embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)class ChunkDocs(BatchNode):
def prep(self, shared):
return shared["files"]
def exec(self, filepath):
with open(filepath, "r") as f:
text = f.read()
# Chunk by 100 chars
size = 100
return [text[i:i+size] for i in range(0, len(text), size)]
def post(self, shared, prep_res, exec_res_list):
shared["all_chunks"] = [c for chunks in exec_res_list
for c in chunks]
chunk_docs >> embed_docs >> build_index
offline_flow = Flow(start=chunk_docs)class RetrieveDocs(Node):
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
return chunks[I[0][0]]
embed_query >> retrieve_docs >> generate_answer
online_flow = Flow(start=embed_query)class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "default"class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "default"
**AsyncParallelBatchNode** - Process multiple items concurrently:
```python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["texts"] # List of texts
async def exec_async(self, text):
# Runs in parallel for each text
return await call_llm_async(f"Summarize: {text}")
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
**AsyncParallelBatchNode** - 并发处理多个项:
```python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["texts"] # List of texts
async def exec_async(self, text):
# Runs in parallel for each text
return await call_llm_async(f"Summarize: {text}")
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
return call_llm(f"Create outline for: {topic}")
def post(self, shared, prep_res, exec_res):
shared["outline"] = exec_res
class WriteSection(Node):
def exec(self, outline):
return call_llm(f"Write content: {outline}")
def post(self, shared, prep_res, exec_res):
shared["draft"] = exec_res
class ReviewAndRefine(Node):
def exec(self, draft):
return call_llm(f"Review and improve: {draft}")class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
return call_llm(f"Create outline for: {topic}")
def post(self, shared, prep_res, exec_res):
shared["outline"] = exec_res
class WriteSection(Node):
def exec(self, outline):
return call_llm(f"Write content: {outline}")
def post(self, shared, prep_res, exec_res):
shared["draft"] = exec_res
class ReviewAndRefine(Node):
def exec(self, draft):
return call_llm(f"Review and improve: {draft}")undefinedundefinedclass SummarizeNode(Node):
def exec(self, prep_res):
prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
# Validate
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_resultclass SummarizeNode(Node):
def exec(self, prep_res):
prompt = f"""
Summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
# Validate
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_resultassets/examples/assets/template/assets/examples/assets/template/assets/examples/01_chat.pyundefinedassets/examples/01_chat.pyundefined
**What it demonstrates:**
- Message history management
- Self-looping nodes
- Graceful exit handling
- User input processing
**Run it:** `python assets/examples/01_chat.py`
---
**演示内容:**
- 对话历史管理
- 自循环节点
- 优雅退出处理
- 用户输入处理
**运行方式:** `python assets/examples/01_chat.py`
---assets/examples/02_workflow.pyundefinedassets/examples/02_workflow.pyundefined
**What it demonstrates:**
- Task decomposition
- Sequential workflows
- Progressive content generation
**Run it:** `python assets/examples/02_workflow.py "AI Safety"`
---
**演示内容:**
- 任务分解
- 顺序工作流
- 渐进式内容生成
**运行方式:** `python assets/examples/02_workflow.py "AI Safety"`
---assets/examples/03_agent.pyundefinedassets/examples/03_agent.pyundefined
**What it demonstrates:**
- Dynamic action selection
- Branching logic
- Agent decision-making
- Iterative research loops
**Run it:** `python assets/examples/03_agent.py "Nobel Prize 2024"`
---
**演示内容:**
- 动态动作选择
- 分支逻辑
- Agent决策机制
- 迭代式研究循环
**运行方式:** `python assets/examples/03_agent.py "Nobel Prize 2024"`
---assets/examples/04_rag.pyundefinedassets/examples/04_rag.pyundefined
**What it demonstrates:**
- Document embedding and indexing
- Similarity search
- Context-based generation
- Multi-stage pipelines
**Run it:** `python assets/examples/04_rag.py --"How to install PocketFlow?"`
---
**演示内容:**
- 文档嵌入与索引
- 相似度搜索
- 基于上下文的生成
- 多阶段流水线
**运行方式:** `python assets/examples/04_rag.py --"How to install PocketFlow?"`
---assets/examples/05_structured_output.pyundefinedassets/examples/05_structured_output.pyundefinedyaml")[1].split("yaml")[1].split("
**What it demonstrates:**
- Structured LLM responses with YAML
- Schema validation
- Retry logic for parsing
- Data extraction patterns
**Run it:** `python assets/examples/05_structured_output.py`
---
**演示内容:**
- 基于YAML的LLM结构化响应
- Schema验证
- 解析重试逻辑
- 数据提取模式
**运行方式:** `python assets/examples/05_structured_output.py`
---assets/examples/06_multi_agent.pyundefinedassets/examples/06_multi_agent.pyundefined
**What it demonstrates:**
- AsyncNode for concurrent operations
- Message queues for inter-agent communication
- Multi-agent coordination
- Game logic with termination
**Run it:** `python assets/examples/06_multi_agent.py`
---
**演示内容:**
- 用AsyncNode实现并发操作
- 用消息队列实现Agent间通信
- 多Agent协调
- 含终止逻辑的游戏规则
**运行方式:** `python assets/examples/06_multi_agent.py`
---assets/template/main.pyflow.pynodes.pyutils.pyrequirements.txtcd assets/template/
pip install -r requirements.txtassets/template/main.pyflow.pynodes.pyutils.pyrequirements.txtcd assets/template/
pip install -r requirements.txt
**What it demonstrates:**
- Separation of concerns
- Factory pattern for flows
- Clean data flow with shared store
- Configuration best practices
---
**演示内容:**
- 关注点分离
- 流的工厂模式
- 基于共享存储的清晰数据流
- 配置最佳实践
---assets/COOKBOOK_GUIDE.mdassets/COOKBOOK_GUIDE.md| Pattern | Use Case | Key Component |
|---|---|---|
| Agent | Dynamic action selection | Action space + context management |
| Workflow | Multi-step task decomposition | Chained nodes |
| RAG | Context-aware answers | Offline indexing + online retrieval |
| Map Reduce | Large input processing | BatchNode with aggregation |
| Multi-Agent | Collaborative agents | Message queues + AsyncNode |
| Structured Output | Typed LLM responses | YAML prompting + validation |
| 模式 | 适用场景 | 核心组件 |
|---|---|---|
| Agent | 动态动作选择 | 动作空间 + 上下文管理 |
| 工作流 | 多步骤任务分解 | 链式节点 |
| RAG | 基于上下文的问答 | 离线索引 + 在线检索 |
| Map Reduce | 大输入处理 | 带聚合功能的BatchNode |
| 多Agent | 协作式Agent | 消息队列 + AsyncNode |
| 结构化输出 | 类型化LLM响应 | YAML提示词 + 验证 |
undefinedundefined
**Best Practice:** Separate data schema from compute logic using shared store.
**最佳实践:** 用共享存储分离数据Schema与计算逻辑。class SummarizeFile(Node):
def prep(self, shared):
# Access node's params
filename = self.params["filename"]
return shared["data"].get(filename, "")class SummarizeFile(Node):
def prep(self, shared):
# Access node's params
filename = self.params["filename"]
return shared["data"].get(filename, "")undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedclass AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return messageclass AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return messageundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedmy_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.mdmy_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.mdflowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]read_databaseread_databasesread_csvsread_databaseread_databasesread_csvsreferences/core_abstraction.mdreferences/core_abstraction.mdreferences/core_abstraction.mdreferences/core_abstraction.md