Loading...
Loading...
This skill should be used when the user asks to "connect to MySQL with asyncio", "use aiomysql", "set up an async MySQL connection pool", "query MySQL asynchronously in Python", or needs guidance on aiomysql best practices, connection lifecycle, transactions, or cursor types.
npx skill4agent add the-perfect-developer/the-perfect-opencode python-aiomysqlasync/awaitpip install aiomysql
# Optional SQLAlchemy expression layer:
pip install aiomysql sqlalchemycreate_poolconnect()async withexecute()autocommitFalsepool.close()await pool.wait_closed()import asyncio
import aiomysql
async def create_app_pool() -> aiomysql.Pool:
return await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="appuser",
password="secret",
db="mydb",
minsize=2,
maxsize=10,
autocommit=False,
pool_recycle=3600, # recycle stale connections after 1 h
charset="utf8mb4",
)
async def main() -> None:
pool = await create_app_pool()
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
(result,) = await cur.fetchone()
print(result) # 1
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())| Parameter | Default | Purpose |
|---|---|---|
| 1 | Connections pre-created at startup |
| 10 | Hard ceiling on pool size |
| -1 (off) | Seconds before a connection is recycled |
| False | Set |
| | Use |
execute().format()# CORRECT — parameterized
await cur.execute(
"SELECT id, name FROM users WHERE email = %s AND active = %s",
(email, True),
)
# CORRECT — bulk insert via executemany (batched automatically)
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
await cur.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
rows,
)
# WRONG — string interpolation, never do this
await cur.execute(f"SELECT * FROM users WHERE email = '{email}'")| Cursor | Import | Returns | Use when |
|---|---|---|---|
| built-in | | General queries, small result sets |
| | | Named-column access, readability |
| | | Large result sets (unbuffered) |
| | | Large result sets, named columns |
# DictCursor — access columns by name
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT id, name FROM users WHERE id = %s", (42,))
row = await cur.fetchone()
print(row["name"]) # "Alice"
# SSCursor — stream large result sets without buffering all rows in memory
async with conn.cursor(aiomysql.SSCursor) as cur:
await cur.execute("SELECT * FROM large_table")
async for row in cur:
process(row)async def transfer_funds(
pool: aiomysql.Pool, from_id: int, to_id: int, amount: float
) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await conn.begin()
await cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id),
)
await cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id),
)
await conn.commit()
except Exception:
await conn.rollback()
raiseconn.rollback()exceptautocommit=False# Single row — stops fetching immediately
row = await cur.fetchone()
# All rows — fine for small to medium result sets
rows = await cur.fetchall()
# Paginated — process in chunks to bound memory usage
while True:
chunk = await cur.fetchmany(size=500)
if not chunk:
break
process_chunk(chunk)SSCursorfetchall()await cur.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s)",
(user_id, total),
)
await conn.commit()
new_id = cur.lastrowid # AUTO_INCREMENT value of inserted row
affected = cur.rowcount # rows affected by last DML statementconnect()import aiomysql
async def run_script() -> None:
conn = await aiomysql.connect(
host="127.0.0.1", port=3306,
user="root", password="", db="mydb",
charset="utf8mb4",
)
try:
async with conn.cursor() as cur:
await cur.execute("SELECT VERSION()")
(version,) = await cur.fetchone()
print(f"MySQL {version}")
finally:
conn.close() # synchronous; flushes and closes socketconn.close()await conn.ensure_closed()aiomysql.saaiomysql.saimport asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(128)),
sa.Column("email", sa.String(255)),
)
async def main() -> None:
engine = await create_engine(
user="appuser", password="secret",
host="127.0.0.1", db="mydb",
)
async with engine.acquire() as conn:
async with conn.begin() as tx:
await conn.execute(users.insert().values(name="Alice", email="a@x.com"))
await tx.commit()
result = await conn.execute(users.select())
async for row in result:
print(row.id, row.name)
engine.close()
await engine.wait_closed()
asyncio.run(main())SAConnectionaiomysql.Connection.begin().begin_nested().scalar().execute()awaitcur.close()awaitasync with conn.cursor() as curpool_recyclewait_timeoutpool_recyclefetchall()fetchmany(size=N)SSCursorautocommit=Trueawait conn.commit()looploopconnect()create_pool()references/connection-pool.mdpool_recyclereferences/transactions-cursors.mdcallproc