python-aiomysql

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

aiomysql — Async MySQL for Python

aiomysql — Python的异步MySQL库

aiomysql provides asyncio-native access to MySQL databases. It wraps PyMySQL with
async/await
support and exposes Connection, Cursor, and Pool primitives that mirror the synchronous DBAPI interface.
Requirements: Python 3.9+, PyMySQL. Install with:
bash
pip install aiomysql
aiomysql 为MySQL数据库提供原生asyncio支持的访问能力。它基于PyMySQL进行封装,添加了
async/await
支持,并提供了与同步DBAPI接口一致的Connection、Cursor和Pool核心组件。
环境要求:Python 3.9+、PyMySQL。安装命令如下:
bash
pip install aiomysql

Optional SQLAlchemy expression layer:

Optional SQLAlchemy expression layer:

pip install aiomysql sqlalchemy
undefined
pip install aiomysql sqlalchemy
undefined

Core Principles

核心原则

  • Always use a connection pool (
    create_pool
    ) in production — never bare
    connect()
    for long-lived services.
  • Always use async context managers (
    async with
    ) to guarantee connection and cursor release.
  • Never format SQL strings manually. Always pass parameters as the second argument to
    execute()
    .
  • Commit explicitly;
    autocommit
    defaults to
    False
    .
  • Close the pool cleanly on shutdown:
    pool.close()
    then
    await pool.wait_closed()
    .
  • 生产环境中务必使用连接池(
    create_pool
    )—— 长期运行的服务绝对不要直接使用
    connect()
  • 务必使用异步上下文管理器(
    async with
    )来确保连接和游标被正确释放。
  • 绝对不要手动拼接SQL字符串。务必将参数作为
    execute()
    的第二个参数传入。
  • 显式提交事务;
    autocommit
    默认值为
    False
  • 服务关闭时要优雅地关闭连接池:先调用
    pool.close()
    ,再执行
    await pool.wait_closed()

Connection Pool (Preferred Pattern)

连接池(推荐模式)

Create one pool at application startup and share it across the lifetime of the service.
python
import asyncio
import aiomysql

async def create_app_pool() -> aiomysql.Pool:
    return await aiomysql.create_pool(
        host="127.0.0.1",
        port=3306,
        user="appuser",
        password="secret",
        db="mydb",
        minsize=2,
        maxsize=10,
        autocommit=False,
        pool_recycle=3600,  # recycle stale connections after 1 h
        charset="utf8mb4",
    )

async def main() -> None:
    pool = await create_app_pool()
    try:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                (result,) = await cur.fetchone()
                print(result)  # 1
    finally:
        pool.close()
        await pool.wait_closed()

asyncio.run(main())
Key parameters:
ParameterDefaultPurpose
minsize
1Connections pre-created at startup
maxsize
10Hard ceiling on pool size
pool_recycle
-1 (off)Seconds before a connection is recycled
autocommit
FalseSet
True
for read-only workloads
charset
''
Use
utf8mb4
for full Unicode support
在应用启动时创建一个连接池,并在服务的整个生命周期中复用它。
python
import asyncio
import aiomysql

async def create_app_pool() -> aiomysql.Pool:
    return await aiomysql.create_pool(
        host="127.0.0.1",
        port=3306,
        user="appuser",
        password="secret",
        db="mydb",
        minsize=2,
        maxsize=10,
        autocommit=False,
        pool_recycle=3600,  # recycle stale connections after 1 h
        charset="utf8mb4",
    )

async def main() -> None:
    pool = await create_app_pool()
    try:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                (result,) = await cur.fetchone()
                print(result)  # 1
    finally:
        pool.close()
        await pool.wait_closed()

asyncio.run(main())
关键参数
参数默认值用途
minsize
1启动时预创建的连接数
maxsize
10连接池的最大连接数上限
pool_recycle
-1(关闭)连接被回收前的存活时间(秒)
autocommit
False只读工作负载可设置为
True
charset
''
使用
utf8mb4
以支持完整Unicode

Parameterized Queries — SQL Injection Prevention

参数化查询 — 防止SQL注入

Pass values as the second argument to
execute()
. Never use f-strings or
.format()
for SQL parameters.
python
undefined
将参数作为
execute()
的第二个参数传入。绝对不要使用f-string或
.format()
来拼接SQL参数。
python
undefined

CORRECT — parameterized

正确写法 — 参数化查询

await cur.execute( "SELECT id, name FROM users WHERE email = %s AND active = %s", (email, True), )
await cur.execute( "SELECT id, name FROM users WHERE email = %s AND active = %s", (email, True), )

CORRECT — bulk insert via executemany (batched automatically)

正确写法 — 通过executemany批量插入(自动分批处理)

rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")] await cur.executemany( "INSERT INTO users (name, email) VALUES (%s, %s)", rows, )
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")] await cur.executemany( "INSERT INTO users (name, email) VALUES (%s, %s)", rows, )

WRONG — string interpolation, never do this

错误写法 — 字符串插值,绝对不要这样做

await cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
undefined
await cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
undefined

Cursor Types

游标类型

Choose the right cursor for the job:
CursorImportReturnsUse when
Cursor
(default)
built-in
tuple
General queries, small result sets
DictCursor
aiomysql.DictCursor
dict
Named-column access, readability
SSCursor
aiomysql.SSCursor
tuple
Large result sets (unbuffered)
SSDictCursor
aiomysql.SSDictCursor
dict
Large result sets, named columns
python
undefined
根据需求选择合适的游标:
游标导入方式返回值类型使用场景
Cursor
(默认)
内置
tuple
通用查询、小结果集
DictCursor
aiomysql.DictCursor
dict
按列名访问结果、提升可读性
SSCursor
aiomysql.SSCursor
tuple
大结果集(无缓冲)
SSDictCursor
aiomysql.SSDictCursor
dict
大结果集、按列名访问
python
undefined

DictCursor — access columns by name

DictCursor — 按列名访问结果

async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute("SELECT id, name FROM users WHERE id = %s", (42,)) row = await cur.fetchone() print(row["name"]) # "Alice"
async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute("SELECT id, name FROM users WHERE id = %s", (42,)) row = await cur.fetchone() print(row["name"]) # "Alice"

SSCursor — stream large result sets without buffering all rows in memory

SSCursor — 流式处理大结果集,无需将所有行加载到内存

async with conn.cursor(aiomysql.SSCursor) as cur: await cur.execute("SELECT * FROM large_table") async for row in cur: process(row)
undefined
async with conn.cursor(aiomysql.SSCursor) as cur: await cur.execute("SELECT * FROM large_table") async for row in cur: process(row)
undefined

Transaction Management

事务管理

Explicit transaction handling avoids silent data loss and partial writes.
python
async def transfer_funds(
    pool: aiomysql.Pool, from_id: int, to_id: int, amount: float
) -> None:
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            try:
                await conn.begin()
                await cur.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, from_id),
                )
                await cur.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to_id),
                )
                await conn.commit()
            except Exception:
                await conn.rollback()
                raise
Never rely on auto-rollback. Always call
conn.rollback()
explicitly in the
except
block when
autocommit=False
.
显式处理事务可避免静默数据丢失和部分写入。
python
async def transfer_funds(
    pool: aiomysql.Pool, from_id: int, to_id: int, amount: float
) -> None:
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            try:
                await conn.begin()
                await cur.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, from_id),
                )
                await cur.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to_id),
                )
                await conn.commit()
            except Exception:
                await conn.rollback()
                raise
绝对不要依赖自动回滚。当
autocommit=False
时,务必在
except
块中显式调用
conn.rollback()

Fetch Strategies

结果获取策略

Choose the fetch method based on result size:
python
undefined
根据结果集大小选择合适的获取方法:
python
undefined

Single row — stops fetching immediately

获取单行 — 立即停止获取

row = await cur.fetchone()
row = await cur.fetchone()

All rows — fine for small to medium result sets

获取所有行 — 适用于小到中等大小的结果集

rows = await cur.fetchall()
rows = await cur.fetchall()

Paginated — process in chunks to bound memory usage

分页获取 — 分块处理以控制内存使用

while True: chunk = await cur.fetchmany(size=500) if not chunk: break process_chunk(chunk)

Use `SSCursor` (unbuffered) for very large result sets instead of `fetchall()`.
while True: chunk = await cur.fetchmany(size=500) if not chunk: break process_chunk(chunk)

处理超大结果集时,使用`SSCursor`(无缓冲)替代`fetchall()`。

Accessing INSERT IDs and Row Counts

获取插入ID和受影响行数

python
await cur.execute(
    "INSERT INTO orders (user_id, total) VALUES (%s, %s)",
    (user_id, total),
)
await conn.commit()

new_id = cur.lastrowid      # AUTO_INCREMENT value of inserted row
affected = cur.rowcount     # rows affected by last DML statement
python
await cur.execute(
    "INSERT INTO orders (user_id, total) VALUES (%s, %s)",
    (user_id, total),
)
await conn.commit()

new_id = cur.lastrowid      # 插入行的AUTO_INCREMENT值
affected = cur.rowcount     # 最后一次DML语句影响的行数

Lifecycle: Single Connection (Scripts / Tests)

生命周期:单个连接(脚本/测试)

Use bare
connect()
only in short-lived scripts or test fixtures — not in services.
python
import aiomysql

async def run_script() -> None:
    conn = await aiomysql.connect(
        host="127.0.0.1", port=3306,
        user="root", password="", db="mydb",
        charset="utf8mb4",
    )
    try:
        async with conn.cursor() as cur:
            await cur.execute("SELECT VERSION()")
            (version,) = await cur.fetchone()
            print(f"MySQL {version}")
    finally:
        conn.close()  # synchronous; flushes and closes socket
Note:
conn.close()
is synchronous. Use
await conn.ensure_closed()
when you need the async variant that sends a quit command before closing.
仅在短期运行的脚本或测试夹具中使用直接
connect()
—— 不要在服务中使用。
python
import aiomysql

async def run_script() -> None:
    conn = await aiomysql.connect(
        host="127.0.0.1", port=3306,
        user="root", password="", db="mydb",
        charset="utf8mb4",
    )
    try:
        async with conn.cursor() as cur:
            await cur.execute("SELECT VERSION()")
            (version,) = await cur.fetchone()
            print(f"MySQL {version}")
    finally:
        conn.close()  # 同步方法;刷新并关闭套接字
注意:
conn.close()
同步方法。如果需要先发送退出命令再关闭的异步版本,请使用
await conn.ensure_closed()

SQLAlchemy Expression Layer (
aiomysql.sa
)

SQLAlchemy表达式层(
aiomysql.sa

Use
aiomysql.sa
for type-safe query construction when raw SQL becomes unwieldy.
python
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine

metadata = sa.MetaData()
users = sa.Table(
    "users",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("name", sa.String(128)),
    sa.Column("email", sa.String(255)),
)

async def main() -> None:
    engine = await create_engine(
        user="appuser", password="secret",
        host="127.0.0.1", db="mydb",
    )
    async with engine.acquire() as conn:
        async with conn.begin() as tx:
            await conn.execute(users.insert().values(name="Alice", email="a@x.com"))
            await tx.commit()

        result = await conn.execute(users.select())
        async for row in result:
            print(row.id, row.name)

    engine.close()
    await engine.wait_closed()

asyncio.run(main())
The
SAConnection
wraps
aiomysql.Connection
and provides
.begin()
,
.begin_nested()
(SAVEPOINT),
.scalar()
, and
.execute()
with SQLAlchemy expression support.
当原生SQL变得繁琐时,可使用
aiomysql.sa
进行类型安全的查询构造。
python
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine

metadata = sa.MetaData()
users = sa.Table(
    "users",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("name", sa.String(128)),
    sa.Column("email", sa.String(255)),
)

async def main() -> None:
    engine = await create_engine(
        user="appuser", password="secret",
        host="127.0.0.1", db="mydb",
    )
    async with engine.acquire() as conn:
        async with conn.begin() as tx:
            await conn.execute(users.insert().values(name="Alice", email="a@x.com"))
            await tx.commit()

        result = await conn.execute(users.select())
        async for row in result:
            print(row.id, row.name)

    engine.close()
    await engine.wait_closed()

asyncio.run(main())
SAConnection
封装了
aiomysql.Connection
,提供了
.begin()
.begin_nested()
(保存点)、
.scalar()
以及支持SQLAlchemy表达式的
.execute()
方法。

Common Mistakes

常见错误

Forgetting
await
on cursor close
:
cur.close()
is a coroutine — omitting
await
silently skips cleanup. Prefer
async with conn.cursor() as cur
to avoid this entirely.
Not recycling stale connections: Without
pool_recycle
, connections held longer than MySQL's
wait_timeout
(default 8 hours) become invalid. Set
pool_recycle
to a value shorter than the server's timeout.
Using
fetchall()
on large tables
: Loads the entire result set into memory. Use
fetchmany(size=N)
or
SSCursor
for large datasets.
Mixing autocommit modes: Setting
autocommit=True
on the pool and calling
await conn.commit()
is harmless but redundant. Be explicit and consistent per workload.
Passing
loop
parameter in Python 3.10+
: The
loop
parameter is deprecated and ignored in modern Python. Remove it from all
connect()
and
create_pool()
calls.
忘记在关闭游标时使用
await
cur.close()
是一个协程—— 省略
await
会静默跳过清理操作。建议优先使用
async with conn.cursor() as cur
来完全避免这个问题。
未回收过期连接:如果没有设置
pool_recycle
,连接的存活时间超过MySQL的
wait_timeout
(默认8小时)后会失效。请将
pool_recycle
设置为比服务器超时时间更短的值。
在大表上使用
fetchall()
:会将整个结果集加载到内存中。处理大型数据集时,请使用
fetchmany(size=N)
SSCursor
混合自动提交模式:在连接池上设置
autocommit=True
同时调用
await conn.commit()
是无害的,但属于冗余操作。请根据工作负载明确且一致地设置模式。
在Python 3.10+中传递
loop
参数
:在现代Python中,
loop
参数已被废弃并会被忽略。请从所有
connect()
create_pool()
调用中移除该参数。

Additional Resources

额外资源

Reference Files

参考文档

  • references/connection-pool.md
    — Pool configuration,
    pool_recycle
    , sizing guidelines, and graceful shutdown patterns
  • references/transactions-cursors.md
    — Transaction isolation levels, SAVEPOINT nesting, cursor type comparison, and
    callproc
    usage
  • references/connection-pool.md
    — 连接池配置、
    pool_recycle
    、大小调整指南和优雅关闭模式
  • references/transactions-cursors.md
    — 事务隔离级别、保存点嵌套、游标类型对比和
    callproc
    用法

External Documentation

外部文档