agent-skills-context-engineering

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Agent Skills for Context Engineering

用于上下文工程的Agent技能

Skill by ara.so — AI Agent Skills collection.
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems. This skill teaches principles for managing LLM context windows, designing effective agent architectures, and building production-grade agent systems.
ara.so 提供的技能 — AI Agent技能合集。
这是一套关于上下文工程、多Agent架构和生产级Agent系统的全面Agent技能集合。本技能教授管理LLM上下文窗口、设计高效Agent架构以及构建生产级Agent系统的原则。

What This Project Does

项目功能

Agent Skills for Context Engineering provides battle-tested patterns for:
  • Context Management: Managing limited attention budgets, avoiding lost-in-middle degradation
  • Multi-Agent Systems: Orchestrator, peer-to-peer, and hierarchical architectures
  • Memory Systems: Short-term, long-term, and graph-based memory patterns
  • Tool Design: Building tools that agents can use effectively
  • Evaluation: LLM-as-judge frameworks for measuring agent quality
  • Production Systems: Hosted agents with sandboxed VMs and multiplayer support
Unlike prompt engineering (crafting instructions), context engineering addresses holistic curation of all information in the context window: system prompts, tool definitions, retrieved documents, message history, and tool outputs.
用于上下文工程的Agent技能提供经过实战验证的模式,涵盖:
  • 上下文管理:管理有限的注意力预算,避免"中间信息丢失"性能退化
  • 多Agent系统:编排器、对等网络和分层架构
  • 内存系统:短期、长期和基于图的内存模式
  • 工具设计:构建Agent可有效使用的工具
  • 评估:用于衡量Agent质量的LLM-as-judge框架
  • 生产系统:带有沙箱虚拟机和多人协作支持的托管Agent
与提示工程(编写指令)不同,上下文工程针对上下文窗口中所有信息的整体管理:系统提示、工具定义、检索到的文档、消息历史和工具输出。

Installation

安装方法

For Claude Code (Recommended)

适用于Claude Code(推荐)

Step 1: Add the Marketplace
bash
/plugin marketplace add muratcankoylan/Agent-Skills-for-Context-Engineering
Step 2: Install the Plugin
bash
/plugin install context-engineering@context-engineering-marketplace
Or browse and install:
  1. Select
    Browse and install plugins
  2. Select
    context-engineering-marketplace
  3. Select
    context-engineering
  4. Select
    Install now
步骤1:添加插件市场
bash
/plugin marketplace add muratcankoylan/Agent-Skills-for-Context-Engineering
步骤2:安装插件
bash
/plugin install context-engineering@context-engineering-marketplace
或通过浏览安装:
  1. 选择
    Browse and install plugins
  2. 选择
    context-engineering-marketplace
  3. 选择
    context-engineering
  4. 选择
    Install now

For Cursor (Open Plugins)

适用于Cursor(开放插件)

Add to your
.cursor/plugins.json
:
json
{
  "plugins": [
    {
      "name": "context-engineering",
      "repository": "muratcankoylan/Agent-Skills-for-Context-Engineering"
    }
  ]
}
添加到你的
.cursor/plugins.json
json
{
  "plugins": [
    {
      "name": "context-engineering",
      "repository": "muratcankoylan/Agent-Skills-for-Context-Engineering"
    }
  ]
}

For Individual Skills

安装单个技能

Copy specific skills to your project:
bash
undefined
将特定技能复制到你的项目中:
bash
undefined

Create skills directory

创建技能目录

mkdir -p .claude/skills
mkdir -p .claude/skills

Add a specific skill (example: context-fundamentals)

添加特定技能(示例:context-fundamentals)


Available skills: `context-fundamentals`, `context-degradation`, `context-compression`, `context-optimization`, `latent-briefing`, `multi-agent-patterns`, `memory-systems`, `tool-design`, `filesystem-context`, `hosted-agents`, `evaluation`, `advanced-evaluation`, `project-development`, `bdi-mental-states`

可用技能:`context-fundamentals`, `context-degradation`, `context-compression`, `context-optimization`, `latent-briefing`, `multi-agent-patterns`, `memory-systems`, `tool-design`, `filesystem-context`, `hosted-agents`, `evaluation`, `advanced-evaluation`, `project-development`, `bdi-mental-states`

Core Concepts

核心概念

Context Window Management

上下文窗口管理

The fundamental challenge: context windows are constrained by attention mechanics, not raw token capacity.
Key degradation patterns:
  • Lost-in-the-middle: Models lose track of information in the middle of long contexts
  • U-shaped attention: Strong attention at beginning and end, weak in middle
  • Attention scarcity: As context grows, attention per token decreases
Solution: Find the smallest possible set of high-signal tokens.
核心挑战:上下文窗口受注意力机制限制,而非原始token容量。
关键退化模式:
  • 中间信息丢失:模型在长上下文的中间部分会丢失信息追踪
  • U型注意力:对上下文开头和结尾的注意力强,中间部分弱
  • 注意力稀缺:随着上下文增长,每个token获得的注意力减少
解决方案:找到最小的高信号token集合。

Progressive Disclosure

渐进式披露

Load information only when needed:
python
undefined
仅在需要时加载信息:
python
undefined

skills/init.py - Lazy loading pattern

skills/init.py - 懒加载模式

class SkillRegistry: def init(self): self._skills = {} self._loaded = set()
def get_skill_summary(self, skill_name: str) -> dict:
    """Load only name and description initially."""
    return {
        "name": skill_name,
        "description": self._get_description(skill_name)
    }

def load_skill(self, skill_name: str) -> dict:
    """Load full skill content only when activated."""
    if skill_name not in self._loaded:
        self._skills[skill_name] = self._read_skill_file(skill_name)
        self._loaded.add(skill_name)
    return self._skills[skill_name]
undefined
class SkillRegistry: def init(self): self._skills = {} self._loaded = set()
def get_skill_summary(self, skill_name: str) -> dict:
    """仅初始加载名称和描述。"""
    return {
        "name": skill_name,
        "description": self._get_description(skill_name)
    }

def load_skill(self, skill_name: str) -> dict:
    """仅在激活时加载完整技能内容。"""
    if skill_name not in self._loaded:
        self._skills[skill_name] = self._read_skill_file(skill_name)
        self._loaded.add(skill_name)
    return self._skills[skill_name]
undefined

Context Compression Strategies

上下文压缩策略

Sliding Window:
python
def sliding_window_context(messages: list, window_size: int = 10) -> list:
    """Keep only recent messages."""
    if len(messages) <= window_size:
        return messages
    
    # Always keep system message
    system_msgs = [m for m in messages if m["role"] == "system"]
    recent_msgs = messages[-window_size:]
    
    return system_msgs + recent_msgs
Summarization:
python
async def compress_with_summary(messages: list, llm_client) -> list:
    """Compress old messages into summary."""
    if len(messages) < 20:
        return messages
    
    # Keep recent messages uncompressed
    to_compress = messages[1:-10]  # Skip system message and recent 10
    recent = messages[-10:]
    
    # Generate summary
    summary_prompt = f"Summarize these messages concisely:\n{to_compress}"
    summary = await llm_client.complete(summary_prompt)
    
    return [
        messages[0],  # System message
        {"role": "assistant", "content": f"[Summary of previous conversation: {summary}]"},
        *recent
    ]
滑动窗口:
python
def sliding_window_context(messages: list, window_size: int = 10) -> list:
    """仅保留最近的消息。"""
    if len(messages) <= window_size:
        return messages
    
    # 始终保留系统消息
    system_msgs = [m for m in messages if m["role"] == "system"]
    recent_msgs = messages[-window_size:]
    
    return system_msgs + recent_msgs
摘要压缩:
python
async def compress_with_summary(messages: list, llm_client) -> list:
    """将旧消息压缩为摘要。"""
    if len(messages) < 20:
        return messages
    
    # 保留最近的消息不压缩
    to_compress = messages[1:-10]  # 跳过系统消息和最近10条消息
    recent = messages[-10:]
    
    # 生成摘要
    summary_prompt = f"Summarize these messages concisely:\n{to_compress}"
    summary = await llm_client.complete(summary_prompt)
    
    return [
        messages[0],  # 系统消息
        {"role": "assistant", "content": f"[对话历史摘要: {summary}]"},
        *recent
    ]

Multi-Agent Patterns

多Agent模式

Orchestrator Pattern

编排器模式

Single coordinator delegates to specialized workers:
python
from typing import List, Dict

class OrchestratorAgent:
    def __init__(self, workers: Dict[str, Agent]):
        self.workers = workers
    
    async def process_task(self, task: str) -> str:
        # Determine which worker to use
        worker_name = await self._route_task(task)
        worker = self.workers[worker_name]
        
        # Delegate to worker with minimal context
        result = await worker.execute(task)
        
        return result
    
    async def _route_task(self, task: str) -> str:
        """Use LLM to determine which worker handles task."""
        routing_prompt = f"""Given this task: {task}

Available workers:
- code_writer: Writes and modifies code
- researcher: Gathers information and analyzes data
- reviewer: Reviews code and provides feedback

Which worker should handle this? Respond with just the worker name."""
        
        return await self.llm.complete(routing_prompt)
单个协调者将任务委派给专业的工作Agent:
python
from typing import List, Dict

class OrchestratorAgent:
    def __init__(self, workers: Dict[str, Agent]):
        self.workers = workers
    
    async def process_task(self, task: str) -> str:
        # 确定使用哪个工作Agent
        worker_name = await self._route_task(task)
        worker = self.workers[worker_name]
        
        # 用最少的上下文将任务委派给工作Agent
        result = await worker.execute(task)
        
        return result
    
    async def _route_task(self, task: str) -> str:
        """使用LLM确定哪个工作Agent处理任务。"""
        routing_prompt = f"""Given this task: {task}

Available workers:
- code_writer: Writes and modifies code
- researcher: Gathers information and analyzes data
- reviewer: Reviews code and provides feedback

Which worker should handle this? Respond with just the worker name."""
        
        return await self.llm.complete(routing_prompt)

Usage

使用示例

orchestrator = OrchestratorAgent({ "code_writer": CodeWriterAgent(), "researcher": ResearcherAgent(), "reviewer": ReviewerAgent() })
result = await orchestrator.process_task("Add error handling to the API client")
undefined
orchestrator = OrchestratorAgent({ "code_writer": CodeWriterAgent(), "researcher": ResearcherAgent(), "reviewer": ReviewerAgent() })
result = await orchestrator.process_task("Add error handling to the API client")
undefined

Peer-to-Peer Pattern

对等网络模式

Agents collaborate directly:
python
class PeerAgent:
    def __init__(self, name: str, peers: List['PeerAgent']):
        self.name = name
        self.peers = peers
        self.messages = []
    
    async def broadcast(self, message: str):
        """Send message to all peers."""
        for peer in self.peers:
            await peer.receive(self.name, message)
    
    async def receive(self, sender: str, message: str):
        """Receive message from peer."""
        self.messages.append({
            "from": sender,
            "content": message,
            "timestamp": time.time()
        })
Agent之间直接协作:
python
class PeerAgent:
    def __init__(self, name: str, peers: List['PeerAgent']):
        self.name = name
        self.peers = peers
        self.messages = []
    
    async def broadcast(self, message: str):
        """向所有Peer发送消息。"""
        for peer in self.peers:
            await peer.receive(self.name, message)
    
    async def receive(self, sender: str, message: str):
        """接收来自Peer的消息。"""
        self.messages.append({
            "from": sender,
            "content": message,
            "timestamp": time.time()
        })

Memory Systems

内存系统

Short-term Memory (Working Context)

短期内存(工作上下文)

python
class WorkingMemory:
    def __init__(self, max_items: int = 5):
        self.items = []
        self.max_items = max_items
    
    def add(self, item: dict):
        """Add item, removing oldest if at capacity."""
        self.items.append(item)
        if len(self.items) > self.max_items:
            self.items.pop(0)
    
    def get_context(self) -> str:
        """Format for inclusion in prompt."""
        return "\n".join([
            f"- {item['key']}: {item['value']}"
            for item in self.items
        ])
python
class WorkingMemory:
    def __init__(self, max_items: int = 5):
        self.items = []
        self.max_items = max_items
    
    def add(self, item: dict):
        """添加项目,达到容量时移除最早的项目。"""
        self.items.append(item)
        if len(self.items) > self.max_items:
            self.items.pop(0)
    
    def get_context(self) -> str:
        """格式化后用于提示词。"""
        return "\n".join([
            f"- {item['key']}: {item['value']}"
            for item in self.items
        ])

Long-term Memory (Retrieval)

长期内存(检索式)

python
import chromadb
from typing import List, Dict

class LongTermMemory:
    def __init__(self):
        self.client = chromadb.Client()
        self.collection = self.client.create_collection("agent_memory")
    
    def store(self, content: str, metadata: dict = None):
        """Store information for later retrieval."""
        self.collection.add(
            documents=[content],
            metadatas=[metadata or {}],
            ids=[str(hash(content))]
        )
    
    def recall(self, query: str, n_results: int = 3) -> List[Dict]:
        """Retrieve relevant memories."""
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )
        
        return [
            {
                "content": doc,
                "metadata": meta
            }
            for doc, meta in zip(results['documents'][0], results['metadatas'][0])
        ]
python
import chromadb
from typing import List, Dict

class LongTermMemory:
    def __init__(self):
        self.client = chromadb.Client()
        self.collection = self.client.create_collection("agent_memory")
    
    def store(self, content: str, metadata: dict = None):
        """存储信息供后续检索。"""
        self.collection.add(
            documents=[content],
            metadatas=[metadata or {}],
            ids=[str(hash(content))]
        )
    
    def recall(self, query: str, n_results: int = 3) -> List[Dict]:
        """检索相关记忆。"""
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )
        
        return [
            {
                "content": doc,
                "metadata": meta
            }
            for doc, meta in zip(results['documents'][0], results['metadatas'][0])
        ]

Graph-based Memory

基于图的内存

python
import networkx as nx

class GraphMemory:
    def __init__(self):
        self.graph = nx.DiGraph()
    
    def add_entity(self, entity: str, properties: dict):
        """Add or update entity node."""
        self.graph.add_node(entity, **properties)
    
    def add_relation(self, from_entity: str, to_entity: str, relation: str):
        """Add relationship between entities."""
        self.graph.add_edge(from_entity, to_entity, relation=relation)
    
    def get_neighbors(self, entity: str, max_depth: int = 2) -> dict:
        """Get connected entities within depth."""
        if entity not in self.graph:
            return {}
        
        # BFS to find neighbors
        neighbors = {}
        for node in nx.single_source_shortest_path_length(
            self.graph, entity, cutoff=max_depth
        ):
            neighbors[node] = self.graph.nodes[node]
        
        return neighbors
python
import networkx as nx

class GraphMemory:
    def __init__(self):
        self.graph = nx.DiGraph()
    
    def add_entity(self, entity: str, properties: dict):
        """添加或更新实体节点。"""
        self.graph.add_node(entity, **properties)
    
    def add_relation(self, from_entity: str, to_entity: str, relation: str):
        """添加实体间的关系。"""
        self.graph.add_edge(from_entity, to_entity, relation=relation)
    
    def get_neighbors(self, entity: str, max_depth: int = 2) -> dict:
        """获取指定深度内的关联实体。"""
        if entity not in self.graph:
            return {}
        
        # BFS查找邻居
        neighbors = {}
        for node in nx.single_source_shortest_path_length(
            self.graph, entity, cutoff=max_depth
        ):
            neighbors[node] = self.graph.nodes[node]
        
        return neighbors

Tool Design Principles

工具设计原则

Minimal Interface

极简接口

python
from typing import Dict, Any

def search_documentation(query: str, max_results: int = 5) -> list[Dict[str, Any]]:
    """Search documentation with minimal parameters.
    
    Args:
        query: Search query string
        max_results: Maximum number of results to return (default: 5)
    
    Returns:
        List of matching documentation sections with title and content
    
    Example:
        results = search_documentation("authentication")
    """
    # Implementation
    pass
python
from typing import Dict, Any

def search_documentation(query: str, max_results: int = 5) -> list[Dict[str, Any]]:
    """使用最少参数搜索文档。
    
    Args:
        query: 搜索查询字符串
        max_results: 返回的最大结果数(默认:5)
    
    Returns:
        包含标题和内容的匹配文档章节列表
    
    Example:
        results = search_documentation("authentication")
    """
    # 实现代码
    pass

Clear Output Format

清晰的输出格式

python
def analyze_code(code: str) -> dict:
    """Analyze code and return structured results.
    
    Returns:
        {
            "issues": [{"line": int, "severity": str, "message": str}],
            "metrics": {"complexity": int, "lines": int},
            "suggestions": [str]
        }
    """
    return {
        "issues": [
            {"line": 15, "severity": "warning", "message": "Unused variable 'x'"}
        ],
        "metrics": {
            "complexity": 7,
            "lines": 42
        },
        "suggestions": [
            "Consider extracting this logic into a separate function"
        ]
    }
python
def analyze_code(code: str) -> dict:
    """分析代码并返回结构化结果。
    
    Returns:
        {
            "issues": [{"line": int, "severity": str, "message": str}],
            "metrics": {"complexity": int, "lines": int},
            "suggestions": [str]
        }
    """
    return {
        "issues": [
            {"line": 15, "severity": "warning", "message": "Unused variable 'x'"}
        ],
        "metrics": {
            "complexity": 7,
            "lines": 42
        },
        "suggestions": [
            "Consider extracting this logic into a separate function"
        ]
    }

Context Offloading

上下文卸载

python
import json
from pathlib import Path

def analyze_large_dataset(data_path: str, output_dir: str = ".agent_context") -> str:
    """Analyze data and write detailed results to file.
    
    Returns reference to results file instead of full data in context.
    """
    # Create context directory
    Path(output_dir).mkdir(exist_ok=True)
    
    # Analyze data
    results = perform_analysis(data_path)
    
    # Write detailed results to file
    results_file = f"{output_dir}/analysis_results.json"
    with open(results_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    # Return only summary in context
    summary = {
        "total_records": results["count"],
        "key_findings": results["top_insights"][:3],
        "full_results": results_file
    }
    
    return f"Analysis complete. Summary: {summary}\nFull results in {results_file}"
python
import json
from pathlib import Path

def analyze_large_dataset(data_path: str, output_dir: str = ".agent_context") -> str:
    """分析数据并将详细结果写入文件。
    
    返回结果文件的引用,而非在上下文中返回完整数据。
    """
    # 创建上下文目录
    Path(output_dir).mkdir(exist_ok=True)
    
    # 分析数据
    results = perform_analysis(data_path)
    
    # 将详细结果写入文件
    results_file = f"{output_dir}/analysis_results.json"
    with open(results_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    # 仅在上下文中返回摘要
    summary = {
        "total_records": results["count"],
        "key_findings": results["top_insights"][:3],
        "full_results": results_file
    }
    
    return f"Analysis complete. Summary: {summary}\nFull results in {results_file}"

Filesystem-based Context Management

基于文件系统的上下文管理

Dynamic Discovery

动态发现

python
from pathlib import Path
import yaml

def discover_tools(tools_dir: str = ".agent_tools") -> dict:
    """Dynamically discover available tools from filesystem."""
    tools = {}
    
    for tool_file in Path(tools_dir).glob("*.yaml"):
        with open(tool_file) as f:
            tool_spec = yaml.safe_load(f)
            tools[tool_spec["name"]] = tool_spec
    
    return tools
python
from pathlib import Path
import yaml

def discover_tools(tools_dir: str = ".agent_tools") -> dict:
    """从文件系统动态发现可用工具。"""
    tools = {}
    
    for tool_file in Path(tools_dir).glob("*.yaml"):
        with open(tool_file) as f:
            tool_spec = yaml.safe_load(f)
            tools[tool_spec["name"]] = tool_spec
    
    return tools

Tool definition file: .agent_tools/github_search.yaml

工具定义文件: .agent_tools/github_search.yaml

""" name: github_search description: Search GitHub repositories parameters:
  • name: query type: string required: true
  • name: language type: string required: false """
undefined
""" name: github_search description: Search GitHub repositories parameters:
  • name: query type: string required: true
  • name: language type: string required: false """
undefined

Plan Persistence

计划持久化

python
import json
from datetime import datetime

class PlanTracker:
    def __init__(self, plan_file: str = ".agent_context/current_plan.json"):
        self.plan_file = plan_file
    
    def save_plan(self, steps: list):
        """Persist plan to disk."""
        plan = {
            "created_at": datetime.now().isoformat(),
            "steps": steps,
            "completed": []
        }
        
        with open(self.plan_file, 'w') as f:
            json.dump(plan, f, indent=2)
    
    def mark_complete(self, step_index: int):
        """Mark step as complete."""
        with open(self.plan_file, 'r') as f:
            plan = json.load(f)
        
        plan["completed"].append(step_index)
        
        with open(self.plan_file, 'w') as f:
            json.dump(plan, f, indent=2)
    
    def get_next_step(self) -> dict:
        """Get next incomplete step."""
        with open(self.plan_file, 'r') as f:
            plan = json.load(f)
        
        for i, step in enumerate(plan["steps"]):
            if i not in plan["completed"]:
                return {"index": i, "step": step}
        
        return None
python
import json
from datetime import datetime

class PlanTracker:
    def __init__(self, plan_file: str = ".agent_context/current_plan.json"):
        self.plan_file = plan_file
    
    def save_plan(self, steps: list):
        """将计划持久化到磁盘。"""
        plan = {
            "created_at": datetime.now().isoformat(),
            "steps": steps,
            "completed": []
        }
        
        with open(self.plan_file, 'w') as f:
            json.dump(plan, f, indent=2)
    
    def mark_complete(self, step_index: int):
        """标记步骤为已完成。"""
        with open(self.plan_file, 'r') as f:
            plan = json.load(f)
        
        plan["completed"].append(step_index)
        
        with open(self.plan_file, 'w') as f:
            json.dump(plan, f, indent=2)
    
    def get_next_step(self) -> dict:
        """获取下一个未完成的步骤。"""
        with open(self.plan_file, 'r') as f:
            plan = json.load(f)
        
        for i, step in enumerate(plan["steps"]):
            if i not in plan["completed"]:
                return {"index": i, "step": step}
        
        return None

LLM-as-Judge Evaluation

LLM-as-Judge评估

Direct Scoring

直接评分

python
from typing import List, Dict

async def score_response(
    response: str,
    criteria: List[Dict[str, any]],
    llm_client
) -> dict:
    """Score response against weighted criteria.
    
    Args:
        response: Response to evaluate
        criteria: List of {name, description, weight, scale}
        llm_client: LLM client for evaluation
    
    Returns:
        {
            "total_score": float,
            "criteria_scores": [{name, score, reasoning}]
        }
    """
    scores = []
    
    for criterion in criteria:
        prompt = f"""Evaluate this response on {criterion['name']}:

{criterion['description']}

Response to evaluate:
{response}

Score from 1-{criterion['scale']} and explain your reasoning.
Format: SCORE: X | REASONING: explanation"""
        
        evaluation = await llm_client.complete(prompt)
        
        # Parse score and reasoning
        score_line = [l for l in evaluation.split('\n') if 'SCORE:' in l][0]
        score = int(score_line.split('SCORE:')[1].split('|')[0].strip())
        reasoning = evaluation.split('REASONING:')[1].strip()
        
        scores.append({
            "name": criterion['name'],
            "score": score,
            "max_score": criterion['scale'],
            "weight": criterion['weight'],
            "reasoning": reasoning
        })
    
    # Calculate weighted total
    total = sum(s['score'] / s['max_score'] * s['weight'] for s in scores)
    total_weight = sum(c['weight'] for c in criteria)
    
    return {
        "total_score": total / total_weight,
        "criteria_scores": scores
    }
python
from typing import List, Dict

async def score_response(
    response: str,
    criteria: List[Dict[str, any]],
    llm_client
) -> dict:
    """根据加权标准对响应进行评分。
    
    Args:
        response: 待评估的响应
        criteria: 包含{name, description, weight, scale}的标准列表
        llm_client: 用于评估的LLM客户端
    
    Returns:
        {
            "total_score": float,
            "criteria_scores": [{name, score, reasoning}]
        }
    """
    scores = []
    
    for criterion in criteria:
        prompt = f"""Evaluate this response on {criterion['name']}:

{criterion['description']}

Response to evaluate:
{response}

Score from 1-{criterion['scale']} and explain your reasoning.
Format: SCORE: X | REASONING: explanation"""
        
        evaluation = await llm_client.complete(prompt)
        
        # 解析评分和理由
        score_line = [l for l in evaluation.split('\n') if 'SCORE:' in l][0]
        score = int(score_line.split('SCORE:')[1].split('|')[0].strip())
        reasoning = evaluation.split('REASONING:')[1].strip()
        
        scores.append({
            "name": criterion['name'],
            "score": score,
            "max_score": criterion['scale'],
            "weight": criterion['weight'],
            "reasoning": reasoning
        })
    
    # 计算加权总分
    total = sum(s['score'] / s['max_score'] * s['weight'] for s in scores)
    total_weight = sum(c['weight'] for c in criteria)
    
    return {
        "total_score": total / total_weight,
        "criteria_scores": scores
    }

Usage

使用示例

result = await score_response( response="The API returns JSON with user data...", criteria=[ { "name": "accuracy", "description": "Is the information factually correct?", "weight": 2.0, "scale": 5 }, { "name": "completeness", "description": "Does it cover all aspects of the question?", "weight": 1.5, "scale": 5 } ], llm_client=client )
undefined
result = await score_response( response="The API returns JSON with user data...", criteria=[ { "name": "accuracy", "description": "Is the information factually correct?", "weight": 2.0, "scale": 5 }, { "name": "completeness", "description": "Does it cover all aspects of the question?", "weight": 1.5, "scale": 5 } ], llm_client=client )
undefined

Pairwise Comparison

两两对比

python
async def compare_responses(
    response_a: str,
    response_b: str,
    criteria: str,
    llm_client,
    mitigate_bias: bool = True
) -> dict:
    """Compare two responses with position bias mitigation.
    
    Args:
        response_a: First response
        response_b: Second response
        criteria: Evaluation criteria
        llm_client: LLM client
        mitigate_bias: Run comparison both ways and aggregate
    
    Returns:
        {"winner": "A" | "B" | "tie", "reasoning": str, "confidence": float}
    """
    async def single_comparison(first: str, second: str) -> dict:
        prompt = f"""Compare these two responses based on: {criteria}

Response 1:
{first}

Response 2:
{second}

Which is better? Respond with: WINNER: [1|2|TIE] | REASONING: explanation"""
        
        result = await llm_client.complete(prompt)
        winner_line = [l for l in result.split('\n') if 'WINNER:' in l][0]
        winner = winner_line.split('WINNER:')[1].split('|')[0].strip()
        reasoning = result.split('REASONING:')[1].strip()
        
        return {"winner": winner, "reasoning": reasoning}
    
    # First comparison (A then B)
    comp1 = await single_comparison(response_a, response_b)
    
    if not mitigate_bias:
        return {
            "winner": "A" if comp1["winner"] == "1" else "B" if comp1["winner"] == "2" else "tie",
            "reasoning": comp1["reasoning"],
            "confidence": 1.0
        }
    
    # Second comparison (B then A) to mitigate position bias
    comp2 = await single_comparison(response_b, response_a)
    
    # Aggregate results
    if comp1["winner"] == "1" and comp2["winner"] == "2":
        return {"winner": "A", "reasoning": comp1["reasoning"], "confidence": 1.0}
    elif comp1["winner"] == "2" and comp2["winner"] == "1":
        return {"winner": "B", "reasoning": comp1["reasoning"], "confidence": 1.0}
    elif comp1["winner"] == "TIE" or comp2["winner"] == "TIE":
        return {"winner": "tie", "reasoning": "Evaluations were mixed", "confidence": 0.5}
    else:
        return {"winner": "tie", "reasoning": "Evaluations disagreed", "confidence": 0.3}
python
async def compare_responses(
    response_a: str,
    response_b: str,
    criteria: str,
    llm_client,
    mitigate_bias: bool = True
) -> dict:
    """对比两个响应,减轻位置偏差。
    
    Args:
        response_a: 第一个响应
        response_b: 第二个响应
        criteria: 评估标准
        llm_client: LLM客户端
        mitigate_bias: 双向对比并汇总结果
    
    Returns:
        {"winner": "A" | "B" | "tie", "reasoning": str, "confidence": float}
    """
    async def single_comparison(first: str, second: str) -> dict:
        prompt = f"""Compare these two responses based on: {criteria}

Response 1:
{first}

Response 2:
{second}

Which is better? Respond with: WINNER: [1|2|TIE] | REASONING: explanation"""
        
        result = await llm_client.complete(prompt)
        winner_line = [l for l in result.split('\n') if 'WINNER:' in l][0]
        winner = winner_line.split('WINNER:')[1].split('|')[0].strip()
        reasoning = result.split('REASONING:')[1].strip()
        
        return {"winner": winner, "reasoning": reasoning}
    
    # 第一次对比(A在前,B在后)
    comp1 = await single_comparison(response_a, response_b)
    
    if not mitigate_bias:
        return {
            "winner": "A" if comp1["winner"] == "1" else "B" if comp1["winner"] == "2" else "tie",
            "reasoning": comp1["reasoning"],
            "confidence": 1.0
        }
    
    # 第二次对比(B在前,A在后)以减轻位置偏差
    comp2 = await single_comparison(response_b, response_a)
    
    # 汇总结果
    if comp1["winner"] == "1" and comp2["winner"] == "2":
        return {"winner": "A", "reasoning": comp1["reasoning"], "confidence": 1.0}
    elif comp1["winner"] == "2" and comp2["winner"] == "1":
        return {"winner": "B", "reasoning": comp1["reasoning"], "confidence": 1.0}
    elif comp1["winner"] == "TIE" or comp2["winner"] == "TIE":
        return {"winner": "tie", "reasoning": "Evaluations were mixed", "confidence": 0.5}
    else:
        return {"winner": "tie", "reasoning": "Evaluations disagreed", "confidence": 0.3}

Hosted Agents (Sandboxed Execution)

托管Agent(沙箱执行)

Modal-based Background Agent

基于Modal的后台Agent

python
import modal
python
import modal

Create Modal app

创建Modal应用

app = modal.App("coding-agent")
app = modal.App("coding-agent")

Define image with dependencies

定义包含依赖的镜像

image = modal.Image.debian_slim().pip_install( "anthropic", "requests" )
@app.function( image=image, secrets=[modal.Secret.from_name("anthropic-api-key")], timeout=3600 ) async def run_coding_task(task: str, files: dict) -> dict: """Execute coding task in sandboxed environment.
Args:
    task: Task description
    files: Initial files as {path: content}

Returns:
    {
        "status": "success" | "error",
        "files": {path: content},
        "output": str
    }
"""
import os
from anthropic import Anthropic

# Initialize Claude
client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

# Write initial files
for path, content in files.items():
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, 'w') as f:
        f.write(content)

# Execute task with agent
messages = [
    {
        "role": "user",
        "content": f"Complete this task: {task}\n\nAvailable files: {list(files.keys())}"
    }
]

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=4096,
    messages=messages
)

# Collect modified files
output_files = {}
for path in files.keys():
    if os.path.exists(path):
        with open(path, 'r') as f:
            output_files[path] = f.read()

return {
    "status": "success",
    "files": output_files,
    "output": response.content[0].text
}
image = modal.Image.debian_slim().pip_install( "anthropic", "requests" )
@app.function( image=image, secrets=[modal.Secret.from_name("anthropic-api-key")], timeout=3600 ) async def run_coding_task(task: str, files: dict) -> dict: """在沙箱环境中执行编码任务。
Args:
    task: 任务描述
    files: 初始文件,格式为{path: content}

Returns:
    {
        "status": "success" | "error",
        "files": {path: content},
        "output": str
    }
"""
import os
from anthropic import Anthropic

# 初始化Claude
client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

# 写入初始文件
for path, content in files.items():
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, 'w') as f:
        f.write(content)

# 使用Agent执行任务
messages = [
    {
        "role": "user",
        "content": f"Complete this task: {task}\n\nAvailable files: {list(files.keys())}"
    }
]

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=4096,
    messages=messages
)

# 收集修改后的文件
output_files = {}
for path in files.keys():
    if os.path.exists(path):
        with open(path, 'r') as f:
            output_files[path] = f.read()

return {
    "status": "success",
    "files": output_files,
    "output": response.content[0].text
}

Client usage

客户端使用

@app.local_entrypoint() def main(): result = run_coding_task.remote( task="Add error handling to the API client", files={ "client.py": "def fetch_data():\n response = requests.get(url)\n return response.json()" } )
print(result["output"])
print("\nModified files:")
for path, content in result["files"].items():
    print(f"\n{path}:")
    print(content)
undefined
@app.local_entrypoint() def main(): result = run_coding_task.remote( task="Add error handling to the API client", files={ "client.py": "def fetch_data():\n response = requests.get(url)\n return response.json()" } )
print(result["output"])
print("\nModified files:")
for path, content in result["files"].items():
    print(f"\n{path}:")
    print(content)
undefined

Common Patterns

通用模式

Task-Model Fit Analysis

任务-模型适配分析

python
def analyze_task_model_fit(task_description: str) -> dict:
    """Determine if LLM is appropriate for task.
    
    Returns:
        {
            "recommended": bool,
            "model_size": "small" | "medium" | "large",
            "approach": "single_shot" | "chain_of_thought" | "multi_agent",
            "reasoning": str
        }
    """
    # Decision tree
    is_structured = "json" in task_description.lower() or "schema" in task_description.lower()
    is_creative = any(w in task_description.lower() for w in ["generate", "write", "create"])
    is_complex = any(w in task_description.lower() for w in ["analyze", "reason", "solve"])
    
    if is_structured:
        return {
            "recommended": True,
            "model_size": "small",
            "approach": "single_shot",
            "reasoning": "Structured output tasks work well with smaller models"
        }
    elif is_complex:
        return {
            "recommended": True,
            "model_size": "large",
            "approach": "chain_of_thought",
            "reasoning": "Complex reasoning requires larger models with step-by-step thinking"
        }
    elif is_creative:
        return {
            "recommended": True,
            "model_size": "medium",
            "approach": "single_shot",
            "reasoning": "Creative tasks benefit from medium-sized models"
        }
    else:
        return {
            "recommended": False,
            "model_size": None,
            "approach": None,
            "reasoning": "Task may be better suited for traditional programming"
        }
python
def analyze_task_model_fit(task_description: str) -> dict:
    """确定LLM是否适合该任务。
    
    Returns:
        {
            "recommended": bool,
            "model_size": "small" | "medium" | "large",
            "approach": "single_shot" | "chain_of_thought" | "multi_agent",
            "reasoning": str
        }
    """
    # 决策树
    is_structured = "json" in task_description.lower() or "schema" in task_description.lower()
    is_creative = any(w in task_description.lower() for w in ["generate", "write", "create"])
    is_complex = any(w in task_description.lower() for w in ["analyze", "reason", "solve"])
    
    if is_structured:
        return {
            "recommended": True,
            "model_size": "small",
            "approach": "single_shot",
            "reasoning": "Structured output tasks work well with smaller models"
        }
    elif is_complex:
        return {
            "recommended": True,
            "model_size": "large",
            "approach": "chain_of_thought",
            "reasoning": "Complex reasoning requires larger models with step-by-step thinking"
        }
    elif is_creative:
        return {
            "recommended": True,
            "model_size": "medium",
            "approach": "single_shot",
            "reasoning": "Creative tasks benefit from medium-sized models"
        }
    else:
        return {
            "recommended": False,
            "model_size": None,
            "approach": None,
            "reasoning": "Task may be better suited for traditional programming"
        }

Append-Only Memory Pattern

仅追加内存模式

python
import json
from datetime import datetime
from pathlib import Path

class AppendOnlyMemory:
    """JSONL-based memory with schema-first line for agent parsing."""
    
    def __init__(self, file_path: str, schema: dict):
        self.file_path = Path(file_path)
        self.schema = schema
        
        # Create file with schema if doesn't exist
        if not self.file_path.exists():
            self.file_path.parent.mkdir(parents=True, exist_ok=True)
            with open(self.file_path, 'w') as f:
                f.write(json.dumps({"_schema": schema}) + '\n')
    
    def append(self, data: dict):
        """Append entry with timestamp."""
        entry = {
            **data,
            "_timestamp": datetime.now().isoformat()
        }
        
        with open(self.file_path, 'a') as f:
            f.write(json.dumps(entry) + '\n')
    
    def read_recent(self, n: int = 10) -> list:
        """Read n most recent entries."""
        with open(self.file_path, 'r') as f:
            lines = f.readlines()
        
        # Skip schema line, get recent entries
        entries = [json.loads(line) for line in lines[1:]]
        return entries[-n:]
    
    def query(self, filter_fn) -> list:
        """Query entries with filter function."""
        with open(self.file_path, 'r') as f:
            lines = f.readlines()
        
        entries = [json.loads(line) for line in lines[1:]]
        return [e for e in entries if filter_fn(e)]
python
import json
from datetime import datetime
from pathlib import Path

class AppendOnlyMemory:
    """基于JSONL的内存,首行是Schema供Agent解析。"""
    
    def __init__(self, file_path: str, schema: dict):
        self.file_path = Path(file_path)
        self.schema = schema
        
        # 如果文件不存在,创建并写入Schema
        if not self.file_path.exists():
            self.file_path.parent.mkdir(parents=True, exist_ok=True)
            with open(self.file_path, 'w') as f:
                f.write(json.dumps({"_schema": schema}) + '\n')
    
    def append(self, data: dict):
        """追加带时间戳的条目。"""
        entry = {
            **data,
            "_timestamp": datetime.now().isoformat()
        }
        
        with open(self.file_path, 'a') as f:
            f.write(json.dumps(entry) + '\n')
    
    def read_recent(self, n: int = 10) -> list:
        """读取最近n条条目。"""
        with open(self.file_path, 'r') as f:
            lines = f.readlines()
        
        # 跳过Schema行,获取最近的条目
        entries = [json.loads(line) for line in lines[1:]]
        return entries[-n:]
    
    def query(self, filter_fn) -> list:
        """使用过滤函数查询条目。"""
        with open(self.file_path, 'r') as f:
            lines = f.readlines()
        
        entries = [json.loads(line) for line in lines[1:]]
        return [e for e in entries if filter_fn(e)]

Usage

使用示例

memory = AppendOnlyMemory( ".agent_memory/interactions.jsonl", schema={ "user": "string", "action": "string", "result": "string" } )
memory.append({ "user": "developer_123", "action": "deployed_feature", "result": "success" })
recent = memory.read_recent(5)
undefined
memory = AppendOnlyMemory( ".agent_memory/interactions.jsonl", schema={ "user": "string", "action": "string", "result": "string" } )
memory.append({ "user": "developer_123", "action": "deployed_feature", "result": "success" })
recent = memory.read_recent(5)
undefined

Troubleshooting

故障排除

Lost-in-Middle Degradation

中间信息丢失退化

Symptom: Agent ignores information from middle of context.
Solution: Use U-shaped context organization:
python
def organize_context(system_msg: str, key_facts: list, messages: list) -> list:
    """Place critical info at beginning and end."""
    return [
        {"role": "system", "content": system_msg},
        # Key facts at the top
        {"role": "system", "content": "Key facts:\n" + "\n".join(key_facts)},
        # Conversation in middle
        *messages[:-3],
        # Recent context at the end
        *messages[-3:],
        # Reminder of key facts at very end
        {"role": "system", "content": "Remember:\n" + "\n".join(key_facts[:3])}
    ]
症状:Agent忽略上下文中间部分的信息。
解决方案:使用U型上下文组织方式:
python
def organize_context(system_msg: str, key_facts: list, messages: list) -> list:
    """将关键信息放在开头和结尾。"""
    return [
        {"role": "system", "content": system_msg},
        # 关键信息放在顶部
        {"role": "system", "content": "关键事实:\n" + "\n".join(key_facts)},
        # 对话内容放在中间
        *messages[:-3],
        # 最近的上下文放在结尾
        *messages[-3:],
        # 在最末尾再次提醒关键事实
        {"role": "system", "content": "请记住:\n" + "\n".join(key_facts[:3])}
    ]

Context Clash

上下文冲突

Symptom: Agent confused by contradictory information.
Solution: Timestamp and prioritize information:
python
def merge_contexts(contexts: list[dict]) -> str:
    """Merge contexts with priority and timestamps."""
    # Sort by priority (higher first) then timestamp (newer first)
    sorted_contexts = sorted(
        contexts,
        key=lambda x: (-x.get('priority', 0), -x.get('timestamp', 0))
    )
    
    merged = []
    for ctx in sorted_contexts:
        timestamp_str = datetime.fromtimestamp(ctx['timestamp']).strftime('%Y-%m-%d %H:%M')
        merged.append(f"[{timestamp_str}, priority={ctx.get('priority', 0)}] {ctx['content']}")
    
    return "\n\n".join(merged)
症状:Agent因矛盾信息而困惑。
解决方案:添加时间戳并设置信息优先级:
python
def merge_contexts(contexts: list[dict]) -> str:
    """合并带优先级和时间戳的上下文。"""
    # 按优先级(高到低)和时间戳(新到旧)排序
    sorted_contexts = sorted(
        contexts,
        key=lambda x: (-x.get('priority', 0), -x.get('timestamp', 0))
    )
    
    merged = []
    for ctx in sorted_contexts:
        timestamp_str = datetime.fromtimestamp(ctx['timestamp']).strftime('%Y-%m-%d %H:%M')
        merged.append(f"[{timestamp_str}, priority={ctx.get('priority', 0)}] {ctx['content']}")
    
    return "\n\n".join(merged)

Attention Scarcity

注意力稀缺

Symptom: Performance degrades with long contexts.
Solution: Implement KV-cache compaction:
python
def compact_messages(messages: list, target_tokens: int) -> list:
    """Compress messages to target token budget."""
    current_tokens = sum(estimate_tokens(m["content"]) for m in messages)
    
    if current_tokens <= target_tokens:
        return messages
    
    # Keep system message and recent messages
    system = [m for m in messages if m["role"] == "system"]
    recent = messages[-5:]
    
    # Compress middle messages
    middle = messages[len(system):-5]
    compression_ratio = (target_tokens - estimate_tokens(system + recent)) / estimate_tokens(middle)
    
    if compression_ratio < 0.5:
        # Aggressive summarization needed
        summary = summarize_messages(middle)
        return system + [{"role": "assistant", "content": f"[Earlier: {summary}]"}] + recent
    else:
        # Keep important messages, drop others
        important = extract_important_messages(middle, keep_ratio=compression_ratio)
        return system + important + recent

def estimate_tokens(text: str) -> int:
    """Rough estimate: 1 token per 4 characters."""
    return len(text) // 4
症状:随着上下文变长,性能下降。
解决方案:实现KV缓存压缩:
python
def compact_messages(messages: list, target_tokens: int) -> list:
    """将消息压缩到目标token预算。"""
    current_tokens = sum(estimate_tokens(m["content"]) for m in messages)
    
    if current_tokens <= target_tokens:
        return messages
    
    # 保留系统消息和最近的消息
    system = [m for m in messages if m["role"] == "system"]
    recent = messages[-5:]
    
    # 压缩中间的消息
    middle = messages[len(system):-5]
    compression_ratio = (target_tokens - estimate_tokens(system + recent)) / estimate_tokens(middle)
    
    if compression_ratio < 0.5:
        # 需要深度摘要
        summary = summarize_messages(middle)
        return system + [{"role": "assistant", "content": f"[对话摘要: {summary}]"}] + recent
    else:
        # 保留重要消息,丢弃其他
        important = extract_important_messages(middle, keep_ratio=compression_ratio)
        return system + important + recent

def estimate_tokens(text: str) -> int:
    """粗略估算:每4个字符对应1个token。"""
    return len(text) // 4

Tool Output Bloat

工具输出膨胀

Symptom: Tool outputs fill context window.
Solution: Offload to filesystem:
python
def execute_tool_with_offloading(
    tool_name: str,
    params: dict,
    context_dir: str = ".agent_context/tool_outputs"
) -> str:
    """Execute tool and offload detailed results."""
    Path(context_dir).mkdir(parents=True, exist_ok=True)
    
    # Execute tool
    result = execute_tool(tool_name, params)
    
    # Write full result to file
    output_file = f"{context_dir}/{tool_name}_{int(time.time())}.json"
    with open(output_file, 'w') as f:
        json.dump(result, f, indent=2)
    
    # Return summary + file reference
    summary = {
        "status": result.get("status"),
        "key_findings": result.get("summary", "See file for details"),
        "record_count": len(result.get("data", [])),
        "full_output": output_file
    }
    
    return json.dumps(summary)
症状:工具输出填满上下文窗口。
解决方案:卸载到文件系统:
python
def execute_tool_with_offloading(
    tool_name: str,
    params: dict,
    context_dir: str = ".agent_context/tool_outputs"
) -> str:
    """执行工具并将详细结果卸载到文件。"""
    Path(context_dir).mkdir(parents=True, exist_ok=True)
    
    # 执行工具
    result = execute_tool(tool_name, params)
    
    # 将完整结果写入文件
    output_file = f"{context_dir}/{tool_name}_{int(time.time())}.json"
    with open(output_file, 'w') as f:
        json.dump(result, f, indent=2)
    
    # 返回摘要+文件引用
    summary = {
        "status": result.get("status"),
        "key_findings": result.get("summary", "详见文件"),
        "record_count": len(result.get("data", [])),
        "full_output": output_file
    }
    
    return json.dumps(summary)

Best Practices

最佳实践

  1. Progressive Disclosure: Load information only when needed
  2. Context Budget: Track token usage, compress proactively
  3. Tool Minimalism: Design tools with minimal interface, maximum clarity
  4. Filesystem Offloading: Keep detailed data out of context
  5. Evaluation First: Build evaluation before building features
  6. Platform Agnostic: Focus on transferable principles over vendor APIs
  7. Append-Only Memory: Use JSONL with schema-first for agent-friendly persistence
  8. U-shaped Organization: Place critical info at start and end of context
  1. 渐进式披露:仅在需要时加载信息
  2. 上下文预算:追踪token使用情况,主动压缩
  3. 工具极简主义:设计接口简洁、清晰度高的工具
  4. 文件系统卸载:将详细数据移出上下文窗口
  5. 优先评估:在构建功能前先搭建评估体系
  6. 平台无关:专注于可迁移的原则,而非供应商API
  7. 仅追加内存:使用带Schema首行的JSONL格式,方便Agent解析持久化数据
  8. U型组织:将关键信息放在上下文的开头和结尾

Resources

资源

License

许可证

MIT
MIT