Loading...
Loading...
Appwrite Python SDK skill. Use when building server-side Python applications with Appwrite, including Django, Flask, and FastAPI integrations. Covers user management, database/table CRUD, file storage, and functions via API keys.
npx skill4agent add appwrite/agent-skills appwrite-pythonpip install appwritefrom appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tablesdb import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider
import os
client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project(os.environ['APPWRITE_PROJECT_ID'])
.set_key(os.environ['APPWRITE_API_KEY']))users = Users(client)
# Create user
user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')
# List users
result = users.list([Query.limit(25)])
# Get user
fetched = users.get('[USER_ID]')
# Delete user
users.delete('[USER_ID]')Note: Use(not the deprecatedTablesDBclass) for all new code. Only useDatabasesif the existing codebase already relies on it or the user explicitly requests it.DatabasesTip: Prefer keyword arguments (e.g.,) over positional arguments for all SDK method calls. Only use positional style if the existing codebase already uses it or the user explicitly requests it.database_id='...'
tables_db = TablesDB(client)
# Create database
db = tables_db.create(ID.unique(), 'My Database')
# Create row
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
})
# Query rows
results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [
Query.equal('title', 'Hello World'),
Query.limit(10)
])
# Get row
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
# Update row
tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', {
'title': 'Updated'
})
# Delete row
tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')Note: The legacytype is deprecated. Use explicit column types for all new columns.string
| Type | Max characters | Indexing | Storage |
|---|---|---|---|
| 16,383 | Full index (if size ≤ 768) | Inline in row |
| 16,383 | Prefix only | Off-page |
| 4,194,303 | Prefix only | Off-page |
| 1,073,741,823 | Prefix only | Off-page |
varchartextmediumtextlongtextsize# Create table with explicit string column types
tables_db.create_table(
database_id='[DATABASE_ID]',
table_id=ID.unique(),
name='articles',
columns=[
{'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable
{'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only
{'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars
{'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars
]
)# Filtering
Query.equal('field', 'value') # == (or pass list for IN)
Query.not_equal('field', 'value') # !=
Query.less_than('field', 100) # <
Query.less_than_equal('field', 100) # <=
Query.greater_than('field', 100) # >
Query.greater_than_equal('field', 100) # >=
Query.between('field', 1, 100) # 1 <= field <= 100
Query.is_null('field') # is null
Query.is_not_null('field') # is not null
Query.starts_with('field', 'prefix') # starts with
Query.ends_with('field', 'suffix') # ends with
Query.contains('field', 'sub') # contains (string or array)
Query.search('field', 'keywords') # full-text search (requires index)
# Sorting
Query.order_asc('field')
Query.order_desc('field')
# Pagination
Query.limit(25) # max rows (default 25, max 100)
Query.offset(0) # skip N rows
Query.cursor_after('[ROW_ID]') # cursor pagination (preferred)
Query.cursor_before('[ROW_ID]')
# Selection & Logic
Query.select(['field1', 'field2']) # return only specified fields
Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR
Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)from appwrite.input_file import InputFile
storage = Storage(client)
# Upload file
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))
# List files
files = storage.list_files('[BUCKET_ID]')
# Delete file
storage.delete_file('[BUCKET_ID]', '[FILE_ID]')from appwrite.input_file import InputFile
InputFile.from_path('/path/to/file.png') # from filesystem path
InputFile.from_bytes(byte_data, 'file.png') # from bytes
InputFile.from_string('Hello world', 'hello.txt') # from string contentfrom appwrite.services.teams import Teams
teams = Teams(client)
# Create team
team = teams.create(ID.unique(), 'Engineering')
# List teams
team_list = teams.list()
# Create membership (invite user by email)
membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')
# List memberships
members = teams.list_memberships('[TEAM_ID]')
# Update membership roles
teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])
# Delete team
teams.delete('[TEAM_ID]')Role-based access: Usefor all team members orRole.team('[TEAM_ID]')for a specific team role when setting permissions.Role.team('[TEAM_ID]', 'editor')
functions = Functions(client)
# Execute function
execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')
# List executions
executions = functions.list_executions('[FUNCTION_ID]')# src/main.py — Appwrite Function entry point
def main(context):
# context.req — request object
# .body — raw request body (string)
# .body_json — parsed JSON body (dict, or None if not JSON)
# .headers — request headers (dict)
# .method — HTTP method (GET, POST, etc.)
# .path — URL path
# .query — parsed query parameters (dict)
# .query_string — raw query string
context.log('Processing: ' + context.req.method + ' ' + context.req.path)
if context.req.method == 'GET':
return context.res.json({'message': 'Hello from Appwrite Function!'})
data = context.req.body_json or {}
if 'name' not in data:
context.error('Missing name field')
return context.res.json({'error': 'Name is required'}, 400)
# Response methods
return context.res.json({'success': True}) # JSON response
# return context.res.text('Hello') # plain text
# return context.res.empty() # 204 No Content
# return context.res.redirect('https://example.com') # 302 Redirect
# return context.res.send('data', 200, {'X-Custom': '1'}) # custom responsefrom appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirect
# Admin client (reusable)
admin_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_key(os.environ['APPWRITE_API_KEY']))
# Session client (create per-request)
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]'))
session = request.cookies.get('a_session_[PROJECT_ID]')
if session:
session_client.set_session(session)@app.post('/login')
def login():
account = Account(admin_client)
session = account.create_email_password_session(
request.json['email'], request.json['password']
)
# Cookie name must be a_session_<PROJECT_ID>
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return resp@app.get('/user')
def get_user():
session = request.cookies.get('a_session_[PROJECT_ID]')
if not session:
return jsonify({'error': 'Unauthorized'}), 401
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_session(session))
account = Account(session_client)
return jsonify(account.get())# Step 1: Redirect to OAuth provider
@app.get('/oauth')
def oauth():
account = Account(admin_client)
redirect_url = account.create_o_auth2_token(
OAuthProvider.Github,
'https://example.com/oauth/success',
'https://example.com/oauth/failure',
)
return redirect(redirect_url)
# Step 2: Handle callback — exchange token for session
@app.get('/oauth/success')
def oauth_success():
account = Account(admin_client)
session = account.create_session(request.args['userId'], request.args['secret'])
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return respCookie security: Always use,httponly, andsecureto prevent XSS. The cookie name must besamesite='Strict'.a_session_<PROJECT_ID>
Forwarding user agent: Callto record the end-user's browser info for debugging and security.session_client.set_forwarded_user_agent(request.headers.get('user-agent'))
from appwrite.exception import AppwriteException
try:
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
print(e.message) # human-readable error message
print(e.code) # HTTP status code (int)
print(e.type) # Appwrite error type string (e.g. 'document_not_found')
print(e.response) # full response body (dict)| Code | Meaning |
|---|---|
| Unauthorized — missing or invalid session/API key |
| Forbidden — insufficient permissions for this action |
| Not found — resource does not exist |
| Conflict — duplicate ID or unique constraint violation |
| Rate limited — too many requests, retry after backoff |
readupdatedeletecreatewritePermissionRolefrom appwrite.permission import Permission
from appwrite.role import Roledoc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
}, [
Permission.read(Role.user('[USER_ID]')), # specific user can read
Permission.update(Role.user('[USER_ID]')), # specific user can update
Permission.read(Role.team('[TEAM_ID]')), # all team members can read
Permission.read(Role.any()), # anyone (including guests) can read
])file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
Permission.read(Role.any()),
Permission.update(Role.user('[USER_ID]')),
Permission.delete(Role.user('[USER_ID]')),
])When to set permissions: Set document/file-level permissions when you need per-resource access control. If all documents in a collection share the same rules, configure permissions at the collection/bucket level and leave document permissions empty.
Common mistakes:
- Forgetting permissions — the resource becomes inaccessible to all users (including the creator)
withRole.any()/write/update— allows any user, including unauthenticated guests, to modify or remove the resourcedelete on sensitive data — makes the resource publicly readablePermission.read(Role.any())