supabase-python

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Supabase + Python Skill

Supabase + Python 实践方案

Load with: base.md + supabase.md + python.md
FastAPI patterns with Supabase Auth and SQLAlchemy/SQLModel for database access.

加载依赖:base.md + supabase.md + python.md
结合Supabase身份验证与SQLAlchemy/SQLModel数据库访问的FastAPI实践模式。

Core Principle

核心原则

SQLAlchemy/SQLModel for queries, Supabase for auth/storage.
Use SQLAlchemy or SQLModel for type-safe database access. Use supabase-py for auth, storage, and realtime. FastAPI for the API layer.

使用SQLAlchemy/SQLModel执行查询,使用Supabase处理身份验证/存储。
使用SQLAlchemy或SQLModel实现类型安全的数据库访问。使用supabase-py处理身份验证、存储和实时功能。使用FastAPI作为API层。

Project Structure

项目结构

project/
├── src/
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py
│   │   │   ├── posts.py
│   │   │   └── users.py
│   │   └── deps.py              # Dependencies (auth, db)
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py            # Settings
│   │   └── security.py          # Auth helpers
│   ├── db/
│   │   ├── __init__.py
│   │   ├── session.py           # Database session
│   │   └── models.py            # SQLModel models
│   ├── services/
│   │   ├── __init__.py
│   │   └── supabase.py          # Supabase client
│   └── main.py                  # FastAPI app
├── supabase/
│   ├── migrations/
│   └── config.toml
├── alembic/                     # Alembic migrations (alternative)
├── alembic.ini
├── pyproject.toml
└── .env

project/
├── src/
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py
│   │   │   ├── posts.py
│   │   │   └── users.py
│   │   └── deps.py              # Dependencies (auth, db)
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py            # Settings
│   │   └── security.py          # Auth helpers
│   ├── db/
│   │   ├── __init__.py
│   │   ├── session.py           # Database session
│   │   └── models.py            # SQLModel models
│   ├── services/
│   │   ├── __init__.py
│   │   └── supabase.py          # Supabase client
│   └── main.py                  # FastAPI app
├── supabase/
│   ├── migrations/
│   └── config.toml
├── alembic/                     # Alembic migrations (alternative)
├── alembic.ini
├── pyproject.toml
└── .env

Setup

环境搭建

Install Dependencies

安装依赖

bash
pip install fastapi uvicorn supabase python-dotenv sqlmodel asyncpg alembic
bash
pip install fastapi uvicorn supabase python-dotenv sqlmodel asyncpg alembic

pyproject.toml

pyproject.toml

toml
[project]
name = "my-app"
version = "0.1.0"
dependencies = [
    "fastapi>=0.109.0",
    "uvicorn[standard]>=0.27.0",
    "supabase>=2.0.0",
    "python-dotenv>=1.0.0",
    "sqlmodel>=0.0.14",
    "asyncpg>=0.29.0",
    "alembic>=1.13.0",
    "pydantic-settings>=2.0.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.23.0",
    "httpx>=0.26.0",
]
toml
[project]
name = "my-app"
version = "0.1.0"
dependencies = [
    "fastapi>=0.109.0",
    "uvicorn[standard]>=0.27.0",
    "supabase>=2.0.0",
    "python-dotenv>=1.0.0",
    "sqlmodel>=0.0.14",
    "asyncpg>=0.29.0",
    "alembic>=1.13.0",
    "pydantic-settings>=2.0.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.23.0",
    "httpx>=0.26.0",
]

Environment Variables

环境变量

bash
undefined
bash
undefined

.env

.env

SUPABASE_URL=http://localhost:54321 SUPABASE_ANON_KEY=<from supabase start> SUPABASE_SERVICE_ROLE_KEY=<from supabase start> DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:54322/postgres

---
SUPABASE_URL=http://localhost:54321 SUPABASE_ANON_KEY=<from supabase start> SUPABASE_SERVICE_ROLE_KEY=<from supabase start> DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:54322/postgres

---

Configuration

配置

src/core/config.py

src/core/config.py

python
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    # Supabase
    supabase_url: str
    supabase_anon_key: str
    supabase_service_role_key: str

    # Database
    database_url: str

    # App
    debug: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


@lru_cache
def get_settings() -> Settings:
    return Settings()

python
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    # Supabase
    supabase_url: str
    supabase_anon_key: str
    supabase_service_role_key: str

    # Database
    database_url: str

    # App
    debug: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


@lru_cache
def get_settings() -> Settings:
    return Settings()

Database Setup

数据库搭建

src/db/session.py

src/db/session.py

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.core.config import get_settings

settings = get_settings()

engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,
    pool_pre_ping=True,
)

AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.core.config import get_settings

settings = get_settings()

engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,
    pool_pre_ping=True,
)

AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

src/db/models.py

src/db/models.py

python
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Field


class ProfileBase(SQLModel):
    email: str
    name: Optional[str] = None
    avatar_url: Optional[str] = None


class Profile(ProfileBase, table=True):
    __tablename__ = "profiles"

    id: UUID = Field(primary_key=True)  # References auth.users
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)


class ProfileCreate(ProfileBase):
    id: UUID


class ProfileRead(ProfileBase):
    id: UUID
    created_at: datetime


class PostBase(SQLModel):
    title: str
    content: Optional[str] = None
    published: bool = False


class Post(PostBase, table=True):
    __tablename__ = "posts"

    id: UUID = Field(default_factory=uuid4, primary_key=True)
    author_id: UUID = Field(foreign_key="profiles.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)


class PostCreate(PostBase):
    pass


class PostRead(PostBase):
    id: UUID
    author_id: UUID
    created_at: datetime

python
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Field


class ProfileBase(SQLModel):
    email: str
    name: Optional[str] = None
    avatar_url: Optional[str] = None


class Profile(ProfileBase, table=True):
    __tablename__ = "profiles"

    id: UUID = Field(primary_key=True)  # References auth.users
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)


class ProfileCreate(ProfileBase):
    id: UUID


class ProfileRead(ProfileBase):
    id: UUID
    created_at: datetime


class PostBase(SQLModel):
    title: str
    content: Optional[str] = None
    published: bool = False


class Post(PostBase, table=True):
    __tablename__ = "posts"

    id: UUID = Field(default_factory=uuid4, primary_key=True)
    author_id: UUID = Field(foreign_key="profiles.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)


class PostCreate(PostBase):
    pass


class PostRead(PostBase):
    id: UUID
    author_id: UUID
    created_at: datetime

Supabase Client

Supabase 客户端

src/services/supabase.py

src/services/supabase.py

python
from supabase import create_client, Client
from src.core.config import get_settings

settings = get_settings()


def get_supabase_client() -> Client:
    """Get Supabase client with anon key (respects RLS)."""
    return create_client(
        settings.supabase_url,
        settings.supabase_anon_key
    )


def get_supabase_admin() -> Client:
    """Get Supabase client with service role (bypasses RLS)."""
    return create_client(
        settings.supabase_url,
        settings.supabase_service_role_key
    )

python
from supabase import create_client, Client
from src.core.config import get_settings

settings = get_settings()


def get_supabase_client() -> Client:
    """Get Supabase client with anon key (respects RLS)."""
    return create_client(
        settings.supabase_url,
        settings.supabase_anon_key
    )


def get_supabase_admin() -> Client:
    """Get Supabase client with service role (bypasses RLS)."""
    return create_client(
        settings.supabase_url,
        settings.supabase_service_role_key
    )

Auth Dependencies

身份验证依赖

src/api/deps.py

src/api/deps.py

python
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from supabase import Client

from src.db.session import get_db
from src.services.supabase import get_supabase_client

security = HTTPBearer()


async def get_current_user(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
) -> dict:
    """Validate JWT and return user."""
    supabase = get_supabase_client()

    try:
        # Verify token with Supabase
        user = supabase.auth.get_user(credentials.credentials)
        if not user or not user.user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token",
            )
        return user.user
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token",
        )
python
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from supabase import Client

from src.db.session import get_db
from src.services.supabase import get_supabase_client

security = HTTPBearer()


async def get_current_user(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
) -> dict:
    """Validate JWT and return user."""
    supabase = get_supabase_client()

    try:
        # Verify token with Supabase
        user = supabase.auth.get_user(credentials.credentials)
        if not user or not user.user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token",
            )
        return user.user
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token",
        )

Type alias for dependency injection

Type alias for dependency injection

CurrentUser = Annotated[dict, Depends(get_current_user)] DbSession = Annotated[AsyncSession, Depends(get_db)]

---
CurrentUser = Annotated[dict, Depends(get_current_user)] DbSession = Annotated[AsyncSession, Depends(get_db)]

---

API Routes

API 路由

src/api/routes/auth.py

src/api/routes/auth.py

python
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, EmailStr

from src.services.supabase import get_supabase_client

router = APIRouter(prefix="/auth", tags=["auth"])


class SignUpRequest(BaseModel):
    email: EmailStr
    password: str


class SignInRequest(BaseModel):
    email: EmailStr
    password: str


class AuthResponse(BaseModel):
    access_token: str
    refresh_token: str
    user_id: str


@router.post("/signup", response_model=AuthResponse)
async def sign_up(request: SignUpRequest):
    supabase = get_supabase_client()

    try:
        response = supabase.auth.sign_up({
            "email": request.email,
            "password": request.password,
        })

        if response.user is None:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Signup failed",
            )

        return AuthResponse(
            access_token=response.session.access_token,
            refresh_token=response.session.refresh_token,
            user_id=str(response.user.id),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        )


@router.post("/signin", response_model=AuthResponse)
async def sign_in(request: SignInRequest):
    supabase = get_supabase_client()

    try:
        response = supabase.auth.sign_in_with_password({
            "email": request.email,
            "password": request.password,
        })

        return AuthResponse(
            access_token=response.session.access_token,
            refresh_token=response.session.refresh_token,
            user_id=str(response.user.id),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials",
        )


@router.post("/signout")
async def sign_out():
    supabase = get_supabase_client()
    supabase.auth.sign_out()
    return {"message": "Signed out"}
python
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, EmailStr

from src.services.supabase import get_supabase_client

router = APIRouter(prefix="/auth", tags=["auth"])


class SignUpRequest(BaseModel):
    email: EmailStr
    password: str


class SignInRequest(BaseModel):
    email: EmailStr
    password: str


class AuthResponse(BaseModel):
    access_token: str
    refresh_token: str
    user_id: str


@router.post("/signup", response_model=AuthResponse)
async def sign_up(request: SignUpRequest):
    supabase = get_supabase_client()

    try:
        response = supabase.auth.sign_up({
            "email": request.email,
            "password": request.password,
        })

        if response.user is None:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Signup failed",
            )

        return AuthResponse(
            access_token=response.session.access_token,
            refresh_token=response.session.refresh_token,
            user_id=str(response.user.id),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        )


@router.post("/signin", response_model=AuthResponse)
async def sign_in(request: SignInRequest):
    supabase = get_supabase_client()

    try:
        response = supabase.auth.sign_in_with_password({
            "email": request.email,
            "password": request.password,
        })

        return AuthResponse(
            access_token=response.session.access_token,
            refresh_token=response.session.refresh_token,
            user_id=str(response.user.id),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials",
        )


@router.post("/signout")
async def sign_out():
    supabase = get_supabase_client()
    supabase.auth.sign_out()
    return {"message": "Signed out"}

src/api/routes/posts.py

src/api/routes/posts.py

python
from uuid import UUID
from fastapi import APIRouter, HTTPException, status
from sqlmodel import select

from src.api.deps import CurrentUser, DbSession
from src.db.models import Post, PostCreate, PostRead

router = APIRouter(prefix="/posts", tags=["posts"])


@router.get("/", response_model=list[PostRead])
async def list_posts(
    db: DbSession,
    published_only: bool = True,
):
    query = select(Post)
    if published_only:
        query = query.where(Post.published == True)
    query = query.order_by(Post.created_at.desc())

    result = await db.execute(query)
    return result.scalars().all()


@router.get("/me", response_model=list[PostRead])
async def list_my_posts(
    db: DbSession,
    user: CurrentUser,
):
    query = select(Post).where(Post.author_id == UUID(user.id))
    result = await db.execute(query)
    return result.scalars().all()


@router.post("/", response_model=PostRead, status_code=status.HTTP_201_CREATED)
async def create_post(
    db: DbSession,
    user: CurrentUser,
    post_in: PostCreate,
):
    post = Post(
        **post_in.model_dump(),
        author_id=UUID(user.id),
    )
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post


@router.get("/{post_id}", response_model=PostRead)
async def get_post(
    db: DbSession,
    post_id: UUID,
):
    result = await db.execute(select(Post).where(Post.id == post_id))
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Post not found",
        )

    return post


@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
    db: DbSession,
    user: CurrentUser,
    post_id: UUID,
):
    result = await db.execute(
        select(Post).where(Post.id == post_id, Post.author_id == UUID(user.id))
    )
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Post not found",
        )

    await db.delete(post)
    await db.commit()

python
from uuid import UUID
from fastapi import APIRouter, HTTPException, status
from sqlmodel import select

from src.api.deps import CurrentUser, DbSession
from src.db.models import Post, PostCreate, PostRead

router = APIRouter(prefix="/posts", tags=["posts"])


@router.get("/", response_model=list[PostRead])
async def list_posts(
    db: DbSession,
    published_only: bool = True,
):
    query = select(Post)
    if published_only:
        query = query.where(Post.published == True)
    query = query.order_by(Post.created_at.desc())

    result = await db.execute(query)
    return result.scalars().all()


@router.get("/me", response_model=list[PostRead])
async def list_my_posts(
    db: DbSession,
    user: CurrentUser,
):
    query = select(Post).where(Post.author_id == UUID(user.id))
    result = await db.execute(query)
    return result.scalars().all()


@router.post("/", response_model=PostRead, status_code=status.HTTP_201_CREATED)
async def create_post(
    db: DbSession,
    user: CurrentUser,
    post_in: PostCreate,
):
    post = Post(
        **post_in.model_dump(),
        author_id=UUID(user.id),
    )
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post


@router.get("/{post_id}", response_model=PostRead)
async def get_post(
    db: DbSession,
    post_id: UUID,
):
    result = await db.execute(select(Post).where(Post.id == post_id))
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Post not found",
        )

    return post


@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
    db: DbSession,
    user: CurrentUser,
    post_id: UUID,
):
    result = await db.execute(
        select(Post).where(Post.id == post_id, Post.author_id == UUID(user.id))
    )
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Post not found",
        )

    await db.delete(post)
    await db.commit()

Main Application

主应用

src/main.py

src/main.py

python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from src.api.routes import auth, posts

app = FastAPI(title="My API")
python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from src.api.routes import auth, posts

app = FastAPI(title="My API")

CORS

CORS

app.add_middleware( CORSMiddleware, allow_origins=[""], # Configure for production allow_credentials=True, allow_methods=[""], allow_headers=["*"], )
app.add_middleware( CORSMiddleware, allow_origins=[""], # Configure for production allow_credentials=True, allow_methods=[""], allow_headers=["*"], )

Routes

Routes

app.include_router(auth.router, prefix="/api") app.include_router(posts.router, prefix="/api")
@app.get("/health") async def health_check(): return {"status": "healthy"}

---
app.include_router(auth.router, prefix="/api") app.include_router(posts.router, prefix="/api")
@app.get("/health") async def health_check(): return {"status": "healthy"}

---

Alembic Migrations

Alembic 迁移

Initialize Alembic

初始化 Alembic

bash
alembic init alembic
bash
alembic init alembic

alembic/env.py (key changes)

alembic/env.py(关键修改)

python
from src.db.models import SQLModel
from src.core.config import get_settings

settings = get_settings()
python
from src.db.models import SQLModel
from src.core.config import get_settings

settings = get_settings()

Use async engine

Use async engine

config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = SQLModel.metadata
def run_migrations_online(): # For async import asyncio from sqlalchemy.ext.asyncio import create_async_engine
connectable = create_async_engine(settings.database_url)

async def do_run_migrations():
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations_sync)

def do_run_migrations_sync(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
    )
    with context.begin_transaction():
        context.run_migrations()

asyncio.run(do_run_migrations())
undefined
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = SQLModel.metadata
def run_migrations_online(): # For async import asyncio from sqlalchemy.ext.asyncio import create_async_engine
connectable = create_async_engine(settings.database_url)

async def do_run_migrations():
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations_sync)

def do_run_migrations_sync(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
    )
    with context.begin_transaction():
        context.run_migrations()

asyncio.run(do_run_migrations())
undefined

Migration Commands

迁移命令

bash
undefined
bash
undefined

Create migration

创建迁移

alembic revision --autogenerate -m "create posts table"
alembic revision --autogenerate -m "create posts table"

Apply migrations

应用迁移

alembic upgrade head
alembic upgrade head

Rollback

回滚

alembic downgrade -1

---
alembic downgrade -1

---

Storage

存储功能

Upload File

上传文件

python
from fastapi import UploadFile
from src.services.supabase import get_supabase_client


async def upload_avatar(user_id: str, file: UploadFile) -> str:
    supabase = get_supabase_client()

    file_content = await file.read()
    file_path = f"{user_id}/avatar.{file.filename.split('.')[-1]}"

    response = supabase.storage.from_("avatars").upload(
        file_path,
        file_content,
        {"content-type": file.content_type, "upsert": "true"},
    )

    # Get public URL
    url = supabase.storage.from_("avatars").get_public_url(file_path)
    return url
python
from fastapi import UploadFile
from src.services.supabase import get_supabase_client


async def upload_avatar(user_id: str, file: UploadFile) -> str:
    supabase = get_supabase_client()

    file_content = await file.read()
    file_path = f"{user_id}/avatar.{file.filename.split('.')[-1]}"

    response = supabase.storage.from_("avatars").upload(
        file_path,
        file_content,
        {"content-type": file.content_type, "upsert": "true"},
    )

    # Get public URL
    url = supabase.storage.from_("avatars").get_public_url(file_path)
    return url

Download File

下载文件

python
def get_avatar_url(user_id: str) -> str:
    supabase = get_supabase_client()
    return supabase.storage.from_("avatars").get_public_url(f"{user_id}/avatar.png")

python
def get_avatar_url(user_id: str) -> str:
    supabase = get_supabase_client()
    return supabase.storage.from_("avatars").get_public_url(f"{user_id}/avatar.png")

Realtime (Async)

实时功能(异步)

python
import asyncio
from supabase import create_client


async def listen_to_posts():
    supabase = create_client(
        settings.supabase_url,
        settings.supabase_anon_key
    )

    def handle_change(payload):
        print(f"Change received: {payload}")

    channel = supabase.channel("posts")
    channel.on_postgres_changes(
        event="*",
        schema="public",
        table="posts",
        callback=handle_change,
    ).subscribe()

    # Keep listening
    while True:
        await asyncio.sleep(1)

python
import asyncio
from supabase import create_client


async def listen_to_posts():
    supabase = create_client(
        settings.supabase_url,
        settings.supabase_anon_key
    )

    def handle_change(payload):
        print(f"Change received: {payload}")

    channel = supabase.channel("posts")
    channel.on_postgres_changes(
        event="*",
        schema="public",
        table="posts",
        callback=handle_change,
    ).subscribe()

    # Keep listening
    while True:
        await asyncio.sleep(1)

Testing

测试

tests/conftest.py

tests/conftest.py

python
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from src.main import app
from src.db.session import get_db
from src.db.models import SQLModel

TEST_DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:54322/postgres_test"

engine = create_async_engine(TEST_DATABASE_URL)
TestingSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


@pytest.fixture(scope="function")
async def db_session():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    async with TestingSessionLocal() as session:
        yield session

    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.drop_all)


@pytest.fixture
async def client(db_session):
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as ac:
        yield ac

    app.dependency_overrides.clear()
python
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from src.main import app
from src.db.session import get_db
from src.db.models import SQLModel

TEST_DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:54322/postgres_test"

engine = create_async_engine(TEST_DATABASE_URL)
TestingSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


@pytest.fixture(scope="function")
async def db_session():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    async with TestingSessionLocal() as session:
        yield session

    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.drop_all)


@pytest.fixture
async def client(db_session):
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as ac:
        yield ac

    app.dependency_overrides.clear()

tests/test_posts.py

tests/test_posts.py

python
import pytest
from httpx import AsyncClient


@pytest.mark.asyncio
async def test_list_posts(client: AsyncClient):
    response = await client.get("/api/posts/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

python
import pytest
from httpx import AsyncClient


@pytest.mark.asyncio
async def test_list_posts(client: AsyncClient):
    response = await client.get("/api/posts/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

Running the App

运行应用

bash
undefined
bash
undefined

Development

开发环境

uvicorn src.main:app --reload --port 8000
uvicorn src.main:app --reload --port 8000

Production

生产环境

uvicorn src.main:app --host 0.0.0.0 --port 8000 --workers 4

---
uvicorn src.main:app --host 0.0.0.0 --port 8000 --workers 4

---

Anti-Patterns

反模式

  • Using Supabase client for DB queries - Use SQLAlchemy/SQLModel
  • Sync database calls - Use async with asyncpg
  • Hardcoded credentials - Use environment variables
  • No connection pooling - asyncpg handles this
  • Missing auth dependency - Always validate JWT
  • Not closing sessions - Use context managers
  • Blocking I/O in async - Use async libraries
  • 使用Supabase客户端执行数据库查询 - 应使用SQLAlchemy/SQLModel
  • 同步数据库调用 - 应结合asyncpg使用异步方式
  • 硬编码凭据 - 应使用环境变量
  • 未使用连接池 - asyncpg会自动处理连接池
  • 缺少身份验证依赖 - 始终要验证JWT
  • 未关闭会话 - 使用上下文管理器
  • 异步代码中存在阻塞I/O - 使用异步库