Loading...
Loading...
Builds a structured vulnerability scanning workflow using tools like Nessus, Qualys, and OpenVAS to discover, prioritize, and track remediation of security vulnerabilities across infrastructure. Use when SOC teams need to establish recurring vulnerability assessment processes, integrate scan results with SIEM alerting, and build remediation tracking dashboards.
npx skill4agent add mukul975/anthropic-cybersecurity-skills building-vulnerability-scanning-workflowimport requests
nessus_url = "https://nessus.company.com:8834"
headers = {"X-ApiKeys": f"accessKey={access_key};secretKey={secret_key}"}
# Create scan policy
policy = {
"uuid": "advanced",
"settings": {
"name": "SOC Weekly Infrastructure Scan",
"description": "Weekly credentialed scan of all server and workstation segments",
"scanner_id": 1,
"policy_id": 0,
"text_targets": "10.0.0.0/16, 172.16.0.0/12",
"launch": "WEEKLY",
"starttime": "20240315T020000",
"rrules": "FREQ=WEEKLY;INTERVAL=1;BYDAY=SA",
"enabled": True
},
"credentials": {
"add": {
"Host": {
"Windows": [{
"domain": "company.local",
"username": "nessus_svc",
"password": "SCAN_SERVICE_PASSWORD",
"auth_method": "Password"
}],
"SSH": [{
"username": "nessus_svc",
"private_key": "/path/to/nessus_key",
"auth_method": "public key"
}]
}
}
}
}
response = requests.post(f"{nessus_url}/scans", headers=headers, json=policy,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true") # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
scan_id = response.json()["scan"]["id"]
print(f"Scan created: ID {scan_id}")import qualysapi
conn = qualysapi.connect(
hostname="qualysapi.qualys.com",
username="api_user",
password="API_PASSWORD"
)
# Launch vulnerability scan
params = {
"action": "launch",
"scan_title": "Weekly_Infrastructure_Scan",
"ip": "10.0.0.0/16",
"option_id": "123456", # Scan profile ID
"iscanner_name": "Internal_Scanner_01",
"priority": "0"
}
response = conn.request("/api/2.0/fo/scan/", params)
print(f"Scan launched: {response}")import requests
import csv
# Export Nessus results
response = requests.get(
f"{nessus_url}/scans/{scan_id}/export",
headers=headers,
params={"format": "csv"},
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
)
# Parse and prioritize
vulns = []
reader = csv.DictReader(response.text.splitlines())
for row in reader:
cvss = float(row.get("CVSS v3.0 Base Score", 0))
asset_criticality = get_asset_criticality(row["Host"]) # From asset inventory
# Risk-based priority calculation
risk_score = cvss * asset_criticality_multiplier(asset_criticality)
# Boost score if actively exploited (check CISA KEV)
if row.get("CVE") in cisa_kev_list:
risk_score *= 1.5
vulns.append({
"host": row["Host"],
"plugin_name": row["Name"],
"severity": row["Risk"],
"cvss": cvss,
"cve": row.get("CVE", "N/A"),
"risk_score": round(risk_score, 1),
"asset_criticality": asset_criticality,
"kev": row.get("CVE") in cisa_kev_list
})
# Sort by risk score
vulns.sort(key=lambda x: x["risk_score"], reverse=True)import requests
kev_response = requests.get(
"https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
)
kev_data = kev_response.json()
cisa_kev_list = {v["cveID"] for v in kev_data["vulnerabilities"]}
# Check if vulnerability is actively exploited
def is_actively_exploited(cve_id):
return cve_id in cisa_kev_list| Priority | CVSS Range | Asset Type | SLA | Examples |
|---|---|---|---|---|
| P1 Critical | 9.0-10.0 + KEV | All assets | 24 hours | Log4Shell, EternalBlue on prod servers |
| P2 High | 7.0-8.9 or 9.0+ non-KEV | Business-critical | 7 days | RCE without known exploit |
| P3 Medium | 4.0-6.9 | Business-critical | 30 days | Authenticated privilege escalation |
| P4 Low | 0.1-3.9 | Standard | 90 days | Information disclosure, low-impact DoS |
| P5 Informational | 0.0 | Development | Next cycle | Best practice findings, config hardening |
index=vulnerability sourcetype="nessus:scan"
| eval vuln_key = Host.":".CVE
| join vuln_key type=left [
search index=ids_ips sourcetype="snort" OR sourcetype="suricata"
| eval vuln_key = dest_ip.":".cve_id
| stats count AS exploit_attempts, latest(_time) AS last_exploit_attempt by vuln_key
]
| where isnotnull(exploit_attempts)
| eval risk = "CRITICAL — Vulnerability being actively exploited"
| sort - exploit_attempts
| table Host, CVE, plugin_name, cvss_score, exploit_attempts, last_exploit_attempt, riskindex=vulnerability sourcetype="nessus:scan" severity="Critical"
| lookup cisa_kev_lookup.csv cve_id AS CVE OUTPUT kev_status, due_date
| where kev_status="active"
| lookup asset_criticality_lookup.csv ip AS Host OUTPUT criticality
| where criticality IN ("business-critical", "mission-critical")
| table Host, CVE, plugin_name, cvss_score, kev_status, due_date, criticality-- Open vulnerabilities by severity
index=vulnerability sourcetype="nessus:scan" status="open"
| stats count by severity
| eval order = case(severity="Critical", 1, severity="High", 2, severity="Medium", 3,
severity="Low", 4, 1=1, 5)
| sort order
-- SLA compliance tracking
index=vulnerability sourcetype="nessus:scan" status="open"
| eval sla_days = case(
severity="Critical", 1,
severity="High", 7,
severity="Medium", 30,
severity="Low", 90
)
| eval days_open = round((now() - first_detected) / 86400)
| eval sla_status = if(days_open > sla_days, "OVERDUE", "Within SLA")
| stats count by severity, sla_status
-- Remediation trend over 90 days
index=vulnerability sourcetype="nessus:scan"
| eval is_open = if(status="open", 1, 0)
| eval is_closed = if(status="fixed", 1, 0)
| timechart span=1w sum(is_open) AS opened, sum(is_closed) AS remediatedimport requests
servicenow_url = "https://company.service-now.com/api/now/table/incident"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {snow_token}"
}
for vuln in vulns:
if vuln["risk_score"] >= 8.0:
ticket = {
"short_description": f"[VULN] {vuln['cve']} — {vuln['plugin_name']} on {vuln['host']}",
"description": (
f"Vulnerability: {vuln['plugin_name']}\n"
f"CVE: {vuln['cve']}\n"
f"CVSS: {vuln['cvss']}\n"
f"Host: {vuln['host']}\n"
f"Asset Criticality: {vuln['asset_criticality']}\n"
f"CISA KEV: {'YES' if vuln['kev'] else 'NO'}\n"
f"Risk Score: {vuln['risk_score']}\n"
f"Remediation SLA: {'24 hours' if vuln['kev'] else '7 days'}"
),
"urgency": "1" if vuln["kev"] else "2",
"impact": "1" if vuln["asset_criticality"] == "business-critical" else "2",
"assignment_group": "IT Infrastructure",
"category": "Vulnerability"
}
response = requests.post(servicenow_url, headers=headers, json=ticket)
print(f"Ticket created: {response.json()['result']['number']}")| Term | Definition |
|---|---|
| CVSS | Common Vulnerability Scoring System — standardized severity rating (0-10) for vulnerabilities |
| CISA KEV | Known Exploited Vulnerabilities catalog — CISA-maintained list of vulnerabilities with confirmed active exploitation |
| Credentialed Scan | Vulnerability scan using authenticated access for deeper detection than network-only scanning |
| Asset Criticality | Business impact classification determining remediation priority (mission-critical, business-critical, standard) |
| Remediation SLA | Service Level Agreement defining maximum time allowed to patch vulnerabilities by severity |
| EPSS | Exploit Prediction Scoring System — ML-based probability score predicting likelihood of exploitation |
VULNERABILITY SCAN REPORT — Weekly Summary
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Scan Date: 2024-03-16 02:00 UTC
Scan Scope: 10.0.0.0/16 (1,247 hosts scanned)
Duration: 4h 23m
Coverage: 98.7% (16 hosts unreachable)
Findings:
Severity Count New CISA KEV
Critical 23 5 3
High 187 34 12
Medium 892 78 0
Low 1,456 112 0
Info 3,891 201 0
Top Priority (P1 — 24hr SLA):
CVE-2024-21762 FortiOS RCE 3 hosts KEV: YES
CVE-2024-1709 ConnectWise RCE 1 host KEV: YES
CVE-2024-3400 Palo Alto PAN-OS RCE 2 hosts KEV: YES
SLA Compliance:
Critical: 82% within SLA (4 overdue)
High: 91% within SLA (17 overdue)
Medium: 88% within SLA (107 overdue)
Tickets Created: 39 (ServiceNow)