Loading...
Loading...
Security patterns for LLM integrations including prompt injection defense and hallucination prevention. Use when implementing context separation, validating LLM outputs, or protecting against prompt injection attacks.
npx skill4agent add yonatangross/orchestkit llm-safety-patternsIdentifiers flow AROUND the LLM, not THROUGH it. The LLM sees only content. Attribution happens deterministically.
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ SYSTEM CONTEXT (flows around LLM) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ user_id │ tenant_id │ analysis_id │ trace_id │ permissions │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │ PRE-LLM │ ┌─────────────────────┐ │POST-LLM │ │
│ │ FILTER │──────▶│ LLM │───────────▶│ATTRIBUTE│ │
│ │ │ │ │ │ │ │
│ │ Returns │ │ Sees ONLY: │ │ Adds: │ │
│ │ CONTENT │ │ - content text │ │ - IDs │ │
│ │ (no IDs)│ │ - context text │ │ - refs │ │
│ └─────────┘ │ (NO IDs!) │ └─────────┘ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘| Parameter | Type | Why Forbidden |
|---|---|---|
| UUID | Can be hallucinated, enables cross-user access |
| UUID | Critical for multi-tenant isolation |
| UUID | Job tracking, not for LLM |
| UUID | Source tracking, not for LLM |
| UUID | Output tracking, not for LLM |
| UUID | RAG reference, not for LLM |
| str | Auth context, not for LLM |
| str | Observability, not for LLM |
| Any UUID | UUID | Pattern: |
import re
FORBIDDEN_PATTERNS = [
r'user[_-]?id',
r'tenant[_-]?id',
r'analysis[_-]?id',
r'document[_-]?id',
r'artifact[_-]?id',
r'chunk[_-]?id',
r'session[_-]?id',
r'trace[_-]?id',
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
]
def audit_prompt(prompt: str) -> list[str]:
"""Check for forbidden patterns in prompt"""
violations = []
for pattern in FORBIDDEN_PATTERNS:
if re.search(pattern, prompt, re.IGNORECASE):
violations.append(pattern)
return violationsasync def prepare_for_llm(
query: str,
ctx: RequestContext,
) -> tuple[str, list[str], SourceRefs]:
"""
Filter data and extract content for LLM.
Returns: (content, context_texts, source_references)
"""
# 1. Retrieve with tenant filter
documents = await semantic_search(
query_embedding=embed(query),
ctx=ctx, # Filters by tenant_id, user_id
)
# 2. Save references for attribution
source_refs = SourceRefs(
document_ids=[d.id for d in documents],
chunk_ids=[c.id for c in chunks],
)
# 3. Extract content only (no IDs)
content_texts = [d.content for d in documents]
return query, content_texts, source_refsdef build_prompt(content: str, context_texts: list[str]) -> str:
"""
Build prompt with ONLY content, no identifiers.
"""
prompt = f"""
Analyze the following content and provide insights.
CONTENT:
{content}
RELEVANT CONTEXT:
{chr(10).join(f"- {text}" for text in context_texts)}
Provide analysis covering:
1. Key concepts
2. Prerequisites
3. Learning objectives
"""
# AUDIT: Verify no IDs leaked
violations = audit_prompt(prompt)
if violations:
raise SecurityError(f"IDs leaked to prompt: {violations}")
return prompt
async def call_llm(prompt: str) -> dict:
"""LLM only sees content, never IDs"""
response = await llm.generate(prompt)
return parse_response(response)async def save_with_attribution(
llm_output: dict,
ctx: RequestContext,
source_refs: SourceRefs,
) -> Analysis:
"""
Attach context and references to LLM output.
Attribution is deterministic, not LLM-generated.
"""
return await Analysis.create(
# Generated
id=uuid4(),
# From RequestContext (system-provided)
user_id=ctx.user_id,
tenant_id=ctx.tenant_id,
analysis_id=ctx.resource_id,
trace_id=ctx.trace_id,
# From Pre-LLM refs (deterministic)
source_document_ids=source_refs.document_ids,
source_chunk_ids=source_refs.chunk_ids,
# From LLM (content only)
content=llm_output["analysis"],
key_concepts=llm_output["key_concepts"],
difficulty=llm_output["difficulty"],
# Metadata
created_at=datetime.now(timezone.utc),
model_used=MODEL_NAME,
)async def validate_output(
llm_output: dict,
context_texts: list[str],
) -> ValidationResult:
"""Validate LLM output before use"""
# 1. Schema validation
try:
parsed = AnalysisOutput.model_validate(llm_output)
except ValidationError as e:
return ValidationResult(valid=False, reason=f"Schema error: {e}")
# 2. Guardrails
if await contains_toxic_content(parsed.content):
return ValidationResult(valid=False, reason="Toxic content detected")
# 3. Grounding check
if not is_grounded(parsed.content, context_texts):
return ValidationResult(valid=False, reason="Ungrounded claims")
# 4. No hallucinated IDs
if contains_uuid_pattern(parsed.content):
return ValidationResult(valid=False, reason="Hallucinated IDs")
return ValidationResult(valid=True)backend/app/workflows/
├── agents/
│ ├── execution.py # Add context separation
│ └── prompts/ # Audit all prompts
├── tasks/
│ └── generate_artifact.py # Add attributionbackend/app/services/
├── embeddings/ # Pre-LLM filtering
└── analysis/ # Post-LLM attributioninput-validationrag-retrievalllm-evaluationsecurity-scanningdefense-in-depth| Decision | Choice | Rationale |
|---|---|---|
| ID handling | Flow around LLM, never through | Prevents hallucination, injection, and cross-tenant leakage |
| Output validation | Schema + guardrails + grounding | Defense-in-depth for LLM outputs |
| Attribution approach | Deterministic post-LLM | System context provides IDs, not LLM |
| Prompt auditing | Regex pattern matching | Fast detection of forbidden identifiers |