performing-ip-reputation-analysis-with-shodan

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Performing IP Reputation Analysis with Shodan

使用Shodan进行IP信誉分析

Overview

概述

Shodan is the world's first search engine for internet-connected devices, continuously scanning the IPv4 and IPv6 address space to catalog open ports, running services, SSL certificates, and known vulnerabilities. This skill covers using the Shodan API and InternetDB free API to enrich IP addresses from security alerts, assess threat levels based on exposed services and vulnerabilities, identify hosting infrastructure patterns, and integrate IP reputation data into SOC triage and threat intelligence workflows.
Shodan是全球首个面向联网设备的搜索引擎,持续扫描IPv4和IPv6地址空间,记录开放端口、运行服务、SSL证书以及已知漏洞。本技能介绍如何使用Shodan API和免费的InternetDB API来增强安全警报中的IP地址信息,基于暴露的服务和漏洞评估威胁等级,识别托管基础设施模式,并将IP信誉数据整合到SOC分流和威胁情报工作流中。

When to Use

使用场景

  • When conducting security assessments that involve performing ip reputation analysis with shodan
  • When following incident response procedures for related security events
  • When performing scheduled security testing or auditing activities
  • When validating security controls through hands-on testing
  • 当需要使用Shodan进行IP信誉分析的安全评估时
  • 当针对相关安全事件执行事件响应流程时
  • 当执行定期安全测试或审计活动时
  • 当通过实操测试验证安全控制措施时

Prerequisites

前置条件

  • Python 3.9+ with
    shodan
    library (
    pip install shodan
    )
  • Shodan API key (free tier: limited queries; paid plans for higher limits and streaming)
  • Understanding of TCP/UDP ports, common services, and CVE identifiers
  • Familiarity with ASN, CIDR notation, and IP geolocation concepts
  • Network security knowledge for interpreting scan results
  • 安装Python 3.9+及
    shodan
    库(
    pip install shodan
  • Shodan API密钥(免费版:查询次数有限;付费版提供更高限额和流数据功能)
  • 了解TCP/UDP端口、常见服务及CVE标识符
  • 熟悉ASN、CIDR表示法和IP地理定位概念
  • 具备解读扫描结果的网络安全知识

Key Concepts

核心概念

Shodan Data Model

Shodan数据模型

Each IP record in Shodan contains: open ports and protocols, banner data (service responses), SSL/TLS certificate details, known CVE vulnerabilities, hostname(s) and reverse DNS, ASN and ISP information, geographic location, operating system fingerprint, and historical scan data showing changes over time.
Shodan中的每条IP记录包含:开放端口和协议、横幅数据(服务响应)、SSL/TLS证书详情、已知CVE漏洞、主机名及反向DNS、ASN和ISP信息、地理位置、操作系统指纹,以及显示随时间变化的历史扫描数据。

InternetDB API

InternetDB API

Shodan's free InternetDB API (internetdb.shodan.io) provides quick IP lookups without authentication, returning open ports, hostnames, tags, CPEs, and known vulnerabilities. This is useful for high-volume enrichment where the full Shodan API would hit rate limits.
Shodan的免费InternetDB API(internetdb.shodan.io)无需认证即可快速查询IP地址,返回开放端口、主机名、标签、CPE及已知漏洞。这适用于需要大量增强操作的场景,避免触发完整Shodan API的速率限制。

Reputation Scoring

信誉评分

IP reputation is assessed by combining: number and type of open ports (unusual ports indicate compromise), vulnerable services (unpatched software with known CVEs), hosting type (residential, cloud, VPN/proxy, bulletproof hosting), historical activity (past associations with malware, scanning, spam), and geographic context (countries known for specific threat activity).
IP信誉通过以下因素综合评估:开放端口的数量和类型(异常端口表明可能已被入侵)、存在漏洞的服务(未打补丁且存在已知CVE的软件)、托管类型(住宅、云、VPN/代理、高防托管)、历史活动(过去与恶意软件、扫描、垃圾邮件的关联)以及地理环境(以特定威胁活动著称的国家)。

Workflow

工作流程

Step 1: Basic IP Enrichment with Shodan API

步骤1:使用Shodan API进行基础IP信息增强

python
import shodan
import json
from datetime import datetime

class ShodanEnricher:
    def __init__(self, api_key):
        self.api = shodan.Shodan(api_key)
        self.info = self.api.info()
        print(f"[+] Shodan API initialized. Credits: {self.info.get('scan_credits', 0)}")

    def enrich_ip(self, ip_address):
        """Full enrichment of an IP address via Shodan."""
        try:
            host = self.api.host(ip_address)
            enrichment = {
                "ip": ip_address,
                "organization": host.get("org", ""),
                "asn": host.get("asn", ""),
                "isp": host.get("isp", ""),
                "country": host.get("country_name", ""),
                "country_code": host.get("country_code", ""),
                "city": host.get("city", ""),
                "latitude": host.get("latitude"),
                "longitude": host.get("longitude"),
                "os": host.get("os", ""),
                "ports": host.get("ports", []),
                "hostnames": host.get("hostnames", []),
                "domains": host.get("domains", []),
                "vulns": host.get("vulns", []),
                "tags": host.get("tags", []),
                "last_update": host.get("last_update", ""),
                "services": [],
            }

            for service in host.get("data", []):
                svc = {
                    "port": service.get("port", 0),
                    "transport": service.get("transport", "tcp"),
                    "product": service.get("product", ""),
                    "version": service.get("version", ""),
                    "module": service.get("_shodan", {}).get("module", ""),
                    "banner": service.get("data", "")[:200],
                }
                if "ssl" in service:
                    svc["ssl_subject"] = service["ssl"].get("cert", {}).get("subject", {})
                    svc["ssl_issuer"] = service["ssl"].get("cert", {}).get("issuer", {})
                    svc["ssl_expires"] = service["ssl"].get("cert", {}).get("expires", "")
                enrichment["services"].append(svc)

            # Calculate reputation score
            enrichment["reputation"] = self._calculate_reputation(enrichment)
            print(f"[+] {ip_address}: {len(enrichment['ports'])} ports, "
                  f"{len(enrichment['vulns'])} vulns, "
                  f"reputation: {enrichment['reputation']['level']}")
            return enrichment

        except shodan.APIError as e:
            print(f"[-] Shodan error for {ip_address}: {e}")
            return None

    def _calculate_reputation(self, data):
        """Calculate IP reputation score based on Shodan data."""
        score = 0
        factors = []

        # Vulnerability assessment
        vuln_count = len(data.get("vulns", []))
        if vuln_count > 10:
            score += 40
            factors.append(f"{vuln_count} known vulnerabilities")
        elif vuln_count > 5:
            score += 25
            factors.append(f"{vuln_count} known vulnerabilities")
        elif vuln_count > 0:
            score += 10
            factors.append(f"{vuln_count} known vulnerabilities")

        # Suspicious port analysis
        suspicious_ports = {4444, 5555, 6666, 8888, 9090, 1234, 31337,
                           6667, 6697, 8080, 8443, 3128, 1080}
        open_ports = set(data.get("ports", []))
        sus_found = open_ports.intersection(suspicious_ports)
        if sus_found:
            score += 15
            factors.append(f"suspicious ports: {sus_found}")

        # Tag-based assessment
        malicious_tags = {"self-signed", "cloud", "vpn", "proxy", "tor"}
        tags = set(data.get("tags", []))
        mal_tags = tags.intersection(malicious_tags)
        if mal_tags:
            score += 10
            factors.append(f"tags: {mal_tags}")

        # Too many open ports
        port_count = len(data.get("ports", []))
        if port_count > 20:
            score += 15
            factors.append(f"excessive open ports ({port_count})")

        level = (
            "critical" if score >= 50
            else "high" if score >= 35
            else "medium" if score >= 15
            else "low"
        )

        return {"score": score, "level": level, "factors": factors}

    def enrich_ip_free(self, ip_address):
        """Quick IP enrichment using free InternetDB API."""
        import requests
        resp = requests.get(f"https://internetdb.shodan.io/{ip_address}", timeout=10)
        if resp.status_code == 200:
            data = resp.json()
            print(f"[+] InternetDB: {ip_address} -> "
                  f"{len(data.get('ports', []))} ports, "
                  f"{len(data.get('vulns', []))} vulns")
            return data
        return None

enricher = ShodanEnricher("YOUR_SHODAN_API_KEY")
result = enricher.enrich_ip("8.8.8.8")
print(json.dumps(result, indent=2, default=str))
python
import shodan
import json
from datetime import datetime

class ShodanEnricher:
    def __init__(self, api_key):
        self.api = shodan.Shodan(api_key)
        self.info = self.api.info()
        print(f"[+] Shodan API initialized. Credits: {self.info.get('scan_credits', 0)}")

    def enrich_ip(self, ip_address):
        """Full enrichment of an IP address via Shodan."""
        try:
            host = self.api.host(ip_address)
            enrichment = {
                "ip": ip_address,
                "organization": host.get("org", ""),
                "asn": host.get("asn", ""),
                "isp": host.get("isp", ""),
                "country": host.get("country_name", ""),
                "country_code": host.get("country_code", ""),
                "city": host.get("city", ""),
                "latitude": host.get("latitude"),
                "longitude": host.get("longitude"),
                "os": host.get("os", ""),
                "ports": host.get("ports", []),
                "hostnames": host.get("hostnames", []),
                "domains": host.get("domains", []),
                "vulns": host.get("vulns", []),
                "tags": host.get("tags", []),
                "last_update": host.get("last_update", ""),
                "services": [],
            }

            for service in host.get("data", []):
                svc = {
                    "port": service.get("port", 0),
                    "transport": service.get("transport", "tcp"),
                    "product": service.get("product", ""),
                    "version": service.get("version", ""),
                    "module": service.get("_shodan", {}).get("module", ""),
                    "banner": service.get("data", "")[:200],
                }
                if "ssl" in service:
                    svc["ssl_subject"] = service["ssl"].get("cert", {}).get("subject", {})
                    svc["ssl_issuer"] = service["ssl"].get("cert", {}).get("issuer", {})
                    svc["ssl_expires"] = service["ssl"].get("cert", {}).get("expires", "")
                enrichment["services"].append(svc)

            # Calculate reputation score
            enrichment["reputation"] = self._calculate_reputation(enrichment)
            print(f"[+] {ip_address}: {len(enrichment['ports'])} ports, "
                  f"{len(enrichment['vulns'])} vulns, "
                  f"reputation: {enrichment['reputation']['level']}")
            return enrichment

        except shodan.APIError as e:
            print(f"[-] Shodan error for {ip_address}: {e}")
            return None

    def _calculate_reputation(self, data):
        """Calculate IP reputation score based on Shodan data."""
        score = 0
        factors = []

        # Vulnerability assessment
        vuln_count = len(data.get("vulns", []))
        if vuln_count > 10:
            score += 40
            factors.append(f"{vuln_count} known vulnerabilities")
        elif vuln_count > 5:
            score += 25
            factors.append(f"{vuln_count} known vulnerabilities")
        elif vuln_count > 0:
            score += 10
            factors.append(f"{vuln_count} known vulnerabilities")

        # Suspicious port analysis
        suspicious_ports = {4444, 5555, 6666, 8888, 9090, 1234, 31337,
                           6667, 6697, 8080, 8443, 3128, 1080}
        open_ports = set(data.get("ports", []))
        sus_found = open_ports.intersection(suspicious_ports)
        if sus_found:
            score += 15
            factors.append(f"suspicious ports: {sus_found}")

        # Tag-based assessment
        malicious_tags = {"self-signed", "cloud", "vpn", "proxy", "tor"}
        tags = set(data.get("tags", []))
        mal_tags = tags.intersection(malicious_tags)
        if mal_tags:
            score += 10
            factors.append(f"tags: {mal_tags}")

        # Too many open ports
        port_count = len(data.get("ports", []))
        if port_count > 20:
            score += 15
            factors.append(f"excessive open ports ({port_count})")

        level = (
            "critical" if score >= 50
            else "high" if score >= 35
            else "medium" if score >= 15
            else "low"
        )

        return {"score": score, "level": level, "factors": factors}

    def enrich_ip_free(self, ip_address):
        """Quick IP enrichment using free InternetDB API."""
        import requests
        resp = requests.get(f"https://internetdb.shodan.io/{ip_address}", timeout=10)
        if resp.status_code == 200:
            data = resp.json()
            print(f"[+] InternetDB: {ip_address} -> "
                  f"{len(data.get('ports', []))} ports, "
                  f"{len(data.get('vulns', []))} vulns")
            return data
        return None

enricher = ShodanEnricher("YOUR_SHODAN_API_KEY")
result = enricher.enrich_ip("8.8.8.8")
print(json.dumps(result, indent=2, default=str))

Step 2: Batch IP Reputation Check

步骤2:批量IP信誉检查

python
import time

def batch_ip_reputation(enricher, ip_list, output_file="ip_reputation.json"):
    """Check reputation for a list of IP addresses."""
    results = []
    for i, ip in enumerate(ip_list):
        result = enricher.enrich_ip(ip)
        if result:
            results.append(result)
        if (i + 1) % 10 == 0:
            print(f"  [{i+1}/{len(ip_list)}] Processed")
            time.sleep(1)  # Rate limiting

    # Sort by reputation score (highest risk first)
    results.sort(key=lambda x: x.get("reputation", {}).get("score", 0), reverse=True)

    with open(output_file, "w") as f:
        json.dump(results, f, indent=2, default=str)

    # Summary
    levels = {"critical": 0, "high": 0, "medium": 0, "low": 0}
    for r in results:
        level = r.get("reputation", {}).get("level", "low")
        levels[level] += 1

    print(f"\n=== Batch Reputation Summary ===")
    print(f"Total IPs: {len(results)}")
    for level, count in levels.items():
        print(f"  {level.upper()}: {count}")

    return results

suspicious_ips = ["203.0.113.1", "198.51.100.5", "192.0.2.100"]
results = batch_ip_reputation(enricher, suspicious_ips)
python
import time

def batch_ip_reputation(enricher, ip_list, output_file="ip_reputation.json"):
    """Check reputation for a list of IP addresses."""
    results = []
    for i, ip in enumerate(ip_list):
        result = enricher.enrich_ip(ip)
        if result:
            results.append(result)
        if (i + 1) % 10 == 0:
            print(f"  [{i+1}/{len(ip_list)}] Processed")
            time.sleep(1)  # Rate limiting

    # Sort by reputation score (highest risk first)
    results.sort(key=lambda x: x.get("reputation", {}).get("score", 0), reverse=True)

    with open(output_file, "w") as f:
        json.dump(results, f, indent=2, default=str)

    # Summary
    levels = {"critical": 0, "high": 0, "medium": 0, "low": 0}
    for r in results:
        level = r.get("reputation", {}).get("level", "low")
        levels[level] += 1

    print(f"\n=== Batch Reputation Summary ===")
    print(f"Total IPs: {len(results)}")
    for level, count in levels.items():
        print(f"  {level.upper()}: {count}")

    return results

suspicious_ips = ["203.0.113.1", "198.51.100.5", "192.0.2.100"]
results = batch_ip_reputation(enricher, suspicious_ips)

Step 3: Infrastructure Correlation

步骤3:基础设施关联

python
def correlate_infrastructure(enricher, ip_address):
    """Find related infrastructure based on shared attributes."""
    host_data = enricher.enrich_ip(ip_address)
    if not host_data:
        return {}

    correlations = {
        "same_org": [],
        "same_asn": [],
        "shared_ssl": [],
    }

    # Search for same organization
    org = host_data.get("organization", "")
    if org:
        try:
            results = enricher.api.search(f'org:"{org}"', limit=20)
            for match in results.get("matches", []):
                correlations["same_org"].append({
                    "ip": match.get("ip_str", ""),
                    "port": match.get("port", 0),
                    "product": match.get("product", ""),
                })
        except shodan.APIError:
            pass

    # Search for same SSL certificate
    for service in host_data.get("services", []):
        ssl_subject = service.get("ssl_subject", {})
        if ssl_subject:
            cn = ssl_subject.get("CN", "")
            if cn:
                try:
                    results = enricher.api.search(f'ssl.cert.subject.CN:"{cn}"', limit=20)
                    for match in results.get("matches", []):
                        correlations["shared_ssl"].append({
                            "ip": match.get("ip_str", ""),
                            "cn": cn,
                        })
                except shodan.APIError:
                    pass

    print(f"[+] Infrastructure correlations for {ip_address}:")
    print(f"  Same org: {len(correlations['same_org'])} hosts")
    print(f"  Shared SSL: {len(correlations['shared_ssl'])} hosts")
    return correlations
python
def correlate_infrastructure(enricher, ip_address):
    """Find related infrastructure based on shared attributes."""
    host_data = enricher.enrich_ip(ip_address)
    if not host_data:
        return {}

    correlations = {
        "same_org": [],
        "same_asn": [],
        "shared_ssl": [],
    }

    # Search for same organization
    org = host_data.get("organization", "")
    if org:
        try:
            results = enricher.api.search(f'org:"{org}"', limit=20)
            for match in results.get("matches", []):
                correlations["same_org"].append({
                    "ip": match.get("ip_str", ""),
                    "port": match.get("port", 0),
                    "product": match.get("product", ""),
                })
        except shodan.APIError:
            pass

    # Search for same SSL certificate
    for service in host_data.get("services", []):
        ssl_subject = service.get("ssl_subject", {})
        if ssl_subject:
            cn = ssl_subject.get("CN", "")
            if cn:
                try:
                    results = enricher.api.search(f'ssl.cert.subject.CN:"{cn}"', limit=20)
                    for match in results.get("matches", []):
                        correlations["shared_ssl"].append({
                            "ip": match.get("ip_str", ""),
                            "cn": cn,
                        })
                except shodan.APIError:
                    pass

    print(f"[+] Infrastructure correlations for {ip_address}:")
    print(f"  Same org: {len(correlations['same_org'])} hosts")
    print(f"  Shared SSL: {len(correlations['shared_ssl'])} hosts")
    return correlations

Validation Criteria

验证标准

  • Shodan API queried successfully with proper authentication
  • IP enrichment returns ports, services, vulnerabilities, and geolocation
  • Reputation scoring classifies IPs by threat level
  • Batch enrichment handles rate limiting correctly
  • Infrastructure correlation identifies related hosts
  • InternetDB free API used for high-volume lookups
  • 成功通过正确认证调用Shodan API
  • IP信息增强返回端口、服务、漏洞及地理定位信息
  • 信誉评分根据威胁等级对IP进行分类
  • 批量信息增强正确处理速率限制
  • 基础设施关联识别出相关主机
  • 使用InternetDB免费API进行大量查询

References

参考资料