python-aiomysql
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chineseaiomysql — Async MySQL for Python
aiomysql — Python的异步MySQL库
aiomysql provides asyncio-native access to MySQL databases. It wraps PyMySQL with support and exposes Connection, Cursor, and Pool primitives that mirror the synchronous DBAPI interface.
async/awaitRequirements: Python 3.9+, PyMySQL. Install with:
bash
pip install aiomysqlaiomysql 为MySQL数据库提供原生asyncio支持的访问能力。它基于PyMySQL进行封装,添加了支持,并提供了与同步DBAPI接口一致的Connection、Cursor和Pool核心组件。
async/await环境要求:Python 3.9+、PyMySQL。安装命令如下:
bash
pip install aiomysqlOptional SQLAlchemy expression layer:
Optional SQLAlchemy expression layer:
pip install aiomysql sqlalchemy
undefinedpip install aiomysql sqlalchemy
undefinedCore Principles
核心原则
- Always use a connection pool () in production — never bare
create_poolfor long-lived services.connect() - Always use async context managers () to guarantee connection and cursor release.
async with - Never format SQL strings manually. Always pass parameters as the second argument to .
execute() - Commit explicitly; defaults to
autocommit.False - Close the pool cleanly on shutdown: then
pool.close().await pool.wait_closed()
- 生产环境中务必使用连接池()—— 长期运行的服务绝对不要直接使用
create_pool。connect() - 务必使用异步上下文管理器()来确保连接和游标被正确释放。
async with - 绝对不要手动拼接SQL字符串。务必将参数作为的第二个参数传入。
execute() - 显式提交事务;默认值为
autocommit。False - 服务关闭时要优雅地关闭连接池:先调用,再执行
pool.close()。await pool.wait_closed()
Connection Pool (Preferred Pattern)
连接池(推荐模式)
Create one pool at application startup and share it across the lifetime of the service.
python
import asyncio
import aiomysql
async def create_app_pool() -> aiomysql.Pool:
return await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="appuser",
password="secret",
db="mydb",
minsize=2,
maxsize=10,
autocommit=False,
pool_recycle=3600, # recycle stale connections after 1 h
charset="utf8mb4",
)
async def main() -> None:
pool = await create_app_pool()
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
(result,) = await cur.fetchone()
print(result) # 1
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())Key parameters:
| Parameter | Default | Purpose |
|---|---|---|
| 1 | Connections pre-created at startup |
| 10 | Hard ceiling on pool size |
| -1 (off) | Seconds before a connection is recycled |
| False | Set |
| | Use |
在应用启动时创建一个连接池,并在服务的整个生命周期中复用它。
python
import asyncio
import aiomysql
async def create_app_pool() -> aiomysql.Pool:
return await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="appuser",
password="secret",
db="mydb",
minsize=2,
maxsize=10,
autocommit=False,
pool_recycle=3600, # recycle stale connections after 1 h
charset="utf8mb4",
)
async def main() -> None:
pool = await create_app_pool()
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
(result,) = await cur.fetchone()
print(result) # 1
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())关键参数:
| 参数 | 默认值 | 用途 |
|---|---|---|
| 1 | 启动时预创建的连接数 |
| 10 | 连接池的最大连接数上限 |
| -1(关闭) | 连接被回收前的存活时间(秒) |
| False | 只读工作负载可设置为 |
| | 使用 |
Parameterized Queries — SQL Injection Prevention
参数化查询 — 防止SQL注入
Pass values as the second argument to . Never use f-strings or for SQL parameters.
execute().format()python
undefined将参数作为的第二个参数传入。绝对不要使用f-string或来拼接SQL参数。
execute().format()python
undefinedCORRECT — parameterized
正确写法 — 参数化查询
await cur.execute(
"SELECT id, name FROM users WHERE email = %s AND active = %s",
(email, True),
)
await cur.execute(
"SELECT id, name FROM users WHERE email = %s AND active = %s",
(email, True),
)
CORRECT — bulk insert via executemany (batched automatically)
正确写法 — 通过executemany批量插入(自动分批处理)
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
await cur.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
rows,
)
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
await cur.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
rows,
)
WRONG — string interpolation, never do this
错误写法 — 字符串插值,绝对不要这样做
await cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
undefinedawait cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
undefinedCursor Types
游标类型
Choose the right cursor for the job:
| Cursor | Import | Returns | Use when |
|---|---|---|---|
| built-in | | General queries, small result sets |
| | | Named-column access, readability |
| | | Large result sets (unbuffered) |
| | | Large result sets, named columns |
python
undefined根据需求选择合适的游标:
| 游标 | 导入方式 | 返回值类型 | 使用场景 |
|---|---|---|---|
| 内置 | | 通用查询、小结果集 |
| | | 按列名访问结果、提升可读性 |
| | | 大结果集(无缓冲) |
| | | 大结果集、按列名访问 |
python
undefinedDictCursor — access columns by name
DictCursor — 按列名访问结果
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT id, name FROM users WHERE id = %s", (42,))
row = await cur.fetchone()
print(row["name"]) # "Alice"
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT id, name FROM users WHERE id = %s", (42,))
row = await cur.fetchone()
print(row["name"]) # "Alice"
SSCursor — stream large result sets without buffering all rows in memory
SSCursor — 流式处理大结果集,无需将所有行加载到内存
async with conn.cursor(aiomysql.SSCursor) as cur:
await cur.execute("SELECT * FROM large_table")
async for row in cur:
process(row)
undefinedasync with conn.cursor(aiomysql.SSCursor) as cur:
await cur.execute("SELECT * FROM large_table")
async for row in cur:
process(row)
undefinedTransaction Management
事务管理
Explicit transaction handling avoids silent data loss and partial writes.
python
async def transfer_funds(
pool: aiomysql.Pool, from_id: int, to_id: int, amount: float
) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await conn.begin()
await cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id),
)
await cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id),
)
await conn.commit()
except Exception:
await conn.rollback()
raiseNever rely on auto-rollback. Always call explicitly in the block when .
conn.rollback()exceptautocommit=False显式处理事务可避免静默数据丢失和部分写入。
python
async def transfer_funds(
pool: aiomysql.Pool, from_id: int, to_id: int, amount: float
) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await conn.begin()
await cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id),
)
await cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id),
)
await conn.commit()
except Exception:
await conn.rollback()
raise绝对不要依赖自动回滚。当时,务必在块中显式调用。
autocommit=Falseexceptconn.rollback()Fetch Strategies
结果获取策略
Choose the fetch method based on result size:
python
undefined根据结果集大小选择合适的获取方法:
python
undefinedSingle row — stops fetching immediately
获取单行 — 立即停止获取
row = await cur.fetchone()
row = await cur.fetchone()
All rows — fine for small to medium result sets
获取所有行 — 适用于小到中等大小的结果集
rows = await cur.fetchall()
rows = await cur.fetchall()
Paginated — process in chunks to bound memory usage
分页获取 — 分块处理以控制内存使用
while True:
chunk = await cur.fetchmany(size=500)
if not chunk:
break
process_chunk(chunk)
Use `SSCursor` (unbuffered) for very large result sets instead of `fetchall()`.while True:
chunk = await cur.fetchmany(size=500)
if not chunk:
break
process_chunk(chunk)
处理超大结果集时,使用`SSCursor`(无缓冲)替代`fetchall()`。Accessing INSERT IDs and Row Counts
获取插入ID和受影响行数
python
await cur.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s)",
(user_id, total),
)
await conn.commit()
new_id = cur.lastrowid # AUTO_INCREMENT value of inserted row
affected = cur.rowcount # rows affected by last DML statementpython
await cur.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s)",
(user_id, total),
)
await conn.commit()
new_id = cur.lastrowid # 插入行的AUTO_INCREMENT值
affected = cur.rowcount # 最后一次DML语句影响的行数Lifecycle: Single Connection (Scripts / Tests)
生命周期:单个连接(脚本/测试)
Use bare only in short-lived scripts or test fixtures — not in services.
connect()python
import aiomysql
async def run_script() -> None:
conn = await aiomysql.connect(
host="127.0.0.1", port=3306,
user="root", password="", db="mydb",
charset="utf8mb4",
)
try:
async with conn.cursor() as cur:
await cur.execute("SELECT VERSION()")
(version,) = await cur.fetchone()
print(f"MySQL {version}")
finally:
conn.close() # synchronous; flushes and closes socketNote: is synchronous. Use when you need the async variant that sends a quit command before closing.
conn.close()await conn.ensure_closed()仅在短期运行的脚本或测试夹具中使用直接—— 不要在服务中使用。
connect()python
import aiomysql
async def run_script() -> None:
conn = await aiomysql.connect(
host="127.0.0.1", port=3306,
user="root", password="", db="mydb",
charset="utf8mb4",
)
try:
async with conn.cursor() as cur:
await cur.execute("SELECT VERSION()")
(version,) = await cur.fetchone()
print(f"MySQL {version}")
finally:
conn.close() # 同步方法;刷新并关闭套接字注意:是同步方法。如果需要先发送退出命令再关闭的异步版本,请使用。
conn.close()await conn.ensure_closed()SQLAlchemy Expression Layer (aiomysql.sa
)
aiomysql.saSQLAlchemy表达式层(aiomysql.sa
)
aiomysql.saUse for type-safe query construction when raw SQL becomes unwieldy.
aiomysql.sapython
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(128)),
sa.Column("email", sa.String(255)),
)
async def main() -> None:
engine = await create_engine(
user="appuser", password="secret",
host="127.0.0.1", db="mydb",
)
async with engine.acquire() as conn:
async with conn.begin() as tx:
await conn.execute(users.insert().values(name="Alice", email="a@x.com"))
await tx.commit()
result = await conn.execute(users.select())
async for row in result:
print(row.id, row.name)
engine.close()
await engine.wait_closed()
asyncio.run(main())The wraps and provides , (SAVEPOINT), , and with SQLAlchemy expression support.
SAConnectionaiomysql.Connection.begin().begin_nested().scalar().execute()当原生SQL变得繁琐时,可使用进行类型安全的查询构造。
aiomysql.sapython
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(128)),
sa.Column("email", sa.String(255)),
)
async def main() -> None:
engine = await create_engine(
user="appuser", password="secret",
host="127.0.0.1", db="mydb",
)
async with engine.acquire() as conn:
async with conn.begin() as tx:
await conn.execute(users.insert().values(name="Alice", email="a@x.com"))
await tx.commit()
result = await conn.execute(users.select())
async for row in result:
print(row.id, row.name)
engine.close()
await engine.wait_closed()
asyncio.run(main())SAConnectionaiomysql.Connection.begin().begin_nested().scalar().execute()Common Mistakes
常见错误
Forgetting on cursor close: is a coroutine — omitting silently skips cleanup. Prefer to avoid this entirely.
awaitcur.close()awaitasync with conn.cursor() as curNot recycling stale connections: Without , connections held longer than MySQL's (default 8 hours) become invalid. Set to a value shorter than the server's timeout.
pool_recyclewait_timeoutpool_recycleUsing on large tables: Loads the entire result set into memory. Use or for large datasets.
fetchall()fetchmany(size=N)SSCursorMixing autocommit modes: Setting on the pool and calling is harmless but redundant. Be explicit and consistent per workload.
autocommit=Trueawait conn.commit()Passing parameter in Python 3.10+: The parameter is deprecated and ignored in modern Python. Remove it from all and calls.
looploopconnect()create_pool()忘记在关闭游标时使用:是一个协程—— 省略会静默跳过清理操作。建议优先使用来完全避免这个问题。
awaitcur.close()awaitasync with conn.cursor() as cur未回收过期连接:如果没有设置,连接的存活时间超过MySQL的(默认8小时)后会失效。请将设置为比服务器超时时间更短的值。
pool_recyclewait_timeoutpool_recycle在大表上使用:会将整个结果集加载到内存中。处理大型数据集时,请使用或。
fetchall()fetchmany(size=N)SSCursor混合自动提交模式:在连接池上设置同时调用是无害的,但属于冗余操作。请根据工作负载明确且一致地设置模式。
autocommit=Trueawait conn.commit()在Python 3.10+中传递参数:在现代Python中,参数已被废弃并会被忽略。请从所有和调用中移除该参数。
looploopconnect()create_pool()Additional Resources
额外资源
Reference Files
参考文档
- — Pool configuration,
references/connection-pool.md, sizing guidelines, and graceful shutdown patternspool_recycle - — Transaction isolation levels, SAVEPOINT nesting, cursor type comparison, and
references/transactions-cursors.mdusagecallproc
- — 连接池配置、
references/connection-pool.md、大小调整指南和优雅关闭模式pool_recycle - — 事务隔离级别、保存点嵌套、游标类型对比和
references/transactions-cursors.md用法callproc