Loading...
Loading...
Provides Python async/await patterns and asyncio best practices. Activated when the user asks about async/await patterns, asyncio best practices, concurrent tasks, async generators, task groups, async context managers, event loops, running blocking code in async, or async testing. Covers asyncio, concurrency, async iterators, semaphores, and asynchronous programming patterns in Python.
npx skill4agent add ingpdw/pdw-python-dev-tool async-patternsasyncioasynciomultiprocessingasync defasyncio.run()awaitTaskFutureimport asyncio
async def fetch_data() -> str:
await asyncio.sleep(1) # simulate I/O
return "result"
async def main() -> None:
data = await fetch_data()
print(data)
asyncio.run(main())asyncio.run()asyncio.run(main())Taskasync def main() -> None:
task_a = asyncio.create_task(fetch_data("a"))
task_b = asyncio.create_task(fetch_data("b"))
result_a = await task_a
result_b = await task_bTaskGroupasync withasync def main() -> None:
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_data("a"))
task_b = tg.create_task(fetch_data("b"))
# Both tasks are done here
print(task_a.result(), task_b.result())ExceptionGroupresults = await asyncio.gather(
fetch_data("a"),
fetch_data("b"),
fetch_data("c"),
return_exceptions=True, # exceptions returned as values instead of raised
)return_exceptions=Falsereturn_exceptions=True| Feature | | |
|---|---|---|
| Python version | 3.4+ | 3.11+ |
| Error handling | first exception or return_exceptions | ExceptionGroup, cancels siblings |
| Structured concurrency | No | Yes |
| Dynamic task creation | No (fixed at call time) | Yes (create_task inside block) |
TaskGroupgather()return_exceptions=Trueimport time
def cpu_light_blocking() -> str:
time.sleep(2) # blocking I/O
return "done"
async def main() -> None:
result = await asyncio.to_thread(cpu_light_blocking)from concurrent.futures import ProcessPoolExecutor
async def main() -> None:
loop = asyncio.get_running_loop()
# Thread pool (default)
result = await loop.run_in_executor(None, blocking_io_func)
# Process pool for CPU-bound work
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_func, arg1)class AsyncDBConnection:
async def __aenter__(self):
self.conn = await connect_to_db()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
return False # do not suppress exceptionsfrom contextlib import asynccontextmanager
@asynccontextmanager
async def db_connection(url: str):
conn = await connect_to_db(url)
try:
yield conn
finally:
await conn.close()
async def main() -> None:
async with db_connection("postgres://...") as conn:
await conn.execute("SELECT 1")async def stream_rows(query: str):
async with db_connection() as conn:
cursor = await conn.execute(query)
async for row in cursor:
yield row
async def main() -> None:
async for row in stream_rows("SELECT * FROM users"):
process(row)class Countdown:
def __init__(self, start: int):
self.current = start
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current <= 0:
raise StopAsyncIteration
self.current -= 1
await asyncio.sleep(0.1)
return self.current + 1sem = asyncio.Semaphore(10)
async def rate_limited_fetch(url: str) -> bytes:
async with sem:
return await http_get(url)lock = asyncio.Lock()
async def update_shared_state():
async with lock:
# only one coroutine at a time
state["counter"] += 1event = asyncio.Event()
async def waiter():
await event.wait()
print("event fired")
async def trigger():
event.set()condition = asyncio.Condition()
async def consumer():
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.pop()async def main() -> None:
async with asyncio.timeout(5.0):
data = await slow_operation()try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("operation timed out")asyncio.timeout()TaskGroupExceptionGrouptry:
async with asyncio.TaskGroup() as tg:
tg.create_task(may_fail_a())
tg.create_task(may_fail_b())
except* ValueError as eg:
for exc in eg.exceptions:
log_error(exc)
except* TypeError as eg:
for exc in eg.exceptions:
log_error(exc)except*task = asyncio.create_task(long_running())
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task was cancelled")CancelledErrorCancelledErrortime.sleep()asyncio.to_thread()run_in_executor()async defawait-W defaultasyncioasyncio.create_task(coro())background_tasks: set[asyncio.Task] = set()
def schedule(coro):
task = asyncio.create_task(coro)
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)asyncio.run()RuntimeErrorawaitcreate_task()asyncio.run()to_thread()run_in_executor()pytest-asyncioimport pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data()
assert result == "expected"@pytest.fixture
async def db_session():
session = await create_session()
yield session
await session.close()
@pytest.mark.asyncio
async def test_query(db_session):
rows = await db_session.execute("SELECT 1")
assert len(rows) == 1pytest-asynciopyproject.toml[tool.pytest.ini_options]
asyncio_mode = "auto" # all async tests detected automaticallyasyncio_mode = "auto"@pytest.mark.asyncioasync def test_*fastapi