python-cheatsheet

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Cheatsheet

Python速查表

Quick reference for mapping global architecture concepts to Python implementation.
用于将全局架构概念映射到Python实现的速查参考。

Global → Python Mapping

全局概念→Python实现映射

Global SkillPython Implementation
service-patternsService classes, Depends()
repository-patternsSQLAlchemy 2.0, Protocol classes
error-handlingCustom exceptions, HTTPException
web-handlersFastAPI routes, Annotated[T, Depends()]

全局技能Python实现
service-patternsService类、Depends()
repository-patternsSQLAlchemy 2.0、Protocol类
error-handling自定义异常、HTTPException
web-handlersFastAPI路由、Annotated[T, Depends()]

Service Patterns (Python)

服务模式(Python实现)

See:
gts-backend-dev
for full details,
service-patterns
(global) for concepts
Quick reference:
python
class ShootoutService:
    """Orchestrates domain logic. Depends on ports, not adapters."""

    def __init__(
        self,
        repo: ShootoutRepository,      # Port (Protocol)
        job_queue: JobQueuePort,       # Port (Protocol)
    ):
        self._repo = repo
        self._job_queue = job_queue

    async def create_shootout(
        self,
        user_id: UUID,
        data: CreateShootoutData,
    ) -> Shootout:
        """Domain orchestration - no infrastructure knowledge."""
        shootout = Shootout.create(user_id=user_id, title=data.title)

        for tone in data.tones:
            shootout.add_tone(tone)  # Domain logic in entity

        await self._repo.save(shootout)
        await self._job_queue.enqueue(RenderShootoutJob(shootout.id))

        return shootout
Dependency injection (composition root):
python
undefined
参考: 完整细节请查看
gts-backend-dev
,相关概念请查看全局技能
service-patterns
速查示例:
python
class ShootoutService:
    """Orchestrates domain logic. Depends on ports, not adapters."""

    def __init__(
        self,
        repo: ShootoutRepository,      # Port (Protocol)
        job_queue: JobQueuePort,       # Port (Protocol)
    ):
        self._repo = repo
        self._job_queue = job_queue

    async def create_shootout(
        self,
        user_id: UUID,
        data: CreateShootoutData,
    ) -> Shootout:
        """Domain orchestration - no infrastructure knowledge."""
        shootout = Shootout.create(user_id=user_id, title=data.title)

        for tone in data.tones:
            shootout.add_tone(tone)  # Domain logic in entity

        await self._repo.save(shootout)
        await self._job_queue.enqueue(RenderShootoutJob(shootout.id))

        return shootout
依赖注入(组合根):
python
undefined

bootstrap.py or api/deps.py

bootstrap.py or api/deps.py

def create_shootout_service(session: AsyncSession) -> ShootoutService: """Wire adapters to ports at the composition root.""" return ShootoutService( repo=SQLAlchemyShootoutRepository(session), job_queue=TaskIQJobQueue(broker), )
def create_shootout_service(session: AsyncSession) -> ShootoutService: """Wire adapters to ports at the composition root.""" return ShootoutService( repo=SQLAlchemyShootoutRepository(session), job_queue=TaskIQJobQueue(broker), )

FastAPI dependency

FastAPI dependency

async def get_shootout_service( session: AsyncSession = Depends(get_session), ) -> ShootoutService: return create_shootout_service(session)

---
async def get_shootout_service( session: AsyncSession = Depends(get_session), ) -> ShootoutService: return create_shootout_service(session)

---

Repository Patterns (Python)

仓储模式(Python实现)

See:
gts-backend-dev
for full details,
repository-patterns
(global) for concepts
参考: 完整细节请查看
gts-backend-dev
,相关概念请查看全局技能
repository-patterns

Protocol-based ports (preferred):

基于Protocol的端口(推荐):

python
from typing import Protocol
from uuid import UUID

class ShootoutRepository(Protocol):
    """Port for shootout persistence."""

    async def get_by_id(self, id: UUID) -> Shootout | None:
        """Retrieve shootout by ID."""
        ...

    async def save(self, shootout: Shootout) -> None:
        """Persist shootout."""
        ...
python
from typing import Protocol
from uuid import UUID

class ShootoutRepository(Protocol):
    """Port for shootout persistence."""

    async def get_by_id(self, id: UUID) -> Shootout | None:
        """Retrieve shootout by ID."""
        ...

    async def save(self, shootout: Shootout) -> None:
        """Persist shootout."""
        ...

SQLAlchemy 2.0 adapter:

SQLAlchemy 2.0适配器:

python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

class SQLAlchemyShootoutRepository:
    """Adapter: implements ShootoutRepository port with SQLAlchemy."""

    def __init__(self, session: AsyncSession):
        self._session = session

    async def get_by_id(self, id: UUID) -> Shootout | None:
        stmt = select(ShootoutModel).where(ShootoutModel.id == id)
        result = await self._session.execute(stmt)
        model = result.scalar_one_or_none()
        return ShootoutMapper.to_entity(model) if model else None

    async def save(self, shootout: Shootout) -> None:
        model = ShootoutMapper.to_model(shootout)
        self._session.add(model)
        await self._session.flush()
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

class SQLAlchemyShootoutRepository:
    """Adapter: implements ShootoutRepository port with SQLAlchemy."""

    def __init__(self, session: AsyncSession):
        self._session = session

    async def get_by_id(self, id: UUID) -> Shootout | None:
        stmt = select(ShootoutModel).where(ShootoutModel.id == id)
        result = await self._session.execute(stmt)
        model = result.scalar_one_or_none()
        return ShootoutMapper.to_entity(model) if model else None

    async def save(self, shootout: Shootout) -> None:
        model = ShootoutMapper.to_model(shootout)
        self._session.add(model)
        await self._session.flush()

SQLAlchemy 2.0 models (typed):

SQLAlchemy 2.0类型化模型:

python
from sqlalchemy import ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    tone3000_id: Mapped[int] = mapped_column(unique=True, index=True)
    username: Mapped[str] = mapped_column(String(100))

    # Relationships
    shootouts: Mapped[list["Shootout"]] = relationship(
        back_populates="user",
        cascade="all, delete-orphan"
    )

class Shootout(Base):
    __tablename__ = "shootouts"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    title: Mapped[str] = mapped_column(String(200))
    config_json: Mapped[str] = mapped_column(Text)

    user: Mapped["User"] = relationship(back_populates="shootouts")
python
from sqlalchemy import ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    tone3000_id: Mapped[int] = mapped_column(unique=True, index=True)
    username: Mapped[str] = mapped_column(String(100))

    # Relationships
    shootouts: Mapped[list["Shootout"]] = relationship(
        back_populates="user",
        cascade="all, delete-orphan"
    )

class Shootout(Base):
    __tablename__ = "shootouts"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    title: Mapped[str] = mapped_column(String(200))
    config_json: Mapped[str] = mapped_column(Text)

    user: Mapped["User"] = relationship(back_populates="shootouts")

Queries — Single Query via joinedload:

查询 — 通过joinedload实现单查询:

CRITICAL: See
.claude/rules/query-patterns.md
. Never use
selectinload
— always
joinedload
.
python
from sqlalchemy import select
from sqlalchemy.orm import joinedload
重要提示: 请查看
.claude/rules/query-patterns.md
。禁止使用
selectinload
,必须使用
joinedload
python
from sqlalchemy import select
from sqlalchemy.orm import joinedload

Single entity — full aggregate in ONE query

单个实体 — 一次查询获取完整聚合数据

stmt = ( select(Gear) .where(Gear.id == gear_id) .options( joinedload(Gear.make), joinedload(Gear.source), joinedload(Gear.models), joinedload(Gear.tags), ) ) result = await db.execute(stmt) gear = result.unique().scalar_one_or_none() # .unique() REQUIRED
stmt = ( select(Gear) .where(Gear.id == gear_id) .options( joinedload(Gear.make), joinedload(Gear.source), joinedload(Gear.models), joinedload(Gear.tags), ) ) result = await db.execute(stmt) gear = result.unique().scalar_one_or_none() # .unique() 是必须的

Paginated list — ID subquery avoids LIMIT on cartesian rows

分页列表 — ID子查询避免笛卡尔积行的LIMIT问题

id_stmt = ( select(Shootout.id) .where(Shootout.user_id == user_id) .order_by(Shootout.created_at.desc()) .offset(skip) .limit(limit) ) stmt = ( select(Shootout) .where(Shootout.id.in_(id_stmt)) .options(joinedload(Shootout.chains)) .order_by(Shootout.created_at.desc()) ) result = await db.execute(stmt) shootouts = result.unique().scalars().all()
id_stmt = ( select(Shootout.id) .where(Shootout.user_id == user_id) .order_by(Shootout.created_at.desc()) .offset(skip) .limit(limit) ) stmt = ( select(Shootout) .where(Shootout.id.in_(id_stmt)) .options(joinedload(Shootout.chains)) .order_by(Shootout.created_at.desc()) ) result = await db.execute(stmt) shootouts = result.unique().scalars().all()

Chained — nested JOINs in one query

链式查询 — 一次查询实现嵌套JOIN

stmt = ( select(User) .options( joinedload(User.identities).joinedload(UserIdentity.provider) ) )
undefined
stmt = ( select(User) .options( joinedload(User.identities).joinedload(UserIdentity.provider) ) )
undefined

ORM relationship defaults:

ORM关系默认配置:

python
undefined
python
undefined

CORRECT — lazy="raise" forces explicit joinedload in repositories

正确方式 — lazy="raise" 强制在仓储中显式使用joinedload

models: Mapped[list[GearModel]] = relationship( "GearModel", back_populates="gear", cascade="all, delete-orphan", lazy="raise", )
models: Mapped[list[GearModel]] = relationship( "GearModel", back_populates="gear", cascade="all, delete-orphan", lazy="raise", )

BANNED — fires separate query

禁止使用 — 会触发单独查询

models: Mapped[list[GearModel]] = relationship(..., lazy="selectin")

---
models: Mapped[list[GearModel]] = relationship(..., lazy="selectin")

---

Error Handling (Python)

错误处理(Python实现)

See:
error-handling
(global) for concepts
参考: 相关概念请查看全局技能
error-handling

Custom exceptions:

自定义异常:

python
class AppError(Exception):
    """Base application error."""

class NotFoundError(AppError):
    """Resource not found."""

class UnauthorizedError(AppError):
    """User not authorised."""
python
class AppError(Exception):
    """Base application error."""

class NotFoundError(AppError):
    """Resource not found."""

class UnauthorizedError(AppError):
    """User not authorised."""

FastAPI error handling:

FastAPI错误处理:

python
from fastapi import HTTPException, status

@router.get("/{id}")
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
    item = await db.get(Item, id)
    if not item:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Item {id} not found"
        )
    return item

python
from fastapi import HTTPException, status

@router.get("/{id}")
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
    item = await db.get(Item, id)
    if not item:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Item {id} not found"
        )
    return item

Web Handlers (Python)

Web处理器(Python实现)

See:
gts-backend-dev
for full details,
web-handlers
(global) for concepts
参考: 完整细节请查看
gts-backend-dev
,相关概念请查看全局技能
web-handlers

FastAPI route pattern:

FastAPI路由模式:

python
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/shootouts", tags=["shootouts"])

@router.get("/{shootout_id}", response_model=ShootoutRead)
async def get_shootout(
    shootout_id: int,
    db: Annotated[AsyncSession, Depends(get_db)],
    current_user: Annotated[User, Depends(get_current_user)],
) -> Shootout:
    """Get a shootout by ID."""
    shootout = await db.get(Shootout, shootout_id)
    if not shootout or shootout.user_id != current_user.id:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Shootout not found"
        )
    return shootout
python
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/shootouts", tags=["shootouts"])

@router.get("/{shootout_id}", response_model=ShootoutRead)
async def get_shootout(
    shootout_id: int,
    db: Annotated[AsyncSession, Depends(get_db)],
    current_user: Annotated[User, Depends(get_current_user)],
) -> Shootout:
    """Get a shootout by ID."""
    shootout = await db.get(Shootout, shootout_id)
    if not shootout or shootout.user_id != current_user.id:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Shootout not found"
        )
    return shootout

Pydantic schemas:

Pydantic模型:

python
from pydantic import BaseModel, ConfigDict

class ShootoutBase(BaseModel):
    title: str
    config_json: str

class ShootoutCreate(ShootoutBase):
    pass

class ShootoutRead(ShootoutBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    user_id: int

python
from pydantic import BaseModel, ConfigDict

class ShootoutBase(BaseModel):
    title: str
    config_json: str

class ShootoutCreate(ShootoutBase):
    pass

class ShootoutRead(ShootoutBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    user_id: int

Transaction Patterns (CRITICAL)

事务模式(重要)

See:
gts-backend-dev
Section "Transactions" for full details
参考: 完整细节请查看
gts-backend-dev
中的“事务”章节

Service layer transaction ownership:

服务层事务所有权:

python
undefined
python
undefined

In services (gear_item_service.py, signal_chain_service.py, etc.)

在服务层(gear_item_service.py、signal_chain_service.py等)

context_manager = ( session.begin_nested() if session.in_transaction() else session.begin() ) async with context_manager: # ... do work, add(), flush() ...
context_manager = ( session.begin_nested() if session.in_transaction() else session.begin() ) async with context_manager: # ... 执行操作、add()、flush() ...

Savepoint released (if nested) or committed (if begin)

保存点释放(嵌套事务)或提交(顶级事务)


**The Contract:**
- `in_transaction()=False` → Service owns commit via `begin()`
- `in_transaction()=True` → **Caller must commit** (service uses savepoint)

**Common Pitfall:** Pre-check queries start implicit transactions:

```python

**约定:**
- `in_transaction()=False` → 服务层通过`begin()`负责提交
- `in_transaction()=True` → **调用者必须提交**(服务层使用保存点)

**常见陷阱:** 预检查查询会启动隐式事务:

```python

API endpoint

API端点

@router.post("/items") async def create_item(db: DbSession, ...): # This query starts an implicit transaction! existing = await db.execute(select(Item).where(...))
# Service sees in_transaction()=True, uses begin_nested()
item = await service.create_item(...)

# BUG: Without this, changes are never committed!
await db.commit()  # ← REQUIRED when API queried before service call

return item

**Rules:**
1. If API does ANY query before calling a write service → must `await db.commit()` after
2. If API only calls service (no pre-queries) → service handles commit via `begin()`
3. Service-to-service calls → outer service owns commit, inner uses savepoint

---
@router.post("/items") async def create_item(db: DbSession, ...): # 此查询会启动隐式事务! existing = await db.execute(select(Item).where(...))
# 服务层检测到in_transaction()=True,使用begin_nested()
item = await service.create_item(...)

# 错误:如果没有这行,更改永远不会提交!
await db.commit()  # ← 当API在调用服务前执行查询时,这行是必须的

return item

**规则:**
1. 如果API在调用写入服务前执行了任何查询 → 必须在调用后执行`await db.commit()`
2. 如果API仅调用服务(无预查询) → 服务层通过`begin()`处理提交
3. 服务间调用 → 外层服务负责提交,内层服务使用保存点

---

Dependency Injection (FastAPI)

依赖注入(FastAPI)

python
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session_maker

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
    """Decode JWT and return current user."""
    payload = decode_token(token)
    user = await db.execute(
        select(User).where(User.tone3000_id == payload["sub"])
    )
    user = user.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

python
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session_maker

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
    """Decode JWT and return current user."""
    payload = decode_token(token)
    user = await db.execute(
        select(User).where(User.tone3000_id == payload["sub"])
    )
    user = user.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

Background Tasks (TaskIQ)

后台任务(TaskIQ)

python
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

broker = ListQueueBroker(url="redis://redis:6379")
broker.with_result_backend(RedisAsyncResultBackend(redis_url="redis://redis:6379"))

@broker.task
async def process_shootout(job_id: str, shootout_id: int) -> str:
    """Process a shootout through the pipeline."""
    await update_job_status(job_id, "running", progress=0)

    # Process...
    await update_job_status(job_id, "running", progress=50)

    await update_job_status(job_id, "completed", progress=100)
    return output_path

python
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

broker = ListQueueBroker(url="redis://redis:6379")
broker.with_result_backend(RedisAsyncResultBackend(redis_url="redis://redis:6379"))

@broker.task
async def process_shootout(job_id: str, shootout_id: int) -> str:
    """Process a shootout through the pipeline."""
    await update_job_status(job_id, "running", progress=0)

    # Process...
    await update_job_status(job_id, "running", progress=50)

    await update_job_status(job_id, "completed", progress=100)
    return output_path

Migrations (Alembic)

迁移(Alembic)

bash
undefined
bash
undefined

Create migration

创建迁移

docker compose exec webapp alembic revision --autogenerate -m "add jobs table"
docker compose exec webapp alembic revision --autogenerate -m "add jobs table"

Apply migrations

应用迁移

docker compose exec webapp alembic upgrade head
docker compose exec webapp alembic upgrade head

Rollback

回滚

docker compose exec webapp alembic downgrade -1

---
docker compose exec webapp alembic downgrade -1

---

Code Quality

代码质量

Commands

命令

bash
undefined
bash
undefined

Lint (check only)

代码检查(仅检测)

docker compose exec webapp ruff check app/
docker compose exec webapp ruff check app/

Lint (auto-fix)

代码检查(自动修复)

docker compose exec webapp ruff check --fix app/
docker compose exec webapp ruff check --fix app/

Type check

类型检查

docker compose exec webapp mypy app/
docker compose exec webapp mypy app/

Test

测试

docker compose exec webapp pytest tests/
docker compose exec webapp pytest tests/

All checks

所有检查

just check-backend
undefined
just check-backend
undefined

You Must Get Right (can't auto-fix)

必须手动修正的内容(无法自动修复)

RequirementEnforced ByNotes
Type hints on all functionsmypy strictCan't add them for you
snake_case functions/variablesruff NCan detect but not rename semantically
PascalCase for classesruff NCan detect but not rename semantically
pytest conventionsruff PT
test_
prefix, fixtures via conftest.py
ClassVar for mutable class attrsruff RUF012
ClassVar[dict[...]]
for class-level dicts

要求检测工具说明
所有函数添加类型提示mypy严格模式无法自动添加
函数/变量使用snake_case命名ruff N可以检测但无法语义化重命名
类使用PascalCase命名ruff N可以检测但无法语义化重命名
遵循pytest约定ruff PT测试函数以
test_
开头,夹具通过conftest.py定义
可变类属性使用ClassVarruff RUF012类级别的字典需声明为
ClassVar[dict[...]]

Related Skills

相关技能

Project-Specific

项目特定技能

  • gts-backend-dev
    - Full FastAPI/SQLAlchemy documentation
  • gts-frontend-dev
    - Astro frontend patterns
  • gts-testing
    - Testing patterns specific to GTS
  • gts-backend-dev
    - 完整的FastAPI/SQLAlchemy文档
  • gts-frontend-dev
    - Astro前端模式
  • gts-testing
    - GTS项目专属测试模式

Global (Architecture Concepts)

全局架构概念

  • service-patterns
    - Service layer concepts
  • repository-patterns
    - Repository abstraction concepts
  • error-handling
    - Error handling concepts
  • web-handlers
    - HTTP handler concepts
  • service-patterns
    - 服务层概念
  • repository-patterns
    - 仓储抽象概念
  • error-handling
    - 错误处理概念
  • web-handlers
    - HTTP处理器概念