stride-analysis-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

STRIDE Analysis Patterns

STRIDE分析模式

Systematic threat identification using the STRIDE methodology.
使用STRIDE方法论进行系统性威胁识别。

When to Use This Skill

何时使用此方法

  • Starting new threat modeling sessions
  • Analyzing existing system architecture
  • Reviewing security design decisions
  • Creating threat documentation
  • Training teams on threat identification
  • Compliance and audit preparation
  • 启动新的威胁建模会议
  • 分析现有系统架构
  • 评审安全设计决策
  • 创建威胁文档
  • 培训团队掌握威胁识别方法
  • 合规性与审计准备

Core Concepts

核心概念

1. STRIDE Categories

1. STRIDE分类

S - Spoofing       → Authentication threats
T - Tampering      → Integrity threats
R - Repudiation    → Non-repudiation threats
I - Information    → Confidentiality threats
    Disclosure
D - Denial of      → Availability threats
    Service
E - Elevation of   → Authorization threats
    Privilege
S - Spoofing       → 身份仿冒类威胁(认证威胁)
T - Tampering      → 篡改类威胁(完整性威胁)
R - Repudiation    → 抵赖类威胁(不可抵赖性威胁)
I - Information    → 信息泄露类威胁(保密性威胁)
    Disclosure
D - Denial of      → 拒绝服务类威胁(可用性威胁)
    Service
E - Elevation of   → 权限提升类威胁(授权威胁)
    Privilege

2. Threat Analysis Matrix

2. 威胁分析矩阵

CategoryQuestionControl Family
SpoofingCan attacker pretend to be someone else?Authentication
TamperingCan attacker modify data in transit/rest?Integrity
RepudiationCan attacker deny actions?Logging/Audit
Info DisclosureCan attacker access unauthorized data?Encryption
DoSCan attacker disrupt availability?Rate limiting
ElevationCan attacker gain higher privileges?Authorization
分类问题控制类别
身份仿冒(Spoofing)攻击者能否伪装成其他身份?身份认证
篡改(Tampering)攻击者能否修改传输/存储中的数据?完整性保障
抵赖(Repudiation)攻击者能否否认其执行的操作?日志/审计
信息泄露(Info Disclosure)攻击者能否访问未授权的数据?加密
拒绝服务(DoS)攻击者能否破坏系统可用性?速率限制
权限提升(Elevation)攻击者能否获取更高权限?授权控制

Templates

模板

Template 1: STRIDE Threat Model Document

模板1:STRIDE威胁模型文档

markdown
undefined
markdown
undefined

Threat Model: [System Name]

威胁模型:[系统名称]

1. System Overview

1. 系统概述

1.1 Description

1.1 描述

[Brief description of the system and its purpose]
[系统及其用途的简要说明]

1.2 Data Flow Diagram

1.2 数据流图


[User] --> [Web App] --> [API Gateway] --> [Backend Services]
|
v
[Database]

[用户] --> [Web应用] --> [API网关] --> [后端服务]
|
v
[数据库]

1.3 Trust Boundaries

1.3 信任边界

  • External Boundary: Internet to DMZ
  • Internal Boundary: DMZ to Internal Network
  • Data Boundary: Application to Database
  • 外部边界:互联网到DMZ区域
  • 内部边界:DMZ区域到内部网络
  • 数据边界:应用到数据库

2. Assets

2. 资产

AssetSensitivityDescription
User CredentialsHighAuthentication tokens, passwords
Personal DataHighPII, financial information
Session DataMediumActive user sessions
Application LogsMediumSystem activity records
ConfigurationHighSystem settings, secrets
资产敏感度描述
用户凭证认证令牌、密码
个人数据PII(个人可识别信息)、财务信息
会话数据活跃用户会话
应用日志系统活动记录
配置信息系统设置、机密信息

3. STRIDE Analysis

3. STRIDE分析

3.1 Spoofing Threats

3.1 身份仿冒威胁

IDThreatTargetImpactLikelihood
S1Session hijackingUser sessionsHighMedium
S2Token forgeryJWT tokensHighLow
S3Credential stuffingLogin endpointHighHigh
Mitigations:
  • Implement MFA
  • Use secure session management
  • Implement account lockout policies
ID威胁目标影响发生概率
S1会话劫持用户会话
S2令牌伪造JWT令牌
S3凭证填充攻击登录端点
缓解措施:
  • 实现多因素认证(MFA)
  • 使用安全的会话管理机制
  • 实施账户锁定策略

3.2 Tampering Threats

3.2 篡改威胁

IDThreatTargetImpactLikelihood
T1SQL injectionDatabase queriesCriticalMedium
T2Parameter manipulationAPI requestsHighHigh
T3File upload abuseFile storageHighMedium
Mitigations:
  • Input validation on all endpoints
  • Parameterized queries
  • File type validation
ID威胁目标影响发生概率
T1SQL注入数据库查询严重
T2参数篡改API请求
T3文件上传滥用文件存储
缓解措施:
  • 对所有端点进行输入验证
  • 使用参数化查询
  • 文件类型验证

3.3 Repudiation Threats

3.3 抵赖威胁

IDThreatTargetImpactLikelihood
R1Transaction denialFinancial opsHighMedium
R2Access log tamperingAudit logsMediumLow
R3Action attributionUser actionsMediumMedium
Mitigations:
  • Comprehensive audit logging
  • Log integrity protection
  • Digital signatures for critical actions
ID威胁目标影响发生概率
R1交易抵赖金融操作
R2访问日志篡改审计日志
R3操作归属争议用户操作
缓解措施:
  • 全面的审计日志记录
  • 日志完整性保护
  • 关键操作使用数字签名

3.4 Information Disclosure Threats

3.4 信息泄露威胁

IDThreatTargetImpactLikelihood
I1Data breachUser PIICriticalMedium
I2Error message leakageSystem infoLowHigh
I3Insecure transmissionNetwork trafficHighMedium
Mitigations:
  • Encryption at rest and in transit
  • Sanitize error messages
  • Implement TLS 1.3
ID威胁目标影响发生概率
I1数据泄露用户PII严重
I2错误信息泄露系统信息
I3不安全传输网络流量
缓解措施:
  • 静态数据与传输数据加密
  • 清理错误信息
  • 实施TLS 1.3

3.5 Denial of Service Threats

3.5 拒绝服务威胁

IDThreatTargetImpactLikelihood
D1Resource exhaustionAPI serversHighHigh
D2Database overloadDatabaseCriticalMedium
D3Bandwidth saturationNetworkHighMedium
Mitigations:
  • Rate limiting
  • Auto-scaling
  • DDoS protection
ID威胁目标影响发生概率
D1资源耗尽API服务器
D2数据库过载数据库严重
D3带宽饱和网络
缓解措施:
  • 速率限制
  • 自动扩缩容
  • DDoS防护

3.6 Elevation of Privilege Threats

3.6 权限提升威胁

IDThreatTargetImpactLikelihood
E1IDOR vulnerabilitiesUser resourcesHighHigh
E2Role manipulationAdmin accessCriticalLow
E3JWT claim tamperingAuthorizationHighMedium
Mitigations:
  • Proper authorization checks
  • Principle of least privilege
  • Server-side role validation
ID威胁目标影响发生概率
E1IDOR漏洞用户资源
E2角色篡改管理员权限严重
E3JWT声明篡改授权验证
缓解措施:
  • 正确的授权检查
  • 遵循最小权限原则
  • 服务器端角色验证

4. Risk Assessment

4. 风险评估

4.1 Risk Matrix

4.1 风险矩阵


              IMPACT
         Low  Med  High Crit
    Low   1    2    3    4

L Med 2 4 6 8
I High 3 6 9 12
K Crit 4 8 12 16

              影响程度
         低  中  高  严重
    低   1    2    3    4

概 中  2    4    6    8
率 高  3    6    9   12
   严重 4    8   12   16

4.2 Prioritized Risks

4.2 优先级风险

RankThreatRisk ScorePriority
1SQL Injection (T1)12Critical
2IDOR (E1)9High
3Credential Stuffing (S3)9High
4Data Breach (I1)8High
排名威胁风险评分优先级
1SQL注入(T1)12严重
2IDOR漏洞(E1)9
3凭证填充攻击(S3)9
4数据泄露(I1)8

5. Recommendations

5. 建议

Immediate Actions

立即行动

  1. Implement input validation framework
  2. Add rate limiting to authentication endpoints
  3. Enable comprehensive audit logging
  1. 实现输入验证框架
  2. 为认证端点添加速率限制
  3. 启用全面的审计日志

Short-term (30 days)

短期(30天)

  1. Deploy WAF with OWASP ruleset
  2. Implement MFA for sensitive operations
  3. Encrypt all PII at rest
  1. 部署包含OWASP规则集的WAF
  2. 为敏感操作实施MFA
  3. 加密所有静态存储的PII数据

Long-term (90 days)

长期(90天)

  1. Security awareness training
  2. Penetration testing
  3. Bug bounty program
undefined
  1. 安全意识培训
  2. 渗透测试
  3. 漏洞悬赏计划
undefined

Template 2: STRIDE Analysis Code

模板2:STRIDE分析代码

python
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional
import json

class StrideCategory(Enum):
    SPOOFING = "S"
    TAMPERING = "T"
    REPUDIATION = "R"
    INFORMATION_DISCLOSURE = "I"
    DENIAL_OF_SERVICE = "D"
    ELEVATION_OF_PRIVILEGE = "E"


class Impact(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4


class Likelihood(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4


@dataclass
class Threat:
    id: str
    category: StrideCategory
    title: str
    description: str
    target: str
    impact: Impact
    likelihood: Likelihood
    mitigations: List[str] = field(default_factory=list)
    status: str = "open"

    @property
    def risk_score(self) -> int:
        return self.impact.value * self.likelihood.value

    @property
    def risk_level(self) -> str:
        score = self.risk_score
        if score >= 12:
            return "Critical"
        elif score >= 6:
            return "High"
        elif score >= 3:
            return "Medium"
        return "Low"


@dataclass
class Asset:
    name: str
    sensitivity: str
    description: str
    data_classification: str


@dataclass
class TrustBoundary:
    name: str
    description: str
    from_zone: str
    to_zone: str


@dataclass
class ThreatModel:
    name: str
    version: str
    description: str
    assets: List[Asset] = field(default_factory=list)
    boundaries: List[TrustBoundary] = field(default_factory=list)
    threats: List[Threat] = field(default_factory=list)

    def add_threat(self, threat: Threat) -> None:
        self.threats.append(threat)

    def get_threats_by_category(self, category: StrideCategory) -> List[Threat]:
        return [t for t in self.threats if t.category == category]

    def get_critical_threats(self) -> List[Threat]:
        return [t for t in self.threats if t.risk_level in ("Critical", "High")]

    def generate_report(self) -> Dict:
        """Generate threat model report."""
        return {
            "summary": {
                "name": self.name,
                "version": self.version,
                "total_threats": len(self.threats),
                "critical_threats": len([t for t in self.threats if t.risk_level == "Critical"]),
                "high_threats": len([t for t in self.threats if t.risk_level == "High"]),
            },
            "by_category": {
                cat.name: len(self.get_threats_by_category(cat))
                for cat in StrideCategory
            },
            "top_risks": [
                {
                    "id": t.id,
                    "title": t.title,
                    "risk_score": t.risk_score,
                    "risk_level": t.risk_level
                }
                for t in sorted(self.threats, key=lambda x: x.risk_score, reverse=True)[:10]
            ]
        }


class StrideAnalyzer:
    """Automated STRIDE analysis helper."""

    STRIDE_QUESTIONS = {
        StrideCategory.SPOOFING: [
            "Can an attacker impersonate a legitimate user?",
            "Are authentication tokens properly validated?",
            "Can session identifiers be predicted or stolen?",
            "Is multi-factor authentication available?",
        ],
        StrideCategory.TAMPERING: [
            "Can data be modified in transit?",
            "Can data be modified at rest?",
            "Are input validation controls sufficient?",
            "Can an attacker manipulate application logic?",
        ],
        StrideCategory.REPUDIATION: [
            "Are all security-relevant actions logged?",
            "Can logs be tampered with?",
            "Is there sufficient attribution for actions?",
            "Are timestamps reliable and synchronized?",
        ],
        StrideCategory.INFORMATION_DISCLOSURE: [
            "Is sensitive data encrypted at rest?",
            "Is sensitive data encrypted in transit?",
            "Can error messages reveal sensitive information?",
            "Are access controls properly enforced?",
        ],
        StrideCategory.DENIAL_OF_SERVICE: [
            "Are rate limits implemented?",
            "Can resources be exhausted by malicious input?",
            "Is there protection against amplification attacks?",
            "Are there single points of failure?",
        ],
        StrideCategory.ELEVATION_OF_PRIVILEGE: [
            "Are authorization checks performed consistently?",
            "Can users access other users' resources?",
            "Can privilege escalation occur through parameter manipulation?",
            "Is the principle of least privilege followed?",
        ],
    }

    def generate_questionnaire(self, component: str) -> List[Dict]:
        """Generate STRIDE questionnaire for a component."""
        questionnaire = []
        for category, questions in self.STRIDE_QUESTIONS.items():
            for q in questions:
                questionnaire.append({
                    "component": component,
                    "category": category.name,
                    "question": q,
                    "answer": None,
                    "notes": ""
                })
        return questionnaire

    def suggest_mitigations(self, category: StrideCategory) -> List[str]:
        """Suggest common mitigations for a STRIDE category."""
        mitigations = {
            StrideCategory.SPOOFING: [
                "Implement multi-factor authentication",
                "Use secure session management",
                "Implement account lockout policies",
                "Use cryptographically secure tokens",
                "Validate authentication at every request",
            ],
            StrideCategory.TAMPERING: [
                "Implement input validation",
                "Use parameterized queries",
                "Apply integrity checks (HMAC, signatures)",
                "Implement Content Security Policy",
                "Use immutable infrastructure",
            ],
            StrideCategory.REPUDIATION: [
                "Enable comprehensive audit logging",
                "Protect log integrity",
                "Implement digital signatures",
                "Use centralized, tamper-evident logging",
                "Maintain accurate timestamps",
            ],
            StrideCategory.INFORMATION_DISCLOSURE: [
                "Encrypt data at rest and in transit",
                "Implement proper access controls",
                "Sanitize error messages",
                "Use secure defaults",
                "Implement data classification",
            ],
            StrideCategory.DENIAL_OF_SERVICE: [
                "Implement rate limiting",
                "Use auto-scaling",
                "Deploy DDoS protection",
                "Implement circuit breakers",
                "Set resource quotas",
            ],
            StrideCategory.ELEVATION_OF_PRIVILEGE: [
                "Implement proper authorization",
                "Follow principle of least privilege",
                "Validate permissions server-side",
                "Use role-based access control",
                "Implement security boundaries",
            ],
        }
        return mitigations.get(category, [])
python
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional
import json

class StrideCategory(Enum):
    SPOOFING = "S"
    TAMPERING = "T"
    REPUDIATION = "R"
    INFORMATION_DISCLOSURE = "I"
    DENIAL_OF_SERVICE = "D"
    ELEVATION_OF_PRIVILEGE = "E"


class Impact(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4


class Likelihood(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4


@dataclass
class Threat:
    id: str
    category: StrideCategory
    title: str
    description: str
    target: str
    impact: Impact
    likelihood: Likelihood
    mitigations: List[str] = field(default_factory=list)
    status: str = "open"

    @property
    def risk_score(self) -> int:
        return self.impact.value * self.likelihood.value

    @property
    def risk_level(self) -> str:
        score = self.risk_score
        if score >= 12:
            return "Critical"
        elif score >= 6:
            return "High"
        elif score >= 3:
            return "Medium"
        return "Low"


@dataclass
class Asset:
    name: str
    sensitivity: str
    description: str
    data_classification: str


@dataclass
class TrustBoundary:
    name: str
    description: str
    from_zone: str
    to_zone: str


@dataclass
class ThreatModel:
    name: str
    version: str
    description: str
    assets: List[Asset] = field(default_factory=list)
    boundaries: List[TrustBoundary] = field(default_factory=list)
    threats: List[Threat] = field(default_factory=list)

    def add_threat(self, threat: Threat) -> None:
        self.threats.append(threat)

    def get_threats_by_category(self, category: StrideCategory) -> List[Threat]:
        return [t for t in self.threats if t.category == category]

    def get_critical_threats(self) -> List[Threat]:
        return [t for t in self.threats if t.risk_level in ("Critical", "High")]

    def generate_report(self) -> Dict:
        """Generate threat model report."""
        return {
            "summary": {
                "name": self.name,
                "version": self.version,
                "total_threats": len(self.threats),
                "critical_threats": len([t for t in self.threats if t.risk_level == "Critical"]),
                "high_threats": len([t for t in self.threats if t.risk_level == "High"]),
            },
            "by_category": {
                cat.name: len(self.get_threats_by_category(cat))
                for cat in StrideCategory
            },
            "top_risks": [
                {
                    "id": t.id,
                    "title": t.title,
                    "risk_score": t.risk_score,
                    "risk_level": t.risk_level
                }
                for t in sorted(self.threats, key=lambda x: x.risk_score, reverse=True)[:10]
            ]
        }


class StrideAnalyzer:
    """Automated STRIDE analysis helper."""

    STRIDE_QUESTIONS = {
        StrideCategory.SPOOFING: [
            "Can an attacker impersonate a legitimate user?",
            "Are authentication tokens properly validated?",
            "Can session identifiers be predicted or stolen?",
            "Is multi-factor authentication available?",
        ],
        StrideCategory.TAMPERING: [
            "Can data be modified in transit?",
            "Can data be modified at rest?",
            "Are input validation controls sufficient?",
            "Can an attacker manipulate application logic?",
        ],
        StrideCategory.REPUDIATION: [
            "Are all security-relevant actions logged?",
            "Can logs be tampered with?",
            "Is there sufficient attribution for actions?",
            "Are timestamps reliable and synchronized?",
        ],
        StrideCategory.INFORMATION_DISCLOSURE: [
            "Is sensitive data encrypted at rest?",
            "Is sensitive data encrypted in transit?",
            "Can error messages reveal sensitive information?",
            "Are access controls properly enforced?",
        ],
        StrideCategory.DENIAL_OF_SERVICE: [
            "Are rate limits implemented?",
            "Can resources be exhausted by malicious input?",
            "Is there protection against amplification attacks?",
            "Are there single points of failure?",
        ],
        StrideCategory.ELEVATION_OF_PRIVILEGE: [
            "Are authorization checks performed consistently?",
            "Can users access other users' resources?",
            "Can privilege escalation occur through parameter manipulation?",
            "Is the principle of least privilege followed?",
        ],
    }

    def generate_questionnaire(self, component: str) -> List[Dict]:
        """Generate STRIDE questionnaire for a component."""
        questionnaire = []
        for category, questions in self.STRIDE_QUESTIONS.items():
            for q in questions:
                questionnaire.append({
                    "component": component,
                    "category": category.name,
                    "question": q,
                    "answer": None,
                    "notes": ""
                })
        return questionnaire

    def suggest_mitigations(self, category: StrideCategory) -> List[str]:
        """Suggest common mitigations for a STRIDE category."""
        mitigations = {
            StrideCategory.SPOOFING: [
                "Implement multi-factor authentication",
                "Use secure session management",
                "Implement account lockout policies",
                "Use cryptographically secure tokens",
                "Validate authentication at every request",
            ],
            StrideCategory.TAMPERING: [
                "Implement input validation",
                "Use parameterized queries",
                "Apply integrity checks (HMAC, signatures)",
                "Implement Content Security Policy",
                "Use immutable infrastructure",
            ],
            StrideCategory.REPUDIATION: [
                "Enable comprehensive audit logging",
                "Protect log integrity",
                "Implement digital signatures",
                "Use centralized, tamper-evident logging",
                "Maintain accurate timestamps",
            ],
            StrideCategory.INFORMATION_DISCLOSURE: [
                "Encrypt data at rest and in transit",
                "Implement proper access controls",
                "Sanitize error messages",
                "Use secure defaults",
                "Implement data classification",
            ],
            StrideCategory.DENIAL_OF_SERVICE: [
                "Implement rate limiting",
                "Use auto-scaling",
                "Deploy DDoS protection",
                "Implement circuit breakers",
                "Set resource quotas",
            ],
            StrideCategory.ELEVATION_OF_PRIVILEGE: [
                "Implement proper authorization",
                "Follow principle of least privilege",
                "Validate permissions server-side",
                "Use role-based access control",
                "Implement security boundaries",
            ],
        }
        return mitigations.get(category, [])

Template 3: Data Flow Diagram Analysis

模板3:数据流图分析

python
from dataclasses import dataclass
from typing import List, Set, Tuple
from enum import Enum

class ElementType(Enum):
    EXTERNAL_ENTITY = "external"
    PROCESS = "process"
    DATA_STORE = "datastore"
    DATA_FLOW = "dataflow"


@dataclass
class DFDElement:
    id: str
    name: str
    type: ElementType
    trust_level: int  # 0 = untrusted, higher = more trusted
    description: str = ""


@dataclass
class DataFlow:
    id: str
    name: str
    source: str
    destination: str
    data_type: str
    protocol: str
    encrypted: bool = False


class DFDAnalyzer:
    """Analyze Data Flow Diagrams for STRIDE threats."""

    def __init__(self):
        self.elements: Dict[str, DFDElement] = {}
        self.flows: List[DataFlow] = []

    def add_element(self, element: DFDElement) -> None:
        self.elements[element.id] = element

    def add_flow(self, flow: DataFlow) -> None:
        self.flows.append(flow)

    def find_trust_boundary_crossings(self) -> List[Tuple[DataFlow, int]]:
        """Find data flows that cross trust boundaries."""
        crossings = []
        for flow in self.flows:
            source = self.elements.get(flow.source)
            dest = self.elements.get(flow.destination)
            if source and dest and source.trust_level != dest.trust_level:
                trust_diff = abs(source.trust_level - dest.trust_level)
                crossings.append((flow, trust_diff))
        return sorted(crossings, key=lambda x: x[1], reverse=True)

    def identify_threats_per_element(self) -> Dict[str, List[StrideCategory]]:
        """Map applicable STRIDE categories to element types."""
        threat_mapping = {
            ElementType.EXTERNAL_ENTITY: [
                StrideCategory.SPOOFING,
                StrideCategory.REPUDIATION,
            ],
            ElementType.PROCESS: [
                StrideCategory.SPOOFING,
                StrideCategory.TAMPERING,
                StrideCategory.REPUDIATION,
                StrideCategory.INFORMATION_DISCLOSURE,
                StrideCategory.DENIAL_OF_SERVICE,
                StrideCategory.ELEVATION_OF_PRIVILEGE,
            ],
            ElementType.DATA_STORE: [
                StrideCategory.TAMPERING,
                StrideCategory.REPUDIATION,
                StrideCategory.INFORMATION_DISCLOSURE,
                StrideCategory.DENIAL_OF_SERVICE,
            ],
            ElementType.DATA_FLOW: [
                StrideCategory.TAMPERING,
                StrideCategory.INFORMATION_DISCLOSURE,
                StrideCategory.DENIAL_OF_SERVICE,
            ],
        }

        result = {}
        for elem_id, elem in self.elements.items():
            result[elem_id] = threat_mapping.get(elem.type, [])
        return result

    def analyze_unencrypted_flows(self) -> List[DataFlow]:
        """Find unencrypted data flows crossing trust boundaries."""
        risky_flows = []
        for flow in self.flows:
            if not flow.encrypted:
                source = self.elements.get(flow.source)
                dest = self.elements.get(flow.destination)
                if source and dest and source.trust_level != dest.trust_level:
                    risky_flows.append(flow)
        return risky_flows

    def generate_threat_enumeration(self) -> List[Dict]:
        """Generate comprehensive threat enumeration."""
        threats = []
        element_threats = self.identify_threats_per_element()

        for elem_id, categories in element_threats.items():
            elem = self.elements[elem_id]
            for category in categories:
                threats.append({
                    "element_id": elem_id,
                    "element_name": elem.name,
                    "element_type": elem.type.value,
                    "stride_category": category.name,
                    "description": f"{category.name} threat against {elem.name}",
                    "trust_level": elem.trust_level
                })

        return threats
python
from dataclasses import dataclass
from typing import List, Set, Tuple
from enum import Enum

class ElementType(Enum):
    EXTERNAL_ENTITY = "external"
    PROCESS = "process"
    DATA_STORE = "datastore"
    DATA_FLOW = "dataflow"


@dataclass
class DFDElement:
    id: str
    name: str
    type: ElementType
    trust_level: int  # 0 = untrusted, higher = more trusted
    description: str = ""


@dataclass
class DataFlow:
    id: str
    name: str
    source: str
    destination: str
    data_type: str
    protocol: str
    encrypted: bool = False


class DFDAnalyzer:
    """Analyze Data Flow Diagrams for STRIDE threats."""

    def __init__(self):
        self.elements: Dict[str, DFDElement] = {}
        self.flows: List[DataFlow] = []

    def add_element(self, element: DFDElement) -> None:
        self.elements[element.id] = element

    def add_flow(self, flow: DataFlow) -> None:
        self.flows.append(flow)

    def find_trust_boundary_crossings(self) -> List[Tuple[DataFlow, int]]:
        """Find data flows that cross trust boundaries."""
        crossings = []
        for flow in self.flows:
            source = self.elements.get(flow.source)
            dest = self.elements.get(flow.destination)
            if source and dest and source.trust_level != dest.trust_level:
                trust_diff = abs(source.trust_level - dest.trust_level)
                crossings.append((flow, trust_diff))
        return sorted(crossings, key=lambda x: x[1], reverse=True)

    def identify_threats_per_element(self) -> Dict[str, List[StrideCategory]]:
        """Map applicable STRIDE categories to element types."""
        threat_mapping = {
            ElementType.EXTERNAL_ENTITY: [
                StrideCategory.SPOOFING,
                StrideCategory.REPUDIATION,
            ],
            ElementType.PROCESS: [
                StrideCategory.SPOOFING,
                StrideCategory.TAMPERING,
                StrideCategory.REPUDIATION,
                StrideCategory.INFORMATION_DISCLOSURE,
                StrideCategory.DENIAL_OF_SERVICE,
                StrideCategory.ELEVATION_OF_PRIVILEGE,
            ],
            ElementType.DATA_STORE: [
                StrideCategory.TAMPERING,
                StrideCategory.REPUDIATION,
                StrideCategory.INFORMATION_DISCLOSURE,
                StrideCategory.DENIAL_OF_SERVICE,
            ],
            ElementType.DATA_FLOW: [
                StrideCategory.TAMPERING,
                StrideCategory.INFORMATION_DISCLOSURE,
                StrideCategory.DENIAL_OF_SERVICE,
            ],
        }

        result = {}
        for elem_id, elem in self.elements.items():
            result[elem_id] = threat_mapping.get(elem.type, [])
        return result

    def analyze_unencrypted_flows(self) -> List[DataFlow]:
        """Find unencrypted data flows crossing trust boundaries."""
        risky_flows = []
        for flow in self.flows:
            if not flow.encrypted:
                source = self.elements.get(flow.source)
                dest = self.elements.get(flow.destination)
                if source and dest and source.trust_level != dest.trust_level:
                    risky_flows.append(flow)
        return risky_flows

    def generate_threat_enumeration(self) -> List[Dict]:
        """Generate comprehensive threat enumeration."""
        threats = []
        element_threats = self.identify_threats_per_element()

        for elem_id, categories in element_threats.items():
            elem = self.elements[elem_id]
            for category in categories:
                threats.append({
                    "element_id": elem_id,
                    "element_name": elem.name,
                    "element_type": elem.type.value,
                    "stride_category": category.name,
                    "description": f"{category.name} threat against {elem.name}",
                    "trust_level": elem.trust_level
                })

        return threats

Template 4: STRIDE per Interaction

模板4:基于交互的STRIDE分析

python
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class Interaction:
    """Represents an interaction between two components."""
    id: str
    source: str
    target: str
    action: str
    data: str
    protocol: str


class StridePerInteraction:
    """Apply STRIDE to each interaction in the system."""

    INTERACTION_THREATS = {
        # Source type -> Target type -> Applicable threats
        ("external", "process"): {
            "S": "External entity spoofing identity to process",
            "T": "Tampering with data sent to process",
            "R": "External entity denying sending data",
            "I": "Data exposure during transmission",
            "D": "Flooding process with requests",
            "E": "Exploiting process to gain privileges",
        },
        ("process", "datastore"): {
            "T": "Process tampering with stored data",
            "R": "Process denying data modifications",
            "I": "Unauthorized data access by process",
            "D": "Process exhausting storage resources",
        },
        ("process", "process"): {
            "S": "Process spoofing another process",
            "T": "Tampering with inter-process data",
            "I": "Data leakage between processes",
            "D": "One process overwhelming another",
            "E": "Process gaining elevated access",
        },
    }

    def analyze_interaction(
        self,
        interaction: Interaction,
        source_type: str,
        target_type: str
    ) -> List[Dict]:
        """Analyze a single interaction for STRIDE threats."""
        threats = []
        key = (source_type, target_type)

        applicable_threats = self.INTERACTION_THREATS.get(key, {})

        for stride_code, description in applicable_threats.items():
            threats.append({
                "interaction_id": interaction.id,
                "source": interaction.source,
                "target": interaction.target,
                "stride_category": stride_code,
                "threat_description": description,
                "context": f"{interaction.action} - {interaction.data}",
            })

        return threats

    def generate_threat_matrix(
        self,
        interactions: List[Interaction],
        element_types: Dict[str, str]
    ) -> List[Dict]:
        """Generate complete threat matrix for all interactions."""
        all_threats = []

        for interaction in interactions:
            source_type = element_types.get(interaction.source, "unknown")
            target_type = element_types.get(interaction.target, "unknown")

            threats = self.analyze_interaction(
                interaction, source_type, target_type
            )
            all_threats.extend(threats)

        return all_threats
python
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class Interaction:
    """Represents an interaction between two components."""
    id: str
    source: str
    target: str
    action: str
    data: str
    protocol: str


class StridePerInteraction:
    """Apply STRIDE to each interaction in the system."""

    INTERACTION_THREATS = {
        # Source type -> Target type -> Applicable threats
        ("external", "process"): {
            "S": "External entity spoofing identity to process",
            "T": "Tampering with data sent to process",
            "R": "External entity denying sending data",
            "I": "Data exposure during transmission",
            "D": "Flooding process with requests",
            "E": "Exploiting process to gain privileges",
        },
        ("process", "datastore"): {
            "T": "Process tampering with stored data",
            "R": "Process denying data modifications",
            "I": "Unauthorized data access by process",
            "D": "Process exhausting storage resources",
        },
        ("process", "process"): {
            "S": "Process spoofing another process",
            "T": "Tampering with inter-process data",
            "I": "Data leakage between processes",
            "D": "One process overwhelming another",
            "E": "Process gaining elevated access",
        },
    }

    def analyze_interaction(
        self,
        interaction: Interaction,
        source_type: str,
        target_type: str
    ) -> List[Dict]:
        """Analyze a single interaction for STRIDE threats."""
        threats = []
        key = (source_type, target_type)

        applicable_threats = self.INTERACTION_THREATS.get(key, {})

        for stride_code, description in applicable_threats.items():
            threats.append({
                "interaction_id": interaction.id,
                "source": interaction.source,
                "target": interaction.target,
                "stride_category": stride_code,
                "threat_description": description,
                "context": f"{interaction.action} - {interaction.data}",
            })

        return threats

    def generate_threat_matrix(
        self,
        interactions: List[Interaction],
        element_types: Dict[str, str]
    ) -> List[Dict]:
        """Generate complete threat matrix for all interactions."""
        all_threats = []

        for interaction in interactions:
            source_type = element_types.get(interaction.source, "unknown")
            target_type = element_types.get(interaction.target, "unknown")

            threats = self.analyze_interaction(
                interaction, source_type, target_type
            )
            all_threats.extend(threats)

        return all_threats

Best Practices

最佳实践

Do's

建议做法

  • Involve stakeholders - Security, dev, and ops perspectives
  • Be systematic - Cover all STRIDE categories
  • Prioritize realistically - Focus on high-impact threats
  • Update regularly - Threat models are living documents
  • Use visual aids - DFDs help communication
  • 引入利益相关者 - 涵盖安全、开发和运维视角
  • 系统化执行 - 覆盖所有STRIDE分类
  • 合理优先级排序 - 聚焦高影响威胁
  • 定期更新 - 威胁模型是动态文档
  • 使用可视化工具 - 数据流图有助于沟通

Don'ts

避免做法

  • Don't skip categories - Each reveals different threats
  • Don't assume security - Question every component
  • Don't work in isolation - Collaborative modeling is better
  • Don't ignore low-probability - High-impact threats matter
  • Don't stop at identification - Follow through with mitigations
  • 不要跳过分类 - 每个分类揭示不同类型的威胁
  • 不要假设系统安全 - 对每个组件提出质疑
  • 不要孤立工作 - 协作建模效果更好
  • 不要忽视低概率高影响威胁 - 此类威胁同样重要
  • 不要仅停留在识别阶段 - 需跟进缓解措施

Resources

参考资源