Loading...
Loading...
Python backend patterns for asyncio, FastAPI, SQLAlchemy 2.0 async, and connection pooling. Use when building async Python services, FastAPI endpoints, database sessions, or connection pool tuning.
npx skill4agent add yonatangross/orchestkit python-backendrules/| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Asyncio | 3 | HIGH | TaskGroup, structured concurrency, cancellation handling |
| FastAPI | 3 | HIGH | Dependencies, middleware, background tasks |
| SQLAlchemy | 3 | HIGH | Async sessions, relationships, migrations |
| Pooling | 3 | MEDIUM | Database pools, HTTP sessions, tuning |
# FastAPI + SQLAlchemy async session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()# Asyncio TaskGroup with timeout
async def fetch_all(urls: list[str]) -> list[dict]:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]gather()asyncio.timeout()except*asyncio.to_thread()| Decision | Recommendation |
|---|---|
| Task spawning | TaskGroup not gather() |
| Timeouts | asyncio.timeout() context manager |
| Concurrency limit | asyncio.Semaphore |
| Sync bridge | asyncio.to_thread() |
| Cancellation | Always re-raise CancelledError |
asynccontextmanagerDepends().env| Decision | Recommendation |
|---|---|
| Lifespan | asynccontextmanager (not events) |
| Dependencies | Class-based services with DI |
| Settings | Pydantic Settings with .env |
| Response | ORJSONResponse for performance |
| Health | Check all critical dependencies |
expire_on_commit=Falselazy="raise"selectinload| Decision | Recommendation |
|---|---|
| Session scope | One AsyncSession per request |
| Lazy loading | lazy="raise" + explicit loads |
| Eager loading | selectinload for collections |
| expire_on_commit | False (prevents lazy load errors) |
| Pool | pool_pre_ping=True |
pool_sizemax_overflowpool_pre_pingmin_sizemax_sizeTCPConnectorpool_size = (concurrent_requests / avg_queries_per_request) * 1.5# NEVER use gather() for new code - no structured concurrency
# NEVER swallow CancelledError - breaks TaskGroup and timeout
# NEVER block the event loop with sync calls (time.sleep, requests.get)
# NEVER use global mutable state for db sessions
# NEVER skip dependency injection (create sessions in routes)
# NEVER share AsyncSession across tasks (race condition)
# NEVER use sync Session in async code (blocks event loop)
# NEVER create engine/pool per request
# NEVER forget to close pools on shutdownarchitecture-patternsasync-jobsstreaming-api-patternsdatabase-patterns