Loading...
Loading...
SQLAlchemy Python SQL toolkit and ORM with powerful query builder, relationship mapping, and database migrations via Alembic
npx skill4agent add bobmatnyc/claude-mpm-skills sqlalchemy-ormselect()QueryMapped[T]mapped_column()Session.execute()AsyncSession# Core SQLAlchemy
pip install sqlalchemy
# With async support
pip install sqlalchemy[asyncio] aiosqlite # SQLite
pip install sqlalchemy[asyncio] asyncpg # PostgreSQL
# With Alembic for migrations
pip install alembic
# FastAPI integration
pip install fastapi sqlalchemyfrom datetime import datetime
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# Base class for all models
class Base(DeclarativeBase):
pass
# User model with type hints
class User(Base):
__tablename__ = "users"
# Primary key
id: Mapped[int] = mapped_column(primary_key=True)
# Required fields
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))
# Optional fields
full_name: Mapped[Optional[str]] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
# Timestamps with server defaults
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
# Relationships
posts: Mapped[list["Post"]] = relationship(back_populates="author")
def __repr__(self) -> str:
return f"User(id={self.id}, email={self.email})"class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# Relationship with back_populates
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(
secondary="post_tags",
back_populates="posts"
)from sqlalchemy import Table, Column, Integer, ForeignKey
# Association table
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True)
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["Post"]] = relationship(
secondary=post_tags,
back_populates="tags"
)from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
# Database URL formats
# SQLite: sqlite:///./database.db
# PostgreSQL: postgresql://user:pass@localhost/dbname
# MySQL: mysql+pymysql://user:pass@localhost/dbname
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
# Create engine with connection pooling
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Check connection before using
echo=False # Set True for SQL logging
)
# Session factory
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
# Create tables
Base.metadata.create_all(bind=engine)from typing import Generator
def get_db() -> Generator[Session, None, None]:
"""Database session dependency for FastAPI."""
db = SessionLocal()
try:
yield db
finally:
db.close()
# Usage in FastAPI
from fastapi import Depends
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
return db.execute(
select(User).where(User.id == user_id)
).scalar_one_or_none()from sqlalchemy import select, and_, or_, desc, func
# Basic select
stmt = select(User).where(User.email == "user@example.com")
user = session.execute(stmt).scalar_one_or_none()
# Multiple conditions
stmt = select(User).where(
and_(
User.is_active == True,
User.created_at > datetime(2024, 1, 1)
)
)
users = session.execute(stmt).scalars().all()
# OR conditions
stmt = select(User).where(
or_(
User.email.like("%@gmail.com"),
User.email.like("%@yahoo.com")
)
)
# Ordering and limiting
stmt = (
select(User)
.where(User.is_active == True)
.order_by(desc(User.created_at))
.limit(10)
.offset(20)
)
# Counting
stmt = select(func.count()).select_from(User)
count = session.execute(stmt).scalar()# Inner join
stmt = (
select(Post, User)
.join(User, Post.user_id == User.id)
.where(User.is_active == True)
)
results = session.execute(stmt).all()
# Left outer join
stmt = (
select(User, func.count(Post.id).label("post_count"))
.outerjoin(Post)
.group_by(User.id)
)
# Multiple joins
stmt = (
select(Post)
.join(Post.author)
.join(Post.tags)
.where(Tag.name == "python")
)from sqlalchemy.orm import selectinload, joinedload
# selectinload - separate query (better for collections)
stmt = select(User).options(selectinload(User.posts))
users = session.execute(stmt).scalars().all()
# Now users[0].posts won't trigger additional queries
# joinedload - single query with join (better for one-to-one)
stmt = select(Post).options(joinedload(Post.author))
posts = session.execute(stmt).unique().scalars().all()
# Nested eager loading
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.tags)
)
# Load only specific columns
from sqlalchemy.orm import load_only
stmt = select(User).options(load_only(User.id, User.email))def create_user(db: Session, email: str, username: str, password: str):
"""Create new user."""
user = User(
email=email,
username=username,
hashed_password=hash_password(password)
)
db.add(user)
db.commit()
db.refresh(user) # Get updated fields (id, timestamps)
return user
# Bulk insert
users = [
User(email=f"user{i}@example.com", username=f"user{i}")
for i in range(100)
]
db.add_all(users)
db.commit()def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Get user by email."""
stmt = select(User).where(User.email == email)
return db.execute(stmt).scalar_one_or_none()
def get_users(
db: Session,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""Get paginated users."""
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return db.execute(stmt).scalars().all()def update_user(db: Session, user_id: int, **kwargs):
"""Update user fields."""
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
return None
for key, value in kwargs.items():
setattr(user, key, value)
db.commit()
db.refresh(user)
return user
# Bulk update
from sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == False)
.values(deleted_at=datetime.utcnow())
)
db.execute(stmt)
db.commit()def delete_user(db: Session, user_id: int) -> bool:
"""Delete user."""
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
return False
db.delete(user)
db.commit()
return True
# Bulk delete
from sqlalchemy import delete
stmt = delete(User).where(User.is_active == False)
db.execute(stmt)
db.commit()from contextlib import contextmanager
@contextmanager
def get_db_session():
"""Session context manager."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Usage
with get_db_session() as db:
user = create_user(db, "test@example.com", "testuser", "password")
# Auto-commits on success, rollback on exceptiondef transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
"""Transfer money between users with transaction."""
try:
# Begin nested transaction
with db.begin_nested():
# Deduct from sender
stmt = select(User).where(User.id == from_user_id).with_for_update()
sender = db.execute(stmt).scalar_one()
sender.balance -= amount
# Add to receiver
stmt = select(User).where(User.id == to_user_id).with_for_update()
receiver = db.execute(stmt).scalar_one()
receiver.balance += amount
db.commit()
except Exception as e:
db.rollback()
raisefrom sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
# Async engine (note: asyncpg for PostgreSQL, aiosqlite for SQLite)
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
async_engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=5,
max_overflow=10
)
# Async session factory
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
# Create tables
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)async def get_user_async(user_id: int) -> Optional[User]:
"""Get user asynchronously."""
async with AsyncSessionLocal() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def create_user_async(email: str, username: str) -> User:
"""Create user asynchronously."""
async with AsyncSessionLocal() as session:
user = User(email=email, username=username)
session.add(user)
await session.commit()
await session.refresh(user)
return user
# FastAPI async dependency
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user_endpoint(
user_id: int,
db: AsyncSession = Depends(get_async_db)
):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()# Initialize Alembic
alembic init alembic
# Edit alembic.ini - set database URL
# sqlalchemy.url = postgresql://user:pass@localhost/mydb
# Or use env variable in alembic/env.py# alembic/env.py
from sqlalchemy import engine_from_config, pool
from alembic import context
from myapp.models import Base # Import your Base
# Add your model's MetaData
target_metadata = Base.metadata
def run_migrations_online():
"""Run migrations in 'online' mode."""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()# Auto-generate migration from model changes
alembic revision --autogenerate -m "Add users table"
# Review generated migration in alembic/versions/
# Apply migration
alembic upgrade head
# Rollback one version
alembic downgrade -1
# Show current version
alembic current
# Show migration history
alembic history# alembic/versions/xxx_add_users.py
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('username', sa.String(50), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_users_email', 'users', ['email'], unique=True)
def downgrade():
op.drop_index('ix_users_email', table_name='users')
op.drop_table('users')from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from typing import List
app = FastAPI()
# Pydantic schemas
class UserBase(BaseModel):
email: EmailStr
username: str
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True # SQLAlchemy 2.0 (was orm_mode)
# CRUD operations
@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)):
# Check if user exists
stmt = select(User).where(User.email == user.email)
if db.execute(stmt).scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create user
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@app.get("/users/", response_model=List[UserResponse])
def list_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
stmt = (
select(User)
.where(User.is_active == True)
.offset(skip)
.limit(limit)
)
return db.execute(stmt).scalars().all()
@app.put("/users/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
user_update: UserBase,
db: Session = Depends(get_db)
):
stmt = select(User).where(User.id == user_id)
db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db_user.email = user_update.email
db_user.username = user_update.username
db.commit()
db.refresh(db_user)
return db_user
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: Session = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
db_user = db.execute(stmt).scalar_one_or_none()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
db.delete(db_user)
db.commit()import pytest
from sqlalchemy import create_engine, StaticPool
from sqlalchemy.orm import sessionmaker
# In-memory SQLite for testing
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="function")
def db_session():
"""Create test database session."""
engine = create_engine(
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
# Create tables
Base.metadata.create_all(bind=engine)
TestingSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def test_user(db_session):
"""Create test user."""
user = User(
email="test@example.com",
username="testuser",
hashed_password="hashed"
)
db_session.add(user)
db_session.commit()
db_session.refresh(user)
return userdef test_create_user(db_session):
"""Test user creation."""
user = User(email="new@example.com", username="newuser")
db_session.add(user)
db_session.commit()
assert user.id is not None
assert user.email == "new@example.com"
assert user.created_at is not None
def test_query_user(db_session, test_user):
"""Test user query."""
stmt = select(User).where(User.email == "test@example.com")
found_user = db_session.execute(stmt).scalar_one()
assert found_user.id == test_user.id
assert found_user.username == test_user.username
def test_update_user(db_session, test_user):
"""Test user update."""
test_user.username = "updated"
db_session.commit()
stmt = select(User).where(User.id == test_user.id)
updated_user = db_session.execute(stmt).scalar_one()
assert updated_user.username == "updated"
def test_delete_user(db_session, test_user):
"""Test user deletion."""
user_id = test_user.id
db_session.delete(test_user)
db_session.commit()
stmt = select(User).where(User.id == user_id)
assert db_session.execute(stmt).scalar_one_or_none() is None# Use indexes
class User(Base):
__tablename__ = "users"
email: Mapped[str] = mapped_column(String(255), index=True, unique=True)
created_at: Mapped[datetime] = mapped_column(index=True)
# Composite index
__table_args__ = (
Index('ix_user_email_active', 'email', 'is_active'),
)
# Use select_from for complex queries
stmt = (
select(func.count(Post.id))
.select_from(User)
.join(Post)
.where(User.is_active == True)
)
# Use contains_eager for joined loads
from sqlalchemy.orm import contains_eager
stmt = (
select(Post)
.join(Post.author)
.options(contains_eager(Post.author))
.where(User.is_active == True)
)# Configure pool
engine = create_engine(
DATABASE_URL,
pool_size=20, # Number of connections to keep
max_overflow=10, # Additional connections when pool full
pool_timeout=30, # Seconds to wait for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True # Verify connections before use
)
# Monitor pool
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
print("New connection established")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
print("Connection checked out from pool")# Bulk insert with executemany
from sqlalchemy import insert
data = [
{"email": f"user{i}@example.com", "username": f"user{i}"}
for i in range(1000)
]
stmt = insert(User)
db.execute(stmt, data)
db.commit()
# Bulk update
from sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == False)
.values(deleted_at=func.now())
)
db.execute(stmt)Mapped[T]mapped_column()selectinloadjoinedloadAsyncSessionNoResultFoundMultipleResultsFoundfrom typing import Generic, TypeVar, Type
from sqlalchemy.orm import Session
T = TypeVar('T', bound=Base)
class BaseRepository(Generic[T]):
def __init__(self, model: Type[T], db: Session):
self.model = model
self.db = db
def get(self, id: int) -> Optional[T]:
stmt = select(self.model).where(self.model.id == id)
return self.db.execute(stmt).scalar_one_or_none()
def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
stmt = select(self.model).offset(skip).limit(limit)
return self.db.execute(stmt).scalars().all()
def create(self, obj: T) -> T:
self.db.add(obj)
self.db.commit()
self.db.refresh(obj)
return obj
def delete(self, id: int) -> bool:
obj = self.get(id)
if obj:
self.db.delete(obj)
self.db.commit()
return True
return False
# Usage
user_repo = BaseRepository(User, db)
user = user_repo.get(1)class SoftDeleteMixin:
deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
class User(Base, SoftDeleteMixin):
__tablename__ = "users"
# ... fields
# Query only active records
stmt = select(User).where(User.deleted_at.is_(None))
# Soft delete
user.deleted_at = datetime.utcnow()
db.commit()class AuditMixin:
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))
class Post(Base, AuditMixin):
__tablename__ = "posts"
# ... fields