build-your-own-openclaw-agent-tutorial
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBuild Your Own OpenClaw Agent Tutorial
构建你自己的OpenClaw Agent教程
Skill by ara.so — Hermes Skills collection.
A comprehensive tutorial for building AI agents progressively, from a basic chat loop to a production-ready autonomous agent system. This project walks through 18 steps covering single-agent capabilities, event-driven architecture, multi-agent collaboration, and production features like memory and concurrency control.
由ara.so提供的Skill — Hermes Skills合集。
一份循序渐进构建AI Agent的全面教程,从基础聊天循环到可投入生产的自主智能体系统。本项目包含18个步骤,涵盖单智能体能力、事件驱动架构、多智能体协作,以及记忆、并发控制等生产级特性。
What This Tutorial Teaches
本教程涵盖内容
The tutorial is organized into 4 phases:
- Phase 1 (Steps 0-6): Single agent with tools, skills, persistence, and web access
- Phase 2 (Steps 7-10): Event-driven architecture with multi-platform support
- Phase 3 (Steps 11-15): Autonomous agents with routing and collaboration
- Phase 4 (Steps 16-17): Production features like concurrency and long-term memory
教程分为4个阶段:
- 阶段1(步骤0-6):具备工具、技能、持久化和网络访问能力的单智能体
- 阶段2(步骤7-10):支持多平台的事件驱动架构
- 阶段3(步骤11-15):具备路由与协作能力的自主智能体
- 阶段4(步骤16-17):并发控制、长期记忆等生产级特性
Initial Setup
初始设置
Clone the Repository
克隆仓库
bash
git clone https://github.com/czl9707/build-your-own-openclaw.git
cd build-your-own-openclawbash
git clone https://github.com/czl9707/build-your-own-openclaw.git
cd build-your-own-openclawConfigure API Keys
配置API密钥
bash
undefinedbash
undefinedCopy example config
复制示例配置
cp default_workspace/config.example.yaml default_workspace/config.user.yaml
Edit `default_workspace/config.user.yaml`:
```yaml
llm:
model: "gpt-4" # or anthropic/claude-3-5-sonnet-20241022, etc.
api_key: "${OPENAI_API_KEY}" # Use environment variable
# See https://docs.litellm.ai/docs/providers for all providerscp default_workspace/config.example.yaml default_workspace/config.user.yaml
编辑 `default_workspace/config.user.yaml`:
```yaml
llm:
model: "gpt-4" # 或 anthropic/claude-3-5-sonnet-20241022 等
api_key: "${OPENAI_API_KEY}" # 使用环境变量
# 查看所有支持的提供商:https://docs.litellm.ai/docs/providersOptional: Add additional services
可选:添加额外服务
web:
search_api_key: "${SERPER_API_KEY}"
undefinedweb:
search_api_key: "${SERPER_API_KEY}"
undefinedInstall Dependencies (for each step)
安装依赖(每个步骤单独安装)
bash
cd 00-chat-loop # or any step directory
pip install -r requirements.txtbash
cd 00-chat-loop # 或任意步骤目录
pip install -r requirements.txtPhase 1: Building a Capable Single Agent
阶段1:构建具备核心能力的单智能体
Step 0: Basic Chat Loop
步骤0:基础聊天循环
The foundation - a simple conversation loop with an LLM.
python
undefined基础模块——一个与LLM交互的简单对话循环。
python
undefined00-chat-loop/main.py
00-chat-loop/main.py
from litellm import completion
def chat_loop():
messages = []
while True:
user_input = input("You: ")
if user_input.lower() in ['/exit', '/quit']:
break
messages.append({"role": "user", "content": user_input})
response = completion(
model="gpt-4",
messages=messages,
api_key="${OPENAI_API_KEY}"
)
assistant_message = response.choices[0].message.content
messages.append({"role": "assistant", "content": assistant_message})
print(f"Assistant: {assistant_message}")if name == "main":
chat_loop()
Run it:
```bash
cd 00-chat-loop
python main.pyfrom litellm import completion
def chat_loop():
messages = []
while True:
user_input = input("You: ")
if user_input.lower() in ['/exit', '/quit']:
break
messages.append({"role": "user", "content": user_input})
response = completion(
model="gpt-4",
messages=messages,
api_key="${OPENAI_API_KEY}"
)
assistant_message = response.choices[0].message.content
messages.append({"role": "assistant", "content": assistant_message})
print(f"Assistant: {assistant_message}")if name == "main":
chat_loop()
运行方式:
```bash
cd 00-chat-loop
python main.pyStep 1: Adding Tools
步骤1:添加工具
Give your agent function-calling capabilities.
python
undefined为智能体赋予函数调用能力。
python
undefined01-tools/tools.py
01-tools/tools.py
def get_current_weather(location: str) -> dict:
"""Get the current weather for a location."""
# Tool implementation
return {"location": location, "temperature": 72, "condition": "sunny"}
def get_current_weather(location: str) -> dict:
"""Get the current weather for a location."""
# 工具实现
return {"location": location, "temperature": 72, "condition": "sunny"}
Tool schema for LLM
供LLM使用的工具描述schema
weather_tool = {
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g. San Francisco"
}
},
"required": ["location"]
}
}
}
Using tools in the chat loop:
```python
import json
from litellm import completion
response = completion(
model="gpt-4",
messages=messages,
tools=[weather_tool],
tool_choice="auto"
)weather_tool = {
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g. San Francisco"
}
},
"required": ["location"]
}
}
}
在聊天循环中使用工具:
```python
import json
from litellm import completion
response = completion(
model="gpt-4",
messages=messages,
tools=[weather_tool],
tool_choice="auto"
)Handle tool calls
处理工具调用
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
# Execute tool
result = get_current_weather(**arguments)
# Add tool result to messages
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})undefinedif response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
# 执行工具
result = get_current_weather(**arguments)
# 将工具结果添加到对话历史
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})undefinedStep 2: Skills with SKILL.md
步骤2:基于SKILL.md扩展技能
Extend agent capabilities through markdown skill files.
markdown
<!-- skills/web_search.md -->通过Markdown技能文件扩展智能体能力。
markdown
<!-- skills/web_search.md -->Web Search Skill
Web搜索技能
You can search the internet using the tool.
search_web你可以使用工具进行互联网搜索。
search_webWhen to Use
使用场景
- User asks for current information
- Need to verify facts
- Looking for recent news
- 用户询问时效性信息
- 需要验证事实
- 查询近期新闻
Example
示例
User: "What's the latest news on AI?"
You: Let me search for that. [calls search_web("latest AI news")]
Loading skills:
```python用户: "AI领域的最新消息是什么?"
助手: 让我搜索一下。[调用search_web("latest AI news")]
加载技能:
```python02-skills/skill_loader.py
02-skills/skill_loader.py
import os
def load_skills(skills_dir="skills"):
"""Load all .md files from skills directory."""
skills_content = []
for filename in os.listdir(skills_dir):
if filename.endswith(".md"):
with open(os.path.join(skills_dir, filename), 'r') as f:
skills_content.append(f.read())
return "\n\n".join(skills_content)import os
def load_skills(skills_dir="skills"):
"""加载技能目录下所有.md文件。"""
skills_content = []
for filename in os.listdir(skills_dir):
if filename.endswith(".md"):
with open(os.path.join(skills_dir, filename), 'r') as f:
skills_content.append(f.read())
return "\n\n".join(skills_content)Add to system prompt
添加到系统提示词
system_prompt = f"""You are a helpful assistant.
system_prompt = f"""你是一个乐于助人的助手。
Your Skills
你的技能
{load_skills()}
"""
undefined{load_skills()}
"""
undefinedStep 3: Conversation Persistence
步骤3:对话持久化
Save conversations to resume later.
python
undefined保存对话历史以便后续恢复。
python
undefined03-persistence/session_manager.py
03-persistence/session_manager.py
import json
from datetime import datetime
from pathlib import Path
class SessionManager:
def init(self, sessions_dir="sessions"):
self.sessions_dir = Path(sessions_dir)
self.sessions_dir.mkdir(exist_ok=True)
def save_session(self, session_id: str, messages: list):
"""Save conversation history."""
session_file = self.sessions_dir / f"{session_id}.json"
data = {
"session_id": session_id,
"updated_at": datetime.now().isoformat(),
"messages": messages
}
with open(session_file, 'w') as f:
json.dump(data, f, indent=2)
def load_session(self, session_id: str) -> list:
"""Load conversation history."""
session_file = self.sessions_dir / f"{session_id}.json"
if not session_file.exists():
return []
with open(session_file, 'r') as f:
data = json.load(f)
return data.get("messages", [])
def list_sessions(self) -> list:
"""List all available sessions."""
return [f.stem for f in self.sessions_dir.glob("*.json")]
Usage:
```python
manager = SessionManager()import json
from datetime import datetime
from pathlib import Path
class SessionManager:
def init(self, sessions_dir="sessions"):
self.sessions_dir = Path(sessions_dir)
self.sessions_dir.mkdir(exist_ok=True)
def save_session(self, session_id: str, messages: list):
"""保存对话历史。"""
session_file = self.sessions_dir / f"{session_id}.json"
data = {
"session_id": session_id,
"updated_at": datetime.now().isoformat(),
"messages": messages
}
with open(session_file, 'w') as f:
json.dump(data, f, indent=2)
def load_session(self, session_id: str) -> list:
"""加载对话历史。"""
session_file = self.sessions_dir / f"{session_id}.json"
if not session_file.exists():
return []
with open(session_file, 'r') as f:
data = json.load(f)
return data.get("messages", [])
def list_sessions(self) -> list:
"""列出所有可用会话。"""
return [f.stem for f in self.sessions_dir.glob("*.json")]
使用示例:
```python
manager = SessionManager()Load or create session
加载或创建会话
session_id = "my-conversation"
messages = manager.load_session(session_id)
session_id = "my-conversation"
messages = manager.load_session(session_id)
After each exchange
每次对话交互后保存
manager.save_session(session_id, messages)
undefinedmanager.save_session(session_id, messages)
undefinedStep 4: Slash Commands
步骤4:斜杠命令
Direct user control over agent behavior.
python
undefined让用户可以直接控制智能体行为。
python
undefined04-slash-commands/commands.py
04-slash-commands/commands.py
class CommandHandler:
def init(self, session_manager):
self.session_manager = session_manager
self.commands = {
'/new': self.new_session,
'/load': self.load_session,
'/list': self.list_sessions,
'/save': self.save_session,
'/clear': self.clear_session,
'/help': self.show_help
}
def handle(self, user_input: str, current_session: str, messages: list):
"""Handle slash commands."""
parts = user_input.split()
command = parts[0]
args = parts[1:] if len(parts) > 1 else []
if command in self.commands:
return self.commands[command](args, current_session, messages)
return None # Not a command
def new_session(self, args, current_session, messages):
new_id = args[0] if args else f"session_{int(time.time())}"
return {"action": "new_session", "session_id": new_id}
def load_session(self, args, current_session, messages):
if not args:
print("Usage: /load <session_id>")
return {"action": "none"}
loaded = self.session_manager.load_session(args[0])
return {"action": "load_session", "session_id": args[0], "messages": loaded}undefinedclass CommandHandler:
def init(self, session_manager):
self.session_manager = session_manager
self.commands = {
'/new': self.new_session,
'/load': self.load_session,
'/list': self.list_sessions,
'/save': self.save_session,
'/clear': self.clear_session,
'/help': self.show_help
}
def handle(self, user_input: str, current_session: str, messages: list):
"""处理斜杠命令。"""
parts = user_input.split()
command = parts[0]
args = parts[1:] if len(parts) > 1 else []
if command in self.commands:
return self.commands[command](args, current_session, messages)
return None # 不是命令
def new_session(self, args, current_session, messages):
new_id = args[0] if args else f"session_{int(time.time())}"
return {"action": "new_session", "session_id": new_id}
def load_session(self, args, current_session, messages):
if not args:
print("用法: /load <session_id>")
return {"action": "none"}
loaded = self.session_manager.load_session(args[0])
return {"action": "load_session", "session_id": args[0], "messages": loaded}undefinedStep 5: Context Compaction
步骤5:上下文压缩
Manage token limits by summarizing old messages.
python
undefined通过总结旧消息来管理token限制。
python
undefined05-compaction/compactor.py
05-compaction/compactor.py
from litellm import completion
class MessageCompactor:
def init(self, max_messages=20):
self.max_messages = max_messages
def compact_if_needed(self, messages: list) -> list:
"""Compact messages if they exceed threshold."""
if len(messages) <= self.max_messages:
return messages
# Keep system message and recent messages
system_msgs = [m for m in messages if m["role"] == "system"]
recent_msgs = messages[-(self.max_messages - 2):]
# Summarize older messages
old_msgs = messages[len(system_msgs):-len(recent_msgs)]
summary = self._summarize_messages(old_msgs)
return system_msgs + [
{"role": "system", "content": f"Previous conversation summary:\n{summary}"}
] + recent_msgs
def _summarize_messages(self, messages: list) -> str:
"""Generate summary of message history."""
conversation = "\n".join([
f"{m['role']}: {m['content']}" for m in messages
])
response = completion(
model="gpt-4",
messages=[{
"role": "user",
"content": f"Summarize this conversation concisely:\n\n{conversation}"
}]
)
return response.choices[0].message.contentundefinedfrom litellm import completion
class MessageCompactor:
def init(self, max_messages=20):
self.max_messages = max_messages
def compact_if_needed(self, messages: list) -> list:
"""如果消息数量超过阈值,压缩上下文。"""
if len(messages) <= self.max_messages:
return messages
# 保留系统消息和近期消息
system_msgs = [m for m in messages if m["role"] == "system"]
recent_msgs = messages[-(self.max_messages - 2):]
# 总结旧消息
old_msgs = messages[len(system_msgs):-len(recent_msgs)]
summary = self._summarize_messages(old_msgs)
return system_msgs + [
{"role": "system", "content": f"之前对话总结:\n{summary}"}
] + recent_msgs
def _summarize_messages(self, messages: list) -> str:
"""生成对话历史摘要。"""
conversation = "\n".join([
f"{m['role']}: {m['content']}" for m in messages
])
response = completion(
model="gpt-4",
messages=[{
"role": "user",
"content": f"请简洁总结以下对话:\n\n{conversation}"
}]
)
return response.choices[0].message.contentundefinedStep 6: Web Tools
步骤6:网络工具
Give your agent internet access.
python
undefined为智能体提供互联网访问能力。
python
undefined06-web-tools/web_tools.py
06-web-tools/web_tools.py
import requests
import os
def search_web(query: str, num_results: int = 5) -> list:
"""Search the web using Serper API."""
api_key = os.getenv("SERPER_API_KEY")
response = requests.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": api_key},
json={"q": query, "num": num_results}
)
results = response.json()
return [
{
"title": r.get("title"),
"link": r.get("link"),
"snippet": r.get("snippet")
}
for r in results.get("organic", [])
]def fetch_webpage(url: str) -> str:
"""Fetch and extract text from a webpage."""
from bs4 import BeautifulSoup
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(separator="\n", strip=True)undefinedimport requests
import os
def search_web(query: str, num_results: int = 5) -> list:
"""使用Serper API进行网络搜索。"""
api_key = os.getenv("SERPER_API_KEY")
response = requests.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": api_key},
json={"q": query, "num": num_results}
)
results = response.json()
return [
{
"title": r.get("title"),
"link": r.get("link"),
"snippet": r.get("snippet")
}
for r in results.get("organic", [])
]def fetch_webpage(url: str) -> str:
"""获取并提取网页文本内容。"""
from bs4 import BeautifulSoup
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# 移除脚本和样式元素
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(separator="\n", strip=True)undefinedPhase 2: Event-Driven Architecture
阶段2:事件驱动架构
Step 7: Event-Driven Refactor
步骤7:事件驱动重构
Decouple components with an event bus.
python
undefined通过事件总线解耦组件。
python
undefined07-event-driven/event_bus.py
07-event-driven/event_bus.py
from typing import Callable, Dict, List
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
MESSAGE_RECEIVED = "message_received"
MESSAGE_SENT = "message_sent"
TOOL_CALLED = "tool_called"
SESSION_CREATED = "session_created"
@dataclass
class Event:
type: EventType
data: dict
source: str
class EventBus:
def init(self):
self.listeners: Dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, handler: Callable):
"""Subscribe to an event type."""
if event_type not in self.listeners:
self.listeners[event_type] = []
self.listeners[event_type].append(handler)
def publish(self, event: Event):
"""Publish an event to all subscribers."""
if event.type in self.listeners:
for handler in self.listeners[event.type]:
handler(event)from typing import Callable, Dict, List
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
MESSAGE_RECEIVED = "message_received"
MESSAGE_SENT = "message_sent"
TOOL_CALLED = "tool_called"
SESSION_CREATED = "session_created"
@dataclass
class Event:
type: EventType
data: dict
source: str
class EventBus:
def init(self):
self.listeners: Dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, handler: Callable):
"""订阅事件类型。"""
if event_type not in self.listeners:
self.listeners[event_type] = []
self.listeners[event_type].append(handler)
def publish(self, event: Event):
"""向所有订阅者发布事件。"""
if event.type in self.listeners:
for handler in self.listeners[event.type]:
handler(event)Usage
使用示例
bus = EventBus()
def on_message(event: Event):
print(f"Received: {event.data['content']}")
bus.subscribe(EventType.MESSAGE_RECEIVED, on_message)
bus.publish(Event(
type=EventType.MESSAGE_RECEIVED,
data={"content": "Hello!"},
source="user"
))
undefinedbus = EventBus()
def on_message(event: Event):
print(f"收到消息: {event.data['content']}")
bus.subscribe(EventType.MESSAGE_RECEIVED, on_message)
bus.publish(Event(
type=EventType.MESSAGE_RECEIVED,
data={"content": "你好!"},
source="user"
))
undefinedStep 9: Multi-Channel Support
步骤9:多渠道支持
Support Discord, Slack, Telegram, etc.
python
undefined支持Discord、Slack、Telegram等平台。
python
undefined09-channels/discord_channel.py
09-channels/discord_channel.py
import discord
from event_bus import EventBus, Event, EventType
class DiscordChannel:
def init(self, event_bus: EventBus, token: str):
self.event_bus = event_bus
self.client = discord.Client(intents=discord.Intents.default())
self.token = token
@self.client.event
async def on_message(message):
if message.author == self.client.user:
return
# Publish to event bus
self.event_bus.publish(Event(
type=EventType.MESSAGE_RECEIVED,
data={
"content": message.content,
"channel_id": str(message.channel.id),
"user_id": str(message.author.id)
},
source="discord"
))
# Subscribe to outgoing messages
self.event_bus.subscribe(EventType.MESSAGE_SENT, self.send_message)
async def send_message(self, event: Event):
"""Send message to Discord."""
if event.data.get("channel") != "discord":
return
channel = self.client.get_channel(int(event.data["channel_id"]))
await channel.send(event.data["content"])
def start(self):
self.client.run(self.token)undefinedimport discord
from event_bus import EventBus, Event, EventType
class DiscordChannel:
def init(self, event_bus: EventBus, token: str):
self.event_bus = event_bus
self.client = discord.Client(intents=discord.Intents.default())
self.token = token
@self.client.event
async def on_message(message):
if message.author == self.client.user:
return
# 发布到事件总线
self.event_bus.publish(Event(
type=EventType.MESSAGE_RECEIVED,
data={
"content": message.content,
"channel_id": str(message.channel.id),
"user_id": str(message.author.id)
},
source="discord"
))
# 订阅 outgoing 消息事件
self.event_bus.subscribe(EventType.MESSAGE_SENT, self.send_message)
async def send_message(self, event: Event):
"""发送消息到Discord。"""
if event.data.get("channel") != "discord":
return
channel = self.client.get_channel(int(event.data["channel_id"]))
await channel.send(event.data["content"])
def start(self):
self.client.run(self.token)undefinedPhase 3: Multi-Agent Systems
阶段3:多智能体系统
Step 11: Agent Routing
步骤11:智能体路由
Route requests to specialized agents.
python
undefined将请求路由到专业智能体。
python
undefined11-multi-agent-routing/router.py
11-multi-agent-routing/router.py
from litellm import completion
class AgentRouter:
def init(self, agents: dict):
self.agents = agents # {"code": CodeAgent(), "research": ResearchAgent()}
def route(self, user_message: str) -> str:
"""Determine which agent should handle the request."""
agent_descriptions = "\n".join([
f"- {name}: {agent.description}"
for name, agent in self.agents.items()
])
response = completion(
model="gpt-4",
messages=[{
"role": "user",
"content": f"""Which agent should handle this request?Available agents:
{agent_descriptions}
User request: {user_message}
Respond with just the agent name."""
}]
)
agent_name = response.choices[0].message.content.strip()
return agent_name if agent_name in self.agents else "default"undefinedfrom litellm import completion
class AgentRouter:
def init(self, agents: dict):
self.agents = agents # {"code": CodeAgent(), "research": ResearchAgent()}
def route(self, user_message: str) -> str:
"""确定哪个智能体应该处理请求。"""
agent_descriptions = "\n".join([
f"- {name}: {agent.description}"
for name, agent in self.agents.items()
])
response = completion(
model="gpt-4",
messages=[{
"role": "user",
"content": f"""哪个智能体应该处理这个请求?可用智能体:
{agent_descriptions}
用户请求: {user_message}
仅回复智能体名称。"""
}]
)
agent_name = response.choices[0].message.content.strip()
return agent_name if agent_name in self.agents else "default"undefinedStep 12: Scheduled Tasks (Cron Heartbeat)
步骤12:定时任务(Cron心跳)
Run agents on a schedule.
python
undefined按计划运行智能体任务。
python
undefined12-cron-heartbeat/scheduler.py
12-cron-heartbeat/scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
class AgentScheduler:
def init(self, event_bus):
self.scheduler = BackgroundScheduler()
self.event_bus = event_bus
def schedule_task(self, agent_id: str, cron: str, task: callable):
"""Schedule a recurring task."""
self.scheduler.add_job(
func=task,
trigger='cron',
**self._parse_cron(cron),
id=f"{agent_id}_{datetime.now().timestamp()}"
)
def _parse_cron(self, cron: str) -> dict:
"""Parse cron expression to APScheduler format."""
# "0 9 * * *" -> {"hour": 9, "minute": 0}
parts = cron.split()
return {
"minute": parts[0],
"hour": parts[1],
"day": parts[2],
"month": parts[3],
"day_of_week": parts[4]
}
def start(self):
self.scheduler.start()from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
class AgentScheduler:
def init(self, event_bus):
self.scheduler = BackgroundScheduler()
self.event_bus = event_bus
def schedule_task(self, agent_id: str, cron: str, task: callable):
"""调度周期性任务。"""
self.scheduler.add_job(
func=task,
trigger='cron',
**self._parse_cron(cron),
id=f"{agent_id}_{datetime.now().timestamp()}"
)
def _parse_cron(self, cron: str) -> dict:
"""将Cron表达式解析为APScheduler格式。"""
# "0 9 * * *" -> {"hour": 9, "minute": 0}
parts = cron.split()
return {
"minute": parts[0],
"hour": parts[1],
"day": parts[2],
"month": parts[3],
"day_of_week": parts[4]
}
def start(self):
self.scheduler.start()Usage
使用示例
scheduler = AgentScheduler(event_bus)
scheduler.schedule_task(
"morning_brief",
"0 9 * * *", # 9 AM daily
lambda: send_daily_briefing()
)
scheduler.start()
undefinedscheduler = AgentScheduler(event_bus)
scheduler.schedule_task(
"morning_brief",
"0 9 * * *", # 每天上午9点
lambda: send_daily_briefing()
)
scheduler.start()
undefinedStep 15: Agent Dispatch
步骤15:智能体调度
Agents collaborate to complete tasks.
python
undefined智能体协作完成任务。
python
undefined15-agent-dispatch/dispatcher.py
15-agent-dispatch/dispatcher.py
class AgentDispatcher:
def init(self, agents: dict, event_bus):
self.agents = agents
self.event_bus = event_bus
async def dispatch_task(self, task: str, requesting_agent: str):
"""Dispatch a task to another agent."""
# Determine best agent for subtask
target_agent = self._select_agent(task)
# Execute subtask
result = await self.agents[target_agent].execute(task)
# Return result to requesting agent
self.event_bus.publish(Event(
type=EventType.TASK_COMPLETED,
data={
"task": task,
"result": result,
"requester": requesting_agent
},
source=target_agent
))
return resultundefinedclass AgentDispatcher:
def init(self, agents: dict, event_bus):
self.agents = agents
self.event_bus = event_bus
async def dispatch_task(self, task: str, requesting_agent: str):
"""将任务调度给其他智能体。"""
# 确定处理子任务的最佳智能体
target_agent = self._select_agent(task)
# 执行子任务
result = await self.agents[target_agent].execute(task)
# 将结果返回给请求智能体
self.event_bus.publish(Event(
type=EventType.TASK_COMPLETED,
data={
"task": task,
"result": result,
"requester": requesting_agent
},
source=target_agent
))
return resultundefinedPhase 4: Production Features
阶段4:生产级特性
Step 16: Concurrency Control
步骤16:并发控制
Prevent race conditions with multiple agents.
python
undefined防止多智能体场景下的竞态条件。
python
undefined16-concurrency-control/lock_manager.py
16-concurrency-control/lock_manager.py
import asyncio
from contextlib import asynccontextmanager
class LockManager:
def init(self):
self.locks = {}
@asynccontextmanager
async def acquire(self, resource_id: str):
"""Acquire lock for a resource."""
if resource_id not in self.locks:
self.locks[resource_id] = asyncio.Lock()
async with self.locks[resource_id]:
yieldimport asyncio
from contextlib import asynccontextmanager
class LockManager:
def init(self):
self.locks = {}
@asynccontextmanager
async def acquire(self, resource_id: str):
"""获取资源锁。"""
if resource_id not in self.locks:
self.locks[resource_id] = asyncio.Lock()
async with self.locks[resource_id]:
yieldUsage
使用示例
lock_manager = LockManager()
async def process_session(session_id: str):
async with lock_manager.acquire(f"session:{session_id}"):
# Only one agent can access this session at a time
messages = load_session(session_id)
# ... process ...
save_session(session_id, messages)
undefinedlock_manager = LockManager()
async def process_session(session_id: str):
async with lock_manager.acquire(f"session:{session_id}"):
# 同一时间只有一个智能体可以访问该会话
messages = load_session(session_id)
# ... 处理逻辑 ...
save_session(session_id, messages)
undefinedStep 17: Long-Term Memory
步骤17:长期记忆
Store and retrieve relevant memories.
python
undefined存储和检索相关记忆。
python
undefined17-memory/memory_store.py
17-memory/memory_store.py
from chromadb import Client
from chromadb.config import Settings
class MemoryStore:
def init(self, collection_name="agent_memory"):
self.client = Client(Settings(persist_directory="./memory_db"))
self.collection = self.client.get_or_create_collection(collection_name)
def store_memory(self, content: str, metadata: dict):
"""Store a memory with embeddings."""
self.collection.add(
documents=[content],
metadatas=[metadata],
ids=[f"mem_{metadata.get('timestamp', 0)}"]
)
def recall(self, query: str, n_results: int = 5) -> list:
"""Retrieve relevant memories."""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return [
{
"content": doc,
"metadata": meta
}
for doc, meta in zip(
results['documents'][0],
results['metadatas'][0]
)
]from chromadb import Client
from chromadb.config import Settings
class MemoryStore:
def init(self, collection_name="agent_memory"):
self.client = Client(Settings(persist_directory="./memory_db"))
self.collection = self.client.get_or_create_collection(collection_name)
def store_memory(self, content: str, metadata: dict):
"""存储带嵌入向量的记忆。"""
self.collection.add(
documents=[content],
metadatas=[metadata],
ids=[f"mem_{metadata.get('timestamp', 0)}"]
)
def recall(self, query: str, n_results: int = 5) -> list:
"""检索相关记忆。"""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return [
{
"content": doc,
"metadata": meta
}
for doc, meta in zip(
results['documents'][0],
results['metadatas'][0]
)
]Usage
使用示例
memory = MemoryStore()
memory = MemoryStore()
Store memory
存储记忆
memory.store_memory(
"User prefers Python over JavaScript",
{"user_id": "user123", "timestamp": 1234567890}
)
memory.store_memory(
"用户偏好Python而非JavaScript",
{"user_id": "user123", "timestamp": 1234567890}
)
Recall relevant memories
检索相关记忆
relevant = memory.recall("What languages does the user like?")
undefinedrelevant = memory.recall("用户喜欢什么编程语言?")
undefinedCommon Patterns
通用模式
Full Agent Implementation
完整智能体实现
python
undefinedpython
undefinedComplete agent with all features
集成所有特性的完整智能体
class Agent:
def init(self, config_path="config.user.yaml"):
self.config = self.load_config(config_path)
self.event_bus = EventBus()
self.session_manager = SessionManager()
self.memory = MemoryStore()
self.tools = self.load_tools()
self.skills = self.load_skills()
async def process_message(self, message: str, session_id: str):
# Load session with lock
async with lock_manager.acquire(f"session:{session_id}"):
messages = self.session_manager.load_session(session_id)
# Recall relevant memories
memories = self.memory.recall(message)
context = self.build_context(memories)
# Add user message
messages.append({"role": "user", "content": message})
# Compact if needed
messages = self.compactor.compact_if_needed(messages)
# Get response with tools
response = await completion(
model=self.config['llm']['model'],
messages=[{"role": "system", "content": context}] + messages,
tools=self.tools
)
# Handle tool calls
while response.choices[0].message.tool_calls:
# Execute tools and continue
pass
# Store memory of interaction
self.memory.store_memory(
f"User: {message}\nAssistant: {response_text}",
{"session_id": session_id, "timestamp": time.time()}
)
# Save session
self.session_manager.save_session(session_id, messages)
return response_textundefinedclass Agent:
def init(self, config_path="config.user.yaml"):
self.config = self.load_config(config_path)
self.event_bus = EventBus()
self.session_manager = SessionManager()
self.memory = MemoryStore()
self.tools = self.load_tools()
self.skills = self.load_skills()
async def process_message(self, message: str, session_id: str):
# 加锁加载会话
async with lock_manager.acquire(f"session:{session_id}"):
messages = self.session_manager.load_session(session_id)
# 检索相关记忆
memories = self.memory.recall(message)
context = self.build_context(memories)
# 添加用户消息
messages.append({"role": "user", "content": message})
# 必要时压缩上下文
messages = self.compactor.compact_if_needed(messages)
# 调用LLM并使用工具
response = await completion(
model=self.config['llm']['model'],
messages=[{"role": "system", "content": context}] + messages,
tools=self.tools
)
# 处理工具调用
while response.choices[0].message.tool_calls:
# 执行工具并继续对话
pass
# 存储本次交互的记忆
self.memory.store_memory(
f"用户: {message}\n助手: {response_text}",
{"session_id": session_id, "timestamp": time.time()}
)
# 保存会话
self.session_manager.save_session(session_id, messages)
return response_textundefinedTroubleshooting
故障排查
API Key Issues
API密钥问题
bash
undefinedbash
undefinedCheck environment variables
检查环境变量
echo $OPENAI_API_KEY
echo $OPENAI_API_KEY
Verify config.user.yaml uses correct env var syntax
验证config.user.yaml使用正确的环境变量语法
✓ Correct: api_key: "${OPENAI_API_KEY}"
✓ 正确格式: api_key: "${OPENAI_API_KEY}"
✗ Wrong: api_key: "$OPENAI_API_KEY" or api_key: "sk-..."
✗ 错误格式: api_key: "$OPENAI_API_KEY" 或 api_key: "sk-..."
undefinedundefinedLiteLLM Provider Errors
LiteLLM提供商错误
python
undefinedpython
undefinedTest your LLM config
测试你的LLM配置
from litellm import completion
response = completion(
model="gpt-4", # or your model
messages=[{"role": "user", "content": "test"}],
api_key="${OPENAI_API_KEY}"
)
print(response.choices[0].message.content)
undefinedfrom litellm import completion
response = completion(
model="gpt-4", # 或你的模型
messages=[{"role": "user", "content": "test"}],
api_key="${OPENAI_API_KEY}"
)
print(response.choices[0].message.content)
undefinedSession Persistence Issues
会话持久化问题
python
undefinedpython
undefinedVerify sessions directory exists and is writable
验证会话目录存在且可写
import os
sessions_dir = "sessions"
os.makedirs(sessions_dir, exist_ok=True)
print(f"Sessions dir: {os.path.abspath(sessions_dir)}")
undefinedimport os
sessions_dir = "sessions"
os.makedirs(sessions_dir, exist_ok=True)
print(f"会话目录: {os.path.abspath(sessions_dir)}")
undefinedMemory/ChromaDB Issues
记忆/ChromaDB问题
bash
undefinedbash
undefinedInstall ChromaDB dependencies
安装ChromaDB依赖
pip install chromadb
pip install chromadb
Clear memory database if corrupted
如果数据库损坏,清空记忆数据库
rm -rf ./memory_db
undefinedrm -rf ./memory_db
undefinedNext Steps
后续步骤
- Start with Step 0 and progress sequentially
- Each step builds on previous ones
- Reference the example project (pickle-bot) for a complete implementation
- Customize each step for your specific use case
- Mix and match features based on your needs
- 从步骤0开始,循序渐进学习
- 每个步骤都基于前序步骤构建
- 参考示例项目(pickle-bot)获取完整实现
- 根据你的特定需求定制每个步骤
- 根据需要混合搭配不同特性
Resources
参考资源
- LiteLLM Documentation
- Tutorial Homepage
- OpenClaw Project
- Each step's README.md contains detailed explanations
- LiteLLM文档
- 教程主页
- OpenClaw项目
- 每个步骤的README.md包含详细说明