ln-652-transaction-correctness-auditor

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Transaction Correctness Auditor (L3 Worker)

事务正确性审计器(L3工作器)

Specialized worker auditing database transaction patterns for correctness, scope, and trigger interaction.
专注于审计数据库事务模式的正确性、范围以及触发器交互的工作器。

Purpose & Scope

目的与范围

  • Worker in ln-650 coordinator pipeline - invoked by ln-650-persistence-performance-auditor
  • Audit transaction correctness (Priority: HIGH)
  • Check commit patterns, transaction boundaries, rollback handling, trigger/notify semantics
  • Return structured findings with severity, location, effort, recommendations
  • Calculate compliance score (X/10) for Transaction Correctness category
  • ln-650协调器流水线中的工作器 - 由ln-650-persistence-performance-auditor调用
  • 审计事务正确性(优先级:高)
  • 检查提交模式、事务边界、回滚处理、触发器/通知语义
  • 返回包含严重程度、位置、修复工作量和建议的结构化审计结果
  • 计算事务正确性类别的合规分数(X/10)

Inputs (from Coordinator)

输入(来自协调器)

MANDATORY READ: Load
shared/references/task_delegation_pattern.md#audit-coordinator--worker-contract
for contextStore structure.
Receives
contextStore
with:
tech_stack
,
best_practices
,
db_config
(database type, ORM settings, trigger/notify patterns),
codebase_root
.
Domain-aware: Supports
domain_mode
+
current_domain
.
必读: 加载
shared/references/task_delegation_pattern.md#audit-coordinator--worker-contract
以了解contextStore结构。
接收包含以下内容的
contextStore
tech_stack
best_practices
db_config
(数据库类型、ORM设置、触发器/通知模式)、
codebase_root
领域感知: 支持
domain_mode
+
current_domain

Workflow

工作流程

  1. Parse context from contextStore
    • Extract tech_stack, best_practices, db_config
    • Determine scan_path
  2. Discover transaction infrastructure
    • Find migration files with triggers (
      pg_notify
      ,
      CREATE TRIGGER
      ,
      NOTIFY
      )
    • Find session/transaction configuration (
      expire_on_commit
      ,
      autocommit
      , isolation level)
    • Map trigger-affected tables
  3. Scan codebase for violations
    • Trace UPDATE paths for trigger-affected tables
    • Analyze transaction boundaries (begin/commit scope)
    • Check error handling around commits
  4. Collect findings with severity, location, effort, recommendation
  5. Calculate score using penalty algorithm
  6. Return JSON result to coordinator
  1. 解析contextStore中的上下文
    • 提取tech_stack、best_practices、db_config
    • 确定扫描路径
  2. 发现事务基础设施
    • 查找包含触发器的迁移文件(
      pg_notify
      CREATE TRIGGER
      NOTIFY
    • 查找会话/事务配置(
      expire_on_commit
      autocommit
      、隔离级别)
    • 映射受触发器影响的表
  3. 扫描代码库以查找违规情况
    • 跟踪受触发器影响表的UPDATE路径
    • 分析事务边界(begin/commit范围)
    • 检查提交周围的错误处理
  4. 收集包含严重程度、位置、修复工作量和建议的审计结果
  5. 使用惩罚算法计算分数
  6. 向协调器返回JSON结果

Audit Rules (Priority: HIGH)

审计规则(优先级:高)

1. Missing Intermediate Commits

1. 缺失中间提交

What: UPDATE without commit when DB trigger/NOTIFY depends on transaction commit
Detection:
  • Step 1: Find triggers in migrations:
    • Grep for
      pg_notify|NOTIFY|CREATE TRIGGER|CREATE OR REPLACE FUNCTION.*trigger
      in
      alembic/versions/
      ,
      migrations/
    • Extract: trigger function name, table name, trigger event (INSERT/UPDATE)
  • Step 2: Find code that UPDATEs trigger-affected tables:
    • Grep for
      repo.*update|session\.execute.*update|\.progress|\.status
      related to trigger tables
  • Step 3: Check for
    commit()
    between sequential updates:
    • If multiple UPDATEs to trigger table occur in a loop/sequence without intermediate
      commit()
      , NOTIFY events are deferred until final commit
    • Real-time progress tracking breaks without intermediate commits
Severity:
  • CRITICAL: Missing commit for NOTIFY/LISTEN-based real-time features (SSE, WebSocket)
  • HIGH: Missing commit for triggers that update materialized data
Recommendation:
  • Add
    session.commit()
    at progress milestones (throttled: every N%, every T seconds)
  • Or move real-time notifications out of DB triggers (Redis pub/sub, in-process events)
Effort: S-M (add strategic commits or redesign notification path)
描述: 当数据库触发器/NOTIFY依赖事务提交时,执行UPDATE但未提交
检测方式:
  • 步骤1: 在迁移文件中查找触发器:
    • alembic/versions/
      migrations/
      中搜索
      pg_notify|NOTIFY|CREATE TRIGGER|CREATE OR REPLACE FUNCTION.*trigger
    • 提取:触发器函数名称、表名、触发器事件(INSERT/UPDATE)
  • 步骤2: 查找更新受触发器影响表的代码:
    • 搜索与触发器表相关的
      repo.*update|session\.execute.*update|\.progress|\.status
  • 步骤3: 检查连续更新之间是否存在
    commit()
    • 如果在循环/序列中对触发器表执行多次UPDATE但未进行中间
      commit()
      ,NOTIFY事件将延迟到最终提交时才触发
    • 没有中间提交会导致实时进度跟踪失效
严重程度:
  • CRITICAL(严重): 基于NOTIFY/LISTEN的实时功能(SSE、WebSocket)缺失提交
  • HIGH(高): 更新物化数据的触发器缺失提交
建议:
  • 在进度里程碑处添加
    session.commit()
    (限流:每N%或每T秒执行一次)
  • 或者将实时通知移出数据库触发器(使用Redis发布/订阅、进程内事件)
修复工作量: S-M(添加策略性提交或重新设计通知路径)

2. Transaction Scope Too Wide

2. 事务范围过宽

What: Single transaction wraps unrelated operations, including slow external calls
Detection:
  • Find
    async with session.begin()
    or explicit transaction blocks
  • Check if block contains external calls:
    await httpx.
    ,
    await aiohttp.
    ,
    await requests.
    ,
    await grpc.
  • Check if block contains file I/O:
    open(
    ,
    .read(
    ,
    .write(
  • Pattern: DB write + external API call + another DB write in same transaction
Severity:
  • HIGH: External HTTP/gRPC call inside transaction (holds DB connection during network latency)
  • MEDIUM: File I/O inside transaction
Recommendation: Split into separate transactions; use Saga/Outbox pattern for cross-service consistency
Effort: M-L (restructure transaction boundaries)
描述: 单个事务包含不相关操作,包括缓慢的外部调用
检测方式:
  • 查找
    async with session.begin()
    或显式事务块
  • 检查块中是否包含外部调用:
    await httpx.
    await aiohttp.
    await requests.
    await grpc.
  • 检查块中是否包含文件I/O:
    open(
    .read(
    .write(
  • 模式:数据库写入 + 外部API调用 + 另一个数据库写入在同一个事务中
严重程度:
  • HIGH(高): 事务内部包含外部HTTP/gRPC调用(在网络延迟期间占用数据库连接)
  • MEDIUM(中): 事务内部包含文件I/O
建议: 拆分为单独的事务;使用Saga/Outbox模式实现跨服务一致性
修复工作量: M-L(重构事务边界)

3. Transaction Scope Too Narrow

3. 事务范围过窄

What: Logically atomic operations split across multiple commits
Detection:
  • Multiple
    session.commit()
    calls for operations that should be atomic
  • Pattern: create parent entity, commit, create child entities, commit (should be single transaction)
  • Pattern: update status + create audit log in separate commits
Severity:
  • HIGH: Parent-child creation in separate commits (orphan risk on failure)
  • MEDIUM: Related updates in separate commits (inconsistent state on failure)
Recommendation: Wrap related operations in single transaction using
async with session.begin()
or unit-of-work pattern
Effort: M (restructure commit boundaries)
描述: 逻辑上原子的操作被拆分为多个提交
检测方式:
  • 多个
    session.commit()
    调用用于应该是原子性的操作
  • 模式:创建父实体,提交,创建子实体,提交(应在单个事务中完成)
  • 模式:更新状态 + 创建审计日志在单独的提交中
严重程度:
  • HIGH(高): 父子实体创建在单独的提交中(失败时存在孤儿风险)
  • MEDIUM(中): 相关更新在单独的提交中(失败时会导致状态不一致)
建议: 使用
async with session.begin()
或工作单元模式将相关操作包装在单个事务中
修复工作量: M(重构提交边界)

4. Missing Rollback Handling

4. 缺失回滚处理

What:
session.commit()
without proper error handling and rollback
Detection:
  • Find
    session.commit()
    not inside
    try/except
    block or context manager
  • Find
    session.commit()
    in
    try
    without
    session.rollback()
    in
    except
  • Pattern: bare
    await session.commit()
    in service methods
  • Exception:
    async with session.begin()
    auto-rollbacks (safe)
Severity:
  • MEDIUM: Missing rollback (session left in broken state on failure)
  • LOW: Missing explicit rollback when using context manager (auto-handled)
Recommendation: Use
async with session.begin()
(auto-rollback), or add explicit
try/except/rollback
pattern
Effort: S (wrap in context manager or add error handling)
描述:
session.commit()
没有适当的错误处理和回滚
检测方式:
  • 查找未在
    try/except
    块或上下文管理器中的
    session.commit()
  • 查找在
    try
    块中但
    except
    块中没有
    session.rollback()
    session.commit()
  • 模式:服务方法中存在裸
    await session.commit()
  • 例外情况:
    async with session.begin()
    会自动回滚(安全)
严重程度:
  • MEDIUM(中): 缺失回滚(失败时会话处于损坏状态)
  • LOW(低): 使用上下文管理器时缺失显式回滚(已自动处理)
建议: 使用
async with session.begin()
(自动回滚),或添加显式
try/except/rollback
模式
修复工作量: S(包装在上下文管理器中或添加错误处理)

5. Long-Held Transaction

5. 长期持有的事务

What: Transaction open during slow/blocking operations
Detection:
  • Measure scope: count lines between transaction start and commit
  • Flag if >50 lines of code between
    begin()
    and
    commit()
  • Flag if transaction contains
    await
    calls to external services (network latency)
  • Flag if transaction contains
    time.sleep()
    or
    asyncio.sleep()
Severity:
  • HIGH: Transaction held during external API call (connection pool exhaustion risk)
  • MEDIUM: Transaction spans >50 lines (complex logic, high chance of lock contention)
Recommendation: Minimize transaction scope; prepare data before opening transaction, commit immediately after DB operations
Effort: M (restructure code to minimize transaction window)
描述: 事务在缓慢/阻塞操作期间保持打开状态
检测方式:
  • 测量范围:统计事务开始和提交之间的代码行数
  • 如果
    begin()
    commit()
    之间的代码行数超过50行则标记
  • 如果事务包含对外部服务的
    await
    调用(网络延迟)则标记
  • 如果事务包含
    time.sleep()
    asyncio.sleep()
    则标记
严重程度:
  • HIGH(高): 事务在外部API调用期间保持打开(存在连接池耗尽风险)
  • MEDIUM(中): 事务跨越超过50行代码(逻辑复杂,锁竞争概率高)
建议: 最小化事务范围;在打开事务前准备好数据,数据库操作完成后立即提交
修复工作量: M(重构代码以最小化事务窗口)

Scoring Algorithm

评分算法

See
shared/references/audit_scoring.md
for unified formula and score interpretation.
有关统一公式和分数解释,请参阅
shared/references/audit_scoring.md

Output Format

输出格式

Return JSON to coordinator:
json
{
  "category": "Transaction Correctness",
  "score": 5,
  "total_issues": 6,
  "critical": 1,
  "high": 2,
  "medium": 2,
  "low": 1,
  "findings": [
    {
      "severity": "CRITICAL",
      "location": "app/infrastructure/messaging/job_processor.py:412",
      "issue": "Missing intermediate commits: progress UPDATEs trigger pg_notify but no commit() between updates; real-time SSE events deferred",
      "principle": "Transaction Correctness / Trigger Semantics",
      "recommendation": "Add session.commit() at progress milestones (throttled every 5%)",
      "effort": "S"
    }
  ]
}
向协调器返回JSON:
json
{
  "category": "Transaction Correctness",
  "score": 5,
  "total_issues": 6,
  "critical": 1,
  "high": 2,
  "medium": 2,
  "low": 1,
  "findings": [
    {
      "severity": "CRITICAL",
      "location": "app/infrastructure/messaging/job_processor.py:412",
      "issue": "Missing intermediate commits: progress UPDATEs trigger pg_notify but no commit() between updates; real-time SSE events deferred",
      "principle": "Transaction Correctness / Trigger Semantics",
      "recommendation": "Add session.commit() at progress milestones (throttled every 5%)",
      "effort": "S"
    }
  ]
}

Critical Rules

关键规则

  • Do not auto-fix: Report only
  • Trigger discovery first: Always scan migrations for triggers/NOTIFY before analyzing transaction patterns
  • ORM-aware: Check if ORM context manager auto-rollbacks (
    async with session.begin()
    is safe)
  • Exclude test transactions: Do not flag test fixtures with manual commit/rollback
  • Database-specific: PostgreSQL NOTIFY semantics differ from MySQL event scheduler
  • 请勿自动修复: 仅报告问题
  • 先发现触发器: 在分析事务模式前,始终先扫描迁移文件中的触发器/NOTIFY
  • ORM感知: 检查ORM上下文管理器是否自动回滚(
    async with session.begin()
    是安全的)
  • 排除测试事务: 不要标记带有手动提交/回滚的测试夹具
  • 数据库特定: PostgreSQL NOTIFY语义与MySQL事件调度器不同

Definition of Done

完成定义

  • contextStore parsed (tech_stack, db_config, trigger patterns)
  • scan_path determined
  • Trigger/NOTIFY infrastructure discovered from migrations
  • All 5 checks completed:
    • missing intermediate commits, scope too wide, scope too narrow, missing rollback, long-held
  • Findings collected with severity, location, effort, recommendation
  • Score calculated
  • JSON returned to coordinator

Version: 1.0.0 Last Updated: 2026-02-04
  • 已解析contextStore(tech_stack、db_config、触发器模式)
  • 已确定扫描路径
  • 已从迁移文件中发现触发器/NOTIFY基础设施
  • 已完成所有5项检查:
    • 缺失中间提交、范围过宽、范围过窄、缺失回滚、长期持有
  • 已收集包含严重程度、位置、修复工作量和建议的审计结果
  • 已计算分数
  • 已向协调器返回JSON结果

版本: 1.0.0 最后更新: 2026-02-04