securityclaw-autonomous-soc-agent

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

SecurityClaw Autonomous SOC Agent

SecurityClaw 自主SOC Agent

Skill by ara.so — Security Skills collection.
SecurityClaw is a modular, skill-based autonomous Security Operations Center (SOC) agent that monitors OpenSearch/Elasticsearch data, builds RAG-based behavioral memory, and validates real-time anomalies using LLMs. It orchestrates security workflows through LangGraph, maintains conversation-based investigations, and provides both CLI and web interfaces for threat analysis.
ara.so开发的Skill——安全技能合集。
SecurityClaw是一款模块化、基于技能的自主安全运营中心(SOC)Agent,可监控OpenSearch/Elasticsearch数据,构建基于RAG的行为记忆,并使用LLM验证实时异常。它通过LangGraph编排安全工作流,维护基于对话的调查流程,并提供CLI和Web界面用于威胁分析。

Core Capabilities

核心能力

  • Skill-based architecture: Each capability is an isolated module with Python logic + LLM instruction
  • RAG behavioral memory: Vector embeddings of network baselines stored in OpenSearch
  • Anomaly detection: Scheduled 1-minute watcher polls findings and escalates threats
  • LLM-powered analysis: Threat analyst validates anomalies using retrieval-augmented context
  • LangGraph orchestration: DECIDE→EXECUTE→EVALUATE supervisor loop with SQLite checkpointing
  • Web + CLI interfaces: React UI for chat investigations, CLI for automation
  • Provider agnostic: Swap OpenSearch↔Elasticsearch, Ollama↔other LLM providers
  • 基于技能的架构:每项能力都是独立模块,包含Python逻辑+LLM指令
  • RAG行为记忆:网络基线的向量嵌入存储在OpenSearch中
  • 异常检测:定时1分钟监控器轮询检测结果并升级威胁
  • LLM驱动分析:威胁分析师使用检索增强上下文验证异常
  • LangGraph编排:DECIDE→EXECUTE→EVALUATE监督循环,搭配SQLite检查点
  • Web + CLI界面:用于聊天调查的React UI,用于自动化的CLI
  • 供应商无关:可替换OpenSearch↔Elasticsearch、Ollama↔其他LLM供应商

Installation

安装

Prerequisites

前置条件

bash
undefined
bash
undefined

Python 3.11+ required

需要Python 3.11+

python --version
python --version

Install Ollama for LLM provider

安装Ollama作为LLM供应商

curl -fsSL https://ollama.com/install.sh | sh ollama serve
curl -fsSL https://ollama.com/install.sh | sh ollama serve

Pull recommended models

拉取推荐模型

ollama pull qwen2.5:7b-instruct-q4_K_M ollama pull nomic-embed-text:latest
undefined
ollama pull qwen2.5:7b-instruct-q4_K_M ollama pull nomic-embed-text:latest
undefined

Setup

设置

bash
undefined
bash
undefined

Clone repository

克隆仓库

Create virtual environment

创建虚拟环境

python3.11 -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate
python3.11 -m venv .venv source .venv/bin/activate # Windows系统使用: .venv\Scripts\activate

Install dependencies

安装依赖

pip install -r requirements.txt
pip install -r requirements.txt

Run interactive onboarding wizard

运行交互式入职向导

python main.py onboard

The onboarding wizard configures:
- OpenSearch/Elasticsearch connection (host, port, SSL, auth)
- LLM provider (Ollama endpoint, model names)
- Optional external APIs (AbuseIPDB, VirusTotal, MaxMind GeoIP)
- Skill-specific environment variables

Outputs `config.yaml` and `.env` with validated configuration.
python main.py onboard

入职向导将配置:
- OpenSearch/Elasticsearch连接(主机、端口、SSL、认证)
- LLM供应商(Ollama端点、模型名称)
- 可选外部API(AbuseIPDB、VirusTotal、MaxMind GeoIP)
- 技能特定环境变量

输出包含验证后配置的`config.yaml`和`.env`文件。

Configuration

配置

config.yaml Structure

config.yaml结构

yaml
undefined
yaml
undefined

Database configuration

数据库配置

database: provider: opensearch # or elasticsearch host: localhost port: 9200 use_ssl: true verify_certs: false username: admin password_env: OPENSEARCH_PASSWORD # Reads from .env
database: provider: opensearch # 或elasticsearch host: localhost port: 9200 use_ssl: true verify_certs: false username: admin password_env: OPENSEARCH_PASSWORD # 从.env读取

LLM provider

LLM供应商

llm: provider: ollama base_url: http://localhost:11434 model: qwen2.5:7b-instruct-q4_K_M temperature: 0.7 max_tokens: 16384
llm: provider: ollama base_url: http://localhost:11434 model: qwen2.5:7b-instruct-q4_K_M temperature: 0.7 max_tokens: 16384

RAG engine

RAG引擎

rag: index_name: securityclaw_baselines embedding_model: nomic-embed-text:latest embedding_dimension: 768 top_k: 5
rag: index_name: securityclaw_baselines embedding_model: nomic-embed-text:latest embedding_dimension: 768 top_k: 5

API server

API服务器

api: host: 0.0.0.0 port: 7799 enable_cors: true
undefined
api: host: 0.0.0.0 port: 7799 enable_cors: true
undefined

Environment Variables (.env)

环境变量(.env)

bash
undefined
bash
undefined

Database credentials

数据库凭证

OPENSEARCH_PASSWORD=your_password_here
OPENSEARCH_PASSWORD=your_password_here

Optional external APIs

可选外部API

ABUSEIPDB_API_KEY=${ABUSEIPDB_API_KEY} VIRUSTOTAL_API_KEY=${VIRUSTOTAL_API_KEY} MAXMIND_LICENSE_KEY=${MAXMIND_LICENSE_KEY}
ABUSEIPDB_API_KEY=${ABUSEIPDB_API_KEY} VIRUSTOTAL_API_KEY=${VIRUSTOTAL_API_KEY} MAXMIND_LICENSE_KEY=${MAXMIND_LICENSE_KEY}

Skill-specific variables (discovered by onboard command)

技能特定变量(由onboard命令自动识别)

ANOMALY_TRIAGE_THRESHOLD=0.7
undefined
ANOMALY_TRIAGE_THRESHOLD=0.7
undefined

CLI Commands

CLI命令

Service Management

服务管理

bash
undefined
bash
undefined

Start full service (scheduler + web UI + API)

启动完整服务(调度器 + Web UI + API)

python main.py service
python main.py service

Access web UI at http://localhost:5173

访问Web UI地址:http://localhost:5173

Start API only (no background scheduler)

仅启动API(无后台调度器)

SECURITYCLAW_API_ONLY=1 python main.py service
SECURITYCLAW_API_ONLY=1 python main.py service

Start scheduler loop only (no web interface)

仅启动调度循环(无Web界面)

python main.py run
python main.py run

Web development mode (frontend with hot reload)

Web开发模式(前端热重载)

python main.py web-dev
undefined
python main.py web-dev
undefined

Skill Operations

技能操作

bash
undefined
bash
undefined

List all loaded skills and their schedules

列出所有已加载技能及其调度计划

python main.py list-skills
python main.py list-skills

Manually dispatch a skill once

手动触发一次技能执行

python main.py dispatch network_baseliner python main.py dispatch threat_analyst
python main.py dispatch network_baseliner python main.py dispatch threat_analyst

Interactive chat interface (CLI)

交互式聊天界面(CLI)

python main.py chat
python main.py chat

View agent memory snapshot

查看Agent内存快照

python main.py status
undefined
python main.py status
undefined

Configuration Management

配置管理

bash
undefined
bash
undefined

Re-run onboarding wizard

重新运行入职向导

python main.py onboard
python main.py onboard

Validate current configuration

验证当前配置

python main.py validate-config
undefined
python main.py validate-config
undefined

Skill Development

技能开发

Creating a New Skill

创建新技能

Skills are directories in
skills/
with two required files:
skills/my_skill/instruction.md (LLM guidance + metadata):
markdown
---
skill_id: my_skill
display_name: My Custom Skill
version: 1.0.0
schedule_interval_seconds: 3600  # Optional: for scheduled execution
capabilities:
  - custom_analysis
prerequisites:
  - network_data
required_entities:
  - ip_address
artifacts_produced:
  - analysis_report
---
技能是
skills/
目录下的文件夹,包含两个必填文件:
skills/my_skill/instruction.md(LLM指引+元数据):
markdown
---
skill_id: my_skill
display_name: My Custom Skill
version: 1.0.0
schedule_interval_seconds: 3600  # 可选:用于定时执行
capabilities:
  - custom_analysis
prerequisites:
  - network_data
required_entities:
  - ip_address
artifacts_produced:
  - analysis_report
---

System Prompt for My Skill

我的技能系统提示词

You are a security analyst performing custom analysis.
你是一名执行自定义分析的安全分析师。

Task

任务

Analyze network data and produce findings.
分析网络数据并生成检测结果。

Output Format

输出格式

Return JSON with "findings" array.

**skills/my_skill/logic.py** (Python implementation):

```python
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)

def execute(
    db_connector,
    llm_provider,
    rag_engine,
    config: Dict[str, Any],
    memory: Dict[str, Any],
    **kwargs
) -> Dict[str, Any]:
    """
    Skill entrypoint.
    
    Args:
        db_connector: OpenSearch/ES client
        llm_provider: LLM client
        rag_engine: RAG context retrieval
        config: Skill-specific config from instruction.md
        memory: Shared agent memory (read/write)
        **kwargs: Additional context (user_query, conversation_id, etc.)
    
    Returns:
        Dict with success status and results
    """
    logger.info("Executing my_skill")
    
    # Query database
    query = {
        "size": 100,
        "query": {"match_all": {}},
        "sort": [{"@timestamp": "desc"}]
    }
    results = db_connector.search(index="network-*", body=query)
    
    # Retrieve RAG context
    context = rag_engine.retrieve("recent network behavior", top_k=3)
    
    # Call LLM with context
    prompt = f"""Analyze these network events:
{results['hits']['hits'][:5]}

Baseline context:
{context}

Identify anomalies."""
    
    response = llm_provider.chat([
        {"role": "system", "content": config.get("system_prompt", "")},
        {"role": "user", "content": prompt}
    ])
    
    # Update shared memory
    memory.setdefault("my_skill_runs", []).append({
        "timestamp": "2026-05-19T10:00:00Z",
        "findings_count": len(results['hits']['hits'])
    })
    
    return {
        "success": True,
        "findings": response["content"],
        "context_used": len(context)
    }
The skill is auto-discovered on next run. Set
schedule_interval_seconds
in
instruction.md
to enable automatic execution.
返回包含"findings"数组的JSON。

**skills/my_skill/logic.py**(Python实现):

```python
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)

def execute(
    db_connector,
    llm_provider,
    rag_engine,
    config: Dict[str, Any],
    memory: Dict[str, Any],
    **kwargs
) -> Dict[str, Any]:
    """
    技能入口点。
    
    参数:
        db_connector: OpenSearch/ES客户端
        llm_provider: LLM客户端
        rag_engine: RAG上下文检索器
        config: 来自instruction.md的技能特定配置
        memory: 共享Agent内存(可读可写)
        **kwargs: 额外上下文(用户查询、对话ID等)
    
    返回:
        包含成功状态和结果的字典
    """
    logger.info("Executing my_skill")
    
    # 查询数据库
    query = {
        "size": 100,
        "query": {"match_all": {}},
        "sort": [{"@timestamp": "desc"}]
    }
    results = db_connector.search(index="network-*", body=query)
    
    # 检索RAG上下文
    context = rag_engine.retrieve("recent network behavior", top_k=3)
    
    # 携带上下文调用LLM
    prompt = f"""Analyze these network events:
{results['hits']['hits'][:5]}

Baseline context:
{context}

Identify anomalies."""
    
    response = llm_provider.chat([
        {"role": "system", "content": config.get("system_prompt", "")},
        {"role": "user", "content": prompt}
    ])
    
    # 更新共享内存
    memory.setdefault("my_skill_runs", []).append({
        "timestamp": "2026-05-19T10:00:00Z",
        "findings_count": len(results['hits']['hits'])
    })
    
    return {
        "success": True,
        "findings": response["content"],
        "context_used": len(context)
    }
下次运行时技能会被自动发现。在
instruction.md
中设置
schedule_interval_seconds
可启用自动执行。

Built-in Skills

内置技能

network_baseliner (6-hour schedule)

network_baseliner(6小时调度)

Builds behavioral baselines from network logs:
python
undefined
从网络日志构建行为基线:
python
undefined

Triggered automatically every 6 hours

每6小时自动触发

Aggregates normal traffic patterns into RAG vectors

将正常流量模式聚合为RAG向量

Used by threat_analyst for context

供threat_analyst作为上下文使用

Manual dispatch:

手动触发:

python main.py dispatch network_baseliner
undefined
python main.py dispatch network_baseliner
undefined

anomaly_triage (Manual, convertible to scheduled)

anomaly_triage(手动触发,可转为定时)

Polls OpenSearch Anomaly Detection findings:
python
undefined
轮询OpenSearch异常检测结果:
python
undefined

Currently manual dispatch:

当前为手动触发:

python main.py dispatch anomaly_triage
python main.py dispatch anomaly_triage

To enable 1-minute polling, add to skills/anomaly_triage/instruction.md:

要启用1分钟轮询,在skills/anomaly_triage/instruction.md中添加:

schedule_interval_seconds: 60

schedule_interval_seconds: 60


Escalates high-confidence anomalies to memory queue for analysis.

将高置信度异常升级到内存队列等待分析。

threat_analyst (Manual, convertible to scheduled)

threat_analyst(手动触发,可转为定时)

Analyzes escalated findings with RAG context:
python
undefined
结合RAG上下文分析已升级的检测结果:
python
undefined

Manual threat analysis:

手动威胁分析:

python main.py dispatch threat_analyst
python main.py dispatch threat_analyst

Returns verdict with LLM reasoning:

返回包含LLM推理过程的结论:

{

{

"verdict": "malicious",

"verdict": "malicious",

"confidence": 0.85,

"confidence": 0.85,

"reasoning": "Unusual port scan pattern...",

"reasoning": "Unusual port scan pattern...",

"context_sources": ["baseline_2026-05-15", ...]

"context_sources": ["baseline_2026-05-15", ...]

}

}

undefined
undefined

opensearch_querier (Manual)

opensearch_querier(手动触发)

Executes raw database queries:
python
undefined
执行原始数据库查询:
python
undefined

Via chat interface:

通过聊天界面:

"Query OpenSearch for failed logins in the last hour"

"Query OpenSearch for failed logins in the last hour"

Skill constructs and executes:

技能构建并执行查询:

GET /auth-logs-*/_search

GET /auth-logs-*/_search

{

{

"query": {

"query": {

"bool": {

"bool": {

"must": [

"must": [

{"match": {"event.outcome": "failure"}},

{"match": {"event.outcome": "failure"}},

{"range": {"@timestamp": {"gte": "now-1h"}}}

{"range": {"@timestamp": {"gte": "now-1h"}}}

]

]

}

}

}

}

}

}

undefined
undefined

geoip_lookup (Cron: Tue/Fri 2 AM UTC)

geoip_lookup(定时:UTC时间周二/周五凌晨2点)

Maintains MaxMind GeoLite2 database:
python
undefined
维护MaxMind GeoLite2数据库:
python
undefined

Automatically updates GeoIP databases

自动更新GeoIP数据库

Requires MAXMIND_LICENSE_KEY in .env

需要在.env中配置MAXMIND_LICENSE_KEY

Manual update:

手动更新:

python main.py dispatch geoip_lookup
undefined
python main.py dispatch geoip_lookup
undefined

API Usage

API使用

Chat Endpoint (SSE Streaming)

聊天端点(SSE流式传输)

python
import requests
import json

url = "http://localhost:7799/chat"
payload = {
    "message": "Analyze recent anomalies and check if 192.168.1.100 is malicious",
    "conversation_id": "investigation_001"  # Optional: for multi-turn context
}
python
import requests
import json

url = "http://localhost:7799/chat"
payload = {
    "message": "Analyze recent anomalies and check if 192.168.1.100 is malicious",
    "conversation_id": "investigation_001"  # 可选:用于多轮上下文
}

Server-Sent Events stream

服务器发送事件流

response = requests.post(url, json=payload, stream=True) for line in response.iter_lines(): if line.startswith(b"data: "): data = json.loads(line[6:])
    if data["type"] == "reasoning":
        print(f"[THINK] {data['content']}")
    elif data["type"] == "skill_call":
        print(f"[SKILL] {data['skill_name']}: {data['reasoning']}")
    elif data["type"] == "skill_result":
        print(f"[RESULT] {data['summary']}")
    elif data["type"] == "final":
        print(f"[ANSWER] {data['content']}")
undefined
response = requests.post(url, json=payload, stream=True) for line in response.iter_lines(): if line.startswith(b"data: "): data = json.loads(line[6:])
    if data["type"] == "reasoning":
        print(f"[思考] {data['content']}")
    elif data["type"] == "skill_call":
        print(f"[技能调用] {data['skill_name']}: {data['reasoning']}")
    elif data["type"] == "skill_result":
        print(f"[结果] {data['summary']}")
    elif data["type"] == "final":
        print(f"[回答] {data['content']}")
undefined

Dispatch Skill

触发技能

python
import requests

response = requests.post(
    "http://localhost:7799/dispatch",
    json={"skill_name": "threat_analyst"}
)

result = response.json()
python
import requests

response = requests.post(
    "http://localhost:7799/dispatch",
    json={"skill_name": "threat_analyst"}
)

result = response.json()

{

{

"success": true,

"success": true,

"skill": "threat_analyst",

"skill": "threat_analyst",

"result": {...},

"result": {...},

"execution_time": 2.34

"execution_time": 2.34

}

}

undefined
undefined

Query Memory

查询内存

python
response = requests.get("http://localhost:7799/memory")
memory = response.json()
python
response = requests.get("http://localhost:7799/memory")
memory = response.json()

{

{

"escalated_findings": [...],

"escalated_findings": [...],

"last_baseline_run": "2026-05-19T04:00:00Z",

"last_baseline_run": "2026-05-19T04:00:00Z",

"anomaly_triage_cursor": "1234567890",

"anomaly_triage_cursor": "1234567890",

"conversation_count": 5

"conversation_count": 5

}

}

undefined
undefined

LangGraph Orchestration

LangGraph编排

SecurityClaw uses LangGraph for chat routing with a supervisor pattern:
python
undefined
SecurityClaw使用LangGraph实现带监督模式的聊天路由:
python
undefined

core/chat_router/graph.py structure

core/chat_router/graph.py结构

from langgraph.graph import StateGraph from langgraph.checkpoint.sqlite import SqliteSaver
class ChatState(TypedDict): messages: List[Dict] user_query: str plan: str skill_results: List[Dict] final_answer: str retry_count: int
def decide_node(state): """Supervisor plans which skills to invoke""" # Analyzes query against skill manifests # Returns plan with skill sequence pass
def execute_node(state): """Executes planned skills""" # Dispatches skills with context # Collects results pass
def evaluate_node(state): """Checks if answer is complete""" # Validates against user query # Triggers retry if insufficient pass
from langgraph.graph import StateGraph from langgraph.checkpoint.sqlite import SqliteSaver
class ChatState(TypedDict): messages: List[Dict] user_query: str plan: str skill_results: List[Dict] final_answer: str retry_count: int
def decide_node(state): """监督者规划要调用的技能""" # 根据技能清单分析查询 # 返回包含技能执行序列的计划 pass
def execute_node(state): """执行规划的技能""" # 携带上下文触发技能 # 收集结果 pass
def evaluate_node(state): """检查回答是否完整""" # 根据用户查询验证结果 # 如果结果不足则触发重试 pass

Graph construction

构建图

workflow = StateGraph(ChatState) workflow.add_node("decide", decide_node) workflow.add_node("execute", execute_node) workflow.add_node("evaluate", evaluate_node)
workflow.set_entry_point("decide") workflow.add_edge("decide", "execute") workflow.add_conditional_edges( "evaluate", should_continue, {"continue": "decide", "end": END} )
workflow = StateGraph(ChatState) workflow.add_node("decide", decide_node) workflow.add_node("execute", execute_node) workflow.add_node("evaluate", evaluate_node)
workflow.set_entry_point("decide") workflow.add_edge("decide", "execute") workflow.add_conditional_edges( "evaluate", should_continue, {"continue": "decide", "end": END} )

Checkpoint to SQLite

检查点存储到SQLite

memory = SqliteSaver.from_conn_string("data/conversations.db") app = workflow.compile(checkpointer=memory)
undefined
memory = SqliteSaver.from_conn_string("data/conversations.db") app = workflow.compile(checkpointer=memory)
undefined

Common Patterns

常见模式

Building Custom Threat Detection

构建自定义威胁检测

python
undefined
python
undefined

skills/custom_detector/logic.py

skills/custom_detector/logic.py

def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs): # 1. Query recent events events = db_connector.search( index="network-*", body={ "size": 1000, "query": { "range": {"@timestamp": {"gte": "now-1h"}} } } )
# 2. Retrieve behavioral baseline
baseline = rag_engine.retrieve(
    query="normal traffic patterns last 24h",
    top_k=5
)

# 3. LLM analysis with context
threats = []
for hit in events['hits']['hits']:
    event = hit['_source']
    
    prompt = f"""Event: {event}
Baseline: {baseline}
Is this anomalous? Respond JSON: {{"anomalous": bool, "reason": str}}"""
    response = llm_provider.chat([
        {"role": "user", "content": prompt}
    ])
    
    analysis = json.loads(response['content'])
    if analysis['anomalous']:
        threats.append({
            "event": event,
            "reason": analysis['reason']
        })

# 4. Store findings in memory
memory.setdefault("custom_threats", []).extend(threats)

return {
    "success": True,
    "threats_found": len(threats),
    "details": threats
}
undefined
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs): # 1. 查询近期事件 events = db_connector.search( index="network-*", body={ "size": 1000, "query": { "range": {"@timestamp": {"gte": "now-1h"}} } } )
# 2. 检索行为基线
baseline = rag_engine.retrieve(
    query="normal traffic patterns last 24h",
    top_k=5
)

# 3. 结合上下文的LLM分析
threats = []
for hit in events['hits']['hits']:
    event = hit['_source']
    
    prompt = f"""Event: {event}
Baseline: {baseline}
Is this anomalous? Respond JSON: {{"anomalous": bool, "reason": str}}"""
    response = llm_provider.chat([
        {"role": "user", "content": prompt}
    ])
    
    analysis = json.loads(response['content'])
    if analysis['anomalous']:
        threats.append({
            "event": event,
            "reason": analysis['reason']
        })

# 4. 将检测结果存储到内存
memory.setdefault("custom_threats", []).extend(threats)

return {
    "success": True,
    "threats_found": len(threats),
    "details": threats
}
undefined

Enriching with External Threat Intel

结合外部威胁情报 enrichment

python
undefined
python
undefined

skills/ip_enricher/logic.py

skills/ip_enricher/logic.py

import os import requests
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs): suspicious_ips = kwargs.get("ip_addresses", [])
enriched = []
for ip in suspicious_ips:
    # AbuseIPDB lookup
    headers = {"Key": os.getenv("ABUSEIPDB_API_KEY")}
    response = requests.get(
        f"https://api.abuseipdb.com/api/v2/check",
        params={"ipAddress": ip, "maxAgeInDays": 90},
        headers=headers
    )
    
    data = response.json()
    enriched.append({
        "ip": ip,
        "abuse_score": data.get("data", {}).get("abuseConfidenceScore", 0),
        "reports": data.get("data", {}).get("totalReports", 0)
    })

return {
    "success": True,
    "enriched_ips": enriched
}
undefined
import os import requests
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs): suspicious_ips = kwargs.get("ip_addresses", [])
enriched = []
for ip in suspicious_ips:
    # AbuseIPDB查询
    headers = {"Key": os.getenv("ABUSEIPDB_API_KEY")}
    response = requests.get(
        f"https://api.abuseipdb.com/api/v2/check",
        params={"ipAddress": ip, "maxAgeInDays": 90},
        headers=headers
    )
    
    data = response.json()
    enriched.append({
        "ip": ip,
        "abuse_score": data.get("data", {}).get("abuseConfidenceScore", 0),
        "reports": data.get("data", {}).get("totalReports", 0)
    })

return {
    "success": True,
    "enriched_ips": enriched
}
undefined

Multi-Skill Investigation Workflow

多技能调查工作流

python
undefined
python
undefined

Via chat interface or API:

通过聊天界面或API:

User: "Investigate source IP 10.0.0.50 - check logs, enrich with threat intel, analyze behavior"

用户: "Investigate source IP 10.0.0.50 - check logs, enrich with threat intel, analyze behavior"

LangGraph supervisor plans:

LangGraph监督者规划:

1. opensearch_querier: fetch logs for 10.0.0.50

1. opensearch_querier: 获取10.0.0.50的日志

2. ip_enricher: check external reputation

2. ip_enricher: 检查外部声誉

3. baseline_querier: retrieve normal behavior for this IP

3. baseline_querier: 检索该IP的正常行为

4. threat_analyst: final verdict with all context

4. threat_analyst: 结合所有上下文给出最终结论

Automatic skill chaining based on manifests:

根据清单自动技能链:

- opensearch_querier provides "query_results" artifact

- opensearch_querier提供"query_results"产物

- ip_enricher requires "ip_address" entity (extracted from results)

- ip_enricher需要"ip_address"实体(从结果中提取)

- threat_analyst consumes all previous artifacts

- threat_analyst消费所有之前的产物

undefined
undefined

Troubleshooting

故障排除

Connection Issues

连接问题

bash
undefined
bash
undefined

Test OpenSearch connection

测试OpenSearch连接

curl -k -u admin:password https://localhost:9200
curl -k -u admin:password https://localhost:9200

Test Ollama

测试Ollama

Validate config

验证配置

python main.py validate-config
undefined
python main.py validate-config
undefined

Skill Not Loading

技能未加载

bash
undefined
bash
undefined

Check skill discovery

检查技能发现情况

python main.py list-skills
python main.py list-skills

Verify instruction.md has valid YAML frontmatter

验证instruction.md有有效的YAML前置元数据

Required fields: skill_id, display_name, version

必填字段: skill_id, display_name, version

Check logic.py has execute() function:

检查logic.py是否有execute()函数:

def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs): pass
undefined
def execute(db_connector, llm_provider, rag_engine, config, memory, **kwargs): pass
undefined

RAG Context Not Used

RAG上下文未被使用

python
undefined
python
undefined

Verify embeddings index exists

验证嵌入索引是否存在

from core.db_connector import get_db_connector db = get_db_connector() indices = db.cat_indices()
from core.db_connector import get_db_connector db = get_db_connector() indices = db.cat_indices()

Should show: securityclaw_baselines

应显示: securityclaw_baselines

Rebuild baseline if empty

如果为空则重建基线

python main.py dispatch network_baseliner
python main.py dispatch network_baseliner

Check embedding model is running

检查嵌入模型是否运行

ollama list # Should show nomic-embed-text:latest
undefined
ollama list # 应显示nomic-embed-text:latest
undefined

Memory State Issues

内存状态问题

bash
undefined
bash
undefined

Reset conversation memory (keeps runtime memory)

重置对话内存(保留运行时内存)

rm data/conversations.db
rm data/conversations.db

Reset all memory (caution: loses baselines)

重置所有内存(注意:会丢失基线)

rm data/conversations.db data/runtime_memory.db
rm data/conversations.db data/runtime_memory.db

View memory structure

查看内存结构

python -c " from core.memory import AgentMemory memory = AgentMemory() print(memory.get_summary()) "
undefined
python -c " from core.memory import AgentMemory memory = AgentMemory() print(memory.get_summary()) "
undefined

LLM Response Truncation

LLM响应被截断

yaml
undefined
yaml
undefined

Increase token budget in config.yaml

在config.yaml中增加token预算

llm: max_tokens: 32768 # Default: 16384
llm: max_tokens: 32768 # 默认: 16384

Reduce context injection in prompts

减少提示中的上下文注入

Edit core/memory.py max_context_chars (default: 4000)

编辑core/memory.py中的max_context_chars(默认: 4000)

undefined
undefined

Web UI Not Loading

Web UI无法加载

bash
undefined
bash
undefined

Build frontend if dist/ missing

如果dist/缺失则构建前端

cd web npm install npm run build
cd web npm install npm run build

Check API server logs

检查API服务器日志

python main.py service
python main.py service

Should show: "API server started on http://0.0.0.0:7799"

应显示: "API server started on http://0.0.0.0:7799"

Verify CORS enabled in config.yaml

验证config.yaml中CORS已启用

api: enable_cors: true
undefined
api: enable_cors: true
undefined

Testing

测试

bash
undefined
bash
undefined

Run test suite with mock providers

使用模拟供应商运行测试套件

pytest tests/ -v
pytest tests/ -v

Coverage report

覆盖率报告

pytest tests/ --cov=core --cov=skills --cov-report=html
pytest tests/ --cov=core --cov=skills --cov-report=html

Test specific skill

测试特定技能

pytest tests/test_threat_analyst.py -v
pytest tests/test_threat_analyst.py -v

Use mock OpenSearch (no real database needed)

使用模拟OpenSearch(无需真实数据库)

tests/conftest.py provides mock_db_connector fixture

tests/conftest.py提供mock_db_connector夹具

undefined
undefined

Production Considerations

生产环境注意事项

  • Resource limits: 8GB+ RAM recommended for production with multiple concurrent investigations
  • Checkpoint cleanup: Prune old conversations in
    data/conversations.db
    periodically
  • RAG index maintenance: Archive old baselines, rebuild quarterly for evolving network patterns
  • API authentication: Add auth middleware to
    web/api/server.py
    before exposing publicly
  • Secrets management: Rotate API keys in
    .env
    , use secret managers for production deployments
  • Monitoring: Track skill execution times, LLM token usage, and anomaly escalation rates
  • 资源限制:生产环境建议8GB+内存,以支持多并发调查
  • 检查点清理:定期清理
    data/conversations.db
    中的旧对话
  • RAG索引维护:归档旧基线,每季度重建以适配网络模式变化
  • API认证:公开暴露前,为
    web/api/server.py
    添加认证中间件
  • 密钥管理:定期轮换.env中的API密钥,生产环境使用密钥管理器
  • 监控:跟踪技能执行时间、LLM token使用量和异常升级率