implementing-dragos-platform-for-ot-monitoring
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseImplementing Dragos Platform for OT Monitoring
部署Dragos Platform用于OT监控
When to Use
适用场景
- When deploying an OT-specific network detection and response (NDR) solution for industrial environments
- When needing threat intelligence-driven detection against known ICS threat groups (VOLTZITE, CHERNOVITE, KAMACITE)
- When building an OT SOC capability with purpose-built industrial security tooling
- When requiring asset discovery and vulnerability management alongside threat detection in a single platform
- When integrating OT security monitoring with an enterprise SIEM (Splunk, Sentinel, QRadar)
Do not use for IT-only network monitoring without ICS components, for endpoint detection and response (EDR) on OT workstations, or for environments standardized on Claroty or Nozomi (see respective skills).
- 为工业环境部署特定于OT的网络检测与响应(NDR)解决方案时
- 需要基于威胁情报检测已知ICS威胁组织(VOLTZITE、CHERNOVITE、KAMACITE)时
- 利用专为工业安全打造的工具构建OT SOC能力时
- 需要在单一平台中同时实现资产发现、漏洞管理与威胁检测时
- 将OT安全监控与企业SIEM(Splunk、Sentinel、QRadar)集成时
不适用场景:仅针对无ICS组件的IT网络监控、OT工作站上的终端检测与响应(EDR),或采用Claroty或Nozomi标准的环境(请查看对应技能文档)。
Prerequisites
前置条件
- Dragos Platform license and deployment package
- Network TAP or SPAN port at OT network boundaries (one sensor per monitored segment)
- Dragos sensor hardware (physical appliance) or virtual appliance meeting minimum specifications
- Firewall rules allowing sensor-to-Dragos-SiteStore communication (encrypted, outbound only from OT)
- Dragos Knowledge Pack subscription for threat intelligence updates
- Dragos Platform许可证与部署包
- OT网络边界处的网络TAP或SPAN端口(每个受监控网段一个传感器)
- 符合最低规格的Dragos传感器硬件(物理设备)或虚拟设备
- 允许传感器与Dragos-SiteStore通信的防火墙规则(仅从OT环境发起加密出站通信)
- 用于威胁情报更新的Dragos Knowledge Pack订阅
Workflow
实施流程
Step 1: Deploy Dragos Sensors and Configure Monitoring
步骤1:部署Dragos传感器并配置监控
python
#!/usr/bin/env python3
"""Dragos Platform Deployment Validator and Integration Tool.
Validates Dragos sensor deployment, checks connectivity, and
configures integration with enterprise SIEM for OT alert forwarding.
"""
import json
import sys
import csv
from datetime import datetime
from typing import Optional, List, Dict
try:
import requests
except ImportError:
print("Install requests: pip install requests")
sys.exit(1)
class DragosPlatformManager:
"""Interface with Dragos Platform API for OT monitoring management."""
def __init__(self, base_url: str, api_key: str, api_secret: str, verify_ssl: bool = True):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"API-Key": api_key,
"API-Secret": api_secret,
"Content-Type": "application/json",
})
self.session.verify = verify_ssl
def get_sensors(self) -> List[Dict]:
"""Retrieve all deployed Dragos sensors and their status."""
resp = self.session.get(f"{self.base_url}/api/v1/sensors")
resp.raise_for_status()
return resp.json().get("sensors", [])
def get_assets(self, asset_type: Optional[str] = None) -> List[Dict]:
"""Retrieve OT assets discovered by Dragos."""
params = {}
if asset_type:
params["type"] = asset_type
resp = self.session.get(f"{self.base_url}/api/v1/assets", params=params)
resp.raise_for_status()
return resp.json().get("assets", [])
def get_notifications(self, severity: str = "high", limit: int = 50) -> List[Dict]:
"""Retrieve threat detection notifications."""
params = {"min_severity": severity, "limit": limit}
resp = self.session.get(f"{self.base_url}/api/v1/notifications", params=params)
resp.raise_for_status()
return resp.json().get("notifications", [])
def get_vulnerabilities(self, severity: str = "critical") -> List[Dict]:
"""Retrieve OT vulnerabilities with Dragos-specific context."""
params = {"min_severity": severity}
resp = self.session.get(f"{self.base_url}/api/v1/vulnerabilities", params=params)
resp.raise_for_status()
return resp.json().get("vulnerabilities", [])
def get_threat_groups(self) -> List[Dict]:
"""Retrieve tracked ICS threat group activity relevant to the environment."""
resp = self.session.get(f"{self.base_url}/api/v1/threat-groups")
resp.raise_for_status()
return resp.json().get("threat_groups", [])
def validate_deployment(self):
"""Validate sensor deployment health and coverage."""
sensors = self.get_sensors()
assets = self.get_assets()
print(f"\n{'='*65}")
print("DRAGOS PLATFORM DEPLOYMENT VALIDATION")
print(f"{'='*65}")
print(f"Validation Time: {datetime.now().isoformat()}")
print(f"\n--- SENSOR STATUS ---")
healthy_sensors = 0
for sensor in sensors:
status = sensor.get("status", "unknown")
icon = "[OK]" if status == "connected" else "[!!]"
print(f" {icon} {sensor.get('name', 'Unknown')} | Status: {status}")
print(f" IP: {sensor.get('ip_address')} | Segment: {sensor.get('monitored_segment')}")
print(f" Last Seen: {sensor.get('last_seen')} | Packets/sec: {sensor.get('pps', 0)}")
print(f" Knowledge Pack: {sensor.get('knowledge_pack_version', 'N/A')}")
if status == "connected":
healthy_sensors += 1
print(f"\n Sensor Health: {healthy_sensors}/{len(sensors)} operational")
print(f"\n--- ASSET VISIBILITY ---")
print(f" Total Assets Discovered: {len(assets)}")
asset_types = {}
for asset in assets:
atype = asset.get("type", "Unknown")
asset_types[atype] = asset_types.get(atype, 0) + 1
for atype, count in sorted(asset_types.items(), key=lambda x: -x[1]):
print(f" {atype}: {count}")
protocols = set()
for asset in assets:
protocols.update(asset.get("protocols", []))
print(f" Protocols Observed: {', '.join(sorted(protocols))}")
print(f"\n--- THREAT INTELLIGENCE ---")
groups = self.get_threat_groups()
print(f" Relevant Threat Groups: {len(groups)}")
for group in groups:
print(f" - {group.get('name')}: {group.get('description', '')[:80]}")
print(f" Targets: {', '.join(group.get('target_sectors', []))}")
print(f" Activity Level: {group.get('activity_level', 'Unknown')}")
def generate_siem_integration_config(self, siem_type: str = "splunk"):
"""Generate SIEM integration configuration for Dragos alerts."""
configs = {
"splunk": {
"syslog_format": "CEF",
"syslog_port": 514,
"severity_mapping": {
"critical": 10,
"high": 7,
"medium": 5,
"low": 3,
"info": 1,
},
"index": "ot_security",
"sourcetype": "dragos:notification",
"fields": [
"notification_id", "severity", "category", "source_ip",
"destination_ip", "asset_name", "protocol", "description",
"mitre_ics_technique", "threat_group",
],
},
"sentinel": {
"connector_type": "Syslog-CEF",
"workspace_id": "<workspace-id>",
"log_analytics_table": "DragosOTAlerts_CL",
"severity_mapping": {
"critical": "High",
"high": "High",
"medium": "Medium",
"low": "Low",
"info": "Informational",
},
},
}
config = configs.get(siem_type, configs["splunk"])
print(f"\n--- {siem_type.upper()} INTEGRATION CONFIG ---")
print(json.dumps(config, indent=2))
return config
if __name__ == "__main__":
manager = DragosPlatformManager(
base_url="https://dragos-sitestore.plant.local",
api_key="your-api-key",
api_secret="your-api-secret",
verify_ssl=True,
)
manager.validate_deployment()
manager.generate_siem_integration_config("splunk")
print(f"\n--- RECENT HIGH-SEVERITY NOTIFICATIONS ---")
notifications = manager.get_notifications(severity="high", limit=10)
for n in notifications:
print(f" [{n.get('severity', '').upper()}] {n.get('title', 'No title')}")
print(f" Category: {n.get('category')} | Time: {n.get('timestamp')}")
print(f" Assets: {', '.join(n.get('affected_assets', []))}")
print(f" MITRE ICS: {n.get('mitre_technique', 'N/A')}")python
#!/usr/bin/env python3
"""Dragos Platform Deployment Validator and Integration Tool.
Validates Dragos sensor deployment, checks connectivity, and
configures integration with enterprise SIEM for OT alert forwarding.
"""
import json
import sys
import csv
from datetime import datetime
from typing import Optional, List, Dict
try:
import requests
except ImportError:
print("Install requests: pip install requests")
sys.exit(1)
class DragosPlatformManager:
"""Interface with Dragos Platform API for OT monitoring management."""
def __init__(self, base_url: str, api_key: str, api_secret: str, verify_ssl: bool = True):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"API-Key": api_key,
"API-Secret": api_secret,
"Content-Type": "application/json",
})
self.session.verify = verify_ssl
def get_sensors(self) -> List[Dict]:
"""Retrieve all deployed Dragos sensors and their status."""
resp = self.session.get(f"{self.base_url}/api/v1/sensors")
resp.raise_for_status()
return resp.json().get("sensors", [])
def get_assets(self, asset_type: Optional[str] = None) -> List[Dict]:
"""Retrieve OT assets discovered by Dragos."""
params = {}
if asset_type:
params["type"] = asset_type
resp = self.session.get(f"{self.base_url}/api/v1/assets", params=params)
resp.raise_for_status()
return resp.json().get("assets", [])
def get_notifications(self, severity: str = "high", limit: int = 50) -> List[Dict]:
"""Retrieve threat detection notifications."""
params = {"min_severity": severity, "limit": limit}
resp = self.session.get(f"{self.base_url}/api/v1/notifications", params=params)
resp.raise_for_status()
return resp.json().get("notifications", [])
def get_vulnerabilities(self, severity: str = "critical") -> List[Dict]:
"""Retrieve OT vulnerabilities with Dragos-specific context."""
params = {"min_severity": severity}
resp = self.session.get(f"{self.base_url}/api/v1/vulnerabilities", params=params)
resp.raise_for_status()
return resp.json().get("vulnerabilities", [])
def get_threat_groups(self) -> List[Dict]:
"""Retrieve tracked ICS threat group activity relevant to the environment."""
resp = self.session.get(f"{self.base_url}/api/v1/threat-groups")
resp.raise_for_status()
return resp.json().get("threat_groups", [])
def validate_deployment(self):
"""Validate sensor deployment health and coverage."""
sensors = self.get_sensors()
assets = self.get_assets()
print(f"\n{'='*65}")
print("DRAGOS PLATFORM DEPLOYMENT VALIDATION")
print(f"{'='*65}")
print(f"Validation Time: {datetime.now().isoformat()}")
print(f"\n--- SENSOR STATUS ---")
healthy_sensors = 0
for sensor in sensors:
status = sensor.get("status", "unknown")
icon = "[OK]" if status == "connected" else "[!!]"
print(f" {icon} {sensor.get('name', 'Unknown')} | Status: {status}")
print(f" IP: {sensor.get('ip_address')} | Segment: {sensor.get('monitored_segment')}")
print(f" Last Seen: {sensor.get('last_seen')} | Packets/sec: {sensor.get('pps', 0)}")
print(f" Knowledge Pack: {sensor.get('knowledge_pack_version', 'N/A')}")
if status == "connected":
healthy_sensors += 1
print(f"\n Sensor Health: {healthy_sensors}/{len(sensors)} operational")
print(f"\n--- ASSET VISIBILITY ---")
print(f" Total Assets Discovered: {len(assets)}")
asset_types = {}
for asset in assets:
atype = asset.get("type", "Unknown")
asset_types[atype] = asset_types.get(atype, 0) + 1
for atype, count in sorted(asset_types.items(), key=lambda x: -x[1]):
print(f" {atype}: {count}")
protocols = set()
for asset in assets:
protocols.update(asset.get("protocols", []))
print(f" Protocols Observed: {', '.join(sorted(protocols))}")
print(f"\n--- THREAT INTELLIGENCE ---")
groups = self.get_threat_groups()
print(f" Relevant Threat Groups: {len(groups)}")
for group in groups:
print(f" - {group.get('name')}: {group.get('description', '')[:80]}")
print(f" Targets: {', '.join(group.get('target_sectors', []))}")
print(f" Activity Level: {group.get('activity_level', 'Unknown')}")
def generate_siem_integration_config(self, siem_type: str = "splunk"):
"""Generate SIEM integration configuration for Dragos alerts."""
configs = {
"splunk": {
"syslog_format": "CEF",
"syslog_port": 514,
"severity_mapping": {
"critical": 10,
"high": 7,
"medium": 5,
"low": 3,
"info": 1,
},
"index": "ot_security",
"sourcetype": "dragos:notification",
"fields": [
"notification_id", "severity", "category", "source_ip",
"destination_ip", "asset_name", "protocol", "description",
"mitre_ics_technique", "threat_group",
],
},
"sentinel": {
"connector_type": "Syslog-CEF",
"workspace_id": "<workspace-id>",
"log_analytics_table": "DragosOTAlerts_CL",
"severity_mapping": {
"critical": "High",
"high": "High",
"medium": "Medium",
"low": "Low",
"info": "Informational",
},
},
}
config = configs.get(siem_type, configs["splunk"])
print(f"\n--- {siem_type.upper()} INTEGRATION CONFIG ---")
print(json.dumps(config, indent=2))
return config
if __name__ == "__main__":
manager = DragosPlatformManager(
base_url="https://dragos-sitestore.plant.local",
api_key="your-api-key",
api_secret="your-api-secret",
verify_ssl=True,
)
manager.validate_deployment()
manager.generate_siem_integration_config("splunk")
print(f"\n--- RECENT HIGH-SEVERITY NOTIFICATIONS ---")
notifications = manager.get_notifications(severity="high", limit=10)
for n in notifications:
print(f" [{n.get('severity', '').upper()}] {n.get('title', 'No title')}")
print(f" Category: {n.get('category')} | Time: {n.get('timestamp')}")
print(f" Assets: {', '.join(n.get('affected_assets', []))}")
print(f" MITRE ICS: {n.get('mitre_technique', 'N/A')}")Step 2: Configure Detection Analytics and Knowledge Packs
步骤2:配置检测分析与Knowledge Pack
yaml
undefinedyaml
undefinedDragos Platform Detection Configuration
Dragos Platform Detection Configuration
Tuned for manufacturing/energy environment
Tuned for manufacturing/energy environment
detection_configuration:
knowledge_pack:
auto_update: true
update_schedule: "weekly"
include_threat_groups:
- "VOLTZITE" # Targets energy sector, exfiltrates OT diagrams
- "GRAPHITE" # New 2025 threat group targeting ICS
- "BAUXITE" # New 2025 threat group targeting ICS
- "CHERNOVITE" # Developed PIPEDREAM/INCONTROLLER framework
- "ELECTRUM" # Linked to Industroyer/CrashOverride
- "KAMACITE" # Targets energy sector initial access
detection_categories:
network_baseline:
enabled: true
learning_period_days: 30
alert_on:
- "new_communication_pair"
- "new_protocol_detected"
- "new_device_on_network"
- "protocol_anomaly"
threat_detection:
enabled: true
alert_on:
- "known_malware_ioc"
- "threat_group_ttp"
- "lateral_movement"
- "command_and_control"
- "data_exfiltration"
vulnerability_correlation:
enabled: true
alert_on:
- "active_exploitation_attempt"
- "vulnerability_with_public_exploit"protocol_monitoring:
modbus:
monitor_writes: true
baseline_function_codes: true
baseline_register_ranges: true
dnp3:
monitor_control_commands: true
detect_firmware_updates: true
s7comm:
detect_cpu_stop: true
detect_program_download: true
opc_ua:
monitor_method_calls: true
detect_browsing: true
ethernet_ip:
monitor_cip_services: true
detect_firmware_flash: true
alert_routing:
critical:
notify: ["ot_soc_team", "plant_manager"]
siem_forward: true
auto_ticket: true
high:
notify: ["ot_soc_team"]
siem_forward: true
auto_ticket: true
medium:
siem_forward: true
low:
siem_forward: true
undefineddetection_configuration:
knowledge_pack:
auto_update: true
update_schedule: "weekly"
include_threat_groups:
- "VOLTZITE" # Targets energy sector, exfiltrates OT diagrams
- "GRAPHITE" # New 2025 threat group targeting ICS
- "BAUXITE" # New 2025 threat group targeting ICS
- "CHERNOVITE" # Developed PIPEDREAM/INCONTROLLER framework
- "ELECTRUM" # Linked to Industroyer/CrashOverride
- "KAMACITE" # Targets energy sector initial access
detection_categories:
network_baseline:
enabled: true
learning_period_days: 30
alert_on:
- "new_communication_pair"
- "new_protocol_detected"
- "new_device_on_network"
- "protocol_anomaly"
threat_detection:
enabled: true
alert_on:
- "known_malware_ioc"
- "threat_group_ttp"
- "lateral_movement"
- "command_and_control"
- "data_exfiltration"
vulnerability_correlation:
enabled: true
alert_on:
- "active_exploitation_attempt"
- "vulnerability_with_public_exploit"protocol_monitoring:
modbus:
monitor_writes: true
baseline_function_codes: true
baseline_register_ranges: true
dnp3:
monitor_control_commands: true
detect_firmware_updates: true
s7comm:
detect_cpu_stop: true
detect_program_download: true
opc_ua:
monitor_method_calls: true
detect_browsing: true
ethernet_ip:
monitor_cip_services: true
detect_firmware_flash: true
alert_routing:
critical:
notify: ["ot_soc_team", "plant_manager"]
siem_forward: true
auto_ticket: true
high:
notify: ["ot_soc_team"]
siem_forward: true
auto_ticket: true
medium:
siem_forward: true
low:
siem_forward: true
undefinedKey Concepts
核心概念
| Term | Definition |
|---|---|
| Dragos Platform | Purpose-built OT cybersecurity platform with asset visibility, threat detection, and vulnerability management for ICS environments |
| Knowledge Pack | Dragos threat intelligence update containing detection analytics for new threats, malware, and vulnerability exploits specific to ICS |
| SiteStore | Dragos central management server aggregating data from all deployed sensors across a site |
| VOLTZITE | Dragos-tracked threat group targeting energy sector OT environments, exfiltrating GIS data and ICS network diagrams |
| PIPEDREAM/INCONTROLLER | Modular ICS attack framework developed by CHERNOVITE, targeting Schneider/OMRON PLCs and OPC UA servers |
| Neighborhood Keeper | Dragos community defense program sharing anonymized threat data across participating OT environments |
| 术语 | 定义 |
|---|---|
| Dragos Platform | 专为OT网络安全打造的平台,具备资产可见性、威胁检测和漏洞管理功能,适用于ICS环境 |
| Knowledge Pack | Dragos威胁情报更新包,包含针对ICS领域新威胁、恶意软件和漏洞利用的检测分析规则 |
| SiteStore | Dragos中央管理服务器,聚合站点内所有已部署传感器的数据 |
| VOLTZITE | Dragos追踪的威胁组织,针对能源行业OT环境,窃取GIS数据和ICS网络拓扑图 |
| PIPEDREAM/INCONTROLLER | CHERNOVITE开发的模块化ICS攻击框架,针对Schneider/OMRON PLC和OPC UA服务器 |
| Neighborhood Keeper | Dragos社区防御计划,在参与的OT环境之间共享匿名威胁数据 |
Common Scenarios
常见场景
Scenario: Detecting VOLTZITE Reconnaissance in Energy Utility
场景:检测能源企业中的VOLTZITE侦察活动
Context: A Dragos sensor deployed at an electric utility detects unusual OPC UA browsing activity and exfiltration of device configuration data from an engineering workstation.
Approach:
- Review the Dragos notification for MITRE ATT&CK ICS technique mapping
- Identify the source host performing OPC UA browsing (check if it is an authorized engineering workstation)
- Check Dragos threat intelligence correlation for VOLTZITE TTPs
- Examine the scope of data accessed (GIS data, network diagrams, ICS configuration files)
- Isolate the compromised workstation from the OT network
- Check for lateral movement indicators to other OT systems
- Engage Dragos Professional Services if threat group attribution is confirmed
- Report to CISA as a critical infrastructure cyber incident
Pitfalls: Do not ignore OPC UA browsing alerts as false positives -- VOLTZITE specifically uses this technique for pre-positioning. Ensure Dragos Knowledge Packs are current to detect the latest VOLTZITE indicators. Do not reimage the compromised workstation before collecting forensic evidence.
背景:部署在电力企业的Dragos传感器检测到异常的OPC UA浏览活动,以及从工程工作站导出设备配置数据的行为。
处理方法:
- 查看Dragos告警中的MITRE ATT&CK ICS技术映射
- 识别执行OPC UA浏览的源主机(检查是否为授权的工程工作站)
- 查看Dragos威胁情报关联的VOLTZITE战术、技术和流程(TTP)
- 检查被访问数据的范围(GIS数据、网络拓扑图、ICS配置文件)
- 将受感染的工作站与OT网络隔离
- 检查是否存在向其他OT系统横向移动的迹象
- 若确认威胁组织归属,联系Dragos专业服务团队
- 作为关键基础设施网络事件向CISA报告
注意事项:不要将OPC UA浏览告警视为误报——VOLTZITE专门使用该技术进行前置部署。确保Dragos Knowledge Pack为最新版本,以检测VOLTZITE的最新指标。在收集取证证据前,不要重新镜像受感染的工作站。
Output Format
输出格式
DRAGOS OT MONITORING DEPLOYMENT REPORT
==========================================
Site: [Site Name]
Date: YYYY-MM-DD
SENSOR DEPLOYMENT:
Total Sensors: [count]
Operational: [count]
Coverage: [percentage of OT segments monitored]
ASSET VISIBILITY:
Total OT Assets: [count]
PLCs: [count] | HMIs: [count] | Network Devices: [count]
Protocols: [list]
THREAT DETECTION:
Active Threat Groups Relevant: [count]
Detection Analytics Loaded: [count]
Alerts (Last 30 Days): [count by severity]
SIEM INTEGRATION:
Status: [Connected/Disconnected]
Events Forwarded (Last 24h): [count]DRAGOS OT MONITORING DEPLOYMENT REPORT
==========================================
Site: [Site Name]
Date: YYYY-MM-DD
SENSOR DEPLOYMENT:
Total Sensors: [count]
Operational: [count]
Coverage: [percentage of OT segments monitored]
ASSET VISIBILITY:
Total OT Assets: [count]
PLCs: [count] | HMIs: [count] | Network Devices: [count]
Protocols: [list]
THREAT DETECTION:
Active Threat Groups Relevant: [count]
Detection Analytics Loaded: [count]
Alerts (Last 30 Days): [count by severity]
SIEM INTEGRATION:
Status: [Connected/Disconnected]
Events Forwarded (Last 24h): [count]