Loading...
Loading...
Execute deterministic, event-sourced security audits using ESAA-Security's LLM-based agent architecture with 95 checks across 16 security domains
npx skill4agent add aradotso/security-skills esaa-security-auditSkill by ara.so — Security Skills collection.
.roadmap/activity.jsonl# Clone the repository
git clone https://github.com/elzobrito/ESAA-Security.git
cd ESAA-Security
# Install Python dependencies
pip install -r requirements.txt
# Set up environment variables
export OPENAI_API_KEY=$YOUR_OPENAI_KEY
export ANTHROPIC_API_KEY=$YOUR_ANTHROPIC_KEY # if using Claude
export AUDIT_TARGET_REPO="/path/to/repo".roadmap/ # Event sourcing core
├── activity.jsonl # Immutable event store
├── roadmap.json # Derived audit progress
├── issues.json # Structured findings
├── AGENT_CONTRACT.yaml # Agent boundaries
├── ORCHESTRATOR_CONTRACT.yaml # State mutation rules
└── PROJECTION_SPEC.md # Event → state mapping
playbooks/
├── playbooks.security.json # 95 security checks
└── global_input_contract.json # Input requirements
reports/
├── phase1/ # Reconnaissance
├── phase2/ # Domain audits
├── phase3/ # Risk classification
├── phase4/ # Recommendations
└── final/ # Compiled reportactivity.jsonl{
"event_id": "evt_001",
"timestamp": "2026-05-14T10:30:00Z",
"event_type": "task.started",
"task_id": "SEC-010",
"phase": "phase2",
"domain": "authentication",
"agent": "agent-impl"
}{
"event_id": "evt_002",
"timestamp": "2026-05-14T10:32:15Z",
"event_type": "check.completed",
"task_id": "SEC-010",
"check_id": "AU-002",
"status": "fail",
"severity": "high",
"finding": "Password stored without bcrypt/argon2",
"evidence": {
"file": "auth/user.py",
"line": 45,
"code_snippet": "user.password = request.form['password']"
},
"hash": "a3f8b2..."
}roadmap.json{
"version": "0.4.0",
"phases": {
"phase1": {
"status": "done",
"tasks": {
"SEC-001": {"status": "done", "output": "reports/phase1/tech-stack.md"}
}
},
"phase2": {
"status": "in_progress",
"domains": {
"authentication": {
"checks_passed": 5,
"checks_failed": 3,
"tasks": ["SEC-010", "SEC-011"]
}
}
}
}
}# orchestrator.py
from esaa_security import Orchestrator, Agent
# Initialize orchestrator
orchestrator = Orchestrator(
event_store=".roadmap/activity.jsonl",
roadmap_path=".roadmap/roadmap.json",
target_repo=os.getenv("AUDIT_TARGET_REPO")
)
# Initialize reconnaissance agent
agent_spec = Agent(
role="agent-spec",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-spec.yaml"
)
# Execute reconnaissance phase
recon_tasks = ["SEC-001", "SEC-002", "SEC-003", "SEC-004"]
for task_id in recon_tasks:
result = agent_spec.execute_task(task_id, orchestrator.get_context())
orchestrator.validate_and_append(task_id, result)SEC-001SEC-002SEC-003SEC-004# Load security playbooks
with open("playbooks/playbooks.security.json") as f:
playbooks = json.load(f)
# Initialize audit execution agent
agent_impl = Agent(
role="agent-impl",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-impl.yaml"
)
# Execute checks for a domain (e.g., Authentication)
auth_checks = ["AU-001", "AU-002", "AU-003", "AU-004", "AU-005", "AU-006", "AU-007", "AU-008"]
for check_id in auth_checks:
playbook = playbooks["checks"][check_id]
result = agent_impl.execute_check(
check_id=check_id,
playbook=playbook,
context=orchestrator.get_context()
)
# Orchestrator validates against schema
orchestrator.validate_and_append(
task_id=f"SEC-{check_id}",
result=result
)# agent_impl output for AU-002 (Password Storage)
{
"check_id": "AU-002",
"status": "fail",
"severity": "critical",
"title": "Weak Password Hashing",
"description": "Passwords stored using SHA-256 instead of bcrypt/argon2",
"evidence": {
"files": ["auth/models.py"],
"lines": [67],
"code": "hashlib.sha256(password.encode()).hexdigest()"
},
"cwe": "CWE-916",
"owasp": "A02:2021 Cryptographic Failures",
"recommendation": "Replace SHA-256 with bcrypt (cost factor 12+)",
"references": [
"https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html"
]
}# Initialize QA/risk agent
agent_qa = Agent(
role="agent-qa",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-qa.yaml"
)
# Classify vulnerabilities
findings = orchestrator.get_all_findings()
risk_matrix = agent_qa.classify_risks(findings)
# Write risk classification
with open("reports/phase3/risk-matrix.json", "w") as f:
json.dump(risk_matrix, f, indent=2){
"critical": [
{"id": "AU-002", "cvss": 9.1, "exploitability": "high"}
],
"high": [
{"id": "IV-003", "cvss": 7.5, "exploitability": "medium"}
],
"medium": [
{"id": "SH-001", "cvss": 5.3, "exploitability": "low"}
],
"remediation_priority": ["AU-002", "AZ-001", "IV-003"]
}# Generate final report
report = agent_qa.generate_report(
findings=orchestrator.get_all_findings(),
risk_matrix=risk_matrix,
context=orchestrator.get_context()
)
# Write final outputs
with open("reports/final/security-audit-report.md", "w") as f:
f.write(report["markdown"])
with open("reports/final/security-audit-report.json", "w") as f:
json.dump(report["structured"], f, indent=2)# Verify audit determinism
from esaa_security import EventReplay, HashVerifier
# Replay events from scratch
replayer = EventReplay(event_store=".roadmap/activity.jsonl")
replayed_roadmap = replayer.project_roadmap()
# Compare hash
original_hash = HashVerifier.compute_hash(".roadmap/roadmap.json")
replayed_hash = HashVerifier.compute_hash(replayed_roadmap)
assert original_hash == replayed_hash, "Non-deterministic projection detected"# Example: Check for hardcoded secrets
playbook = {
"check_id": "SC-001",
"title": "Hardcoded Secrets Detection",
"patterns": [
r'password\s*=\s*["\'][^"\']+["\']',
r'api_key\s*=\s*["\'][^"\']+["\']',
r'AWS_SECRET_ACCESS_KEY'
],
"severity": "critical"
}# Example: Check for prompt injection vulnerabilities
playbook = {
"check_id": "AI-001",
"title": "Prompt Injection Defense",
"checks": [
"user_input_sanitization",
"system_prompt_isolation",
"output_validation",
"context_length_limits"
],
"severity": "high"
}.roadmap/AGENT_CONTRACT.yamlagent_impl:
can:
- read: ["**/*.py", "**/*.js", "**/*.java", "config/**"]
- write: ["reports/phase2/**"]
- execute_checks: true
cannot:
- write: [".roadmap/activity.jsonl", ".roadmap/roadmap.json"]
- modify_state: true
- append_events: true
output_schema: "agent_result.schema.json"
token_budget: 8000# PARCER_PROFILE.agent-impl.yaml
budget:
max_tokens: 8000
per_check: 500
context_window: 4000
fallback:
strategy: "map_reduce"
chunk_size: 2000
validation:
require_evidence: true
require_cwe_mapping: true
schema: "agent_result.schema.json"from esaa_security import AuditPipeline, SecurityDomain
# Define custom domain subset
domains = [
SecurityDomain.AUTHENTICATION,
SecurityDomain.AUTHORIZATION,
SecurityDomain.INPUT_VALIDATION,
SecurityDomain.AI_LLM_SECURITY
]
# Initialize pipeline
pipeline = AuditPipeline(
target_repo=os.getenv("AUDIT_TARGET_REPO"),
domains=domains,
event_store=".roadmap/activity.jsonl"
)
# Execute with streaming
for event in pipeline.execute_streaming():
if event["event_type"] == "check.completed":
print(f"✓ {event['check_id']}: {event['status']}")
elif event["event_type"] == "finding.detected":
print(f"⚠ {event['severity']}: {event['title']}")
# Get final report
report = pipeline.get_report()from esaa_security import EventQuery
query = EventQuery(".roadmap/activity.jsonl")
# Find all critical findings
critical = query.filter(
event_type="check.completed",
status="fail",
severity="critical"
).to_list()
# Get domain coverage
coverage = query.aggregate_by("domain")
# {"authentication": 8, "authorization": 6, ...}
# Audit timeline
timeline = query.timeline(group_by="1h")roadmap = orchestrator.load_roadmap()
for task_id in all_tasks:
if roadmap.get_task_status(task_id) == "done":
print(f"Skip {task_id} (already completed)")
continue
result = agent.execute_task(task_id)
orchestrator.validate_and_append(task_id, result)from concurrent.futures import ThreadPoolExecutor
def audit_domain(domain_name, checks):
agent = Agent(role="agent-impl")
results = []
for check_id in checks:
result = agent.execute_check(check_id)
results.append(result)
return domain_name, results
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(audit_domain, domain, checks): domain
for domain, checks in domain_map.items()
}
for future in as_completed(futures):
domain, results = future.result()
for result in results:
orchestrator.validate_and_append(result)from esaa_security import SARIFExporter
exporter = SARIFExporter(event_store=".roadmap/activity.jsonl")
sarif = exporter.to_sarif()
with open("security-audit.sarif", "w") as f:
json.dump(sarif, f, indent=2)# Error: agent_result failed schema validation
# Fix: Check output structure matches agent_result.schema.json
# Validate manually
from jsonschema import validate
import json
with open(".roadmap/agent_result.schema.json") as f:
schema = json.load(f)
with open("reports/phase2/results/SEC-010.json") as f:
result = json.load(f)
validate(instance=result, schema=schema) # Raises ValidationError with details# Verify event store integrity
from esaa_security import EventStoreValidator
validator = EventStoreValidator(".roadmap/activity.jsonl")
errors = validator.validate()
if errors:
print("Corrupt events:")
for err in errors:
print(f"Line {err['line']}: {err['message']}")
else:
print("✓ Event store valid")# Debug: Find which event causes divergence
from esaa_security import ReplayDebugger
debugger = ReplayDebugger(
event_store=".roadmap/activity.jsonl",
expected_roadmap=".roadmap/roadmap.json"
)
divergent_event = debugger.find_divergence()
print(f"Divergence at event: {divergent_event['event_id']}")
print(f"Expected: {divergent_event['expected_state']}")
print(f"Actual: {divergent_event['actual_state']}")# Error: Agent exceeded 8000 token budget
# Fix: Enable Map-Reduce fallback in PARCER profile
# PARCER_PROFILE.agent-impl.yaml
fallback:
strategy: "map_reduce"
chunk_size: 2000
max_chunks: 10
# Or reduce context window
budget:
context_window: 3000 # from 4000# Orchestrator rejects findings without evidence
# Fix: Ensure agent output includes code snippets
# Valid finding structure
{
"check_id": "IV-001",
"status": "fail",
"evidence": {
"file": "api/routes.py",
"line": 23,
"code_snippet": "query = f\"SELECT * FROM users WHERE id={user_id}\""
}
}# .github/workflows/security-audit.yml
name: ESAA Security Audit
on: [push, pull_request]
jobs:
audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run ESAA-Security Audit
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
AUDIT_TARGET_REPO: ${{ github.workspace }}
run: |
pip install -r requirements.txt
python orchestrator.py --full-audit
- name: Upload SARIF
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: security-audit.sarif
- name: Fail on Critical Findings
run: |
python -c "import json; \
report = json.load(open('reports/final/security-audit-report.json')); \
exit(1 if report['critical_count'] > 0 else 0)".roadmap/