cancel-async-tasks

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cancel Async Tasks

取消异步任务

Overview

概述

This skill provides guidance for implementing robust asyncio task cancellation in Python, particularly when dealing with signal handling (SIGINT/KeyboardInterrupt), semaphore-based concurrency limiting, and ensuring proper cleanup of all tasks including those waiting in queues.
本技能提供了在Python中实现可靠的asyncio任务取消的指南,尤其是在处理信号(SIGINT/KeyboardInterrupt)、基于信号量的并发限制,以及确保所有任务(包括队列中等待的任务)都能被正确清理的场景下。

Key Concepts

核心概念

Signal Propagation in Asyncio

Asyncio中的信号传播

Understanding how signals interact with asyncio is critical:
  1. KeyboardInterrupt vs CancelledError: When SIGINT is received during
    asyncio.run()
    , the behavior differs from catching exceptions inside async code. The event loop typically converts the interrupt to
    CancelledError
    that propagates through tasks.
  2. Signal handler context: Signal handlers run in the main thread, but asyncio tasks may be in various states (running, waiting on semaphore, waiting on I/O).
  3. Event loop state: The event loop's handling of SIGINT depends on whether it's running
    asyncio.run()
    vs manual loop management.
理解信号与asyncio的交互方式至关重要:
  1. KeyboardInterrupt 与 CancelledError对比:当在
    asyncio.run()
    期间收到SIGINT时,其行为与在异步代码内部捕获异常不同。事件循环通常会将该中断转换为
    CancelledError
    并在任务中传播。
  2. 信号处理程序上下文:信号处理程序在主线程中运行,但asyncio任务可能处于各种状态(运行中、等待信号量、等待I/O)。
  3. 事件循环状态:事件循环对SIGINT的处理方式取决于它是运行
    asyncio.run()
    还是手动管理循环。

Task Lifecycle States

任务生命周期状态

When cancellation occurs, tasks can be in different states:
  1. Running tasks: Currently executing code
  2. Awaiting tasks: Blocked on I/O or other coroutines
  3. Semaphore-waiting tasks: Waiting to acquire a semaphore for concurrency limiting
  4. Not-yet-started tasks: Created but not yet scheduled
Each state requires different handling for proper cleanup.
当取消发生时,任务可能处于不同状态:
  1. 运行中的任务:当前正在执行代码
  2. 等待中的任务:被I/O或其他协程阻塞
  3. 等待信号量的任务:等待获取信号量以进行并发限制
  4. 尚未启动的任务:已创建但尚未调度
每个状态都需要不同的处理方式以确保正确清理。

Potential Approaches

可行方案

Approach 1: Task Group with Exception Handling

方法1:带有异常处理的Task Group

Use
asyncio.TaskGroup
(Python 3.11+) for automatic cancellation propagation:
  • TaskGroup automatically cancels remaining tasks when one fails
  • Provides structured concurrency guarantees
  • Consider whether this matches the cleanup requirements
使用
asyncio.TaskGroup
(Python 3.11+)实现自动取消传播:
  • 当一个任务失败时,TaskGroup会自动取消剩余任务
  • 提供结构化并发保证
  • 需考虑该方案是否符合清理需求

Approach 2: Manual Task Tracking with Shield

方法2:使用Shield进行手动任务跟踪

Track all task objects explicitly and handle cancellation:
  • Maintain a list of all created task objects
  • Use
    asyncio.shield()
    for cleanup operations that must complete
  • Implement explicit cancellation loop for all tracked tasks
显式跟踪所有任务对象并处理取消:
  • 维护所有已创建任务对象的列表
  • 对必须完成的清理操作使用
    asyncio.shield()
  • 为所有跟踪的任务实现显式取消循环

Approach 3: Signal Handler Registration

方法3:注册信号处理程序

Register explicit signal handlers for SIGINT/SIGTERM:
  • Use
    loop.add_signal_handler()
    to register custom handlers
  • Set a cancellation flag or event that tasks check
  • Coordinate shutdown through the event loop
为SIGINT/SIGTERM注册显式信号处理程序:
  • 使用
    loop.add_signal_handler()
    注册自定义处理程序
  • 设置任务可检查的取消标志或事件
  • 通过事件循环协调关闭流程

Approach 4: Context Manager Pattern

方法4:上下文管理器模式

Wrap task execution in a context manager that handles cleanup:
  • __aenter__
    sets up tasks and tracking
  • __aexit__
    ensures all tasks are cancelled and awaited
  • Handles exceptions uniformly
将任务执行包装在处理清理的上下文管理器中:
  • __aenter__
    方法设置任务和跟踪机制
  • __aexit__
    方法确保所有任务都被取消并等待完成
  • 统一处理异常

Verification Strategies

验证策略

Testing with Real Signals

使用真实信号进行测试

Critical: Test with actual signals, not timeouts:
python
undefined
关键:使用实际信号进行测试,而非超时:
python
undefined

Correct approach: Use subprocess with actual SIGINT

正确方式:使用subprocess发送真实SIGINT

import subprocess import signal import time
proc = subprocess.Popen(['python', 'script.py']) time.sleep(1) # Let tasks start proc.send_signal(signal.SIGINT) stdout, stderr = proc.communicate(timeout=5)
import subprocess import signal import time
proc = subprocess.Popen(['python', 'script.py']) time.sleep(1) # 等待任务启动 proc.send_signal(signal.SIGINT) stdout, stderr = proc.communicate(timeout=5)

Verify cleanup messages in output

验证输出中的清理信息


**Incorrect approach** (gives false confidence):
- Using `asyncio.wait_for()` with timeout does not replicate SIGINT behavior
- Using `asyncio.CancelledError` directly differs from signal-triggered cancellation

**错误方式**(会给出错误的信心):
- 使用`asyncio.wait_for()`超时模拟中断无法复现SIGINT的行为
- 直接使用`asyncio.CancelledError`与信号触发的取消行为不同

Verification Checklist

验证检查清单

  1. Running task cleanup: Verify tasks actively executing receive cancellation
  2. Waiting task cleanup: Verify tasks blocked on I/O are cancelled
  3. Semaphore queue cleanup: Verify tasks waiting on semaphore acquisition are cancelled
  4. Cleanup code execution: Verify finally blocks and cleanup handlers run
  5. No resource leaks: Verify file handles, connections, etc. are closed
  6. Exit code verification: Verify process exits with expected code after interrupt
  1. 运行中任务的清理:验证正在执行的任务是否收到取消信号
  2. 等待中任务的清理:验证被I/O阻塞的任务是否被取消
  3. 信号量队列的清理:验证等待获取信号量的任务是否被取消
  4. 清理代码的执行:验证finally块和清理处理程序是否运行
  5. 无资源泄漏:验证文件句柄、连接等是否已关闭
  6. 退出码验证:验证中断后进程是否按预期代码退出

Test Scenarios to Cover

需覆盖的测试场景

  • Interrupt when all slots are filled (max_concurrent tasks running)
  • Interrupt when tasks are queued waiting for semaphore
  • Interrupt during cleanup phase itself
  • Rapid repeated interrupts
  • Interrupt before any task starts
  • 当所有槽位都被占满时的中断(最大并发任务正在运行)
  • 任务在队列中等待信号量时的中断
  • 清理阶段本身的中断
  • 快速重复中断
  • 任何任务启动前的中断

Common Pitfalls

常见陷阱

Pitfall 1: Catching KeyboardInterrupt Inside Async Functions

陷阱1:在异步函数内部捕获KeyboardInterrupt

Problem:
KeyboardInterrupt
doesn't propagate normally through asyncio - it's typically converted to
CancelledError
by the event loop.
Symptom: Exception handlers for
KeyboardInterrupt
inside async functions never trigger during actual Ctrl+C.
Solution: Handle
CancelledError
instead, or register explicit signal handlers at the event loop level.
问题
KeyboardInterrupt
无法正常通过asyncio传播——事件循环通常会将其转换为
CancelledError
症状:异步函数内部的
KeyboardInterrupt
异常处理程序在实际Ctrl+C时永远不会触发。
解决方案:改为处理
CancelledError
,或在事件循环级别注册显式信号处理程序。

Pitfall 2: asyncio.gather Doesn't Cancel Queued Tasks

陷阱2:asyncio.gather不会取消队列中的任务

Problem: When using
asyncio.gather
with more tasks than can run concurrently (via semaphore), cancelling gather doesn't automatically cancel tasks waiting to acquire the semaphore.
Symptom: Tasks that haven't started don't have their cleanup code run.
Solution: Explicitly track all task objects and cancel them individually, not just rely on gather's cancellation.
问题:当使用
asyncio.gather
且任务数量超过并发限制(通过信号量)时,取消gather不会自动取消等待获取信号量的任务。
症状:尚未启动的任务的清理代码不会运行。
解决方案:显式跟踪所有任务对象并逐个取消,而不仅仅依赖gather的取消机制。

Pitfall 3: Testing with Timeouts Instead of Signals

陷阱3:使用超时而非信号进行测试

Problem: Using
asyncio.wait_for()
timeout to simulate interruption doesn't replicate actual signal handling behavior.
Symptom: Tests pass but actual Ctrl+C behavior differs.
Solution: Use
subprocess
with
signal.SIGINT
to test actual signal handling behavior.
问题:使用
asyncio.wait_for()
超时模拟中断无法复现实际信号处理行为。
症状:测试通过,但实际Ctrl+C行为不同。
解决方案:使用
subprocess
结合
signal.SIGINT
测试实际信号处理行为。

Pitfall 4: Cleanup During Cancellation

陷阱4:取消期间的清理

Problem: Cleanup code itself may be cancelled if not protected.
Symptom: Partial cleanup, resources not released.
Solution: Use
asyncio.shield()
for critical cleanup operations, or handle
CancelledError
and re-raise after cleanup.
问题:如果未受保护,清理代码本身可能被取消。
症状:清理不彻底,资源未释放。
解决方案:对关键清理操作使用
asyncio.shield()
,或处理
CancelledError
并在清理后重新抛出。

Pitfall 5: Duplicate Exception Handling Code

陷阱5:重复的异常处理代码

Problem: Identical cleanup code in multiple exception handlers (
CancelledError
,
KeyboardInterrupt
, etc.).
Symptom: Code duplication, maintenance burden.
Solution: Use a single handler with
except (asyncio.CancelledError, KeyboardInterrupt)
or abstract cleanup into a helper function.
问题:在多个异常处理程序中存在相同的清理代码(
CancelledError
KeyboardInterrupt
等)。
症状:代码重复,维护负担大。
解决方案:使用单个处理程序捕获
except (asyncio.CancelledError, KeyboardInterrupt)
,或将清理逻辑抽象为辅助函数。

Pitfall 6: Not Awaiting Cancelled Tasks

陷阱6:未等待已取消的任务

Problem: Cancelling a task and not awaiting it leaves the task in a partially-cleaned-up state.
Symptom: Resource leaks, warnings about pending tasks.
Solution: Always
await asyncio.gather(*cancelled_tasks, return_exceptions=True)
after cancelling.
问题:取消任务后不等待它会导致任务处于部分清理状态。
症状:资源泄漏,出现关于待处理任务的警告。
解决方案:取消任务后始终执行
await asyncio.gather(*cancelled_tasks, return_exceptions=True)

Decision Framework

决策框架

When implementing async task cancellation, consider:
  1. Python version: TaskGroup (3.11+) vs manual management
  2. Concurrency model: Fixed pool, semaphore-limited, or unlimited
  3. Cleanup requirements: What must happen before exit?
  4. Signal handling needs: Just SIGINT, or also SIGTERM, SIGHUP?
  5. Testing environment: Can tests send real signals?
实现异步任务取消时,需考虑:
  1. Python版本:使用TaskGroup(3.11+)还是手动管理
  2. 并发模型:固定池、信号量限制还是无限制
  3. 清理要求:退出前必须完成哪些操作?
  4. 信号处理需求:仅处理SIGINT,还是同时处理SIGTERM、SIGHUP?
  5. 测试环境:测试是否可以发送真实信号?

Debugging Tips

调试技巧

  • Add logging at task entry, exit, and cancellation points
  • Log the task state when cancellation is received
  • Use
    asyncio.current_task()
    to identify which task is executing
  • Check
    task.cancelled()
    vs
    task.done()
    states
  • Enable asyncio debug mode:
    asyncio.run(main(), debug=True)
  • 在任务进入、退出和取消点添加日志
  • 收到取消信号时记录任务状态
  • 使用
    asyncio.current_task()
    识别正在执行的任务
  • 检查
    task.cancelled()
    task.done()
    状态
  • 启用asyncio调试模式:
    asyncio.run(main(), debug=True)