securityclaw-autonomous-soc-agent
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSecurityClaw Autonomous SOC Agent
SecurityClaw 自主SOC Agent
Skill by ara.so — Security Skills collection.
SecurityClaw is a modular, skill-based autonomous Security Operations Center (SOC) agent that monitors OpenSearch/Elasticsearch data, builds RAG-based behavioral memory, and validates real-time anomalies using LLMs. It orchestrates security workflows through LangGraph, maintains conversation-based investigations, and provides both CLI and web interfaces for threat analysis.
由ara.so开发的Skill——安全技能合集。
SecurityClaw是一款模块化、基于技能的自主安全运营中心(SOC)Agent,可监控OpenSearch/Elasticsearch数据,构建基于RAG的行为记忆,并使用LLM验证实时异常。它通过LangGraph编排安全工作流,维护基于对话的调查流程,并提供CLI和Web界面用于威胁分析。
Core Capabilities
核心能力
- Skill-based architecture: Each capability is an isolated module with Python logic + LLM instruction
- RAG behavioral memory: Vector embeddings of network baselines stored in OpenSearch
- Anomaly detection: Scheduled 1-minute watcher polls findings and escalates threats
- LLM-powered analysis: Threat analyst validates anomalies using retrieval-augmented context
- LangGraph orchestration: DECIDE→EXECUTE→EVALUATE supervisor loop with SQLite checkpointing
- Web + CLI interfaces: React UI for chat investigations, CLI for automation
- Provider agnostic: Swap OpenSearch↔Elasticsearch, Ollama↔other LLM providers
- 基于技能的架构:每项能力都是独立模块,包含Python逻辑+LLM指令
- RAG行为记忆:网络基线的向量嵌入存储在OpenSearch中
- 异常检测:定时1分钟监控器轮询检测结果并升级威胁
- LLM驱动分析:威胁分析师使用检索增强上下文验证异常
- LangGraph编排:DECIDE→EXECUTE→EVALUATE监督循环,搭配SQLite检查点
- Web + CLI界面:用于聊天调查的React UI,用于自动化的CLI
- 供应商无关:可替换OpenSearch↔Elasticsearch、Ollama↔其他LLM供应商
Installation
安装
Prerequisites
前置条件
bash
undefinedbash
undefinedPython 3.11+ required
需要Python 3.11+
python --version
python --version
Install Ollama for LLM provider
安装Ollama作为LLM供应商
curl -fsSL https://ollama.com/install.sh | sh
ollama serve
curl -fsSL https://ollama.com/install.sh | sh
ollama serve
Pull recommended models
拉取推荐模型
ollama pull qwen2.5:7b-instruct-q4_K_M
ollama pull nomic-embed-text:latest
undefinedollama pull qwen2.5:7b-instruct-q4_K_M
ollama pull nomic-embed-text:latest
undefinedSetup
设置
bash
undefinedbash
undefinedClone repository
克隆仓库
git clone https://github.com/SecurityClaw/SecurityClaw.git
cd SecurityClaw
git clone https://github.com/SecurityClaw/SecurityClaw.git
cd SecurityClaw
Create virtual environment
创建虚拟环境
python3.11 -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
python3.11 -m venv .venv
source .venv/bin/activate # Windows系统使用: .venv\Scripts\activate
Install dependencies
安装依赖
pip install -r requirements.txt
pip install -r requirements.txt
Run interactive onboarding wizard
运行交互式入职向导
python main.py onboard
The onboarding wizard configures:
- OpenSearch/Elasticsearch connection (host, port, SSL, auth)
- LLM provider (Ollama endpoint, model names)
- Optional external APIs (AbuseIPDB, VirusTotal, MaxMind GeoIP)
- Skill-specific environment variables
Outputs `config.yaml` and `.env` with validated configuration.python main.py onboard
入职向导将配置:
- OpenSearch/Elasticsearch连接(主机、端口、SSL、认证)
- LLM供应商(Ollama端点、模型名称)
- 可选外部API(AbuseIPDB、VirusTotal、MaxMind GeoIP)
- 技能特定环境变量
输出包含验证后配置的`config.yaml`和`.env`文件。Configuration
配置
config.yaml Structure
config.yaml结构
yaml
undefinedyaml
undefinedDatabase configuration
数据库配置
database:
provider: opensearch # or elasticsearch
host: localhost
port: 9200
use_ssl: true
verify_certs: false
username: admin
password_env: OPENSEARCH_PASSWORD # Reads from .env
database:
provider: opensearch # 或elasticsearch
host: localhost
port: 9200
use_ssl: true
verify_certs: false
username: admin
password_env: OPENSEARCH_PASSWORD # 从.env读取
LLM provider
LLM供应商
llm:
provider: ollama
base_url: http://localhost:11434
model: qwen2.5:7b-instruct-q4_K_M
temperature: 0.7
max_tokens: 16384
llm:
provider: ollama
base_url: http://localhost:11434
model: qwen2.5:7b-instruct-q4_K_M
temperature: 0.7
max_tokens: 16384
RAG engine
RAG引擎
rag:
index_name: securityclaw_baselines
embedding_model: nomic-embed-text:latest
embedding_dimension: 768
top_k: 5
rag:
index_name: securityclaw_baselines
embedding_model: nomic-embed-text:latest
embedding_dimension: 768
top_k: 5
API server
API服务器
api:
host: 0.0.0.0
port: 7799
enable_cors: true
undefinedapi:
host: 0.0.0.0
port: 7799
enable_cors: true
undefinedEnvironment Variables (.env)
环境变量(.env)
bash
undefinedbash
undefinedDatabase credentials
数据库凭证
OPENSEARCH_PASSWORD=your_password_here
OPENSEARCH_PASSWORD=your_password_here
Optional external APIs
可选外部API
ABUSEIPDB_API_KEY=${ABUSEIPDB_API_KEY}
VIRUSTOTAL_API_KEY=${VIRUSTOTAL_API_KEY}
MAXMIND_LICENSE_KEY=${MAXMIND_LICENSE_KEY}
ABUSEIPDB_API_KEY=${ABUSEIPDB_API_KEY}
VIRUSTOTAL_API_KEY=${VIRUSTOTAL_API_KEY}
MAXMIND_LICENSE_KEY=${MAXMIND_LICENSE_KEY}
Skill-specific variables (discovered by onboard command)
技能特定变量(由onboard命令自动识别)
ANOMALY_TRIAGE_THRESHOLD=0.7
undefinedANOMALY_TRIAGE_THRESHOLD=0.7
undefinedCLI Commands
CLI命令
Service Management
服务管理
bash
undefinedbash
undefinedStart full service (scheduler + web UI + API)
启动完整服务(调度器 + Web UI + API)
python main.py service
python main.py service
Access web UI at http://localhost:5173
访问Web UI地址:http://localhost:5173
API at http://localhost:7799
API地址:http://localhost:7799
Start API only (no background scheduler)
仅启动API(无后台调度器)
SECURITYCLAW_API_ONLY=1 python main.py service
SECURITYCLAW_API_ONLY=1 python main.py service
Start scheduler loop only (no web interface)
仅启动调度循环(无Web界面)
python main.py run
python main.py run
Web development mode (frontend with hot reload)
Web开发模式(前端热重载)
python main.py web-dev
undefinedpython main.py web-dev
undefinedSkill Operations
技能操作
bash
undefinedbash
undefinedList all loaded skills and their schedules
列出所有已加载技能及其调度计划
python main.py list-skills
python main.py list-skills
Manually dispatch a skill once
手动触发一次技能执行
python main.py dispatch network_baseliner
python main.py dispatch threat_analyst
python main.py dispatch network_baseliner
python main.py dispatch threat_analyst
Interactive chat interface (CLI)
交互式聊天界面(CLI)
python main.py chat
python main.py chat
View agent memory snapshot
查看Agent内存快照
python main.py status
undefinedpython main.py status
undefinedConfiguration Management
配置管理
bash
undefinedbash
undefinedRe-run onboarding wizard
重新运行入职向导
python main.py onboard
python main.py onboard
Validate current configuration
验证当前配置
python main.py validate-config
undefinedpython main.py validate-config
undefinedSkill Development
技能开发
Creating a New Skill
创建新技能
Skills are directories in with two required files:
skills/skills/my_skill/instruction.md (LLM guidance + metadata):
markdown
---
skill_id: my_skill
display_name: My Custom Skill
version: 1.0.0
schedule_interval_seconds: 3600 # Optional: for scheduled execution
capabilities:
- custom_analysis
prerequisites:
- network_data
required_entities:
- ip_address
artifacts_produced:
- analysis_report
---技能是目录下的文件夹,包含两个必填文件:
skills/skills/my_skill/instruction.md(LLM指引+元数据):
markdown
---
skill_id: my_skill
display_name: My Custom Skill
version: 1.0.0
schedule_interval_seconds: 3600 # 可选:用于定时执行
capabilities:
- custom_analysis
prerequisites:
- network_data
required_entities:
- ip_address
artifacts_produced:
- analysis_report
---System Prompt for My Skill
我的技能系统提示词
You are a security analyst performing custom analysis.
你是一名执行自定义分析的安全分析师。
Task
任务
Analyze network data and produce findings.
分析网络数据并生成检测结果。
Output Format
输出格式
Return JSON with "findings" array.
**skills/my_skill/logic.py** (Python implementation):
```python
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
def execute(
db_connector,
llm_provider,
rag_engine,
config: Dict[str, Any],
memory: Dict[str, Any],
**kwargs
) -> Dict[str, Any]:
"""
Skill entrypoint.
Args:
db_connector: OpenSearch/ES client
llm_provider: LLM client
rag_engine: RAG context retrieval
config: Skill-specific config from instruction.md
memory: Shared agent memory (read/write)
**kwargs: Additional context (user_query, conversation_id, etc.)
Returns:
Dict with success status and results
"""
logger.info("Executing my_skill")
# Query database
query = {
"size": 100,
"query": {"match_all": {}},
"sort": [{"@timestamp": "desc"}]
}
results = db_connector.search(index="network-*", body=query)
# Retrieve RAG context
context = rag_engine.retrieve("recent network behavior", top_k=3)
# Call LLM with context
prompt = f"""Analyze these network events:
{results['hits']['hits'][:5]}
Baseline context:
{context}
Identify anomalies."""
response = llm_provider.chat([
{"role": "system", "content": config.get("system_prompt", "")},
{"role": "user", "content": prompt}
])
# Update shared memory
memory.setdefault("my_skill_runs", []).append({
"timestamp": "2026-05-19T10:00:00Z",
"findings_count": len(results['hits']['hits'])
})
return {
"success": True,
"findings": response["content"],
"context_used": len(context)
}The skill is auto-discovered on next run. Set in to enable automatic execution.
schedule_interval_secondsinstruction.md返回包含"findings"数组的JSON。
**skills/my_skill/logic.py**(Python实现):
```python
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
def execute(
db_connector,
llm_provider,
rag_engine,
config: Dict[str, Any],
memory: Dict[str, Any],
**kwargs
) -> Dict[str, Any]:
"""
技能入口点。
参数:
db_connector: OpenSearch/ES客户端
llm_provider: LLM客户端
rag_engine: RAG上下文检索器
config: 来自instruction.md的技能特定配置
memory: 共享Agent内存(可读可写)
**kwargs: 额外上下文(用户查询、对话ID等)
返回:
包含成功状态和结果的字典
"""
logger.info("Executing my_skill")
# 查询数据库
query = {
"size": 100,
"query": {"match_all": {}},
"sort": [{"@timestamp": "desc"}]
}
results = db_connector.search(index="network-*", body=query)
# 检索RAG上下文
context = rag_engine.retrieve("recent network behavior", top_k=3)
# 携带上下文调用LLM
prompt = f"""Analyze these network events:
{results['hits']['hits'][:5]}
Baseline context:
{context}
Identify anomalies."""
response = llm_provider.chat([
{"role": "system", "content": config.get("system_prompt", "")},
{"role": "user", "content": prompt}
])
# 更新共享内存
memory.setdefault("my_skill_runs", []).append({
"timestamp": "2026-05-19T10:00:00Z",
"findings_count": len(results['hits']['hits'])
})
return {
"success": True,
"findings": response["content"],
"context_used": len(context)
}下次运行时技能会被自动发现。在中设置可启用自动执行。
instruction.mdschedule_interval_secondsBuilt-in Skills
内置技能
network_baseliner (6-hour schedule)
network_baseliner(6小时调度)
Builds behavioral baselines from network logs:
python
undefined从网络日志构建行为基线:
python
undefinedTriggered automatically every 6 hours
每6小时自动触发
Aggregates normal traffic patterns into RAG vectors
将正常流量模式聚合为RAG向量
Used by threat_analyst for context
供threat_analyst作为上下文使用
Manual dispatch:
手动触发:
python main.py dispatch network_baseliner
undefinedpython main.py dispatch network_baseliner
undefinedanomaly_triage (Manual, convertible to scheduled)
anomaly_triage(手动触发,可转为定时)
Polls OpenSearch Anomaly Detection findings:
python
undefined轮询OpenSearch异常检测结果:
python
undefinedCurrently manual dispatch:
当前为手动触发:
python main.py dispatch anomaly_triage
python main.py dispatch anomaly_triage
To enable 1-minute polling, add to skills/anomaly_triage/instruction.md:
要启用1分钟轮询,在skills/anomaly_triage/instruction.md中添加:
schedule_interval_seconds: 60
schedule_interval_seconds: 60
Escalates high-confidence anomalies to memory queue for analysis.
将高置信度异常升级到内存队列等待分析。threat_analyst (Manual, convertible to scheduled)
threat_analyst(手动触发,可转为定时)
Analyzes escalated findings with RAG context:
python
undefined结合RAG上下文分析已升级的检测结果:
python
undefinedManual threat analysis:
手动威胁分析:
python main.py dispatch threat_analyst
python main.py dispatch threat_analyst
Returns verdict with LLM reasoning:
返回包含LLM推理过程的结论:
{
{
"verdict": "malicious",
"verdict": "malicious",
"confidence": 0.85,
"confidence": 0.85,
"reasoning": "Unusual port scan pattern...",
"reasoning": "Unusual port scan pattern...",
"context_sources": ["baseline_2026-05-15", ...]
"context_sources": ["baseline_2026-05-15", ...]
}
}
undefinedundefinedopensearch_querier (Manual)
opensearch_querier(手动触发)
Executes raw database queries:
python
undefined执行原始数据库查询:
python
undefinedVia chat interface:
通过聊天界面:
"Query OpenSearch for failed logins in the last hour"
"Query OpenSearch for failed logins in the last hour"
Skill constructs and executes:
技能构建并执行查询:
GET /auth-logs-*/_search
GET /auth-logs-*/_search
{
{
"query": {
"query": {
"bool": {
"bool": {
"must": [
"must": [
{"match": {"event.outcome": "failure"}},
{"match": {"event.outcome": "failure"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
]
}
}
}
}
}
}
undefinedundefinedgeoip_lookup (Cron: Tue/Fri 2 AM UTC)
geoip_lookup(定时:UTC时间周二/周五凌晨2点)
Maintains MaxMind GeoLite2 database:
python
undefined维护MaxMind GeoLite2数据库:
python
undefinedAutomatically updates GeoIP databases
自动更新GeoIP数据库
Requires MAXMIND_LICENSE_KEY in .env
需要在.env中配置MAXMIND_LICENSE_KEY
Manual update:
手动更新:
python main.py dispatch geoip_lookup
undefinedpython main.py dispatch geoip_lookup
undefinedAPI Usage
API使用
Chat Endpoint (SSE Streaming)
聊天端点(SSE流式传输)
python
import requests
import json
url = "http://localhost:7799/chat"
payload = {
"message": "Analyze recent anomalies and check if 192.168.1.100 is malicious",
"conversation_id": "investigation_001" # Optional: for multi-turn context
}python
import requests
import json
url = "http://localhost:7799/chat"
payload = {
"message": "Analyze recent anomalies and check if 192.168.1.100 is malicious",
"conversation_id": "investigation_001" # 可选:用于多轮上下文
}Server-Sent Events stream
服务器发送事件流
response = requests.post(url, json=payload, stream=True)
for line in response.iter_lines():
if line.startswith(b"data: "):
data = json.loads(line[6:])
if data["type"] == "reasoning":
print(f"[THINK] {data['content']}")
elif data["type"] == "skill_call":
print(f"[SKILL] {data['skill_name']}: {data['reasoning']}")
elif data["type"] == "skill_result":
print(f"[RESULT] {data['summary']}")
elif data["type"] == "final":
print(f"[ANSWER] {data['content']}")undefinedresponse = requests.post(url, json=payload, stream=True)
for line in response.iter_lines():
if line.startswith(b"data: "):
data = json.loads(line[6:])
if data["type"] == "reasoning":
print(f"[思考] {data['content']}")
elif data["type"] == "skill_call":
print(f"[技能调用] {data['skill_name']}: {data['reasoning']}")
elif data["type"] == "skill_result":
print(f"[结果] {data['summary']}")
elif data["type"] == "final":
print(f"[回答] {data['content']}")undefinedDispatch Skill
触发技能
python
import requests
response = requests.post(
"http://localhost:7799/dispatch",
json={"skill_name": "threat_analyst"}
)
result = response.json()python
import requests
response = requests.post(
"http://localhost:7799/dispatch",
json={"skill_name": "threat_analyst"}
)
result = response.json(){
{
"success": true,
"success": true,
"skill": "threat_analyst",
"skill": "threat_analyst",
"result": {...},
"result": {...},
"execution_time": 2.34
"execution_time": 2.34
}
}
undefinedundefinedQuery Memory
查询内存
python
response = requests.get("http://localhost:7799/memory")
memory = response.json()python
response = requests.get("http://localhost:7799/memory")
memory = response.json(){
{
"escalated_findings": [...],
"escalated_findings": [...],
"last_baseline_run": "2026-05-19T04:00:00Z",
"last_baseline_run": "2026-05-19T04:00:00Z",
"anomaly_triage_cursor": "1234567890",
"anomaly_triage_cursor": "1234567890",
"conversation_count": 5
"conversation_count": 5
}
}
undefinedundefinedLangGraph Orchestration
LangGraph编排
SecurityClaw uses LangGraph for chat routing with a supervisor pattern:
python
undefinedSecurityClaw使用LangGraph实现带监督模式的聊天路由:
python
undefinedcore/chat_router/graph.py structure
core/chat_router/graph.py结构
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
class ChatState(TypedDict):
messages: List[Dict]
user_query: str
plan: str
skill_results: List[Dict]
final_answer: str
retry_count: int
def decide_node(state):
"""Supervisor plans which skills to invoke"""
# Analyzes query against skill manifests
# Returns plan with skill sequence
pass
def execute_node(state):
"""Executes planned skills"""
# Dispatches skills with context
# Collects results
pass
def evaluate_node(state):
"""Checks if answer is complete"""
# Validates against user query
# Triggers retry if insufficient
pass
from langgraph.graph import StateGraph
from langgraph.checkpoint.sqlite import SqliteSaver
class ChatState(TypedDict):
messages: List[Dict]
user_query: str
plan: str
skill_results: List[Dict]
final_answer: str
retry_count: int
def decide_node(state):
"""监督者规划要调用的技能"""
# 根据技能清单分析查询
# 返回包含技能执行序列的计划
pass
def execute_node(state):
"""执行规划的技能"""
# 携带上下文触发技能
# 收集结果
pass
def evaluate_node(state):
"""检查回答是否完整"""
# 根据用户查询验证结果
# 如果结果不足则触发重试
pass
Graph construction
构建图
workflow = StateGraph(ChatState)
workflow.add_node("decide", decide_node)
workflow.add_node("execute", execute_node)
workflow.add_node("evaluate", evaluate_node)
workflow.set_entry_point("decide")
workflow.add_edge("decide", "execute")
workflow.add_conditional_edges(
"evaluate",
should_continue,
{"continue": "decide", "end": END}
)
workflow = StateGraph(ChatState)
workflow.add_node("decide", decide_node)
workflow.add_node("execute", execute_node)
workflow.add_node("evaluate", evaluate_node)
workflow.set_entry_point("decide")
workflow.add_edge("decide", "execute")
workflow.add_conditional_edges(
"evaluate",
should_continue,
{"continue": "decide", "end": END}
)
Checkpoint to SQLite
检查点存储到SQLite
memory = SqliteSaver.from_conn_string("data/conversations.db")
app = workflow.compile(checkpointer=memory)
undefinedmemory = SqliteSaver.from_conn_string("data/conversations.db")
app = workflow.compile(checkpointer=memory)
undefinedCommon Patterns
常见模式
Building Custom Threat Detection
构建自定义威胁检测
python
undefinedpython
undefinedskills/custom_detector/logic.py
skills/custom_detector/logic.py
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
# 1. Query recent events
events = db_connector.search(
index="network-*",
body={
"size": 1000,
"query": {
"range": {"@timestamp": {"gte": "now-1h"}}
}
}
)
# 2. Retrieve behavioral baseline
baseline = rag_engine.retrieve(
query="normal traffic patterns last 24h",
top_k=5
)
# 3. LLM analysis with context
threats = []
for hit in events['hits']['hits']:
event = hit['_source']
prompt = f"""Event: {event}Baseline: {baseline}
Is this anomalous? Respond JSON: {{"anomalous": bool, "reason": str}}"""
response = llm_provider.chat([
{"role": "user", "content": prompt}
])
analysis = json.loads(response['content'])
if analysis['anomalous']:
threats.append({
"event": event,
"reason": analysis['reason']
})
# 4. Store findings in memory
memory.setdefault("custom_threats", []).extend(threats)
return {
"success": True,
"threats_found": len(threats),
"details": threats
}undefineddef execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
# 1. 查询近期事件
events = db_connector.search(
index="network-*",
body={
"size": 1000,
"query": {
"range": {"@timestamp": {"gte": "now-1h"}}
}
}
)
# 2. 检索行为基线
baseline = rag_engine.retrieve(
query="normal traffic patterns last 24h",
top_k=5
)
# 3. 结合上下文的LLM分析
threats = []
for hit in events['hits']['hits']:
event = hit['_source']
prompt = f"""Event: {event}Baseline: {baseline}
Is this anomalous? Respond JSON: {{"anomalous": bool, "reason": str}}"""
response = llm_provider.chat([
{"role": "user", "content": prompt}
])
analysis = json.loads(response['content'])
if analysis['anomalous']:
threats.append({
"event": event,
"reason": analysis['reason']
})
# 4. 将检测结果存储到内存
memory.setdefault("custom_threats", []).extend(threats)
return {
"success": True,
"threats_found": len(threats),
"details": threats
}undefinedEnriching with External Threat Intel
结合外部威胁情报 enrichment
python
undefinedpython
undefinedskills/ip_enricher/logic.py
skills/ip_enricher/logic.py
import os
import requests
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
suspicious_ips = kwargs.get("ip_addresses", [])
enriched = []
for ip in suspicious_ips:
# AbuseIPDB lookup
headers = {"Key": os.getenv("ABUSEIPDB_API_KEY")}
response = requests.get(
f"https://api.abuseipdb.com/api/v2/check",
params={"ipAddress": ip, "maxAgeInDays": 90},
headers=headers
)
data = response.json()
enriched.append({
"ip": ip,
"abuse_score": data.get("data", {}).get("abuseConfidenceScore", 0),
"reports": data.get("data", {}).get("totalReports", 0)
})
return {
"success": True,
"enriched_ips": enriched
}undefinedimport os
import requests
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
suspicious_ips = kwargs.get("ip_addresses", [])
enriched = []
for ip in suspicious_ips:
# AbuseIPDB查询
headers = {"Key": os.getenv("ABUSEIPDB_API_KEY")}
response = requests.get(
f"https://api.abuseipdb.com/api/v2/check",
params={"ipAddress": ip, "maxAgeInDays": 90},
headers=headers
)
data = response.json()
enriched.append({
"ip": ip,
"abuse_score": data.get("data", {}).get("abuseConfidenceScore", 0),
"reports": data.get("data", {}).get("totalReports", 0)
})
return {
"success": True,
"enriched_ips": enriched
}undefinedMulti-Skill Investigation Workflow
多技能调查工作流
python
undefinedpython
undefinedVia chat interface or API:
通过聊天界面或API:
User: "Investigate source IP 10.0.0.50 - check logs, enrich with threat intel, analyze behavior"
用户: "Investigate source IP 10.0.0.50 - check logs, enrich with threat intel, analyze behavior"
LangGraph supervisor plans:
LangGraph监督者规划:
1. opensearch_querier: fetch logs for 10.0.0.50
1. opensearch_querier: 获取10.0.0.50的日志
2. ip_enricher: check external reputation
2. ip_enricher: 检查外部声誉
3. baseline_querier: retrieve normal behavior for this IP
3. baseline_querier: 检索该IP的正常行为
4. threat_analyst: final verdict with all context
4. threat_analyst: 结合所有上下文给出最终结论
Automatic skill chaining based on manifests:
根据清单自动技能链:
- opensearch_querier provides "query_results" artifact
- opensearch_querier提供"query_results"产物
- ip_enricher requires "ip_address" entity (extracted from results)
- ip_enricher需要"ip_address"实体(从结果中提取)
- threat_analyst consumes all previous artifacts
- threat_analyst消费所有之前的产物
undefinedundefinedTroubleshooting
故障排除
Connection Issues
连接问题
bash
undefinedbash
undefinedTest OpenSearch connection
测试OpenSearch连接
curl -k -u admin:password https://localhost:9200
curl -k -u admin:password https://localhost:9200
Test Ollama
测试Ollama
Validate config
验证配置
python main.py validate-config
undefinedpython main.py validate-config
undefinedSkill Not Loading
技能未加载
bash
undefinedbash
undefinedCheck skill discovery
检查技能发现情况
python main.py list-skills
python main.py list-skills
Verify instruction.md has valid YAML frontmatter
验证instruction.md有有效的YAML前置元数据
Required fields: skill_id, display_name, version
必填字段: skill_id, display_name, version
Check logic.py has execute() function:
检查logic.py是否有execute()函数:
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
pass
undefineddef execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs):
pass
undefinedRAG Context Not Used
RAG上下文未被使用
python
undefinedpython
undefinedVerify embeddings index exists
验证嵌入索引是否存在
from core.db_connector import get_db_connector
db = get_db_connector()
indices = db.cat_indices()
from core.db_connector import get_db_connector
db = get_db_connector()
indices = db.cat_indices()
Should show: securityclaw_baselines
应显示: securityclaw_baselines
Rebuild baseline if empty
如果为空则重建基线
python main.py dispatch network_baseliner
python main.py dispatch network_baseliner
Check embedding model is running
检查嵌入模型是否运行
ollama list # Should show nomic-embed-text:latest
undefinedollama list # 应显示nomic-embed-text:latest
undefinedMemory State Issues
内存状态问题
bash
undefinedbash
undefinedReset conversation memory (keeps runtime memory)
重置对话内存(保留运行时内存)
rm data/conversations.db
rm data/conversations.db
Reset all memory (caution: loses baselines)
重置所有内存(注意:会丢失基线)
rm data/conversations.db data/runtime_memory.db
rm data/conversations.db data/runtime_memory.db
View memory structure
查看内存结构
python -c "
from core.memory import AgentMemory
memory = AgentMemory()
print(memory.get_summary())
"
undefinedpython -c "
from core.memory import AgentMemory
memory = AgentMemory()
print(memory.get_summary())
"
undefinedLLM Response Truncation
LLM响应被截断
yaml
undefinedyaml
undefinedIncrease token budget in config.yaml
在config.yaml中增加token预算
llm:
max_tokens: 32768 # Default: 16384
llm:
max_tokens: 32768 # 默认: 16384
Reduce context injection in prompts
减少提示中的上下文注入
Edit core/memory.py max_context_chars (default: 4000)
编辑core/memory.py中的max_context_chars(默认: 4000)
undefinedundefinedWeb UI Not Loading
Web UI无法加载
bash
undefinedbash
undefinedBuild frontend if dist/ missing
如果dist/缺失则构建前端
cd web
npm install
npm run build
cd web
npm install
npm run build
Check API server logs
检查API服务器日志
python main.py service
python main.py service
Should show: "API server started on http://0.0.0.0:7799"
应显示: "API server started on http://0.0.0.0:7799"
Verify CORS enabled in config.yaml
验证config.yaml中CORS已启用
api:
enable_cors: true
undefinedapi:
enable_cors: true
undefinedTesting
测试
bash
undefinedbash
undefinedRun test suite with mock providers
使用模拟供应商运行测试套件
pytest tests/ -v
pytest tests/ -v
Coverage report
覆盖率报告
pytest tests/ --cov=core --cov=skills --cov-report=html
pytest tests/ --cov=core --cov=skills --cov-report=html
Test specific skill
测试特定技能
pytest tests/test_threat_analyst.py -v
pytest tests/test_threat_analyst.py -v
Use mock OpenSearch (no real database needed)
使用模拟OpenSearch(无需真实数据库)
tests/conftest.py provides mock_db_connector fixture
tests/conftest.py提供mock_db_connector夹具
undefinedundefinedProduction Considerations
生产环境注意事项
- Resource limits: 8GB+ RAM recommended for production with multiple concurrent investigations
- Checkpoint cleanup: Prune old conversations in periodically
data/conversations.db - RAG index maintenance: Archive old baselines, rebuild quarterly for evolving network patterns
- API authentication: Add auth middleware to before exposing publicly
web/api/server.py - Secrets management: Rotate API keys in , use secret managers for production deployments
.env - Monitoring: Track skill execution times, LLM token usage, and anomaly escalation rates
- 资源限制:生产环境建议8GB+内存,以支持多并发调查
- 检查点清理:定期清理中的旧对话
data/conversations.db - RAG索引维护:归档旧基线,每季度重建以适配网络模式变化
- API认证:公开暴露前,为添加认证中间件
web/api/server.py - 密钥管理:定期轮换.env中的API密钥,生产环境使用密钥管理器
- 监控:跟踪技能执行时间、LLM token使用量和异常升级率