esaa-security-audit

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ESAA-Security Audit Skill

ESAA-Security 审计Skill

Skill by ara.so — Security Skills collection.
ara.so提供的Skill — 安全技能集合。

Overview

概述

ESAA-Security applies the Event Sourcing for Autonomous Agents (ESAA) architecture to automated security auditing. It executes structured security audits across 16 security domains with 95 executable checks, governed by an immutable append-only event log. Every finding, classification, and remediation decision is recorded as a verifiable fact.
Key differentiators:
  • Deterministic audits — same repository state produces same findings via event replay
  • Hallucination prevention — schema-validated outputs with evidence requirements
  • Complete audit trail
    .roadmap/activity.jsonl
    records every check execution
  • Governed agents — PARCER contracts enforce decision hygiene and token budgets
  • Verifiable reports — SHA-256 hash verification from events to final output
ESAA-Security 将自主Agent事件溯源(Event Sourcing for Autonomous Agents,ESAA)架构应用于自动化安全审计。它在16个安全领域执行结构化安全审计,包含95项可执行检查,由不可变的追加式事件日志管控。每一项发现、分类和修复决策都被记录为可验证的事实。
核心差异化特性:
  • 确定性审计 — 相同的仓库状态通过事件重放会产生相同的发现
  • 防止幻觉 — 基于模式验证的输出,要求提供证据
  • 完整审计轨迹
    .roadmap/activity.jsonl
    记录每一项检查的执行情况
  • 管控式Agent — PARCER合约强制决策规范和令牌预算
  • 可验证报告 — 从事件到最终输出的SHA-256哈希验证

Installation

安装

bash
undefined
bash
undefined

Clone the repository

Clone the repository

Install Python dependencies

Install Python dependencies

pip install -r requirements.txt
pip install -r requirements.txt

Set up environment variables

Set up environment variables

export OPENAI_API_KEY=$YOUR_OPENAI_KEY export ANTHROPIC_API_KEY=$YOUR_ANTHROPIC_KEY # if using Claude export AUDIT_TARGET_REPO="/path/to/repo"

**Requirements:**
- Python 3.9+
- LLM API access (OpenAI GPT-4, Anthropic Claude, or compatible)
- Target repository must be readable by the audit agent
export OPENAI_API_KEY=$YOUR_OPENAI_KEY export ANTHROPIC_API_KEY=$YOUR_ANTHROPIC_KEY # if using Claude export AUDIT_TARGET_REPO="/path/to/repo"

**要求:**
- Python 3.9+
- LLM API访问权限(OpenAI GPT-4、Anthropic Claude或兼容模型)
- 审计Agent必须能读取目标仓库

Repository Structure

仓库结构

.roadmap/                              # Event sourcing core
├── activity.jsonl                     # Immutable event store
├── roadmap.json                       # Derived audit progress
├── issues.json                        # Structured findings
├── AGENT_CONTRACT.yaml                # Agent boundaries
├── ORCHESTRATOR_CONTRACT.yaml         # State mutation rules
└── PROJECTION_SPEC.md                 # Event → state mapping

playbooks/
├── playbooks.security.json            # 95 security checks
└── global_input_contract.json         # Input requirements

reports/
├── phase1/                            # Reconnaissance
├── phase2/                            # Domain audits
├── phase3/                            # Risk classification
├── phase4/                            # Recommendations
└── final/                             # Compiled report
.roadmap/                              # 事件溯源核心
├── activity.jsonl                     # 不可变事件存储
├── roadmap.json                       # 衍生审计进度
├── issues.json                        # 结构化发现
├── AGENT_CONTRACT.yaml                # Agent边界
├── ORCHESTRATOR_CONTRACT.yaml         # 状态变更规则
└── PROJECTION_SPEC.md                 # 事件→状态映射

playbooks/
├── playbooks.security.json            # 95项安全检查
└── global_input_contract.json         # 输入要求

reports/
├── phase1/                            # 侦察阶段
├── phase2/                            # 领域审计
├── phase3/                            # 风险分类
├── phase4/                            # 建议阶段
└── final/                             # 编译后的报告

Core Concepts

核心概念

Event Store (
activity.jsonl
)

事件存储(
activity.jsonl

Every audit action is an immutable event:
json
{
  "event_id": "evt_001",
  "timestamp": "2026-05-14T10:30:00Z",
  "event_type": "task.started",
  "task_id": "SEC-010",
  "phase": "phase2",
  "domain": "authentication",
  "agent": "agent-impl"
}
json
{
  "event_id": "evt_002",
  "timestamp": "2026-05-14T10:32:15Z",
  "event_type": "check.completed",
  "task_id": "SEC-010",
  "check_id": "AU-002",
  "status": "fail",
  "severity": "high",
  "finding": "Password stored without bcrypt/argon2",
  "evidence": {
    "file": "auth/user.py",
    "line": 45,
    "code_snippet": "user.password = request.form['password']"
  },
  "hash": "a3f8b2..."
}
每一项审计操作都是一个不可变事件:
json
{
  "event_id": "evt_001",
  "timestamp": "2026-05-14T10:30:00Z",
  "event_type": "task.started",
  "task_id": "SEC-010",
  "phase": "phase2",
  "domain": "authentication",
  "agent": "agent-impl"
}
json
{
  "event_id": "evt_002",
  "timestamp": "2026-05-14T10:32:15Z",
  "event_type": "check.completed",
  "task_id": "SEC-010",
  "check_id": "AU-002",
  "status": "fail",
  "severity": "high",
  "finding": "Password stored without bcrypt/argon2",
  "evidence": {
    "file": "auth/user.py",
    "line": 45,
    "code_snippet": "user.password = request.form['password']"
  },
  "hash": "a3f8b2..."
}

Read Model (
roadmap.json
)

读取模型(
roadmap.json

Projection of audit progress (derived from events):
json
{
  "version": "0.4.0",
  "phases": {
    "phase1": {
      "status": "done",
      "tasks": {
        "SEC-001": {"status": "done", "output": "reports/phase1/tech-stack.md"}
      }
    },
    "phase2": {
      "status": "in_progress",
      "domains": {
        "authentication": {
          "checks_passed": 5,
          "checks_failed": 3,
          "tasks": ["SEC-010", "SEC-011"]
        }
      }
    }
  }
}
审计进度的投影(从事件衍生):
json
{
  "version": "0.4.0",
  "phases": {
    "phase1": {
      "status": "done",
      "tasks": {
        "SEC-001": {"status": "done", "output": "reports/phase1/tech-stack.md"}
      }
    },
    "phase2": {
      "status": "in_progress",
      "domains": {
        "authentication": {
          "checks_passed": 5,
          "checks_failed": 3,
          "tasks": ["SEC-010", "SEC-011"]
        }
      }
    }
  }
}

Running an Audit

执行审计

Phase 1: Reconnaissance

阶段1:侦察

python
undefined
python
undefined

orchestrator.py

orchestrator.py

from esaa_security import Orchestrator, Agent
from esaa_security import Orchestrator, Agent

Initialize orchestrator

Initialize orchestrator

orchestrator = Orchestrator( event_store=".roadmap/activity.jsonl", roadmap_path=".roadmap/roadmap.json", target_repo=os.getenv("AUDIT_TARGET_REPO") )
orchestrator = Orchestrator( event_store=".roadmap/activity.jsonl", roadmap_path=".roadmap/roadmap.json", target_repo=os.getenv("AUDIT_TARGET_REPO") )

Initialize reconnaissance agent

Initialize reconnaissance agent

agent_spec = Agent( role="agent-spec", contract_path=".roadmap/AGENT_CONTRACT.yaml", parcer_profile="PARCER_PROFILE.agent-spec.yaml" )
agent_spec = Agent( role="agent-spec", contract_path=".roadmap/AGENT_CONTRACT.yaml", parcer_profile="PARCER_PROFILE.agent-spec.yaml" )

Execute reconnaissance phase

Execute reconnaissance phase

recon_tasks = ["SEC-001", "SEC-002", "SEC-003", "SEC-004"] for task_id in recon_tasks: result = agent_spec.execute_task(task_id, orchestrator.get_context()) orchestrator.validate_and_append(task_id, result)

**Task outputs:**
- `SEC-001`: Tech stack inventory (languages, frameworks, dependencies)
- `SEC-002`: Architecture map (components, trust boundaries)
- `SEC-003`: Data flow diagram (inputs, storage, outputs)
- `SEC-004`: Attack surface enumeration (endpoints, file uploads, APIs)
recon_tasks = ["SEC-001", "SEC-002", "SEC-003", "SEC-004"] for task_id in recon_tasks: result = agent_spec.execute_task(task_id, orchestrator.get_context()) orchestrator.validate_and_append(task_id, result)

**任务输出:**
- `SEC-001`: 技术栈清单(语言、框架、依赖)
- `SEC-002`: 架构图(组件、信任边界)
- `SEC-003`: 数据流图(输入、存储、输出)
- `SEC-004`: 攻击面枚举(端点、文件上传、API)

Phase 2: Domain Audit Execution

阶段2:领域审计执行

python
undefined
python
undefined

Load security playbooks

Load security playbooks

with open("playbooks/playbooks.security.json") as f: playbooks = json.load(f)
with open("playbooks/playbooks.security.json") as f: playbooks = json.load(f)

Initialize audit execution agent

Initialize audit execution agent

agent_impl = Agent( role="agent-impl", contract_path=".roadmap/AGENT_CONTRACT.yaml", parcer_profile="PARCER_PROFILE.agent-impl.yaml" )
agent_impl = Agent( role="agent-impl", contract_path=".roadmap/AGENT_CONTRACT.yaml", parcer_profile="PARCER_PROFILE.agent-impl.yaml" )

Execute checks for a domain (e.g., Authentication)

Execute checks for a domain (e.g., Authentication)

auth_checks = ["AU-001", "AU-002", "AU-003", "AU-004", "AU-005", "AU-006", "AU-007", "AU-008"]
for check_id in auth_checks: playbook = playbooks["checks"][check_id]
result = agent_impl.execute_check(
    check_id=check_id,
    playbook=playbook,
    context=orchestrator.get_context()
)

# Orchestrator validates against schema
orchestrator.validate_and_append(
    task_id=f"SEC-{check_id}",
    result=result
)

**Example check result:**

```python
auth_checks = ["AU-001", "AU-002", "AU-003", "AU-004", "AU-005", "AU-006", "AU-007", "AU-008"]
for check_id in auth_checks: playbook = playbooks["checks"][check_id]
result = agent_impl.execute_check(
    check_id=check_id,
    playbook=playbook,
    context=orchestrator.get_context()
)

# Orchestrator validates against schema
orchestrator.validate_and_append(
    task_id=f"SEC-{check_id}",
    result=result
)

**示例检查结果:**

```python

agent_impl output for AU-002 (Password Storage)

agent_impl output for AU-002 (Password Storage)

{ "check_id": "AU-002", "status": "fail", "severity": "critical", "title": "Weak Password Hashing", "description": "Passwords stored using SHA-256 instead of bcrypt/argon2", "evidence": { "files": ["auth/models.py"], "lines": [67], "code": "hashlib.sha256(password.encode()).hexdigest()" }, "cwe": "CWE-916", "owasp": "A02:2021 Cryptographic Failures", "recommendation": "Replace SHA-256 with bcrypt (cost factor 12+)", "references": [ "https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html" ] }
undefined
{ "check_id": "AU-002", "status": "fail", "severity": "critical", "title": "Weak Password Hashing", "description": "Passwords stored using SHA-256 instead of bcrypt/argon2", "evidence": { "files": ["auth/models.py"], "lines": [67], "code": "hashlib.sha256(password.encode()).hexdigest()" }, "cwe": "CWE-916", "owasp": "A02:2021 Cryptographic Failures", "recommendation": "Replace SHA-256 with bcrypt (cost factor 12+)", "references": [ "https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html" ] }
undefined

Phase 3: Risk Classification

阶段3:风险分类

python
undefined
python
undefined

Initialize QA/risk agent

Initialize QA/risk agent

agent_qa = Agent( role="agent-qa", contract_path=".roadmap/AGENT_CONTRACT.yaml", parcer_profile="PARCER_PROFILE.agent-qa.yaml" )
agent_qa = Agent( role="agent-qa", contract_path=".roadmap/AGENT_CONTRACT.yaml", parcer_profile="PARCER_PROFILE.agent-qa.yaml" )

Classify vulnerabilities

Classify vulnerabilities

findings = orchestrator.get_all_findings() risk_matrix = agent_qa.classify_risks(findings)
findings = orchestrator.get_all_findings() risk_matrix = agent_qa.classify_risks(findings)

Write risk classification

Write risk classification

with open("reports/phase3/risk-matrix.json", "w") as f: json.dump(risk_matrix, f, indent=2)

**Risk matrix output:**

```json
{
  "critical": [
    {"id": "AU-002", "cvss": 9.1, "exploitability": "high"}
  ],
  "high": [
    {"id": "IV-003", "cvss": 7.5, "exploitability": "medium"}
  ],
  "medium": [
    {"id": "SH-001", "cvss": 5.3, "exploitability": "low"}
  ],
  "remediation_priority": ["AU-002", "AZ-001", "IV-003"]
}
with open("reports/phase3/risk-matrix.json", "w") as f: json.dump(risk_matrix, f, indent=2)

**风险矩阵输出:**

```json
{
  "critical": [
    {"id": "AU-002", "cvss": 9.1, "exploitability": "high"}
  ],
  "high": [
    {"id": "IV-003", "cvss": 7.5, "exploitability": "medium"}
  ],
  "medium": [
    {"id": "SH-001", "cvss": 5.3, "exploitability": "low"}
  ],
  "remediation_priority": ["AU-002", "AZ-001", "IV-003"]
}

Phase 4: Report Generation

阶段4:报告生成

python
undefined
python
undefined

Generate final report

Generate final report

report = agent_qa.generate_report( findings=orchestrator.get_all_findings(), risk_matrix=risk_matrix, context=orchestrator.get_context() )
report = agent_qa.generate_report( findings=orchestrator.get_all_findings(), risk_matrix=risk_matrix, context=orchestrator.get_context() )

Write final outputs

Write final outputs

with open("reports/final/security-audit-report.md", "w") as f: f.write(report["markdown"])
with open("reports/final/security-audit-report.json", "w") as f: json.dump(report["structured"], f, indent=2)
undefined
with open("reports/final/security-audit-report.md", "w") as f: f.write(report["markdown"])
with open("reports/final/security-audit-report.json", "w") as f: json.dump(report["structured"], f, indent=2)
undefined

Event Replay and Verification

事件重放与验证

python
undefined
python
undefined

Verify audit determinism

Verify audit determinism

from esaa_security import EventReplay, HashVerifier
from esaa_security import EventReplay, HashVerifier

Replay events from scratch

Replay events from scratch

replayer = EventReplay(event_store=".roadmap/activity.jsonl") replayed_roadmap = replayer.project_roadmap()
replayer = EventReplay(event_store=".roadmap/activity.jsonl") replayed_roadmap = replayer.project_roadmap()

Compare hash

Compare hash

original_hash = HashVerifier.compute_hash(".roadmap/roadmap.json") replayed_hash = HashVerifier.compute_hash(replayed_roadmap)
assert original_hash == replayed_hash, "Non-deterministic projection detected"
undefined
original_hash = HashVerifier.compute_hash(".roadmap/roadmap.json") replayed_hash = HashVerifier.compute_hash(replayed_roadmap)
assert original_hash == replayed_hash, "Non-deterministic projection detected"
undefined

Security Domain Coverage

安全领域覆盖

Critical Domains (8 total)

关键领域(共8个)

Secrets & Configuration (SC-001 to SC-008):
python
undefined
密钥与配置(SC-001至SC-008):
python
undefined

Example: Check for hardcoded secrets

Example: Check for hardcoded secrets

playbook = { "check_id": "SC-001", "title": "Hardcoded Secrets Detection", "patterns": [ r'password\s*=\s*["'][^"']+["']', r'api_key\s*=\s*["'][^"']+["']', r'AWS_SECRET_ACCESS_KEY' ], "severity": "critical" }

**Authentication (AU-001 to AU-008):**
- Password hashing strength
- MFA enforcement
- Session token generation
- Credential transmission (HTTPS)

**Authorization (AZ-001 to AZ-006):**
- RBAC implementation
- Privilege escalation checks
- IDOR vulnerabilities
- API authorization

**Input Validation (IV-001 to IV-007):**
- SQL injection (ORM usage, parameterized queries)
- XSS (output encoding)
- Command injection
- Path traversal

**Data Security (DA-001 to DA-005):**
- Encryption at rest
- PII handling
- Data retention policies
playbook = { "check_id": "SC-001", "title": "Hardcoded Secrets Detection", "patterns": [ r'password\s*=\s*["'][^"']+["']', r'api_key\s*=\s*["'][^"']+["']', r'AWS_SECRET_ACCESS_KEY' ], "severity": "critical" }

**认证(AU-001至AU-008):**
- 密码哈希强度
- MFA强制要求
- 会话令牌生成
- 凭证传输(HTTPS)

**授权(AZ-001至AZ-006):**
- RBAC实现
- 权限提升检查
- IDOR漏洞
- API授权

**输入验证(IV-001至IV-007):**
- SQL注入(ORM使用、参数化查询)
- XSS(输出编码)
- 命令注入
- 路径遍历

**数据安全(DA-001至DA-005):**
- 静态数据加密
- PII处理
- 数据保留策略

High Priority Domains (7 total)

高优先级领域(共7个)

AI/LLM Security (AI-001 to AI-005):
python
undefined
AI/LLM安全(AI-001至AI-005):
python
undefined

Example: Check for prompt injection vulnerabilities

Example: Check for prompt injection vulnerabilities

playbook = { "check_id": "AI-001", "title": "Prompt Injection Defense", "checks": [ "user_input_sanitization", "system_prompt_isolation", "output_validation", "context_length_limits" ], "severity": "high" }
undefined
playbook = { "check_id": "AI-001", "title": "Prompt Injection Defense", "checks": [ "user_input_sanitization", "system_prompt_isolation", "output_validation", "context_length_limits" ], "severity": "high" }
undefined

Configuration

配置

Agent Contract (
.roadmap/AGENT_CONTRACT.yaml
)

Agent合约(
.roadmap/AGENT_CONTRACT.yaml

yaml
agent_impl:
  can:
    - read: ["**/*.py", "**/*.js", "**/*.java", "config/**"]
    - write: ["reports/phase2/**"]
    - execute_checks: true
  cannot:
    - write: [".roadmap/activity.jsonl", ".roadmap/roadmap.json"]
    - modify_state: true
    - append_events: true
  output_schema: "agent_result.schema.json"
  token_budget: 8000
yaml
agent_impl:
  can:
    - read: ["**/*.py", "**/*.js", "**/*.java", "config/**"]
    - write: ["reports/phase2/**"]
    - execute_checks: true
  cannot:
    - write: [".roadmap/activity.jsonl", ".roadmap/roadmap.json"]
    - modify_state: true
    - append_events: true
  output_schema: "agent_result.schema.json"
  token_budget: 8000

PARCER Profile (Token Budgets)

PARCER配置文件(令牌预算)

yaml
undefined
yaml
undefined

PARCER_PROFILE.agent-impl.yaml

PARCER_PROFILE.agent-impl.yaml

budget: max_tokens: 8000 per_check: 500 context_window: 4000
fallback: strategy: "map_reduce" chunk_size: 2000
validation: require_evidence: true require_cwe_mapping: true schema: "agent_result.schema.json"
undefined
budget: max_tokens: 8000 per_check: 500 context_window: 4000
fallback: strategy: "map_reduce" chunk_size: 2000
validation: require_evidence: true require_cwe_mapping: true schema: "agent_result.schema.json"
undefined

Programmatic Usage

程序化使用

Custom Audit Pipeline

自定义审计流水线

python
from esaa_security import AuditPipeline, SecurityDomain
python
from esaa_security import AuditPipeline, SecurityDomain

Define custom domain subset

Define custom domain subset

domains = [ SecurityDomain.AUTHENTICATION, SecurityDomain.AUTHORIZATION, SecurityDomain.INPUT_VALIDATION, SecurityDomain.AI_LLM_SECURITY ]
domains = [ SecurityDomain.AUTHENTICATION, SecurityDomain.AUTHORIZATION, SecurityDomain.INPUT_VALIDATION, SecurityDomain.AI_LLM_SECURITY ]

Initialize pipeline

Initialize pipeline

pipeline = AuditPipeline( target_repo=os.getenv("AUDIT_TARGET_REPO"), domains=domains, event_store=".roadmap/activity.jsonl" )
pipeline = AuditPipeline( target_repo=os.getenv("AUDIT_TARGET_REPO"), domains=domains, event_store=".roadmap/activity.jsonl" )

Execute with streaming

Execute with streaming

for event in pipeline.execute_streaming(): if event["event_type"] == "check.completed": print(f"✓ {event['check_id']}: {event['status']}") elif event["event_type"] == "finding.detected": print(f"⚠ {event['severity']}: {event['title']}")
for event in pipeline.execute_streaming(): if event["event_type"] == "check.completed": print(f"✓ {event['check_id']}: {event['status']}") elif event["event_type"] == "finding.detected": print(f"⚠ {event['severity']}: {event['title']}")

Get final report

Get final report

report = pipeline.get_report()
undefined
report = pipeline.get_report()
undefined

Query Event Store

查询事件存储

python
from esaa_security import EventQuery

query = EventQuery(".roadmap/activity.jsonl")
python
from esaa_security import EventQuery

query = EventQuery(".roadmap/activity.jsonl")

Find all critical findings

Find all critical findings

critical = query.filter( event_type="check.completed", status="fail", severity="critical" ).to_list()
critical = query.filter( event_type="check.completed", status="fail", severity="critical" ).to_list()

Get domain coverage

Get domain coverage

coverage = query.aggregate_by("domain")
coverage = query.aggregate_by("domain")

{"authentication": 8, "authorization": 6, ...}

{"authentication": 8, "authorization": 6, ...}

Audit timeline

Audit timeline

timeline = query.timeline(group_by="1h")
undefined
timeline = query.timeline(group_by="1h")
undefined

Common Patterns

常见模式

Incremental Audit (Skip Completed)

增量审计(跳过已完成任务)

python
roadmap = orchestrator.load_roadmap()

for task_id in all_tasks:
    if roadmap.get_task_status(task_id) == "done":
        print(f"Skip {task_id} (already completed)")
        continue
    
    result = agent.execute_task(task_id)
    orchestrator.validate_and_append(task_id, result)
python
roadmap = orchestrator.load_roadmap()

for task_id in all_tasks:
    if roadmap.get_task_status(task_id) == "done":
        print(f"Skip {task_id} (already completed)")
        continue
    
    result = agent.execute_task(task_id)
    orchestrator.validate_and_append(task_id, result)

Parallel Domain Execution

并行领域执行

python
from concurrent.futures import ThreadPoolExecutor

def audit_domain(domain_name, checks):
    agent = Agent(role="agent-impl")
    results = []
    for check_id in checks:
        result = agent.execute_check(check_id)
        results.append(result)
    return domain_name, results

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {
        executor.submit(audit_domain, domain, checks): domain
        for domain, checks in domain_map.items()
    }
    
    for future in as_completed(futures):
        domain, results = future.result()
        for result in results:
            orchestrator.validate_and_append(result)
python
from concurrent.futures import ThreadPoolExecutor

def audit_domain(domain_name, checks):
    agent = Agent(role="agent-impl")
    results = []
    for check_id in checks:
        result = agent.execute_check(check_id)
        results.append(result)
    return domain_name, results

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {
        executor.submit(audit_domain, domain, checks): domain
        for domain, checks in domain_map.items()
    }
    
    for future in as_completed(futures):
        domain, results = future.result()
        for result in results:
            orchestrator.validate_and_append(result)

Export to SARIF

导出为SARIF格式

python
from esaa_security import SARIFExporter

exporter = SARIFExporter(event_store=".roadmap/activity.jsonl")
sarif = exporter.to_sarif()

with open("security-audit.sarif", "w") as f:
    json.dump(sarif, f, indent=2)
python
from esaa_security import SARIFExporter

exporter = SARIFExporter(event_store=".roadmap/activity.jsonl")
sarif = exporter.to_sarif()

with open("security-audit.sarif", "w") as f:
    json.dump(sarif, f, indent=2)

Troubleshooting

故障排除

Issue: Schema Validation Failure

问题:模式验证失败

python
undefined
python
undefined

Error: agent_result failed schema validation

Error: agent_result failed schema validation

Fix: Check output structure matches agent_result.schema.json

Fix: Check output structure matches agent_result.schema.json

Validate manually

Validate manually

from jsonschema import validate import json
with open(".roadmap/agent_result.schema.json") as f: schema = json.load(f)
with open("reports/phase2/results/SEC-010.json") as f: result = json.load(f)
validate(instance=result, schema=schema) # Raises ValidationError with details
undefined
from jsonschema import validate import json
with open(".roadmap/agent_result.schema.json") as f: schema = json.load(f)
with open("reports/phase2/results/SEC-010.json") as f: result = json.load(f)
validate(instance=result, schema=schema) # Raises ValidationError with details
undefined

Issue: Event Store Corruption

问题:事件存储损坏

python
undefined
python
undefined

Verify event store integrity

Verify event store integrity

from esaa_security import EventStoreValidator
validator = EventStoreValidator(".roadmap/activity.jsonl") errors = validator.validate()
if errors: print("Corrupt events:") for err in errors: print(f"Line {err['line']}: {err['message']}") else: print("✓ Event store valid")
undefined
from esaa_security import EventStoreValidator
validator = EventStoreValidator(".roadmap/activity.jsonl") errors = validator.validate()
if errors: print("Corrupt events:") for err in errors: print(f"Line {err['line']}: {err['message']}") else: print("✓ Event store valid")
undefined

Issue: Non-Deterministic Replay

问题:非确定性重放

python
undefined
python
undefined

Debug: Find which event causes divergence

Debug: Find which event causes divergence

from esaa_security import ReplayDebugger
debugger = ReplayDebugger( event_store=".roadmap/activity.jsonl", expected_roadmap=".roadmap/roadmap.json" )
divergent_event = debugger.find_divergence() print(f"Divergence at event: {divergent_event['event_id']}") print(f"Expected: {divergent_event['expected_state']}") print(f"Actual: {divergent_event['actual_state']}")
undefined
from esaa_security import ReplayDebugger
debugger = ReplayDebugger( event_store=".roadmap/activity.jsonl", expected_roadmap=".roadmap/roadmap.json" )
divergent_event = debugger.find_divergence() print(f"Divergence at event: {divergent_event['event_id']}") print(f"Expected: {divergent_event['expected_state']}") print(f"Actual: {divergent_event['actual_state']}")
undefined

Issue: Agent Exceeds Token Budget

问题:Agent超出令牌预算

python
undefined
python
undefined

Error: Agent exceeded 8000 token budget

Error: Agent exceeded 8000 token budget

Fix: Enable Map-Reduce fallback in PARCER profile

Fix: Enable Map-Reduce fallback in PARCER profile

PARCER_PROFILE.agent-impl.yaml

PARCER_PROFILE.agent-impl.yaml

fallback: strategy: "map_reduce" chunk_size: 2000 max_chunks: 10
fallback: strategy: "map_reduce" chunk_size: 2000 max_chunks: 10

Or reduce context window

Or reduce context window

budget: context_window: 3000 # from 4000
undefined
budget: context_window: 3000 # from 4000
undefined

Issue: Missing Evidence in Findings

问题:发现中缺少证据

python
undefined
python
undefined

Orchestrator rejects findings without evidence

Orchestrator rejects findings without evidence

Fix: Ensure agent output includes code snippets

Fix: Ensure agent output includes code snippets

Valid finding structure

Valid finding structure

{ "check_id": "IV-001", "status": "fail", "evidence": { "file": "api/routes.py", "line": 23, "code_snippet": "query = f"SELECT * FROM users WHERE id={user_id}"" } }
undefined
{ "check_id": "IV-001", "status": "fail", "evidence": { "file": "api/routes.py", "line": 23, "code_snippet": "query = f"SELECT * FROM users WHERE id={user_id}"" } }
undefined

Integration with CI/CD

与CI/CD集成

GitHub Actions

GitHub Actions

yaml
undefined
yaml
undefined

.github/workflows/security-audit.yml

.github/workflows/security-audit.yml

name: ESAA Security Audit
on: [push, pull_request]
jobs: audit: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3
  - name: Run ESAA-Security Audit
    env:
      OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
      AUDIT_TARGET_REPO: ${{ github.workspace }}
    run: |
      pip install -r requirements.txt
      python orchestrator.py --full-audit
  
  - name: Upload SARIF
    uses: github/codeql-action/upload-sarif@v2
    with:
      sarif_file: security-audit.sarif
  
  - name: Fail on Critical Findings
    run: |
      python -c "import json; \
      report = json.load(open('reports/final/security-audit-report.json')); \
      exit(1 if report['critical_count'] > 0 else 0)"
undefined
name: ESAA Security Audit
on: [push, pull_request]
jobs: audit: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3
  - name: Run ESAA-Security Audit
    env:
      OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
      AUDIT_TARGET_REPO: ${{ github.workspace }}
    run: |
      pip install -r requirements.txt
      python orchestrator.py --full-audit
  
  - name: Upload SARIF
    uses: github/codeql-action/upload-sarif@v2
    with:
      sarif_file: security-audit.sarif
  
  - name: Fail on Critical Findings
    run: |
      python -c "import json; \
      report = json.load(open('reports/final/security-audit-report.json')); \
      exit(1 if report['critical_count'] > 0 else 0)"
undefined

Best Practices

最佳实践

  1. Always verify event store integrity before generating reports
  2. Use deterministic replay to validate audit reproducibility
  3. Configure token budgets per agent role to prevent runaway costs
  4. Enable Map-Reduce fallback for large repositories (>10k LOC)
  5. Review PARCER profiles to adjust validation strictness
  6. Export to SARIF for GitHub Security tab integration
  7. Archive
    .roadmap/
    directory
    for audit forensics
  1. 生成报告前始终验证事件存储完整性
  2. 使用确定性重放验证审计可重复性
  3. 为每个Agent角色配置令牌预算以防止成本失控
  4. 对于大型仓库(>10k行代码)启用Map-Reduce回退
  5. 审查PARCER配置文件以调整验证严格程度
  6. 导出为SARIF格式以集成到GitHub安全选项卡
  7. 归档
    .roadmap/
    目录用于审计取证

References

参考资料