python-cheatsheet
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Cheatsheet
Python速查表
Quick reference for mapping global architecture concepts to Python implementation.
用于将全局架构概念映射到Python实现的速查参考。
Global → Python Mapping
全局概念→Python实现映射
| Global Skill | Python Implementation |
|---|---|
| service-patterns | Service classes, Depends() |
| repository-patterns | SQLAlchemy 2.0, Protocol classes |
| error-handling | Custom exceptions, HTTPException |
| web-handlers | FastAPI routes, Annotated[T, Depends()] |
| 全局技能 | Python实现 |
|---|---|
| service-patterns | Service类、Depends() |
| repository-patterns | SQLAlchemy 2.0、Protocol类 |
| error-handling | 自定义异常、HTTPException |
| web-handlers | FastAPI路由、Annotated[T, Depends()] |
Service Patterns (Python)
服务模式(Python实现)
See: for full details, (global) for concepts
gts-backend-devservice-patternsQuick reference:
python
class ShootoutService:
"""Orchestrates domain logic. Depends on ports, not adapters."""
def __init__(
self,
repo: ShootoutRepository, # Port (Protocol)
job_queue: JobQueuePort, # Port (Protocol)
):
self._repo = repo
self._job_queue = job_queue
async def create_shootout(
self,
user_id: UUID,
data: CreateShootoutData,
) -> Shootout:
"""Domain orchestration - no infrastructure knowledge."""
shootout = Shootout.create(user_id=user_id, title=data.title)
for tone in data.tones:
shootout.add_tone(tone) # Domain logic in entity
await self._repo.save(shootout)
await self._job_queue.enqueue(RenderShootoutJob(shootout.id))
return shootoutDependency injection (composition root):
python
undefined参考: 完整细节请查看,相关概念请查看全局技能
gts-backend-devservice-patterns速查示例:
python
class ShootoutService:
"""Orchestrates domain logic. Depends on ports, not adapters."""
def __init__(
self,
repo: ShootoutRepository, # Port (Protocol)
job_queue: JobQueuePort, # Port (Protocol)
):
self._repo = repo
self._job_queue = job_queue
async def create_shootout(
self,
user_id: UUID,
data: CreateShootoutData,
) -> Shootout:
"""Domain orchestration - no infrastructure knowledge."""
shootout = Shootout.create(user_id=user_id, title=data.title)
for tone in data.tones:
shootout.add_tone(tone) # Domain logic in entity
await self._repo.save(shootout)
await self._job_queue.enqueue(RenderShootoutJob(shootout.id))
return shootout依赖注入(组合根):
python
undefinedbootstrap.py or api/deps.py
bootstrap.py or api/deps.py
def create_shootout_service(session: AsyncSession) -> ShootoutService:
"""Wire adapters to ports at the composition root."""
return ShootoutService(
repo=SQLAlchemyShootoutRepository(session),
job_queue=TaskIQJobQueue(broker),
)
def create_shootout_service(session: AsyncSession) -> ShootoutService:
"""Wire adapters to ports at the composition root."""
return ShootoutService(
repo=SQLAlchemyShootoutRepository(session),
job_queue=TaskIQJobQueue(broker),
)
FastAPI dependency
FastAPI dependency
async def get_shootout_service(
session: AsyncSession = Depends(get_session),
) -> ShootoutService:
return create_shootout_service(session)
---async def get_shootout_service(
session: AsyncSession = Depends(get_session),
) -> ShootoutService:
return create_shootout_service(session)
---Repository Patterns (Python)
仓储模式(Python实现)
See: for full details, (global) for concepts
gts-backend-devrepository-patterns参考: 完整细节请查看,相关概念请查看全局技能
gts-backend-devrepository-patternsProtocol-based ports (preferred):
基于Protocol的端口(推荐):
python
from typing import Protocol
from uuid import UUID
class ShootoutRepository(Protocol):
"""Port for shootout persistence."""
async def get_by_id(self, id: UUID) -> Shootout | None:
"""Retrieve shootout by ID."""
...
async def save(self, shootout: Shootout) -> None:
"""Persist shootout."""
...python
from typing import Protocol
from uuid import UUID
class ShootoutRepository(Protocol):
"""Port for shootout persistence."""
async def get_by_id(self, id: UUID) -> Shootout | None:
"""Retrieve shootout by ID."""
...
async def save(self, shootout: Shootout) -> None:
"""Persist shootout."""
...SQLAlchemy 2.0 adapter:
SQLAlchemy 2.0适配器:
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
class SQLAlchemyShootoutRepository:
"""Adapter: implements ShootoutRepository port with SQLAlchemy."""
def __init__(self, session: AsyncSession):
self._session = session
async def get_by_id(self, id: UUID) -> Shootout | None:
stmt = select(ShootoutModel).where(ShootoutModel.id == id)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()
return ShootoutMapper.to_entity(model) if model else None
async def save(self, shootout: Shootout) -> None:
model = ShootoutMapper.to_model(shootout)
self._session.add(model)
await self._session.flush()python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
class SQLAlchemyShootoutRepository:
"""Adapter: implements ShootoutRepository port with SQLAlchemy."""
def __init__(self, session: AsyncSession):
self._session = session
async def get_by_id(self, id: UUID) -> Shootout | None:
stmt = select(ShootoutModel).where(ShootoutModel.id == id)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()
return ShootoutMapper.to_entity(model) if model else None
async def save(self, shootout: Shootout) -> None:
model = ShootoutMapper.to_model(shootout)
self._session.add(model)
await self._session.flush()SQLAlchemy 2.0 models (typed):
SQLAlchemy 2.0类型化模型:
python
from sqlalchemy import ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
tone3000_id: Mapped[int] = mapped_column(unique=True, index=True)
username: Mapped[str] = mapped_column(String(100))
# Relationships
shootouts: Mapped[list["Shootout"]] = relationship(
back_populates="user",
cascade="all, delete-orphan"
)
class Shootout(Base):
__tablename__ = "shootouts"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
title: Mapped[str] = mapped_column(String(200))
config_json: Mapped[str] = mapped_column(Text)
user: Mapped["User"] = relationship(back_populates="shootouts")python
from sqlalchemy import ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
tone3000_id: Mapped[int] = mapped_column(unique=True, index=True)
username: Mapped[str] = mapped_column(String(100))
# Relationships
shootouts: Mapped[list["Shootout"]] = relationship(
back_populates="user",
cascade="all, delete-orphan"
)
class Shootout(Base):
__tablename__ = "shootouts"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
title: Mapped[str] = mapped_column(String(200))
config_json: Mapped[str] = mapped_column(Text)
user: Mapped["User"] = relationship(back_populates="shootouts")Queries — Single Query via joinedload:
查询 — 通过joinedload实现单查询:
CRITICAL: See . Never use — always .
.claude/rules/query-patterns.mdselectinloadjoinedloadpython
from sqlalchemy import select
from sqlalchemy.orm import joinedload重要提示: 请查看。禁止使用,必须使用。
.claude/rules/query-patterns.mdselectinloadjoinedloadpython
from sqlalchemy import select
from sqlalchemy.orm import joinedloadSingle entity — full aggregate in ONE query
单个实体 — 一次查询获取完整聚合数据
stmt = (
select(Gear)
.where(Gear.id == gear_id)
.options(
joinedload(Gear.make),
joinedload(Gear.source),
joinedload(Gear.models),
joinedload(Gear.tags),
)
)
result = await db.execute(stmt)
gear = result.unique().scalar_one_or_none() # .unique() REQUIRED
stmt = (
select(Gear)
.where(Gear.id == gear_id)
.options(
joinedload(Gear.make),
joinedload(Gear.source),
joinedload(Gear.models),
joinedload(Gear.tags),
)
)
result = await db.execute(stmt)
gear = result.unique().scalar_one_or_none() # .unique() 是必须的
Paginated list — ID subquery avoids LIMIT on cartesian rows
分页列表 — ID子查询避免笛卡尔积行的LIMIT问题
id_stmt = (
select(Shootout.id)
.where(Shootout.user_id == user_id)
.order_by(Shootout.created_at.desc())
.offset(skip)
.limit(limit)
)
stmt = (
select(Shootout)
.where(Shootout.id.in_(id_stmt))
.options(joinedload(Shootout.chains))
.order_by(Shootout.created_at.desc())
)
result = await db.execute(stmt)
shootouts = result.unique().scalars().all()
id_stmt = (
select(Shootout.id)
.where(Shootout.user_id == user_id)
.order_by(Shootout.created_at.desc())
.offset(skip)
.limit(limit)
)
stmt = (
select(Shootout)
.where(Shootout.id.in_(id_stmt))
.options(joinedload(Shootout.chains))
.order_by(Shootout.created_at.desc())
)
result = await db.execute(stmt)
shootouts = result.unique().scalars().all()
Chained — nested JOINs in one query
链式查询 — 一次查询实现嵌套JOIN
stmt = (
select(User)
.options(
joinedload(User.identities).joinedload(UserIdentity.provider)
)
)
undefinedstmt = (
select(User)
.options(
joinedload(User.identities).joinedload(UserIdentity.provider)
)
)
undefinedORM relationship defaults:
ORM关系默认配置:
python
undefinedpython
undefinedCORRECT — lazy="raise" forces explicit joinedload in repositories
正确方式 — lazy="raise" 强制在仓储中显式使用joinedload
models: Mapped[list[GearModel]] = relationship(
"GearModel", back_populates="gear",
cascade="all, delete-orphan",
lazy="raise",
)
models: Mapped[list[GearModel]] = relationship(
"GearModel", back_populates="gear",
cascade="all, delete-orphan",
lazy="raise",
)
BANNED — fires separate query
禁止使用 — 会触发单独查询
models: Mapped[list[GearModel]] = relationship(..., lazy="selectin")
---models: Mapped[list[GearModel]] = relationship(..., lazy="selectin")
---Error Handling (Python)
错误处理(Python实现)
See: (global) for concepts
error-handling参考: 相关概念请查看全局技能
error-handlingCustom exceptions:
自定义异常:
python
class AppError(Exception):
"""Base application error."""
class NotFoundError(AppError):
"""Resource not found."""
class UnauthorizedError(AppError):
"""User not authorised."""python
class AppError(Exception):
"""Base application error."""
class NotFoundError(AppError):
"""Resource not found."""
class UnauthorizedError(AppError):
"""User not authorised."""FastAPI error handling:
FastAPI错误处理:
python
from fastapi import HTTPException, status
@router.get("/{id}")
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
item = await db.get(Item, id)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item {id} not found"
)
return itempython
from fastapi import HTTPException, status
@router.get("/{id}")
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
item = await db.get(Item, id)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item {id} not found"
)
return itemWeb Handlers (Python)
Web处理器(Python实现)
See: for full details, (global) for concepts
gts-backend-devweb-handlers参考: 完整细节请查看,相关概念请查看全局技能
gts-backend-devweb-handlersFastAPI route pattern:
FastAPI路由模式:
python
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/shootouts", tags=["shootouts"])
@router.get("/{shootout_id}", response_model=ShootoutRead)
async def get_shootout(
shootout_id: int,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Shootout:
"""Get a shootout by ID."""
shootout = await db.get(Shootout, shootout_id)
if not shootout or shootout.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Shootout not found"
)
return shootoutpython
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/shootouts", tags=["shootouts"])
@router.get("/{shootout_id}", response_model=ShootoutRead)
async def get_shootout(
shootout_id: int,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
) -> Shootout:
"""Get a shootout by ID."""
shootout = await db.get(Shootout, shootout_id)
if not shootout or shootout.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Shootout not found"
)
return shootoutPydantic schemas:
Pydantic模型:
python
from pydantic import BaseModel, ConfigDict
class ShootoutBase(BaseModel):
title: str
config_json: str
class ShootoutCreate(ShootoutBase):
pass
class ShootoutRead(ShootoutBase):
model_config = ConfigDict(from_attributes=True)
id: int
user_id: intpython
from pydantic import BaseModel, ConfigDict
class ShootoutBase(BaseModel):
title: str
config_json: str
class ShootoutCreate(ShootoutBase):
pass
class ShootoutRead(ShootoutBase):
model_config = ConfigDict(from_attributes=True)
id: int
user_id: intTransaction Patterns (CRITICAL)
事务模式(重要)
See: Section "Transactions" for full details
gts-backend-dev参考: 完整细节请查看中的“事务”章节
gts-backend-devService layer transaction ownership:
服务层事务所有权:
python
undefinedpython
undefinedIn services (gear_item_service.py, signal_chain_service.py, etc.)
在服务层(gear_item_service.py、signal_chain_service.py等)
context_manager = (
session.begin_nested() if session.in_transaction() else session.begin()
)
async with context_manager:
# ... do work, add(), flush() ...
context_manager = (
session.begin_nested() if session.in_transaction() else session.begin()
)
async with context_manager:
# ... 执行操作、add()、flush() ...
Savepoint released (if nested) or committed (if begin)
保存点释放(嵌套事务)或提交(顶级事务)
**The Contract:**
- `in_transaction()=False` → Service owns commit via `begin()`
- `in_transaction()=True` → **Caller must commit** (service uses savepoint)
**Common Pitfall:** Pre-check queries start implicit transactions:
```python
**约定:**
- `in_transaction()=False` → 服务层通过`begin()`负责提交
- `in_transaction()=True` → **调用者必须提交**(服务层使用保存点)
**常见陷阱:** 预检查查询会启动隐式事务:
```pythonAPI endpoint
API端点
@router.post("/items")
async def create_item(db: DbSession, ...):
# This query starts an implicit transaction!
existing = await db.execute(select(Item).where(...))
# Service sees in_transaction()=True, uses begin_nested()
item = await service.create_item(...)
# BUG: Without this, changes are never committed!
await db.commit() # ← REQUIRED when API queried before service call
return item
**Rules:**
1. If API does ANY query before calling a write service → must `await db.commit()` after
2. If API only calls service (no pre-queries) → service handles commit via `begin()`
3. Service-to-service calls → outer service owns commit, inner uses savepoint
---@router.post("/items")
async def create_item(db: DbSession, ...):
# 此查询会启动隐式事务!
existing = await db.execute(select(Item).where(...))
# 服务层检测到in_transaction()=True,使用begin_nested()
item = await service.create_item(...)
# 错误:如果没有这行,更改永远不会提交!
await db.commit() # ← 当API在调用服务前执行查询时,这行是必须的
return item
**规则:**
1. 如果API在调用写入服务前执行了任何查询 → 必须在调用后执行`await db.commit()`
2. 如果API仅调用服务(无预查询) → 服务层通过`begin()`处理提交
3. 服务间调用 → 外层服务负责提交,内层服务使用保存点
---Dependency Injection (FastAPI)
依赖注入(FastAPI)
python
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session_maker
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
"""Decode JWT and return current user."""
payload = decode_token(token)
user = await db.execute(
select(User).where(User.tone3000_id == payload["sub"])
)
user = user.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return userpython
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session_maker
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
"""Decode JWT and return current user."""
payload = decode_token(token)
user = await db.execute(
select(User).where(User.tone3000_id == payload["sub"])
)
user = user.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return userBackground Tasks (TaskIQ)
后台任务(TaskIQ)
python
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
broker = ListQueueBroker(url="redis://redis:6379")
broker.with_result_backend(RedisAsyncResultBackend(redis_url="redis://redis:6379"))
@broker.task
async def process_shootout(job_id: str, shootout_id: int) -> str:
"""Process a shootout through the pipeline."""
await update_job_status(job_id, "running", progress=0)
# Process...
await update_job_status(job_id, "running", progress=50)
await update_job_status(job_id, "completed", progress=100)
return output_pathpython
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
broker = ListQueueBroker(url="redis://redis:6379")
broker.with_result_backend(RedisAsyncResultBackend(redis_url="redis://redis:6379"))
@broker.task
async def process_shootout(job_id: str, shootout_id: int) -> str:
"""Process a shootout through the pipeline."""
await update_job_status(job_id, "running", progress=0)
# Process...
await update_job_status(job_id, "running", progress=50)
await update_job_status(job_id, "completed", progress=100)
return output_pathMigrations (Alembic)
迁移(Alembic)
bash
undefinedbash
undefinedCreate migration
创建迁移
docker compose exec webapp alembic revision --autogenerate -m "add jobs table"
docker compose exec webapp alembic revision --autogenerate -m "add jobs table"
Apply migrations
应用迁移
docker compose exec webapp alembic upgrade head
docker compose exec webapp alembic upgrade head
Rollback
回滚
docker compose exec webapp alembic downgrade -1
---docker compose exec webapp alembic downgrade -1
---Code Quality
代码质量
Commands
命令
bash
undefinedbash
undefinedLint (check only)
代码检查(仅检测)
docker compose exec webapp ruff check app/
docker compose exec webapp ruff check app/
Lint (auto-fix)
代码检查(自动修复)
docker compose exec webapp ruff check --fix app/
docker compose exec webapp ruff check --fix app/
Type check
类型检查
docker compose exec webapp mypy app/
docker compose exec webapp mypy app/
Test
测试
docker compose exec webapp pytest tests/
docker compose exec webapp pytest tests/
All checks
所有检查
just check-backend
undefinedjust check-backend
undefinedYou Must Get Right (can't auto-fix)
必须手动修正的内容(无法自动修复)
| Requirement | Enforced By | Notes |
|---|---|---|
| Type hints on all functions | mypy strict | Can't add them for you |
| snake_case functions/variables | ruff N | Can detect but not rename semantically |
| PascalCase for classes | ruff N | Can detect but not rename semantically |
| pytest conventions | ruff PT | |
| ClassVar for mutable class attrs | ruff RUF012 | |
| 要求 | 检测工具 | 说明 |
|---|---|---|
| 所有函数添加类型提示 | mypy严格模式 | 无法自动添加 |
| 函数/变量使用snake_case命名 | ruff N | 可以检测但无法语义化重命名 |
| 类使用PascalCase命名 | ruff N | 可以检测但无法语义化重命名 |
| 遵循pytest约定 | ruff PT | 测试函数以 |
| 可变类属性使用ClassVar | ruff RUF012 | 类级别的字典需声明为 |
Related Skills
相关技能
Project-Specific
项目特定技能
- - Full FastAPI/SQLAlchemy documentation
gts-backend-dev - - Astro frontend patterns
gts-frontend-dev - - Testing patterns specific to GTS
gts-testing
- - 完整的FastAPI/SQLAlchemy文档
gts-backend-dev - - Astro前端模式
gts-frontend-dev - - GTS项目专属测试模式
gts-testing
Global (Architecture Concepts)
全局架构概念
- - Service layer concepts
service-patterns - - Repository abstraction concepts
repository-patterns - - Error handling concepts
error-handling - - HTTP handler concepts
web-handlers
- - 服务层概念
service-patterns - - 仓储抽象概念
repository-patterns - - 错误处理概念
error-handling - - HTTP处理器概念
web-handlers