Loading...
Loading...
Python asyncio patterns for concurrent programming. Triggers on: asyncio, async, await, coroutine, gather, semaphore, TaskGroup, event loop, aiohttp, concurrent.
npx skill4agent add 0xdarkmatter/claude-mods python-async-patternsimport asyncio
# Coroutine (must be awaited)
async def fetch(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# Entry point
async def main():
result = await fetch("https://example.com")
return result
asyncio.run(main())async def fetch_all(urls: list[str]) -> list[str]:
"""Fetch multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)async def fetch_with_limit(urls: list[str], limit: int = 10):
"""Limit concurrent requests."""
semaphore = asyncio.Semaphore(limit)
async def bounded_fetch(url):
async with semaphore:
return await fetch_one(url)
return await asyncio.gather(*[bounded_fetch(url) for url in urls])async def process_items(items):
"""Structured concurrency with automatic cleanup."""
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(process_one(item))
# All tasks complete here, or exception raisedasync def with_timeout():
try:
async with asyncio.timeout(5.0): # Python 3.11+
result = await slow_operation()
except asyncio.TimeoutError:
result = None
return result# WRONG - blocks event loop
async def bad():
time.sleep(5) # Never use time.sleep!
requests.get(url) # Blocking I/O!
# CORRECT
async def good():
await asyncio.sleep(5)
async with aiohttp.ClientSession() as s:
await s.get(url)# WRONG - orphaned task
async def bad():
asyncio.create_task(work()) # May be garbage collected!
# CORRECT - keep reference
async def good():
task = asyncio.create_task(work())
await task| Pattern | Use Case |
|---|---|
| Multiple independent operations |
| Rate limiting, resource constraints |
| Structured concurrency (3.11+) |
| Producer-consumer |
| Timeout wrapper (3.11+) |
| Shared mutable state |
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_connection():
conn = await create_connection()
try:
yield conn
finally:
await conn.close()./references/concurrency-patterns.md./references/aiohttp-patterns.md./references/mixing-sync-async.md./references/debugging-async.md./references/production-patterns.md./references/error-handling.md./references/performance.md./scripts/find-blocking-calls.sh./assets/async-project-template.pypython-typing-patternspython-fastapi-patternspython-observability-patternspython-database-patterns