performing-ip-reputation-analysis-with-shodan
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePerforming IP Reputation Analysis with Shodan
使用Shodan进行IP信誉分析
Overview
概述
Shodan is the world's first search engine for internet-connected devices, continuously scanning the IPv4 and IPv6 address space to catalog open ports, running services, SSL certificates, and known vulnerabilities. This skill covers using the Shodan API and InternetDB free API to enrich IP addresses from security alerts, assess threat levels based on exposed services and vulnerabilities, identify hosting infrastructure patterns, and integrate IP reputation data into SOC triage and threat intelligence workflows.
Shodan是全球首个面向联网设备的搜索引擎,持续扫描IPv4和IPv6地址空间,记录开放端口、运行服务、SSL证书以及已知漏洞。本技能介绍如何使用Shodan API和免费的InternetDB API来增强安全警报中的IP地址信息,基于暴露的服务和漏洞评估威胁等级,识别托管基础设施模式,并将IP信誉数据整合到SOC分流和威胁情报工作流中。
When to Use
使用场景
- When conducting security assessments that involve performing ip reputation analysis with shodan
- When following incident response procedures for related security events
- When performing scheduled security testing or auditing activities
- When validating security controls through hands-on testing
- 当需要使用Shodan进行IP信誉分析的安全评估时
- 当针对相关安全事件执行事件响应流程时
- 当执行定期安全测试或审计活动时
- 当通过实操测试验证安全控制措施时
Prerequisites
前置条件
- Python 3.9+ with library (
shodan)pip install shodan - Shodan API key (free tier: limited queries; paid plans for higher limits and streaming)
- Understanding of TCP/UDP ports, common services, and CVE identifiers
- Familiarity with ASN, CIDR notation, and IP geolocation concepts
- Network security knowledge for interpreting scan results
- 安装Python 3.9+及库(
shodan)pip install shodan - Shodan API密钥(免费版:查询次数有限;付费版提供更高限额和流数据功能)
- 了解TCP/UDP端口、常见服务及CVE标识符
- 熟悉ASN、CIDR表示法和IP地理定位概念
- 具备解读扫描结果的网络安全知识
Key Concepts
核心概念
Shodan Data Model
Shodan数据模型
Each IP record in Shodan contains: open ports and protocols, banner data (service responses), SSL/TLS certificate details, known CVE vulnerabilities, hostname(s) and reverse DNS, ASN and ISP information, geographic location, operating system fingerprint, and historical scan data showing changes over time.
Shodan中的每条IP记录包含:开放端口和协议、横幅数据(服务响应)、SSL/TLS证书详情、已知CVE漏洞、主机名及反向DNS、ASN和ISP信息、地理位置、操作系统指纹,以及显示随时间变化的历史扫描数据。
InternetDB API
InternetDB API
Shodan's free InternetDB API (internetdb.shodan.io) provides quick IP lookups without authentication, returning open ports, hostnames, tags, CPEs, and known vulnerabilities. This is useful for high-volume enrichment where the full Shodan API would hit rate limits.
Shodan的免费InternetDB API(internetdb.shodan.io)无需认证即可快速查询IP地址,返回开放端口、主机名、标签、CPE及已知漏洞。这适用于需要大量增强操作的场景,避免触发完整Shodan API的速率限制。
Reputation Scoring
信誉评分
IP reputation is assessed by combining: number and type of open ports (unusual ports indicate compromise), vulnerable services (unpatched software with known CVEs), hosting type (residential, cloud, VPN/proxy, bulletproof hosting), historical activity (past associations with malware, scanning, spam), and geographic context (countries known for specific threat activity).
IP信誉通过以下因素综合评估:开放端口的数量和类型(异常端口表明可能已被入侵)、存在漏洞的服务(未打补丁且存在已知CVE的软件)、托管类型(住宅、云、VPN/代理、高防托管)、历史活动(过去与恶意软件、扫描、垃圾邮件的关联)以及地理环境(以特定威胁活动著称的国家)。
Workflow
工作流程
Step 1: Basic IP Enrichment with Shodan API
步骤1:使用Shodan API进行基础IP信息增强
python
import shodan
import json
from datetime import datetime
class ShodanEnricher:
def __init__(self, api_key):
self.api = shodan.Shodan(api_key)
self.info = self.api.info()
print(f"[+] Shodan API initialized. Credits: {self.info.get('scan_credits', 0)}")
def enrich_ip(self, ip_address):
"""Full enrichment of an IP address via Shodan."""
try:
host = self.api.host(ip_address)
enrichment = {
"ip": ip_address,
"organization": host.get("org", ""),
"asn": host.get("asn", ""),
"isp": host.get("isp", ""),
"country": host.get("country_name", ""),
"country_code": host.get("country_code", ""),
"city": host.get("city", ""),
"latitude": host.get("latitude"),
"longitude": host.get("longitude"),
"os": host.get("os", ""),
"ports": host.get("ports", []),
"hostnames": host.get("hostnames", []),
"domains": host.get("domains", []),
"vulns": host.get("vulns", []),
"tags": host.get("tags", []),
"last_update": host.get("last_update", ""),
"services": [],
}
for service in host.get("data", []):
svc = {
"port": service.get("port", 0),
"transport": service.get("transport", "tcp"),
"product": service.get("product", ""),
"version": service.get("version", ""),
"module": service.get("_shodan", {}).get("module", ""),
"banner": service.get("data", "")[:200],
}
if "ssl" in service:
svc["ssl_subject"] = service["ssl"].get("cert", {}).get("subject", {})
svc["ssl_issuer"] = service["ssl"].get("cert", {}).get("issuer", {})
svc["ssl_expires"] = service["ssl"].get("cert", {}).get("expires", "")
enrichment["services"].append(svc)
# Calculate reputation score
enrichment["reputation"] = self._calculate_reputation(enrichment)
print(f"[+] {ip_address}: {len(enrichment['ports'])} ports, "
f"{len(enrichment['vulns'])} vulns, "
f"reputation: {enrichment['reputation']['level']}")
return enrichment
except shodan.APIError as e:
print(f"[-] Shodan error for {ip_address}: {e}")
return None
def _calculate_reputation(self, data):
"""Calculate IP reputation score based on Shodan data."""
score = 0
factors = []
# Vulnerability assessment
vuln_count = len(data.get("vulns", []))
if vuln_count > 10:
score += 40
factors.append(f"{vuln_count} known vulnerabilities")
elif vuln_count > 5:
score += 25
factors.append(f"{vuln_count} known vulnerabilities")
elif vuln_count > 0:
score += 10
factors.append(f"{vuln_count} known vulnerabilities")
# Suspicious port analysis
suspicious_ports = {4444, 5555, 6666, 8888, 9090, 1234, 31337,
6667, 6697, 8080, 8443, 3128, 1080}
open_ports = set(data.get("ports", []))
sus_found = open_ports.intersection(suspicious_ports)
if sus_found:
score += 15
factors.append(f"suspicious ports: {sus_found}")
# Tag-based assessment
malicious_tags = {"self-signed", "cloud", "vpn", "proxy", "tor"}
tags = set(data.get("tags", []))
mal_tags = tags.intersection(malicious_tags)
if mal_tags:
score += 10
factors.append(f"tags: {mal_tags}")
# Too many open ports
port_count = len(data.get("ports", []))
if port_count > 20:
score += 15
factors.append(f"excessive open ports ({port_count})")
level = (
"critical" if score >= 50
else "high" if score >= 35
else "medium" if score >= 15
else "low"
)
return {"score": score, "level": level, "factors": factors}
def enrich_ip_free(self, ip_address):
"""Quick IP enrichment using free InternetDB API."""
import requests
resp = requests.get(f"https://internetdb.shodan.io/{ip_address}", timeout=10)
if resp.status_code == 200:
data = resp.json()
print(f"[+] InternetDB: {ip_address} -> "
f"{len(data.get('ports', []))} ports, "
f"{len(data.get('vulns', []))} vulns")
return data
return None
enricher = ShodanEnricher("YOUR_SHODAN_API_KEY")
result = enricher.enrich_ip("8.8.8.8")
print(json.dumps(result, indent=2, default=str))python
import shodan
import json
from datetime import datetime
class ShodanEnricher:
def __init__(self, api_key):
self.api = shodan.Shodan(api_key)
self.info = self.api.info()
print(f"[+] Shodan API initialized. Credits: {self.info.get('scan_credits', 0)}")
def enrich_ip(self, ip_address):
"""Full enrichment of an IP address via Shodan."""
try:
host = self.api.host(ip_address)
enrichment = {
"ip": ip_address,
"organization": host.get("org", ""),
"asn": host.get("asn", ""),
"isp": host.get("isp", ""),
"country": host.get("country_name", ""),
"country_code": host.get("country_code", ""),
"city": host.get("city", ""),
"latitude": host.get("latitude"),
"longitude": host.get("longitude"),
"os": host.get("os", ""),
"ports": host.get("ports", []),
"hostnames": host.get("hostnames", []),
"domains": host.get("domains", []),
"vulns": host.get("vulns", []),
"tags": host.get("tags", []),
"last_update": host.get("last_update", ""),
"services": [],
}
for service in host.get("data", []):
svc = {
"port": service.get("port", 0),
"transport": service.get("transport", "tcp"),
"product": service.get("product", ""),
"version": service.get("version", ""),
"module": service.get("_shodan", {}).get("module", ""),
"banner": service.get("data", "")[:200],
}
if "ssl" in service:
svc["ssl_subject"] = service["ssl"].get("cert", {}).get("subject", {})
svc["ssl_issuer"] = service["ssl"].get("cert", {}).get("issuer", {})
svc["ssl_expires"] = service["ssl"].get("cert", {}).get("expires", "")
enrichment["services"].append(svc)
# Calculate reputation score
enrichment["reputation"] = self._calculate_reputation(enrichment)
print(f"[+] {ip_address}: {len(enrichment['ports'])} ports, "
f"{len(enrichment['vulns'])} vulns, "
f"reputation: {enrichment['reputation']['level']}")
return enrichment
except shodan.APIError as e:
print(f"[-] Shodan error for {ip_address}: {e}")
return None
def _calculate_reputation(self, data):
"""Calculate IP reputation score based on Shodan data."""
score = 0
factors = []
# Vulnerability assessment
vuln_count = len(data.get("vulns", []))
if vuln_count > 10:
score += 40
factors.append(f"{vuln_count} known vulnerabilities")
elif vuln_count > 5:
score += 25
factors.append(f"{vuln_count} known vulnerabilities")
elif vuln_count > 0:
score += 10
factors.append(f"{vuln_count} known vulnerabilities")
# Suspicious port analysis
suspicious_ports = {4444, 5555, 6666, 8888, 9090, 1234, 31337,
6667, 6697, 8080, 8443, 3128, 1080}
open_ports = set(data.get("ports", []))
sus_found = open_ports.intersection(suspicious_ports)
if sus_found:
score += 15
factors.append(f"suspicious ports: {sus_found}")
# Tag-based assessment
malicious_tags = {"self-signed", "cloud", "vpn", "proxy", "tor"}
tags = set(data.get("tags", []))
mal_tags = tags.intersection(malicious_tags)
if mal_tags:
score += 10
factors.append(f"tags: {mal_tags}")
# Too many open ports
port_count = len(data.get("ports", []))
if port_count > 20:
score += 15
factors.append(f"excessive open ports ({port_count})")
level = (
"critical" if score >= 50
else "high" if score >= 35
else "medium" if score >= 15
else "low"
)
return {"score": score, "level": level, "factors": factors}
def enrich_ip_free(self, ip_address):
"""Quick IP enrichment using free InternetDB API."""
import requests
resp = requests.get(f"https://internetdb.shodan.io/{ip_address}", timeout=10)
if resp.status_code == 200:
data = resp.json()
print(f"[+] InternetDB: {ip_address} -> "
f"{len(data.get('ports', []))} ports, "
f"{len(data.get('vulns', []))} vulns")
return data
return None
enricher = ShodanEnricher("YOUR_SHODAN_API_KEY")
result = enricher.enrich_ip("8.8.8.8")
print(json.dumps(result, indent=2, default=str))Step 2: Batch IP Reputation Check
步骤2:批量IP信誉检查
python
import time
def batch_ip_reputation(enricher, ip_list, output_file="ip_reputation.json"):
"""Check reputation for a list of IP addresses."""
results = []
for i, ip in enumerate(ip_list):
result = enricher.enrich_ip(ip)
if result:
results.append(result)
if (i + 1) % 10 == 0:
print(f" [{i+1}/{len(ip_list)}] Processed")
time.sleep(1) # Rate limiting
# Sort by reputation score (highest risk first)
results.sort(key=lambda x: x.get("reputation", {}).get("score", 0), reverse=True)
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
# Summary
levels = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for r in results:
level = r.get("reputation", {}).get("level", "low")
levels[level] += 1
print(f"\n=== Batch Reputation Summary ===")
print(f"Total IPs: {len(results)}")
for level, count in levels.items():
print(f" {level.upper()}: {count}")
return results
suspicious_ips = ["203.0.113.1", "198.51.100.5", "192.0.2.100"]
results = batch_ip_reputation(enricher, suspicious_ips)python
import time
def batch_ip_reputation(enricher, ip_list, output_file="ip_reputation.json"):
"""Check reputation for a list of IP addresses."""
results = []
for i, ip in enumerate(ip_list):
result = enricher.enrich_ip(ip)
if result:
results.append(result)
if (i + 1) % 10 == 0:
print(f" [{i+1}/{len(ip_list)}] Processed")
time.sleep(1) # Rate limiting
# Sort by reputation score (highest risk first)
results.sort(key=lambda x: x.get("reputation", {}).get("score", 0), reverse=True)
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
# Summary
levels = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for r in results:
level = r.get("reputation", {}).get("level", "low")
levels[level] += 1
print(f"\n=== Batch Reputation Summary ===")
print(f"Total IPs: {len(results)}")
for level, count in levels.items():
print(f" {level.upper()}: {count}")
return results
suspicious_ips = ["203.0.113.1", "198.51.100.5", "192.0.2.100"]
results = batch_ip_reputation(enricher, suspicious_ips)Step 3: Infrastructure Correlation
步骤3:基础设施关联
python
def correlate_infrastructure(enricher, ip_address):
"""Find related infrastructure based on shared attributes."""
host_data = enricher.enrich_ip(ip_address)
if not host_data:
return {}
correlations = {
"same_org": [],
"same_asn": [],
"shared_ssl": [],
}
# Search for same organization
org = host_data.get("organization", "")
if org:
try:
results = enricher.api.search(f'org:"{org}"', limit=20)
for match in results.get("matches", []):
correlations["same_org"].append({
"ip": match.get("ip_str", ""),
"port": match.get("port", 0),
"product": match.get("product", ""),
})
except shodan.APIError:
pass
# Search for same SSL certificate
for service in host_data.get("services", []):
ssl_subject = service.get("ssl_subject", {})
if ssl_subject:
cn = ssl_subject.get("CN", "")
if cn:
try:
results = enricher.api.search(f'ssl.cert.subject.CN:"{cn}"', limit=20)
for match in results.get("matches", []):
correlations["shared_ssl"].append({
"ip": match.get("ip_str", ""),
"cn": cn,
})
except shodan.APIError:
pass
print(f"[+] Infrastructure correlations for {ip_address}:")
print(f" Same org: {len(correlations['same_org'])} hosts")
print(f" Shared SSL: {len(correlations['shared_ssl'])} hosts")
return correlationspython
def correlate_infrastructure(enricher, ip_address):
"""Find related infrastructure based on shared attributes."""
host_data = enricher.enrich_ip(ip_address)
if not host_data:
return {}
correlations = {
"same_org": [],
"same_asn": [],
"shared_ssl": [],
}
# Search for same organization
org = host_data.get("organization", "")
if org:
try:
results = enricher.api.search(f'org:"{org}"', limit=20)
for match in results.get("matches", []):
correlations["same_org"].append({
"ip": match.get("ip_str", ""),
"port": match.get("port", 0),
"product": match.get("product", ""),
})
except shodan.APIError:
pass
# Search for same SSL certificate
for service in host_data.get("services", []):
ssl_subject = service.get("ssl_subject", {})
if ssl_subject:
cn = ssl_subject.get("CN", "")
if cn:
try:
results = enricher.api.search(f'ssl.cert.subject.CN:"{cn}"', limit=20)
for match in results.get("matches", []):
correlations["shared_ssl"].append({
"ip": match.get("ip_str", ""),
"cn": cn,
})
except shodan.APIError:
pass
print(f"[+] Infrastructure correlations for {ip_address}:")
print(f" Same org: {len(correlations['same_org'])} hosts")
print(f" Shared SSL: {len(correlations['shared_ssl'])} hosts")
return correlationsValidation Criteria
验证标准
- Shodan API queried successfully with proper authentication
- IP enrichment returns ports, services, vulnerabilities, and geolocation
- Reputation scoring classifies IPs by threat level
- Batch enrichment handles rate limiting correctly
- Infrastructure correlation identifies related hosts
- InternetDB free API used for high-volume lookups
- 成功通过正确认证调用Shodan API
- IP信息增强返回端口、服务、漏洞及地理定位信息
- 信誉评分根据威胁等级对IP进行分类
- 批量信息增强正确处理速率限制
- 基础设施关联识别出相关主机
- 使用InternetDB免费API进行大量查询