agent-skills-context-engineering
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAgent Skills for Context Engineering
用于上下文工程的Agent技能
Skill by ara.so — AI Agent Skills collection.
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems. This skill teaches principles for managing LLM context windows, designing effective agent architectures, and building production-grade agent systems.
由 ara.so 提供的技能 — AI Agent技能合集。
这是一套关于上下文工程、多Agent架构和生产级Agent系统的全面Agent技能集合。本技能教授管理LLM上下文窗口、设计高效Agent架构以及构建生产级Agent系统的原则。
What This Project Does
项目功能
Agent Skills for Context Engineering provides battle-tested patterns for:
- Context Management: Managing limited attention budgets, avoiding lost-in-middle degradation
- Multi-Agent Systems: Orchestrator, peer-to-peer, and hierarchical architectures
- Memory Systems: Short-term, long-term, and graph-based memory patterns
- Tool Design: Building tools that agents can use effectively
- Evaluation: LLM-as-judge frameworks for measuring agent quality
- Production Systems: Hosted agents with sandboxed VMs and multiplayer support
Unlike prompt engineering (crafting instructions), context engineering addresses holistic curation of all information in the context window: system prompts, tool definitions, retrieved documents, message history, and tool outputs.
用于上下文工程的Agent技能提供经过实战验证的模式,涵盖:
- 上下文管理:管理有限的注意力预算,避免"中间信息丢失"性能退化
- 多Agent系统:编排器、对等网络和分层架构
- 内存系统:短期、长期和基于图的内存模式
- 工具设计:构建Agent可有效使用的工具
- 评估:用于衡量Agent质量的LLM-as-judge框架
- 生产系统:带有沙箱虚拟机和多人协作支持的托管Agent
与提示工程(编写指令)不同,上下文工程针对上下文窗口中所有信息的整体管理:系统提示、工具定义、检索到的文档、消息历史和工具输出。
Installation
安装方法
For Claude Code (Recommended)
适用于Claude Code(推荐)
Step 1: Add the Marketplace
bash
/plugin marketplace add muratcankoylan/Agent-Skills-for-Context-EngineeringStep 2: Install the Plugin
bash
/plugin install context-engineering@context-engineering-marketplaceOr browse and install:
- Select
Browse and install plugins - Select
context-engineering-marketplace - Select
context-engineering - Select
Install now
步骤1:添加插件市场
bash
/plugin marketplace add muratcankoylan/Agent-Skills-for-Context-Engineering步骤2:安装插件
bash
/plugin install context-engineering@context-engineering-marketplace或通过浏览安装:
- 选择
Browse and install plugins - 选择
context-engineering-marketplace - 选择
context-engineering - 选择
Install now
For Cursor (Open Plugins)
适用于Cursor(开放插件)
Add to your :
.cursor/plugins.jsonjson
{
"plugins": [
{
"name": "context-engineering",
"repository": "muratcankoylan/Agent-Skills-for-Context-Engineering"
}
]
}添加到你的:
.cursor/plugins.jsonjson
{
"plugins": [
{
"name": "context-engineering",
"repository": "muratcankoylan/Agent-Skills-for-Context-Engineering"
}
]
}For Individual Skills
安装单个技能
Copy specific skills to your project:
bash
undefined将特定技能复制到你的项目中:
bash
undefinedCreate skills directory
创建技能目录
mkdir -p .claude/skills
mkdir -p .claude/skills
Add a specific skill (example: context-fundamentals)
添加特定技能(示例:context-fundamentals)
curl -o .claude/skills/context-fundamentals.md
https://raw.githubusercontent.com/muratcankoylan/Agent-Skills-for-Context-Engineering/main/skills/context-fundamentals/SKILL.md
https://raw.githubusercontent.com/muratcankoylan/Agent-Skills-for-Context-Engineering/main/skills/context-fundamentals/SKILL.md
Available skills: `context-fundamentals`, `context-degradation`, `context-compression`, `context-optimization`, `latent-briefing`, `multi-agent-patterns`, `memory-systems`, `tool-design`, `filesystem-context`, `hosted-agents`, `evaluation`, `advanced-evaluation`, `project-development`, `bdi-mental-states`curl -o .claude/skills/context-fundamentals.md
https://raw.githubusercontent.com/muratcankoylan/Agent-Skills-for-Context-Engineering/main/skills/context-fundamentals/SKILL.md
https://raw.githubusercontent.com/muratcankoylan/Agent-Skills-for-Context-Engineering/main/skills/context-fundamentals/SKILL.md
可用技能:`context-fundamentals`, `context-degradation`, `context-compression`, `context-optimization`, `latent-briefing`, `multi-agent-patterns`, `memory-systems`, `tool-design`, `filesystem-context`, `hosted-agents`, `evaluation`, `advanced-evaluation`, `project-development`, `bdi-mental-states`Core Concepts
核心概念
Context Window Management
上下文窗口管理
The fundamental challenge: context windows are constrained by attention mechanics, not raw token capacity.
Key degradation patterns:
- Lost-in-the-middle: Models lose track of information in the middle of long contexts
- U-shaped attention: Strong attention at beginning and end, weak in middle
- Attention scarcity: As context grows, attention per token decreases
Solution: Find the smallest possible set of high-signal tokens.
核心挑战:上下文窗口受注意力机制限制,而非原始token容量。
关键退化模式:
- 中间信息丢失:模型在长上下文的中间部分会丢失信息追踪
- U型注意力:对上下文开头和结尾的注意力强,中间部分弱
- 注意力稀缺:随着上下文增长,每个token获得的注意力减少
解决方案:找到最小的高信号token集合。
Progressive Disclosure
渐进式披露
Load information only when needed:
python
undefined仅在需要时加载信息:
python
undefinedskills/init.py - Lazy loading pattern
skills/init.py - 懒加载模式
class SkillRegistry:
def init(self):
self._skills = {}
self._loaded = set()
def get_skill_summary(self, skill_name: str) -> dict:
"""Load only name and description initially."""
return {
"name": skill_name,
"description": self._get_description(skill_name)
}
def load_skill(self, skill_name: str) -> dict:
"""Load full skill content only when activated."""
if skill_name not in self._loaded:
self._skills[skill_name] = self._read_skill_file(skill_name)
self._loaded.add(skill_name)
return self._skills[skill_name]undefinedclass SkillRegistry:
def init(self):
self._skills = {}
self._loaded = set()
def get_skill_summary(self, skill_name: str) -> dict:
"""仅初始加载名称和描述。"""
return {
"name": skill_name,
"description": self._get_description(skill_name)
}
def load_skill(self, skill_name: str) -> dict:
"""仅在激活时加载完整技能内容。"""
if skill_name not in self._loaded:
self._skills[skill_name] = self._read_skill_file(skill_name)
self._loaded.add(skill_name)
return self._skills[skill_name]undefinedContext Compression Strategies
上下文压缩策略
Sliding Window:
python
def sliding_window_context(messages: list, window_size: int = 10) -> list:
"""Keep only recent messages."""
if len(messages) <= window_size:
return messages
# Always keep system message
system_msgs = [m for m in messages if m["role"] == "system"]
recent_msgs = messages[-window_size:]
return system_msgs + recent_msgsSummarization:
python
async def compress_with_summary(messages: list, llm_client) -> list:
"""Compress old messages into summary."""
if len(messages) < 20:
return messages
# Keep recent messages uncompressed
to_compress = messages[1:-10] # Skip system message and recent 10
recent = messages[-10:]
# Generate summary
summary_prompt = f"Summarize these messages concisely:\n{to_compress}"
summary = await llm_client.complete(summary_prompt)
return [
messages[0], # System message
{"role": "assistant", "content": f"[Summary of previous conversation: {summary}]"},
*recent
]滑动窗口:
python
def sliding_window_context(messages: list, window_size: int = 10) -> list:
"""仅保留最近的消息。"""
if len(messages) <= window_size:
return messages
# 始终保留系统消息
system_msgs = [m for m in messages if m["role"] == "system"]
recent_msgs = messages[-window_size:]
return system_msgs + recent_msgs摘要压缩:
python
async def compress_with_summary(messages: list, llm_client) -> list:
"""将旧消息压缩为摘要。"""
if len(messages) < 20:
return messages
# 保留最近的消息不压缩
to_compress = messages[1:-10] # 跳过系统消息和最近10条消息
recent = messages[-10:]
# 生成摘要
summary_prompt = f"Summarize these messages concisely:\n{to_compress}"
summary = await llm_client.complete(summary_prompt)
return [
messages[0], # 系统消息
{"role": "assistant", "content": f"[对话历史摘要: {summary}]"},
*recent
]Multi-Agent Patterns
多Agent模式
Orchestrator Pattern
编排器模式
Single coordinator delegates to specialized workers:
python
from typing import List, Dict
class OrchestratorAgent:
def __init__(self, workers: Dict[str, Agent]):
self.workers = workers
async def process_task(self, task: str) -> str:
# Determine which worker to use
worker_name = await self._route_task(task)
worker = self.workers[worker_name]
# Delegate to worker with minimal context
result = await worker.execute(task)
return result
async def _route_task(self, task: str) -> str:
"""Use LLM to determine which worker handles task."""
routing_prompt = f"""Given this task: {task}
Available workers:
- code_writer: Writes and modifies code
- researcher: Gathers information and analyzes data
- reviewer: Reviews code and provides feedback
Which worker should handle this? Respond with just the worker name."""
return await self.llm.complete(routing_prompt)单个协调者将任务委派给专业的工作Agent:
python
from typing import List, Dict
class OrchestratorAgent:
def __init__(self, workers: Dict[str, Agent]):
self.workers = workers
async def process_task(self, task: str) -> str:
# 确定使用哪个工作Agent
worker_name = await self._route_task(task)
worker = self.workers[worker_name]
# 用最少的上下文将任务委派给工作Agent
result = await worker.execute(task)
return result
async def _route_task(self, task: str) -> str:
"""使用LLM确定哪个工作Agent处理任务。"""
routing_prompt = f"""Given this task: {task}
Available workers:
- code_writer: Writes and modifies code
- researcher: Gathers information and analyzes data
- reviewer: Reviews code and provides feedback
Which worker should handle this? Respond with just the worker name."""
return await self.llm.complete(routing_prompt)Usage
使用示例
orchestrator = OrchestratorAgent({
"code_writer": CodeWriterAgent(),
"researcher": ResearcherAgent(),
"reviewer": ReviewerAgent()
})
result = await orchestrator.process_task("Add error handling to the API client")
undefinedorchestrator = OrchestratorAgent({
"code_writer": CodeWriterAgent(),
"researcher": ResearcherAgent(),
"reviewer": ReviewerAgent()
})
result = await orchestrator.process_task("Add error handling to the API client")
undefinedPeer-to-Peer Pattern
对等网络模式
Agents collaborate directly:
python
class PeerAgent:
def __init__(self, name: str, peers: List['PeerAgent']):
self.name = name
self.peers = peers
self.messages = []
async def broadcast(self, message: str):
"""Send message to all peers."""
for peer in self.peers:
await peer.receive(self.name, message)
async def receive(self, sender: str, message: str):
"""Receive message from peer."""
self.messages.append({
"from": sender,
"content": message,
"timestamp": time.time()
})Agent之间直接协作:
python
class PeerAgent:
def __init__(self, name: str, peers: List['PeerAgent']):
self.name = name
self.peers = peers
self.messages = []
async def broadcast(self, message: str):
"""向所有Peer发送消息。"""
for peer in self.peers:
await peer.receive(self.name, message)
async def receive(self, sender: str, message: str):
"""接收来自Peer的消息。"""
self.messages.append({
"from": sender,
"content": message,
"timestamp": time.time()
})Memory Systems
内存系统
Short-term Memory (Working Context)
短期内存(工作上下文)
python
class WorkingMemory:
def __init__(self, max_items: int = 5):
self.items = []
self.max_items = max_items
def add(self, item: dict):
"""Add item, removing oldest if at capacity."""
self.items.append(item)
if len(self.items) > self.max_items:
self.items.pop(0)
def get_context(self) -> str:
"""Format for inclusion in prompt."""
return "\n".join([
f"- {item['key']}: {item['value']}"
for item in self.items
])python
class WorkingMemory:
def __init__(self, max_items: int = 5):
self.items = []
self.max_items = max_items
def add(self, item: dict):
"""添加项目,达到容量时移除最早的项目。"""
self.items.append(item)
if len(self.items) > self.max_items:
self.items.pop(0)
def get_context(self) -> str:
"""格式化后用于提示词。"""
return "\n".join([
f"- {item['key']}: {item['value']}"
for item in self.items
])Long-term Memory (Retrieval)
长期内存(检索式)
python
import chromadb
from typing import List, Dict
class LongTermMemory:
def __init__(self):
self.client = chromadb.Client()
self.collection = self.client.create_collection("agent_memory")
def store(self, content: str, metadata: dict = None):
"""Store information for later retrieval."""
self.collection.add(
documents=[content],
metadatas=[metadata or {}],
ids=[str(hash(content))]
)
def recall(self, query: str, n_results: int = 3) -> List[Dict]:
"""Retrieve relevant memories."""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return [
{
"content": doc,
"metadata": meta
}
for doc, meta in zip(results['documents'][0], results['metadatas'][0])
]python
import chromadb
from typing import List, Dict
class LongTermMemory:
def __init__(self):
self.client = chromadb.Client()
self.collection = self.client.create_collection("agent_memory")
def store(self, content: str, metadata: dict = None):
"""存储信息供后续检索。"""
self.collection.add(
documents=[content],
metadatas=[metadata or {}],
ids=[str(hash(content))]
)
def recall(self, query: str, n_results: int = 3) -> List[Dict]:
"""检索相关记忆。"""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return [
{
"content": doc,
"metadata": meta
}
for doc, meta in zip(results['documents'][0], results['metadatas'][0])
]Graph-based Memory
基于图的内存
python
import networkx as nx
class GraphMemory:
def __init__(self):
self.graph = nx.DiGraph()
def add_entity(self, entity: str, properties: dict):
"""Add or update entity node."""
self.graph.add_node(entity, **properties)
def add_relation(self, from_entity: str, to_entity: str, relation: str):
"""Add relationship between entities."""
self.graph.add_edge(from_entity, to_entity, relation=relation)
def get_neighbors(self, entity: str, max_depth: int = 2) -> dict:
"""Get connected entities within depth."""
if entity not in self.graph:
return {}
# BFS to find neighbors
neighbors = {}
for node in nx.single_source_shortest_path_length(
self.graph, entity, cutoff=max_depth
):
neighbors[node] = self.graph.nodes[node]
return neighborspython
import networkx as nx
class GraphMemory:
def __init__(self):
self.graph = nx.DiGraph()
def add_entity(self, entity: str, properties: dict):
"""添加或更新实体节点。"""
self.graph.add_node(entity, **properties)
def add_relation(self, from_entity: str, to_entity: str, relation: str):
"""添加实体间的关系。"""
self.graph.add_edge(from_entity, to_entity, relation=relation)
def get_neighbors(self, entity: str, max_depth: int = 2) -> dict:
"""获取指定深度内的关联实体。"""
if entity not in self.graph:
return {}
# BFS查找邻居
neighbors = {}
for node in nx.single_source_shortest_path_length(
self.graph, entity, cutoff=max_depth
):
neighbors[node] = self.graph.nodes[node]
return neighborsTool Design Principles
工具设计原则
Minimal Interface
极简接口
python
from typing import Dict, Any
def search_documentation(query: str, max_results: int = 5) -> list[Dict[str, Any]]:
"""Search documentation with minimal parameters.
Args:
query: Search query string
max_results: Maximum number of results to return (default: 5)
Returns:
List of matching documentation sections with title and content
Example:
results = search_documentation("authentication")
"""
# Implementation
passpython
from typing import Dict, Any
def search_documentation(query: str, max_results: int = 5) -> list[Dict[str, Any]]:
"""使用最少参数搜索文档。
Args:
query: 搜索查询字符串
max_results: 返回的最大结果数(默认:5)
Returns:
包含标题和内容的匹配文档章节列表
Example:
results = search_documentation("authentication")
"""
# 实现代码
passClear Output Format
清晰的输出格式
python
def analyze_code(code: str) -> dict:
"""Analyze code and return structured results.
Returns:
{
"issues": [{"line": int, "severity": str, "message": str}],
"metrics": {"complexity": int, "lines": int},
"suggestions": [str]
}
"""
return {
"issues": [
{"line": 15, "severity": "warning", "message": "Unused variable 'x'"}
],
"metrics": {
"complexity": 7,
"lines": 42
},
"suggestions": [
"Consider extracting this logic into a separate function"
]
}python
def analyze_code(code: str) -> dict:
"""分析代码并返回结构化结果。
Returns:
{
"issues": [{"line": int, "severity": str, "message": str}],
"metrics": {"complexity": int, "lines": int},
"suggestions": [str]
}
"""
return {
"issues": [
{"line": 15, "severity": "warning", "message": "Unused variable 'x'"}
],
"metrics": {
"complexity": 7,
"lines": 42
},
"suggestions": [
"Consider extracting this logic into a separate function"
]
}Context Offloading
上下文卸载
python
import json
from pathlib import Path
def analyze_large_dataset(data_path: str, output_dir: str = ".agent_context") -> str:
"""Analyze data and write detailed results to file.
Returns reference to results file instead of full data in context.
"""
# Create context directory
Path(output_dir).mkdir(exist_ok=True)
# Analyze data
results = perform_analysis(data_path)
# Write detailed results to file
results_file = f"{output_dir}/analysis_results.json"
with open(results_file, 'w') as f:
json.dump(results, f, indent=2)
# Return only summary in context
summary = {
"total_records": results["count"],
"key_findings": results["top_insights"][:3],
"full_results": results_file
}
return f"Analysis complete. Summary: {summary}\nFull results in {results_file}"python
import json
from pathlib import Path
def analyze_large_dataset(data_path: str, output_dir: str = ".agent_context") -> str:
"""分析数据并将详细结果写入文件。
返回结果文件的引用,而非在上下文中返回完整数据。
"""
# 创建上下文目录
Path(output_dir).mkdir(exist_ok=True)
# 分析数据
results = perform_analysis(data_path)
# 将详细结果写入文件
results_file = f"{output_dir}/analysis_results.json"
with open(results_file, 'w') as f:
json.dump(results, f, indent=2)
# 仅在上下文中返回摘要
summary = {
"total_records": results["count"],
"key_findings": results["top_insights"][:3],
"full_results": results_file
}
return f"Analysis complete. Summary: {summary}\nFull results in {results_file}"Filesystem-based Context Management
基于文件系统的上下文管理
Dynamic Discovery
动态发现
python
from pathlib import Path
import yaml
def discover_tools(tools_dir: str = ".agent_tools") -> dict:
"""Dynamically discover available tools from filesystem."""
tools = {}
for tool_file in Path(tools_dir).glob("*.yaml"):
with open(tool_file) as f:
tool_spec = yaml.safe_load(f)
tools[tool_spec["name"]] = tool_spec
return toolspython
from pathlib import Path
import yaml
def discover_tools(tools_dir: str = ".agent_tools") -> dict:
"""从文件系统动态发现可用工具。"""
tools = {}
for tool_file in Path(tools_dir).glob("*.yaml"):
with open(tool_file) as f:
tool_spec = yaml.safe_load(f)
tools[tool_spec["name"]] = tool_spec
return toolsTool definition file: .agent_tools/github_search.yaml
工具定义文件: .agent_tools/github_search.yaml
"""
name: github_search
description: Search GitHub repositories
parameters:
- name: query type: string required: true
- name: language type: string required: false """
undefined"""
name: github_search
description: Search GitHub repositories
parameters:
- name: query type: string required: true
- name: language type: string required: false """
undefinedPlan Persistence
计划持久化
python
import json
from datetime import datetime
class PlanTracker:
def __init__(self, plan_file: str = ".agent_context/current_plan.json"):
self.plan_file = plan_file
def save_plan(self, steps: list):
"""Persist plan to disk."""
plan = {
"created_at": datetime.now().isoformat(),
"steps": steps,
"completed": []
}
with open(self.plan_file, 'w') as f:
json.dump(plan, f, indent=2)
def mark_complete(self, step_index: int):
"""Mark step as complete."""
with open(self.plan_file, 'r') as f:
plan = json.load(f)
plan["completed"].append(step_index)
with open(self.plan_file, 'w') as f:
json.dump(plan, f, indent=2)
def get_next_step(self) -> dict:
"""Get next incomplete step."""
with open(self.plan_file, 'r') as f:
plan = json.load(f)
for i, step in enumerate(plan["steps"]):
if i not in plan["completed"]:
return {"index": i, "step": step}
return Nonepython
import json
from datetime import datetime
class PlanTracker:
def __init__(self, plan_file: str = ".agent_context/current_plan.json"):
self.plan_file = plan_file
def save_plan(self, steps: list):
"""将计划持久化到磁盘。"""
plan = {
"created_at": datetime.now().isoformat(),
"steps": steps,
"completed": []
}
with open(self.plan_file, 'w') as f:
json.dump(plan, f, indent=2)
def mark_complete(self, step_index: int):
"""标记步骤为已完成。"""
with open(self.plan_file, 'r') as f:
plan = json.load(f)
plan["completed"].append(step_index)
with open(self.plan_file, 'w') as f:
json.dump(plan, f, indent=2)
def get_next_step(self) -> dict:
"""获取下一个未完成的步骤。"""
with open(self.plan_file, 'r') as f:
plan = json.load(f)
for i, step in enumerate(plan["steps"]):
if i not in plan["completed"]:
return {"index": i, "step": step}
return NoneLLM-as-Judge Evaluation
LLM-as-Judge评估
Direct Scoring
直接评分
python
from typing import List, Dict
async def score_response(
response: str,
criteria: List[Dict[str, any]],
llm_client
) -> dict:
"""Score response against weighted criteria.
Args:
response: Response to evaluate
criteria: List of {name, description, weight, scale}
llm_client: LLM client for evaluation
Returns:
{
"total_score": float,
"criteria_scores": [{name, score, reasoning}]
}
"""
scores = []
for criterion in criteria:
prompt = f"""Evaluate this response on {criterion['name']}:
{criterion['description']}
Response to evaluate:
{response}
Score from 1-{criterion['scale']} and explain your reasoning.
Format: SCORE: X | REASONING: explanation"""
evaluation = await llm_client.complete(prompt)
# Parse score and reasoning
score_line = [l for l in evaluation.split('\n') if 'SCORE:' in l][0]
score = int(score_line.split('SCORE:')[1].split('|')[0].strip())
reasoning = evaluation.split('REASONING:')[1].strip()
scores.append({
"name": criterion['name'],
"score": score,
"max_score": criterion['scale'],
"weight": criterion['weight'],
"reasoning": reasoning
})
# Calculate weighted total
total = sum(s['score'] / s['max_score'] * s['weight'] for s in scores)
total_weight = sum(c['weight'] for c in criteria)
return {
"total_score": total / total_weight,
"criteria_scores": scores
}python
from typing import List, Dict
async def score_response(
response: str,
criteria: List[Dict[str, any]],
llm_client
) -> dict:
"""根据加权标准对响应进行评分。
Args:
response: 待评估的响应
criteria: 包含{name, description, weight, scale}的标准列表
llm_client: 用于评估的LLM客户端
Returns:
{
"total_score": float,
"criteria_scores": [{name, score, reasoning}]
}
"""
scores = []
for criterion in criteria:
prompt = f"""Evaluate this response on {criterion['name']}:
{criterion['description']}
Response to evaluate:
{response}
Score from 1-{criterion['scale']} and explain your reasoning.
Format: SCORE: X | REASONING: explanation"""
evaluation = await llm_client.complete(prompt)
# 解析评分和理由
score_line = [l for l in evaluation.split('\n') if 'SCORE:' in l][0]
score = int(score_line.split('SCORE:')[1].split('|')[0].strip())
reasoning = evaluation.split('REASONING:')[1].strip()
scores.append({
"name": criterion['name'],
"score": score,
"max_score": criterion['scale'],
"weight": criterion['weight'],
"reasoning": reasoning
})
# 计算加权总分
total = sum(s['score'] / s['max_score'] * s['weight'] for s in scores)
total_weight = sum(c['weight'] for c in criteria)
return {
"total_score": total / total_weight,
"criteria_scores": scores
}Usage
使用示例
result = await score_response(
response="The API returns JSON with user data...",
criteria=[
{
"name": "accuracy",
"description": "Is the information factually correct?",
"weight": 2.0,
"scale": 5
},
{
"name": "completeness",
"description": "Does it cover all aspects of the question?",
"weight": 1.5,
"scale": 5
}
],
llm_client=client
)
undefinedresult = await score_response(
response="The API returns JSON with user data...",
criteria=[
{
"name": "accuracy",
"description": "Is the information factually correct?",
"weight": 2.0,
"scale": 5
},
{
"name": "completeness",
"description": "Does it cover all aspects of the question?",
"weight": 1.5,
"scale": 5
}
],
llm_client=client
)
undefinedPairwise Comparison
两两对比
python
async def compare_responses(
response_a: str,
response_b: str,
criteria: str,
llm_client,
mitigate_bias: bool = True
) -> dict:
"""Compare two responses with position bias mitigation.
Args:
response_a: First response
response_b: Second response
criteria: Evaluation criteria
llm_client: LLM client
mitigate_bias: Run comparison both ways and aggregate
Returns:
{"winner": "A" | "B" | "tie", "reasoning": str, "confidence": float}
"""
async def single_comparison(first: str, second: str) -> dict:
prompt = f"""Compare these two responses based on: {criteria}
Response 1:
{first}
Response 2:
{second}
Which is better? Respond with: WINNER: [1|2|TIE] | REASONING: explanation"""
result = await llm_client.complete(prompt)
winner_line = [l for l in result.split('\n') if 'WINNER:' in l][0]
winner = winner_line.split('WINNER:')[1].split('|')[0].strip()
reasoning = result.split('REASONING:')[1].strip()
return {"winner": winner, "reasoning": reasoning}
# First comparison (A then B)
comp1 = await single_comparison(response_a, response_b)
if not mitigate_bias:
return {
"winner": "A" if comp1["winner"] == "1" else "B" if comp1["winner"] == "2" else "tie",
"reasoning": comp1["reasoning"],
"confidence": 1.0
}
# Second comparison (B then A) to mitigate position bias
comp2 = await single_comparison(response_b, response_a)
# Aggregate results
if comp1["winner"] == "1" and comp2["winner"] == "2":
return {"winner": "A", "reasoning": comp1["reasoning"], "confidence": 1.0}
elif comp1["winner"] == "2" and comp2["winner"] == "1":
return {"winner": "B", "reasoning": comp1["reasoning"], "confidence": 1.0}
elif comp1["winner"] == "TIE" or comp2["winner"] == "TIE":
return {"winner": "tie", "reasoning": "Evaluations were mixed", "confidence": 0.5}
else:
return {"winner": "tie", "reasoning": "Evaluations disagreed", "confidence": 0.3}python
async def compare_responses(
response_a: str,
response_b: str,
criteria: str,
llm_client,
mitigate_bias: bool = True
) -> dict:
"""对比两个响应,减轻位置偏差。
Args:
response_a: 第一个响应
response_b: 第二个响应
criteria: 评估标准
llm_client: LLM客户端
mitigate_bias: 双向对比并汇总结果
Returns:
{"winner": "A" | "B" | "tie", "reasoning": str, "confidence": float}
"""
async def single_comparison(first: str, second: str) -> dict:
prompt = f"""Compare these two responses based on: {criteria}
Response 1:
{first}
Response 2:
{second}
Which is better? Respond with: WINNER: [1|2|TIE] | REASONING: explanation"""
result = await llm_client.complete(prompt)
winner_line = [l for l in result.split('\n') if 'WINNER:' in l][0]
winner = winner_line.split('WINNER:')[1].split('|')[0].strip()
reasoning = result.split('REASONING:')[1].strip()
return {"winner": winner, "reasoning": reasoning}
# 第一次对比(A在前,B在后)
comp1 = await single_comparison(response_a, response_b)
if not mitigate_bias:
return {
"winner": "A" if comp1["winner"] == "1" else "B" if comp1["winner"] == "2" else "tie",
"reasoning": comp1["reasoning"],
"confidence": 1.0
}
# 第二次对比(B在前,A在后)以减轻位置偏差
comp2 = await single_comparison(response_b, response_a)
# 汇总结果
if comp1["winner"] == "1" and comp2["winner"] == "2":
return {"winner": "A", "reasoning": comp1["reasoning"], "confidence": 1.0}
elif comp1["winner"] == "2" and comp2["winner"] == "1":
return {"winner": "B", "reasoning": comp1["reasoning"], "confidence": 1.0}
elif comp1["winner"] == "TIE" or comp2["winner"] == "TIE":
return {"winner": "tie", "reasoning": "Evaluations were mixed", "confidence": 0.5}
else:
return {"winner": "tie", "reasoning": "Evaluations disagreed", "confidence": 0.3}Hosted Agents (Sandboxed Execution)
托管Agent(沙箱执行)
Modal-based Background Agent
基于Modal的后台Agent
python
import modalpython
import modalCreate Modal app
创建Modal应用
app = modal.App("coding-agent")
app = modal.App("coding-agent")
Define image with dependencies
定义包含依赖的镜像
image = modal.Image.debian_slim().pip_install(
"anthropic",
"requests"
)
@app.function(
image=image,
secrets=[modal.Secret.from_name("anthropic-api-key")],
timeout=3600
)
async def run_coding_task(task: str, files: dict) -> dict:
"""Execute coding task in sandboxed environment.
Args:
task: Task description
files: Initial files as {path: content}
Returns:
{
"status": "success" | "error",
"files": {path: content},
"output": str
}
"""
import os
from anthropic import Anthropic
# Initialize Claude
client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
# Write initial files
for path, content in files.items():
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write(content)
# Execute task with agent
messages = [
{
"role": "user",
"content": f"Complete this task: {task}\n\nAvailable files: {list(files.keys())}"
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=messages
)
# Collect modified files
output_files = {}
for path in files.keys():
if os.path.exists(path):
with open(path, 'r') as f:
output_files[path] = f.read()
return {
"status": "success",
"files": output_files,
"output": response.content[0].text
}image = modal.Image.debian_slim().pip_install(
"anthropic",
"requests"
)
@app.function(
image=image,
secrets=[modal.Secret.from_name("anthropic-api-key")],
timeout=3600
)
async def run_coding_task(task: str, files: dict) -> dict:
"""在沙箱环境中执行编码任务。
Args:
task: 任务描述
files: 初始文件,格式为{path: content}
Returns:
{
"status": "success" | "error",
"files": {path: content},
"output": str
}
"""
import os
from anthropic import Anthropic
# 初始化Claude
client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
# 写入初始文件
for path, content in files.items():
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write(content)
# 使用Agent执行任务
messages = [
{
"role": "user",
"content": f"Complete this task: {task}\n\nAvailable files: {list(files.keys())}"
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=messages
)
# 收集修改后的文件
output_files = {}
for path in files.keys():
if os.path.exists(path):
with open(path, 'r') as f:
output_files[path] = f.read()
return {
"status": "success",
"files": output_files,
"output": response.content[0].text
}Client usage
客户端使用
@app.local_entrypoint()
def main():
result = run_coding_task.remote(
task="Add error handling to the API client",
files={
"client.py": "def fetch_data():\n response = requests.get(url)\n return response.json()"
}
)
print(result["output"])
print("\nModified files:")
for path, content in result["files"].items():
print(f"\n{path}:")
print(content)undefined@app.local_entrypoint()
def main():
result = run_coding_task.remote(
task="Add error handling to the API client",
files={
"client.py": "def fetch_data():\n response = requests.get(url)\n return response.json()"
}
)
print(result["output"])
print("\nModified files:")
for path, content in result["files"].items():
print(f"\n{path}:")
print(content)undefinedCommon Patterns
通用模式
Task-Model Fit Analysis
任务-模型适配分析
python
def analyze_task_model_fit(task_description: str) -> dict:
"""Determine if LLM is appropriate for task.
Returns:
{
"recommended": bool,
"model_size": "small" | "medium" | "large",
"approach": "single_shot" | "chain_of_thought" | "multi_agent",
"reasoning": str
}
"""
# Decision tree
is_structured = "json" in task_description.lower() or "schema" in task_description.lower()
is_creative = any(w in task_description.lower() for w in ["generate", "write", "create"])
is_complex = any(w in task_description.lower() for w in ["analyze", "reason", "solve"])
if is_structured:
return {
"recommended": True,
"model_size": "small",
"approach": "single_shot",
"reasoning": "Structured output tasks work well with smaller models"
}
elif is_complex:
return {
"recommended": True,
"model_size": "large",
"approach": "chain_of_thought",
"reasoning": "Complex reasoning requires larger models with step-by-step thinking"
}
elif is_creative:
return {
"recommended": True,
"model_size": "medium",
"approach": "single_shot",
"reasoning": "Creative tasks benefit from medium-sized models"
}
else:
return {
"recommended": False,
"model_size": None,
"approach": None,
"reasoning": "Task may be better suited for traditional programming"
}python
def analyze_task_model_fit(task_description: str) -> dict:
"""确定LLM是否适合该任务。
Returns:
{
"recommended": bool,
"model_size": "small" | "medium" | "large",
"approach": "single_shot" | "chain_of_thought" | "multi_agent",
"reasoning": str
}
"""
# 决策树
is_structured = "json" in task_description.lower() or "schema" in task_description.lower()
is_creative = any(w in task_description.lower() for w in ["generate", "write", "create"])
is_complex = any(w in task_description.lower() for w in ["analyze", "reason", "solve"])
if is_structured:
return {
"recommended": True,
"model_size": "small",
"approach": "single_shot",
"reasoning": "Structured output tasks work well with smaller models"
}
elif is_complex:
return {
"recommended": True,
"model_size": "large",
"approach": "chain_of_thought",
"reasoning": "Complex reasoning requires larger models with step-by-step thinking"
}
elif is_creative:
return {
"recommended": True,
"model_size": "medium",
"approach": "single_shot",
"reasoning": "Creative tasks benefit from medium-sized models"
}
else:
return {
"recommended": False,
"model_size": None,
"approach": None,
"reasoning": "Task may be better suited for traditional programming"
}Append-Only Memory Pattern
仅追加内存模式
python
import json
from datetime import datetime
from pathlib import Path
class AppendOnlyMemory:
"""JSONL-based memory with schema-first line for agent parsing."""
def __init__(self, file_path: str, schema: dict):
self.file_path = Path(file_path)
self.schema = schema
# Create file with schema if doesn't exist
if not self.file_path.exists():
self.file_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.file_path, 'w') as f:
f.write(json.dumps({"_schema": schema}) + '\n')
def append(self, data: dict):
"""Append entry with timestamp."""
entry = {
**data,
"_timestamp": datetime.now().isoformat()
}
with open(self.file_path, 'a') as f:
f.write(json.dumps(entry) + '\n')
def read_recent(self, n: int = 10) -> list:
"""Read n most recent entries."""
with open(self.file_path, 'r') as f:
lines = f.readlines()
# Skip schema line, get recent entries
entries = [json.loads(line) for line in lines[1:]]
return entries[-n:]
def query(self, filter_fn) -> list:
"""Query entries with filter function."""
with open(self.file_path, 'r') as f:
lines = f.readlines()
entries = [json.loads(line) for line in lines[1:]]
return [e for e in entries if filter_fn(e)]python
import json
from datetime import datetime
from pathlib import Path
class AppendOnlyMemory:
"""基于JSONL的内存,首行是Schema供Agent解析。"""
def __init__(self, file_path: str, schema: dict):
self.file_path = Path(file_path)
self.schema = schema
# 如果文件不存在,创建并写入Schema
if not self.file_path.exists():
self.file_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.file_path, 'w') as f:
f.write(json.dumps({"_schema": schema}) + '\n')
def append(self, data: dict):
"""追加带时间戳的条目。"""
entry = {
**data,
"_timestamp": datetime.now().isoformat()
}
with open(self.file_path, 'a') as f:
f.write(json.dumps(entry) + '\n')
def read_recent(self, n: int = 10) -> list:
"""读取最近n条条目。"""
with open(self.file_path, 'r') as f:
lines = f.readlines()
# 跳过Schema行,获取最近的条目
entries = [json.loads(line) for line in lines[1:]]
return entries[-n:]
def query(self, filter_fn) -> list:
"""使用过滤函数查询条目。"""
with open(self.file_path, 'r') as f:
lines = f.readlines()
entries = [json.loads(line) for line in lines[1:]]
return [e for e in entries if filter_fn(e)]Usage
使用示例
memory = AppendOnlyMemory(
".agent_memory/interactions.jsonl",
schema={
"user": "string",
"action": "string",
"result": "string"
}
)
memory.append({
"user": "developer_123",
"action": "deployed_feature",
"result": "success"
})
recent = memory.read_recent(5)
undefinedmemory = AppendOnlyMemory(
".agent_memory/interactions.jsonl",
schema={
"user": "string",
"action": "string",
"result": "string"
}
)
memory.append({
"user": "developer_123",
"action": "deployed_feature",
"result": "success"
})
recent = memory.read_recent(5)
undefinedTroubleshooting
故障排除
Lost-in-Middle Degradation
中间信息丢失退化
Symptom: Agent ignores information from middle of context.
Solution: Use U-shaped context organization:
python
def organize_context(system_msg: str, key_facts: list, messages: list) -> list:
"""Place critical info at beginning and end."""
return [
{"role": "system", "content": system_msg},
# Key facts at the top
{"role": "system", "content": "Key facts:\n" + "\n".join(key_facts)},
# Conversation in middle
*messages[:-3],
# Recent context at the end
*messages[-3:],
# Reminder of key facts at very end
{"role": "system", "content": "Remember:\n" + "\n".join(key_facts[:3])}
]症状:Agent忽略上下文中间部分的信息。
解决方案:使用U型上下文组织方式:
python
def organize_context(system_msg: str, key_facts: list, messages: list) -> list:
"""将关键信息放在开头和结尾。"""
return [
{"role": "system", "content": system_msg},
# 关键信息放在顶部
{"role": "system", "content": "关键事实:\n" + "\n".join(key_facts)},
# 对话内容放在中间
*messages[:-3],
# 最近的上下文放在结尾
*messages[-3:],
# 在最末尾再次提醒关键事实
{"role": "system", "content": "请记住:\n" + "\n".join(key_facts[:3])}
]Context Clash
上下文冲突
Symptom: Agent confused by contradictory information.
Solution: Timestamp and prioritize information:
python
def merge_contexts(contexts: list[dict]) -> str:
"""Merge contexts with priority and timestamps."""
# Sort by priority (higher first) then timestamp (newer first)
sorted_contexts = sorted(
contexts,
key=lambda x: (-x.get('priority', 0), -x.get('timestamp', 0))
)
merged = []
for ctx in sorted_contexts:
timestamp_str = datetime.fromtimestamp(ctx['timestamp']).strftime('%Y-%m-%d %H:%M')
merged.append(f"[{timestamp_str}, priority={ctx.get('priority', 0)}] {ctx['content']}")
return "\n\n".join(merged)症状:Agent因矛盾信息而困惑。
解决方案:添加时间戳并设置信息优先级:
python
def merge_contexts(contexts: list[dict]) -> str:
"""合并带优先级和时间戳的上下文。"""
# 按优先级(高到低)和时间戳(新到旧)排序
sorted_contexts = sorted(
contexts,
key=lambda x: (-x.get('priority', 0), -x.get('timestamp', 0))
)
merged = []
for ctx in sorted_contexts:
timestamp_str = datetime.fromtimestamp(ctx['timestamp']).strftime('%Y-%m-%d %H:%M')
merged.append(f"[{timestamp_str}, priority={ctx.get('priority', 0)}] {ctx['content']}")
return "\n\n".join(merged)Attention Scarcity
注意力稀缺
Symptom: Performance degrades with long contexts.
Solution: Implement KV-cache compaction:
python
def compact_messages(messages: list, target_tokens: int) -> list:
"""Compress messages to target token budget."""
current_tokens = sum(estimate_tokens(m["content"]) for m in messages)
if current_tokens <= target_tokens:
return messages
# Keep system message and recent messages
system = [m for m in messages if m["role"] == "system"]
recent = messages[-5:]
# Compress middle messages
middle = messages[len(system):-5]
compression_ratio = (target_tokens - estimate_tokens(system + recent)) / estimate_tokens(middle)
if compression_ratio < 0.5:
# Aggressive summarization needed
summary = summarize_messages(middle)
return system + [{"role": "assistant", "content": f"[Earlier: {summary}]"}] + recent
else:
# Keep important messages, drop others
important = extract_important_messages(middle, keep_ratio=compression_ratio)
return system + important + recent
def estimate_tokens(text: str) -> int:
"""Rough estimate: 1 token per 4 characters."""
return len(text) // 4症状:随着上下文变长,性能下降。
解决方案:实现KV缓存压缩:
python
def compact_messages(messages: list, target_tokens: int) -> list:
"""将消息压缩到目标token预算。"""
current_tokens = sum(estimate_tokens(m["content"]) for m in messages)
if current_tokens <= target_tokens:
return messages
# 保留系统消息和最近的消息
system = [m for m in messages if m["role"] == "system"]
recent = messages[-5:]
# 压缩中间的消息
middle = messages[len(system):-5]
compression_ratio = (target_tokens - estimate_tokens(system + recent)) / estimate_tokens(middle)
if compression_ratio < 0.5:
# 需要深度摘要
summary = summarize_messages(middle)
return system + [{"role": "assistant", "content": f"[对话摘要: {summary}]"}] + recent
else:
# 保留重要消息,丢弃其他
important = extract_important_messages(middle, keep_ratio=compression_ratio)
return system + important + recent
def estimate_tokens(text: str) -> int:
"""粗略估算:每4个字符对应1个token。"""
return len(text) // 4Tool Output Bloat
工具输出膨胀
Symptom: Tool outputs fill context window.
Solution: Offload to filesystem:
python
def execute_tool_with_offloading(
tool_name: str,
params: dict,
context_dir: str = ".agent_context/tool_outputs"
) -> str:
"""Execute tool and offload detailed results."""
Path(context_dir).mkdir(parents=True, exist_ok=True)
# Execute tool
result = execute_tool(tool_name, params)
# Write full result to file
output_file = f"{context_dir}/{tool_name}_{int(time.time())}.json"
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
# Return summary + file reference
summary = {
"status": result.get("status"),
"key_findings": result.get("summary", "See file for details"),
"record_count": len(result.get("data", [])),
"full_output": output_file
}
return json.dumps(summary)症状:工具输出填满上下文窗口。
解决方案:卸载到文件系统:
python
def execute_tool_with_offloading(
tool_name: str,
params: dict,
context_dir: str = ".agent_context/tool_outputs"
) -> str:
"""执行工具并将详细结果卸载到文件。"""
Path(context_dir).mkdir(parents=True, exist_ok=True)
# 执行工具
result = execute_tool(tool_name, params)
# 将完整结果写入文件
output_file = f"{context_dir}/{tool_name}_{int(time.time())}.json"
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
# 返回摘要+文件引用
summary = {
"status": result.get("status"),
"key_findings": result.get("summary", "详见文件"),
"record_count": len(result.get("data", [])),
"full_output": output_file
}
return json.dumps(summary)Best Practices
最佳实践
- Progressive Disclosure: Load information only when needed
- Context Budget: Track token usage, compress proactively
- Tool Minimalism: Design tools with minimal interface, maximum clarity
- Filesystem Offloading: Keep detailed data out of context
- Evaluation First: Build evaluation before building features
- Platform Agnostic: Focus on transferable principles over vendor APIs
- Append-Only Memory: Use JSONL with schema-first for agent-friendly persistence
- U-shaped Organization: Place critical info at start and end of context
- 渐进式披露:仅在需要时加载信息
- 上下文预算:追踪token使用情况,主动压缩
- 工具极简主义:设计接口简洁、清晰度高的工具
- 文件系统卸载:将详细数据移出上下文窗口
- 优先评估:在构建功能前先搭建评估体系
- 平台无关:专注于可迁移的原则,而非供应商API
- 仅追加内存:使用带Schema首行的JSONL格式,方便Agent解析持久化数据
- U型组织:将关键信息放在上下文的开头和结尾
Resources
资源
License
许可证
MIT
MIT