appwrite-python

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Appwrite Python SDK

Appwrite Python SDK

Installation

安装

bash
pip install appwrite
bash
pip install appwrite

Setting Up the Client

客户端设置

python
from appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tablesdb import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider

import os

client = (Client()
    .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
    .set_project(os.environ['APPWRITE_PROJECT_ID'])
    .set_key(os.environ['APPWRITE_API_KEY']))
python
from appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tablesdb import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider

import os

client = (Client()
    .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
    .set_project(os.environ['APPWRITE_PROJECT_ID'])
    .set_key(os.environ['APPWRITE_API_KEY']))

Code Examples

代码示例

User Management

用户管理

python
users = Users(client)
python
users = Users(client)

Create user

Create user

user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')
user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')

List users

List users

result = users.list([Query.limit(25)])
result = users.list([Query.limit(25)])

Get user

Get user

fetched = users.get('[USER_ID]')
fetched = users.get('[USER_ID]')

Delete user

Delete user

users.delete('[USER_ID]')
undefined
users.delete('[USER_ID]')
undefined

Database Operations

数据库操作

Note: Use
TablesDB
(not the deprecated
Databases
class) for all new code. Only use
Databases
if the existing codebase already relies on it or the user explicitly requests it.
Tip: Prefer keyword arguments (e.g.,
database_id='...'
) over positional arguments for all SDK method calls. Only use positional style if the existing codebase already uses it or the user explicitly requests it.
python
tables_db = TablesDB(client)
注意: 所有新代码请使用
TablesDB
(而非已弃用的
Databases
类)。仅当现有代码库已依赖
Databases
或用户明确要求时,才使用该类。
提示: 对于所有SDK方法调用,优先使用关键字参数(例如
database_id='...'
)而非位置参数。仅当现有代码库已使用位置参数或用户明确要求时,才使用该风格。
python
tables_db = TablesDB(client)

Create database

Create database

db = tables_db.create(ID.unique(), 'My Database')
db = tables_db.create(ID.unique(), 'My Database')

Create row

Create row

doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), { 'title': 'Hello World' })
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), { 'title': 'Hello World' })

Query rows

Query rows

results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [ Query.equal('title', 'Hello World'), Query.limit(10) ])
results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [ Query.equal('title', 'Hello World'), Query.limit(10) ])

Get row

Get row

row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')

Update row

Update row

tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', { 'title': 'Updated' })
tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', { 'title': 'Updated' })

Delete row

Delete row

tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
undefined
tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
undefined

String Column Types

字符串列类型

Note: The legacy
string
type is deprecated. Use explicit column types for all new columns.
TypeMax charactersIndexingStorage
varchar
16,383Full index (if size ≤ 768)Inline in row
text
16,383Prefix onlyOff-page
mediumtext
4,194,303Prefix onlyOff-page
longtext
1,073,741,823Prefix onlyOff-page
  • varchar
    is stored inline and counts towards the 64 KB row size limit. Prefer for short, indexed fields like names, slugs, or identifiers.
  • text
    ,
    mediumtext
    , and
    longtext
    are stored off-page (only a 20-byte pointer lives in the row), so they don't consume the row size budget.
    size
    is not required for these types.
python
undefined
注意: 旧版
string
类型已被弃用。所有新列请使用明确的列类型。
类型最大字符数索引存储方式
varchar
16,383全索引(若长度≤768)行内存储
text
16,383仅前缀索引页外存储
mediumtext
4,194,303仅前缀索引页外存储
longtext
1,073,741,823仅前缀索引页外存储
  • varchar
    存储在行内,占用64KB行大小限制。适合短的、需要索引的字段,如名称、别名或标识符。
  • text
    mediumtext
    longtext
    存储在页外(行内仅保留20字节指针),因此不会占用行大小配额。这些类型无需指定
    size
python
undefined

Create table with explicit string column types

Create table with explicit string column types

tables_db.create_table( database_id='[DATABASE_ID]', table_id=ID.unique(), name='articles', columns=[ {'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable {'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only {'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars {'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars ] )
undefined
tables_db.create_table( database_id='[DATABASE_ID]', table_id=ID.unique(), name='articles', columns=[ {'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable {'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only {'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars {'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars ] )
undefined

Query Methods

查询方法

python
undefined
python
undefined

Filtering

Filtering

Query.equal('field', 'value') # == (or pass list for IN) Query.not_equal('field', 'value') # != Query.less_than('field', 100) # < Query.less_than_equal('field', 100) # <= Query.greater_than('field', 100) # > Query.greater_than_equal('field', 100) # >= Query.between('field', 1, 100) # 1 <= field <= 100 Query.is_null('field') # is null Query.is_not_null('field') # is not null Query.starts_with('field', 'prefix') # starts with Query.ends_with('field', 'suffix') # ends with Query.contains('field', 'sub') # contains (string or array) Query.search('field', 'keywords') # full-text search (requires index)
Query.equal('field', 'value') # == (or pass list for IN) Query.not_equal('field', 'value') # != Query.less_than('field', 100) # < Query.less_than_equal('field', 100) # <= Query.greater_than('field', 100) # > Query.greater_than_equal('field', 100) # >= Query.between('field', 1, 100) # 1 <= field <= 100 Query.is_null('field') # is null Query.is_not_null('field') # is not null Query.starts_with('field', 'prefix') # starts with Query.ends_with('field', 'suffix') # ends with Query.contains('field', 'sub') # contains (string or array) Query.search('field', 'keywords') # full-text search (requires index)

Sorting

Sorting

Query.order_asc('field') Query.order_desc('field')
Query.order_asc('field') Query.order_desc('field')

Pagination

Pagination

Query.limit(25) # max rows (default 25, max 100) Query.offset(0) # skip N rows Query.cursor_after('[ROW_ID]') # cursor pagination (preferred) Query.cursor_before('[ROW_ID]')
Query.limit(25) # max rows (default 25, max 100) Query.offset(0) # skip N rows Query.cursor_after('[ROW_ID]') # cursor pagination (preferred) Query.cursor_before('[ROW_ID]')

Selection & Logic

Selection & Logic

Query.select(['field1', 'field2']) # return only specified fields Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)
undefined
Query.select(['field1', 'field2']) # return only specified fields Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)
undefined

File Storage

文件存储

python
from appwrite.input_file import InputFile

storage = Storage(client)
python
from appwrite.input_file import InputFile

storage = Storage(client)

Upload file

Upload file

file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))

List files

List files

files = storage.list_files('[BUCKET_ID]')
files = storage.list_files('[BUCKET_ID]')

Delete file

Delete file

storage.delete_file('[BUCKET_ID]', '[FILE_ID]')
undefined
storage.delete_file('[BUCKET_ID]', '[FILE_ID]')
undefined

InputFile Factory Methods

InputFile工厂方法

python
from appwrite.input_file import InputFile

InputFile.from_path('/path/to/file.png')             # from filesystem path
InputFile.from_bytes(byte_data, 'file.png')          # from bytes
InputFile.from_string('Hello world', 'hello.txt')    # from string content
python
from appwrite.input_file import InputFile

InputFile.from_path('/path/to/file.png')             # from filesystem path
InputFile.from_bytes(byte_data, 'file.png')          # from bytes
InputFile.from_string('Hello world', 'hello.txt')    # from string content

Teams

团队管理

python
from appwrite.services.teams import Teams

teams = Teams(client)
python
from appwrite.services.teams import Teams

teams = Teams(client)

Create team

Create team

team = teams.create(ID.unique(), 'Engineering')
team = teams.create(ID.unique(), 'Engineering')

List teams

List teams

team_list = teams.list()
team_list = teams.list()

Create membership (invite user by email)

Create membership (invite user by email)

membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')
membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')

List memberships

List memberships

members = teams.list_memberships('[TEAM_ID]')
members = teams.list_memberships('[TEAM_ID]')

Update membership roles

Update membership roles

teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])
teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])

Delete team

Delete team

teams.delete('[TEAM_ID]')

> **Role-based access:** Use `Role.team('[TEAM_ID]')` for all team members or `Role.team('[TEAM_ID]', 'editor')` for a specific team role when setting permissions.
teams.delete('[TEAM_ID]')

> **基于角色的访问控制:** 设置权限时,使用`Role.team('[TEAM_ID]')`表示所有团队成员,或使用`Role.team('[TEAM_ID]', 'editor')`表示特定团队角色。

Serverless Functions

无服务器函数

python
functions = Functions(client)
python
functions = Functions(client)

Execute function

Execute function

execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')
execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')

List executions

List executions

executions = functions.list_executions('[FUNCTION_ID]')
undefined
executions = functions.list_executions('[FUNCTION_ID]')
undefined

Writing a Function Handler (Python runtime)

编写函数处理程序(Python运行时)

python
undefined
python
undefined

src/main.py — Appwrite Function entry point

src/main.py — Appwrite Function entry point

def main(context): # context.req — request object # .body — raw request body (string) # .body_json — parsed JSON body (dict, or None if not JSON) # .headers — request headers (dict) # .method — HTTP method (GET, POST, etc.) # .path — URL path # .query — parsed query parameters (dict) # .query_string — raw query string
context.log('Processing: ' + context.req.method + ' ' + context.req.path)

if context.req.method == 'GET':
    return context.res.json({'message': 'Hello from Appwrite Function!'})

data = context.req.body_json or {}
if 'name' not in data:
    context.error('Missing name field')
    return context.res.json({'error': 'Name is required'}, 400)

# Response methods
return context.res.json({'success': True})                    # JSON response
# return context.res.text('Hello')                           # plain text
# return context.res.empty()                                 # 204 No Content
# return context.res.redirect('https://example.com')         # 302 Redirect
# return context.res.send('data', 200, {'X-Custom': '1'})   # custom response
undefined
def main(context): # context.req — request object # .body — raw request body (string) # .body_json — parsed JSON body (dict, or None if not JSON) # .headers — request headers (dict) # .method — HTTP method (GET, POST, etc.) # .path — URL path # .query — parsed query parameters (dict) # .query_string — raw query string
context.log('Processing: ' + context.req.method + ' ' + context.req.path)

if context.req.method == 'GET':
    return context.res.json({'message': 'Hello from Appwrite Function!'})

data = context.req.body_json or {}
if 'name' not in data:
    context.error('Missing name field')
    return context.res.json({'error': 'Name is required'}, 400)

# Response methods
return context.res.json({'success': True})                    # JSON response
# return context.res.text('Hello')                           # plain text
# return context.res.empty()                                 # 204 No Content
# return context.res.redirect('https://example.com')         # 302 Redirect
# return context.res.send('data', 200, {'X-Custom': '1'})   # custom response
undefined

Server-Side Rendering (SSR) Authentication

服务端渲染(SSR)认证

SSR apps (Flask, Django, FastAPI, etc.) use the server SDK to handle auth. You need two clients:
  • Admin client — uses an API key, creates sessions, bypasses rate limits (reusable singleton)
  • Session client — uses a session cookie, acts on behalf of a user (create per-request, never share)
python
from appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirect
SSR应用(Flask、Django、FastAPI等)使用服务器SDK处理认证。您需要两个客户端:
  • 管理员客户端 — 使用API密钥,可创建会话,绕过速率限制(可复用单例)
  • 会话客户端 — 使用会话Cookie,代表用户执行操作(每个请求创建一次,切勿共享)
python
from appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirect

Admin client (reusable)

Admin client (reusable)

admin_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]') .set_key(os.environ['APPWRITE_API_KEY']))
admin_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]') .set_key(os.environ['APPWRITE_API_KEY']))

Session client (create per-request)

Session client (create per-request)

session_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]'))
session = request.cookies.get('a_session_[PROJECT_ID]') if session: session_client.set_session(session)
undefined
session_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]'))
session = request.cookies.get('a_session_[PROJECT_ID]') if session: session_client.set_session(session)
undefined

Email/Password Login

邮箱/密码登录

python
@app.post('/login')
def login():
    account = Account(admin_client)
    session = account.create_email_password_session(
        request.json['email'], request.json['password']
    )

    # Cookie name must be a_session_<PROJECT_ID>
    resp = make_response(jsonify({'success': True}))
    resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
                    httponly=True, secure=True, samesite='Strict',
                    expires=session['expire'], path='/')
    return resp
python
@app.post('/login')
def login():
    account = Account(admin_client)
    session = account.create_email_password_session(
        request.json['email'], request.json['password']
    )

    # Cookie name must be a_session_<PROJECT_ID>
    resp = make_response(jsonify({'success': True}))
    resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
                    httponly=True, secure=True, samesite='Strict',
                    expires=session['expire'], path='/')
    return resp

Authenticated Requests

已认证请求

python
@app.get('/user')
def get_user():
    session = request.cookies.get('a_session_[PROJECT_ID]')
    if not session:
        return jsonify({'error': 'Unauthorized'}), 401

    session_client = (Client()
        .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
        .set_project('[PROJECT_ID]')
        .set_session(session))

    account = Account(session_client)
    return jsonify(account.get())
python
@app.get('/user')
def get_user():
    session = request.cookies.get('a_session_[PROJECT_ID]')
    if not session:
        return jsonify({'error': 'Unauthorized'}), 401

    session_client = (Client()
        .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
        .set_project('[PROJECT_ID]')
        .set_session(session))

    account = Account(session_client)
    return jsonify(account.get())

OAuth2 SSR Flow

OAuth2 SSR流程

python
undefined
python
undefined

Step 1: Redirect to OAuth provider

Step 1: Redirect to OAuth provider

@app.get('/oauth') def oauth(): account = Account(admin_client) redirect_url = account.create_o_auth2_token( OAuthProvider.Github, 'https://example.com/oauth/success', 'https://example.com/oauth/failure', ) return redirect(redirect_url)
@app.get('/oauth') def oauth(): account = Account(admin_client) redirect_url = account.create_o_auth2_token( OAuthProvider.Github, 'https://example.com/oauth/success', 'https://example.com/oauth/failure', ) return redirect(redirect_url)

Step 2: Handle callback — exchange token for session

Step 2: Handle callback — exchange token for session

@app.get('/oauth/success') def oauth_success(): account = Account(admin_client) session = account.create_session(request.args['userId'], request.args['secret'])
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
                httponly=True, secure=True, samesite='Strict',
                expires=session['expire'], path='/')
return resp

> **Cookie security:** Always use `httponly`, `secure`, and `samesite='Strict'` to prevent XSS. The cookie name must be `a_session_<PROJECT_ID>`.

> **Forwarding user agent:** Call `session_client.set_forwarded_user_agent(request.headers.get('user-agent'))` to record the end-user's browser info for debugging and security.
@app.get('/oauth/success') def oauth_success(): account = Account(admin_client) session = account.create_session(request.args['userId'], request.args['secret'])
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
                httponly=True, secure=True, samesite='Strict',
                expires=session['expire'], path='/')
return resp

> **Cookie安全:** 始终使用`httponly`、`secure`和`samesite='Strict'`以防止XSS攻击。Cookie名称必须为`a_session_<PROJECT_ID>`。
>
> **转发用户代理:** 调用`session_client.set_forwarded_user_agent(request.headers.get('user-agent'))`以记录终端用户的浏览器信息,用于调试和安全目的。

Error Handling

错误处理

python
from appwrite.exception import AppwriteException

try:
    row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
    print(e.message)    # human-readable error message
    print(e.code)       # HTTP status code (int)
    print(e.type)       # Appwrite error type string (e.g. 'document_not_found')
    print(e.response)   # full response body (dict)
Common error codes:
CodeMeaning
401
Unauthorized — missing or invalid session/API key
403
Forbidden — insufficient permissions for this action
404
Not found — resource does not exist
409
Conflict — duplicate ID or unique constraint violation
429
Rate limited — too many requests, retry after backoff
python
from appwrite.exception import AppwriteException

try:
    row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
    print(e.message)    # human-readable error message
    print(e.code)       # HTTP status code (int)
    print(e.type)       # Appwrite error type string (e.g. 'document_not_found')
    print(e.response)   # full response body (dict)
常见错误码:
代码含义
401
未授权 — 缺少或无效的会话/API密钥
403
禁止访问 — 操作权限不足
404
未找到 — 资源不存在
409
冲突 — 重复ID或唯一约束违反
429
请求受限 — 请求过多,请在退避后重试

Permissions & Roles (Critical)

权限与角色(关键)

Appwrite uses permission strings to control access to resources. Each permission pairs an action (
read
,
update
,
delete
,
create
, or
write
which grants create + update + delete) with a role target. By default, no user has access unless permissions are explicitly set at the document/file level or inherited from the collection/bucket settings. Permissions are arrays of strings built with the
Permission
and
Role
helpers.
python
from appwrite.permission import Permission
from appwrite.role import Role
Appwrite使用权限字符串控制资源访问。每个权限将操作(
read
update
delete
create
,或
write
(包含create+update+delete))与角色目标配对。默认情况下,所有用户均无访问权限,除非在文档/文件级别明确设置权限,或从集合/存储桶设置继承权限。权限是使用
Permission
Role
助手构建的字符串数组。
python
from appwrite.permission import Permission
from appwrite.role import Role

Database Row with Permissions

带权限的数据库行

python
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
    'title': 'Hello World'
}, [
    Permission.read(Role.user('[USER_ID]')),     # specific user can read
    Permission.update(Role.user('[USER_ID]')),   # specific user can update
    Permission.read(Role.team('[TEAM_ID]')),     # all team members can read
    Permission.read(Role.any()),                 # anyone (including guests) can read
])
python
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
    'title': 'Hello World'
}, [
    Permission.read(Role.user('[USER_ID]')),     # specific user can read
    Permission.update(Role.user('[USER_ID]')),   # specific user can update
    Permission.read(Role.team('[TEAM_ID]')),     # all team members can read
    Permission.read(Role.any()),                 # anyone (including guests) can read
])

File Upload with Permissions

带权限的文件上传

python
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
    Permission.read(Role.any()),
    Permission.update(Role.user('[USER_ID]')),
    Permission.delete(Role.user('[USER_ID]')),
])
When to set permissions: Set document/file-level permissions when you need per-resource access control. If all documents in a collection share the same rules, configure permissions at the collection/bucket level and leave document permissions empty.
Common mistakes:
  • Forgetting permissions — the resource becomes inaccessible to all users (including the creator)
  • Role.any()
    with
    write
    /
    update
    /
    delete
    — allows any user, including unauthenticated guests, to modify or remove the resource
  • Permission.read(Role.any())
    on sensitive data
    — makes the resource publicly readable
python
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
    Permission.read(Role.any()),
    Permission.update(Role.user('[USER_ID]')),
    Permission.delete(Role.user('[USER_ID]')),
])
何时设置权限: 当您需要按资源进行访问控制时,设置文档/文件级权限。如果集合中的所有文档共享相同规则,请在集合/存储桶级别配置权限,并保持文档权限为空。
常见错误:
  • 忘记设置权限 — 资源对所有用户(包括创建者)均不可访问
  • write
    /
    update
    /
    delete
    使用
    Role.any()
    — 允许任何用户(包括未认证访客)修改或删除资源
  • 对敏感数据使用
    Permission.read(Role.any())
    — 使资源可公开读取