agent-governance

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Agent Governance Patterns

Agent治理模式

Patterns for adding safety, trust, and policy enforcement to AI agent systems.
为AI Agent系统添加安全、信任与策略执行的模式。

Overview

概述

Governance patterns ensure AI agents operate within defined boundaries — controlling which tools they can call, what content they can process, how much they can do, and maintaining accountability through audit trails.
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
                     ↓                      ↓               ↓
              Threat Detection         Allow/Deny      Trust Update
治理模式确保AI Agent在既定边界内运行——控制它们可调用的工具、可处理的内容、可执行的操作量,并通过审计追踪保持可追责性。
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
                     ↓                      ↓               ↓
              Threat Detection         Allow/Deny      Trust Update

When to Use

适用场景

  • Agents with tool access: Any agent that calls external tools (APIs, databases, shell commands)
  • Multi-agent systems: Agents delegating to other agents need trust boundaries
  • Production deployments: Compliance, audit, and safety requirements
  • Sensitive operations: Financial transactions, data access, infrastructure management

  • 具备工具访问权限的Agent:任何调用外部工具(APIs、数据库、Shell命令)的Agent
  • 多Agent系统:委托其他Agent执行任务的Agent需要信任边界
  • 生产环境部署:合规、审计与安全要求
  • 敏感操作:金融交易、数据访问、基础设施管理

Pattern 1: Governance Policy

模式1:治理策略

Define what an agent is allowed to do as a composable, serializable policy object.
python
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import re

class PolicyAction(Enum):
    ALLOW = "allow"
    DENY = "deny"
    REVIEW = "review"  # flag for human review

@dataclass
class GovernancePolicy:
    """Declarative policy controlling agent behavior."""
    name: str
    allowed_tools: list[str] = field(default_factory=list)       # allowlist
    blocked_tools: list[str] = field(default_factory=list)       # blocklist
    blocked_patterns: list[str] = field(default_factory=list)    # content filters
    max_calls_per_request: int = 100                             # rate limit
    require_human_approval: list[str] = field(default_factory=list)  # tools needing approval

    def check_tool(self, tool_name: str) -> PolicyAction:
        """Check if a tool is allowed by this policy."""
        if tool_name in self.blocked_tools:
            return PolicyAction.DENY
        if tool_name in self.require_human_approval:
            return PolicyAction.REVIEW
        if self.allowed_tools and tool_name not in self.allowed_tools:
            return PolicyAction.DENY
        return PolicyAction.ALLOW

    def check_content(self, content: str) -> Optional[str]:
        """Check content against blocked patterns. Returns matched pattern or None."""
        for pattern in self.blocked_patterns:
            if re.search(pattern, content, re.IGNORECASE):
                return pattern
        return None
将Agent的允许操作定义为可组合、可序列化的策略对象。
python
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import re

class PolicyAction(Enum):
    ALLOW = "allow"
    DENY = "deny"
    REVIEW = "review"  # flag for human review

@dataclass
class GovernancePolicy:
    """Declarative policy controlling agent behavior."""
    name: str
    allowed_tools: list[str] = field(default_factory=list)       # allowlist
    blocked_tools: list[str] = field(default_factory=list)       # blocklist
    blocked_patterns: list[str] = field(default_factory=list)    # content filters
    max_calls_per_request: int = 100                             # rate limit
    require_human_approval: list[str] = field(default_factory=list)  # tools needing approval

    def check_tool(self, tool_name: str) -> PolicyAction:
        """Check if a tool is allowed by this policy."""
        if tool_name in self.blocked_tools:
            return PolicyAction.DENY
        if tool_name in self.require_human_approval:
            return PolicyAction.REVIEW
        if self.allowed_tools and tool_name not in self.allowed_tools:
            return PolicyAction.DENY
        return PolicyAction.ALLOW

    def check_content(self, content: str) -> Optional[str]:
        """Check content against blocked patterns. Returns matched pattern or None."""
        for pattern in self.blocked_patterns:
            if re.search(pattern, content, re.IGNORECASE):
                return pattern
        return None

Policy Composition

策略组合

Combine multiple policies (e.g., org-wide + team + agent-specific):
python
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
    """Merge policies with most-restrictive-wins semantics."""
    combined = GovernancePolicy(name="composed")

    for policy in policies:
        combined.blocked_tools.extend(policy.blocked_tools)
        combined.blocked_patterns.extend(policy.blocked_patterns)
        combined.require_human_approval.extend(policy.require_human_approval)
        combined.max_calls_per_request = min(
            combined.max_calls_per_request,
            policy.max_calls_per_request
        )
        if policy.allowed_tools:
            if combined.allowed_tools:
                combined.allowed_tools = [
                    t for t in combined.allowed_tools if t in policy.allowed_tools
                ]
            else:
                combined.allowed_tools = list(policy.allowed_tools)

    return combined
合并多个策略(如:组织级 + 团队级 + Agent专属):
python
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
    """Merge policies with most-restrictive-wins semantics."""
    combined = GovernancePolicy(name="composed")

    for policy in policies:
        combined.blocked_tools.extend(policy.blocked_tools)
        combined.blocked_patterns.extend(policy.blocked_patterns)
        combined.require_human_approval.extend(policy.require_human_approval)
        combined.max_calls_per_request = min(
            combined.max_calls_per_request,
            policy.max_calls_per_request
        )
        if policy.allowed_tools:
            if combined.allowed_tools:
                combined.allowed_tools = [
                    t for t in combined.allowed_tools if t in policy.allowed_tools
                ]
            else:
                combined.allowed_tools = list(policy.allowed_tools)

    return combined

Usage: layer policies from broad to specific

Usage: layer policies from broad to specific

org_policy = GovernancePolicy( name="org-wide", blocked_tools=["shell_exec", "delete_database"], blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"], max_calls_per_request=50 ) team_policy = GovernancePolicy( name="data-team", allowed_tools=["query_db", "read_file", "write_report"], require_human_approval=["write_report"] ) agent_policy = compose_policies(org_policy, team_policy)
undefined
org_policy = GovernancePolicy( name="org-wide", blocked_tools=["shell_exec", "delete_database"], blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"], max_calls_per_request=50 ) team_policy = GovernancePolicy( name="data-team", allowed_tools=["query_db", "read_file", "write_report"], require_human_approval=["write_report"] ) agent_policy = compose_policies(org_policy, team_policy)
undefined

Policy as YAML

策略的YAML存储

Store policies as configuration, not code:
yaml
undefined
将策略存储为配置而非代码:
yaml
undefined

governance-policy.yaml

governance-policy.yaml

name: production-agent allowed_tools:
  • search_documents
  • query_database
  • send_email blocked_tools:
  • shell_exec
  • delete_record blocked_patterns:
  • "(?i)(api[_-]?key|secret|password)\s*[:=]"
  • "(?i)(drop|truncate|delete from)\s+\w+" max_calls_per_request: 25 require_human_approval:
  • send_email

```python
import yaml

def load_policy(path: str) -> GovernancePolicy:
    with open(path) as f:
        data = yaml.safe_load(f)
    return GovernancePolicy(**data)

name: production-agent allowed_tools:
  • search_documents
  • query_database
  • send_email blocked_tools:
  • shell_exec
  • delete_record blocked_patterns:
  • "(?i)(api[_-]?key|secret|password)\s*[:=]"
  • "(?i)(drop|truncate|delete from)\s+\w+" max_calls_per_request: 25 require_human_approval:
  • send_email

```python
import yaml

def load_policy(path: str) -> GovernancePolicy:
    with open(path) as f:
        data = yaml.safe_load(f)
    return GovernancePolicy(**data)

Pattern 2: Semantic Intent Classification

模式2:语义意图分类

Detect dangerous intent in prompts before they reach the agent, using pattern-based signals.
python
from dataclasses import dataclass

@dataclass
class IntentSignal:
    category: str       # e.g., "data_exfiltration", "privilege_escalation"
    confidence: float   # 0.0 to 1.0
    evidence: str       # what triggered the detection
在提示词到达Agent之前检测危险意图,使用基于模式的信号。
python
from dataclasses import dataclass

@dataclass
class IntentSignal:
    category: str       # e.g., "data_exfiltration", "privilege_escalation"
    confidence: float   # 0.0 to 1.0
    evidence: str       # what triggered the detection

Weighted signal patterns for threat detection

Weighted signal patterns for threat detection

THREAT_SIGNALS = [ # Data exfiltration (r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8), (r"(?i)export\s+.\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9), (r"(?i)curl\s+.\s+-d\s+", "data_exfiltration", 0.7),
# Privilege escalation
(r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
(r"(?i)chmod\s+777", "privilege_escalation", 0.9),

# System modification
(r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
(r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),

# Prompt injection
(r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
(r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),
]
def classify_intent(content: str) -> list[IntentSignal]: """Classify content for threat signals.""" signals = [] for pattern, category, weight in THREAT_SIGNALS: match = re.search(pattern, content) if match: signals.append(IntentSignal( category=category, confidence=weight, evidence=match.group() )) return signals
def is_safe(content: str, threshold: float = 0.7) -> bool: """Quick check: is the content safe above the given threshold?""" signals = classify_intent(content) return not any(s.confidence >= threshold for s in signals)

**Key insight**: Intent classification happens *before* tool execution, acting as a pre-flight safety check. This is fundamentally different from output guardrails which only check *after* generation.

---
THREAT_SIGNALS = [ # Data exfiltration (r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8), (r"(?i)export\s+.\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9), (r"(?i)curl\s+.\s+-d\s+", "data_exfiltration", 0.7),
# Privilege escalation
(r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
(r"(?i)chmod\s+777", "privilege_escalation", 0.9),

# System modification
(r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
(r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),

# Prompt injection
(r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
(r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),
]
def classify_intent(content: str) -> list[IntentSignal]: """Classify content for threat signals.""" signals = [] for pattern, category, weight in THREAT_SIGNALS: match = re.search(pattern, content) if match: signals.append(IntentSignal( category=category, confidence=weight, evidence=match.group() )) return signals
def is_safe(content: str, threshold: float = 0.7) -> bool: """Quick check: is the content safe above the given threshold?""" signals = classify_intent(content) return not any(s.confidence >= threshold for s in signals)

**核心要点**:意图分类在工具执行**之前**进行,作为飞行前的安全检查。这与仅在生成**之后**进行检查的输出防护措施有着本质区别。

---

Pattern 3: Tool-Level Governance Decorator

模式3:工具级治理装饰器

Wrap individual tool functions with governance checks:
python
import functools
import time
from collections import defaultdict

_call_counters: dict[str, int] = defaultdict(int)

def govern(policy: GovernancePolicy, audit_trail=None):
    """Decorator that enforces governance policy on a tool function."""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            tool_name = func.__name__

            # 1. Check tool allowlist/blocklist
            action = policy.check_tool(tool_name)
            if action == PolicyAction.DENY:
                raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
            if action == PolicyAction.REVIEW:
                raise PermissionError(f"Tool '{tool_name}' requires human approval")

            # 2. Check rate limit
            _call_counters[policy.name] += 1
            if _call_counters[policy.name] > policy.max_calls_per_request:
                raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")

            # 3. Check content in arguments
            for arg in list(args) + list(kwargs.values()):
                if isinstance(arg, str):
                    matched = policy.check_content(arg)
                    if matched:
                        raise PermissionError(f"Blocked pattern detected: {matched}")

            # 4. Execute and audit
            start = time.monotonic()
            try:
                result = await func(*args, **kwargs)
                if audit_trail is not None:
                    audit_trail.append({
                        "tool": tool_name,
                        "action": "allowed",
                        "duration_ms": (time.monotonic() - start) * 1000,
                        "timestamp": time.time()
                    })
                return result
            except Exception as e:
                if audit_trail is not None:
                    audit_trail.append({
                        "tool": tool_name,
                        "action": "error",
                        "error": str(e),
                        "timestamp": time.time()
                    })
                raise

        return wrapper
    return decorator
用治理检查包裹单个工具函数:
python
import functools
import time
from collections import defaultdict

_call_counters: dict[str, int] = defaultdict(int)

def govern(policy: GovernancePolicy, audit_trail=None):
    """Decorator that enforces governance policy on a tool function."""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            tool_name = func.__name__

            # 1. Check tool allowlist/blocklist
            action = policy.check_tool(tool_name)
            if action == PolicyAction.DENY:
                raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
            if action == PolicyAction.REVIEW:
                raise PermissionError(f"Tool '{tool_name}' requires human approval")

            # 2. Check rate limit
            _call_counters[policy.name] += 1
            if _call_counters[policy.name] > policy.max_calls_per_request:
                raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")

            # 3. Check content in arguments
            for arg in list(args) + list(kwargs.values()):
                if isinstance(arg, str):
                    matched = policy.check_content(arg)
                    if matched:
                        raise PermissionError(f"Blocked pattern detected: {matched}")

            # 4. Execute and audit
            start = time.monotonic()
            try:
                result = await func(*args, **kwargs)
                if audit_trail is not None:
                    audit_trail.append({
                        "tool": tool_name,
                        "action": "allowed",
                        "duration_ms": (time.monotonic() - start) * 1000,
                        "timestamp": time.time()
                    })
                return result
            except Exception as e:
                if audit_trail is not None:
                    audit_trail.append({
                        "tool": tool_name,
                        "action": "error",
                        "error": str(e),
                        "timestamp": time.time()
                    })
                raise

        return wrapper
    return decorator

Usage with any agent framework

Usage with any agent framework

audit_log = [] policy = GovernancePolicy( name="search-agent", allowed_tools=["search", "summarize"], blocked_patterns=[r"(?i)password"], max_calls_per_request=10 )
@govern(policy, audit_trail=audit_log) async def search(query: str) -> str: """Search documents — governed by policy.""" return f"Results for: {query}"
audit_log = [] policy = GovernancePolicy( name="search-agent", allowed_tools=["search", "summarize"], blocked_patterns=[r"(?i)password"], max_calls_per_request=10 )
@govern(policy, audit_trail=audit_log) async def search(query: str) -> str: """Search documents — governed by policy.""" return f"Results for: {query}"

Passes: search("latest quarterly report")

Passes: search("latest quarterly report")

Blocked: search("show me the admin password")

Blocked: search("show me the admin password")


---

---

Pattern 4: Trust Scoring

模式4:信任评分

Track agent reliability over time with decay-based trust scores:
python
from dataclasses import dataclass, field
import math
import time

@dataclass
class TrustScore:
    """Trust score with temporal decay."""
    score: float = 0.5          # 0.0 (untrusted) to 1.0 (fully trusted)
    successes: int = 0
    failures: int = 0
    last_updated: float = field(default_factory=time.time)

    def record_success(self, reward: float = 0.05):
        self.successes += 1
        self.score = min(1.0, self.score + reward * (1 - self.score))
        self.last_updated = time.time()

    def record_failure(self, penalty: float = 0.15):
        self.failures += 1
        self.score = max(0.0, self.score - penalty * self.score)
        self.last_updated = time.time()

    def current(self, decay_rate: float = 0.001) -> float:
        """Get score with temporal decay — trust erodes without activity."""
        elapsed = time.time() - self.last_updated
        decay = math.exp(-decay_rate * elapsed)
        return self.score * decay

    @property
    def reliability(self) -> float:
        total = self.successes + self.failures
        return self.successes / total if total > 0 else 0.0
通过基于衰减的信任分数随时间追踪Agent的可靠性:
python
from dataclasses import dataclass, field
import math
import time

@dataclass
class TrustScore:
    """Trust score with temporal decay."""
    score: float = 0.5          # 0.0 (untrusted) to 1.0 (fully trusted)
    successes: int = 0
    failures: int = 0
    last_updated: float = field(default_factory=time.time)

    def record_success(self, reward: float = 0.05):
        self.successes += 1
        self.score = min(1.0, self.score + reward * (1 - self.score))
        self.last_updated = time.time()

    def record_failure(self, penalty: float = 0.15):
        self.failures += 1
        self.score = max(0.0, self.score - penalty * self.score)
        self.last_updated = time.time()

    def current(self, decay_rate: float = 0.001) -> float:
        """Get score with temporal decay — trust erodes without activity."""
        elapsed = time.time() - self.last_updated
        decay = math.exp(-decay_rate * elapsed)
        return self.score * decay

    @property
    def reliability(self) -> float:
        total = self.successes + self.failures
        return self.successes / total if total > 0 else 0.0

Usage in multi-agent systems

Usage in multi-agent systems

trust = TrustScore()
trust = TrustScore()

Agent completes tasks successfully

Agent completes tasks successfully

trust.record_success() # 0.525 trust.record_success() # 0.549
trust.record_success() # 0.525 trust.record_success() # 0.549

Agent makes an error

Agent makes an error

trust.record_failure() # 0.467
trust.record_failure() # 0.467

Gate sensitive operations on trust

Gate sensitive operations on trust

if trust.current() >= 0.7: # Allow autonomous operation pass elif trust.current() >= 0.4: # Allow with human oversight pass else: # Deny or require explicit approval pass

**Multi-agent trust**: In systems where agents delegate to other agents, each agent maintains trust scores for its delegates:

```python
class AgentTrustRegistry:
    def __init__(self):
        self.scores: dict[str, TrustScore] = {}

    def get_trust(self, agent_id: str) -> TrustScore:
        if agent_id not in self.scores:
            self.scores[agent_id] = TrustScore()
        return self.scores[agent_id]

    def most_trusted(self, agents: list[str]) -> str:
        return max(agents, key=lambda a: self.get_trust(a).current())

    def meets_threshold(self, agent_id: str, threshold: float) -> bool:
        return self.get_trust(agent_id).current() >= threshold

if trust.current() >= 0.7: # Allow autonomous operation pass elif trust.current() >= 0.4: # Allow with human oversight pass else: # Deny or require explicit approval pass

**多Agent信任**:在Agent之间相互委托任务的系统中,每个Agent都会为其委托的Agent维护信任分数:

```python
class AgentTrustRegistry:
    def __init__(self):
        self.scores: dict[str, TrustScore] = {}

    def get_trust(self, agent_id: str) -> TrustScore:
        if agent_id not in self.scores:
            self.scores[agent_id] = TrustScore()
        return self.scores[agent_id]

    def most_trusted(self, agents: list[str]) -> str:
        return max(agents, key=lambda a: self.get_trust(a).current())

    def meets_threshold(self, agent_id: str, threshold: float) -> bool:
        return self.get_trust(agent_id).current() >= threshold

Pattern 5: Audit Trail

模式5:审计追踪

Append-only audit log for all agent actions — critical for compliance and debugging:
python
from dataclasses import dataclass, field
import json
import time

@dataclass
class AuditEntry:
    timestamp: float
    agent_id: str
    tool_name: str
    action: str           # "allowed", "denied", "error"
    policy_name: str
    details: dict = field(default_factory=dict)

class AuditTrail:
    """Append-only audit trail for agent governance events."""
    def __init__(self):
        self._entries: list[AuditEntry] = []

    def log(self, agent_id: str, tool_name: str, action: str,
            policy_name: str, **details):
        self._entries.append(AuditEntry(
            timestamp=time.time(),
            agent_id=agent_id,
            tool_name=tool_name,
            action=action,
            policy_name=policy_name,
            details=details
        ))

    def denied(self) -> list[AuditEntry]:
        """Get all denied actions — useful for security review."""
        return [e for e in self._entries if e.action == "denied"]

    def by_agent(self, agent_id: str) -> list[AuditEntry]:
        return [e for e in self._entries if e.agent_id == agent_id]

    def export_jsonl(self, path: str):
        """Export as JSON Lines for log aggregation systems."""
        with open(path, "w") as f:
            for entry in self._entries:
                f.write(json.dumps({
                    "timestamp": entry.timestamp,
                    "agent_id": entry.agent_id,
                    "tool": entry.tool_name,
                    "action": entry.action,
                    "policy": entry.policy_name,
                    **entry.details
                }) + "\n")

为所有Agent操作提供仅追加的审计日志——这对合规性和调试至关重要:
python
from dataclasses import dataclass, field
import json
import time

@dataclass
class AuditEntry:
    timestamp: float
    agent_id: str
    tool_name: str
    action: str           # "allowed", "denied", "error"
    policy_name: str
    details: dict = field(default_factory=dict)

class AuditTrail:
    """Append-only audit trail for agent governance events."""
    def __init__(self):
        self._entries: list[AuditEntry] = []

    def log(self, agent_id: str, tool_name: str, action: str,
            policy_name: str, **details):
        self._entries.append(AuditEntry(
            timestamp=time.time(),
            agent_id=agent_id,
            tool_name=tool_name,
            action=action,
            policy_name=policy_name,
            details=details
        ))

    def denied(self) -> list[AuditEntry]:
        """Get all denied actions — useful for security review."""
        return [e for e in self._entries if e.action == "denied"]

    def by_agent(self, agent_id: str) -> list[AuditEntry]:
        return [e for e in self._entries if e.agent_id == agent_id]

    def export_jsonl(self, path: str):
        """Export as JSON Lines for log aggregation systems."""
        with open(path, "w") as f:
            for entry in self._entries:
                f.write(json.dumps({
                    "timestamp": entry.timestamp,
                    "agent_id": entry.agent_id,
                    "tool": entry.tool_name,
                    "action": entry.action,
                    "policy": entry.policy_name,
                    **entry.details
                }) + "\n")

Pattern 6: Framework Integration

模式6:框架集成

PydanticAI

PydanticAI

python
from pydantic_ai import Agent

policy = GovernancePolicy(
    name="support-bot",
    allowed_tools=["search_docs", "create_ticket"],
    blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
    max_calls_per_request=20
)

agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")

@agent.tool
@govern(policy)
async def search_docs(ctx, query: str) -> str:
    """Search knowledge base — governed."""
    return await kb.search(query)

@agent.tool
@govern(policy)
async def create_ticket(ctx, title: str, body: str) -> str:
    """Create support ticket — governed."""
    return await tickets.create(title=title, body=body)
python
from pydantic_ai import Agent

policy = GovernancePolicy(
    name="support-bot",
    allowed_tools=["search_docs", "create_ticket"],
    blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
    max_calls_per_request=20
)

agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")

@agent.tool
@govern(policy)
async def search_docs(ctx, query: str) -> str:
    """Search knowledge base — governed."""
    return await kb.search(query)

@agent.tool
@govern(policy)
async def create_ticket(ctx, title: str, body: str) -> str:
    """Create support ticket — governed."""
    return await tickets.create(title=title, body=body)

CrewAI

CrewAI

python
from crewai import Agent, Task, Crew

policy = GovernancePolicy(
    name="research-crew",
    allowed_tools=["search", "analyze"],
    max_calls_per_request=30
)
python
from crewai import Agent, Task, Crew

policy = GovernancePolicy(
    name="research-crew",
    allowed_tools=["search", "analyze"],
    max_calls_per_request=30
)

Apply governance at the crew level

Apply governance at the crew level

def governed_crew_run(crew: Crew, policy: GovernancePolicy): """Wrap crew execution with governance checks.""" audit = AuditTrail() for agent in crew.agents: for tool in agent.tools: original = tool.func tool.func = govern(policy, audit_trail=audit)(original) result = crew.kickoff() return result, audit
undefined
def governed_crew_run(crew: Crew, policy: GovernancePolicy): """Wrap crew execution with governance checks.""" audit = AuditTrail() for agent in crew.agents: for tool in agent.tools: original = tool.func tool.func = govern(policy, audit_trail=audit)(original) result = crew.kickoff() return result, audit
undefined

OpenAI Agents SDK

OpenAI Agents SDK

python
from agents import Agent, function_tool

policy = GovernancePolicy(
    name="coding-agent",
    allowed_tools=["read_file", "write_file", "run_tests"],
    blocked_tools=["shell_exec"],
    max_calls_per_request=50
)

@function_tool
@govern(policy)
async def read_file(path: str) -> str:
    """Read file contents — governed."""
    import os
    safe_path = os.path.realpath(path)
    if not safe_path.startswith(os.path.realpath(".")):
        raise ValueError("Path traversal blocked by governance")
    with open(safe_path) as f:
        return f.read()

python
from agents import Agent, function_tool

policy = GovernancePolicy(
    name="coding-agent",
    allowed_tools=["read_file", "write_file", "run_tests"],
    blocked_tools=["shell_exec"],
    max_calls_per_request=50
)

@function_tool
@govern(policy)
async def read_file(path: str) -> str:
    """Read file contents — governed."""
    import os
    safe_path = os.path.realpath(path)
    if not safe_path.startswith(os.path.realpath(".")):
        raise ValueError("Path traversal blocked by governance")
    with open(safe_path) as f:
        return f.read()

Governance Levels

治理级别

Match governance strictness to risk level:
LevelControlsUse Case
OpenAudit only, no restrictionsInternal dev/testing
StandardTool allowlist + content filtersGeneral production agents
StrictAll controls + human approval for sensitive opsFinancial, healthcare, legal
LockedAllowlist only, no dynamic tools, full auditCompliance-critical systems

根据风险级别匹配治理严格程度:
级别控制措施适用场景
开放仅审计,无限制内部开发/测试
标准工具允许列表 + 内容过滤通用生产环境Agent
严格所有控制措施 + 敏感操作需人工审批金融、医疗、法律领域
锁定仅允许列表,无动态工具,完整审计合规关键系统

Best Practices

最佳实践

PracticeRationale
Policy as configurationStore policies in YAML/JSON, not hardcoded — enables change without deploys
Most-restrictive-winsWhen composing policies, deny always overrides allow
Pre-flight intent checkClassify intent before tool execution, not after
Trust decayTrust scores should decay over time — require ongoing good behavior
Append-only auditNever modify or delete audit entries — immutability enables compliance
Fail closedIf governance check errors, deny the action rather than allowing it
Separate policy from logicGovernance enforcement should be independent of agent business logic

实践原理
策略即配置将策略存储在YAML/JSON中,而非硬编码——无需部署即可变更
最严格规则优先组合策略时,拒绝操作始终覆盖允许操作
飞行前意图检查在工具执行之前对意图进行分类,而非之后
信任衰减信任分数应随时间衰减——需要持续的良好表现
仅追加审计绝不修改或删除审计条目——不可变性确保合规性
默认拒绝如果治理检查出错,拒绝操作而非允许
策略与逻辑分离治理执行应独立于Agent的业务逻辑

Quick Start Checklist

快速入门检查清单

markdown
undefined
markdown
undefined

Agent Governance Implementation Checklist

Agent治理实施检查清单

Setup

设置

  • Define governance policy (allowed tools, blocked patterns, rate limits)
  • Choose governance level (open/standard/strict/locked)
  • Set up audit trail storage
  • 定义治理策略(允许的工具、阻止的模式、速率限制)
  • 选择治理级别(开放/标准/严格/锁定)
  • 设置审计追踪存储

Implementation

实施

  • Add @govern decorator to all tool functions
  • Add intent classification to user input processing
  • Implement trust scoring for multi-agent interactions
  • Wire up audit trail export
  • 为所有工具函数添加@govern装饰器
  • 为用户输入处理添加意图分类
  • 为多Agent交互实现信任评分
  • 连接审计追踪导出功能

Validation

验证

  • Test that blocked tools are properly denied
  • Test that content filters catch sensitive patterns
  • Test rate limiting behavior
  • Verify audit trail captures all events
  • Test policy composition (most-restrictive-wins)

---
  • 测试被阻止的工具是否被正确拒绝
  • 测试内容过滤器是否能捕获敏感模式
  • 测试速率限制行为
  • 验证审计追踪捕获所有事件
  • 测试策略组合(最严格规则优先)

---

Related Resources

相关资源