azure-cosmosdb
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAzure Cosmos DB Skill
Azure Cosmos DB 技能指南
Load with: base.md + [typescript.md | python.md]
Azure Cosmos DB is a globally distributed, multi-model database with guaranteed low latency, elastic scalability, and multiple consistency models.
加载方式:base.md + [typescript.md | python.md]
Azure Cosmos DB 是一款全球分布式多模型数据库,可保证低延迟、弹性伸缩,并支持多种一致性模型。
Core Principle
核心原则
Choose partition key wisely, design for your access patterns, understand consistency tradeoffs.
Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.
谨慎选择分区键,针对访问模式设计,理解一致性权衡。
Cosmos DB 会跨分区分配数据。分区键的选择将决定系统的可扩展性、性能和成本。设计时需保证数据均匀分布,并提升查询效率。
Cosmos DB APIs
Cosmos DB API
| API | Use Case |
|---|---|
| NoSQL (Core) | Document database, most flexible |
| MongoDB | MongoDB wire protocol compatible |
| PostgreSQL | Distributed PostgreSQL (Citus) |
| Apache Cassandra | Wide-column store |
| Apache Gremlin | Graph database |
| Table | Key-value (Azure Table Storage compatible) |
This skill focuses on NoSQL (Core) API - the most common choice.
| API | 适用场景 |
|---|---|
| NoSQL (Core) | 文档型数据库,灵活性最高 |
| MongoDB | 兼容 MongoDB 有线协议 |
| PostgreSQL | 分布式 PostgreSQL(基于 Citus) |
| Apache Cassandra | 宽列存储 |
| Apache Gremlin | 图数据库 |
| Table | 键值存储(兼容 Azure 表存储) |
本技能指南重点介绍 NoSQL (Core) API - 最常用的选择。
Key Concepts
关键概念
| Concept | Description |
|---|---|
| Container | Collection of items (like a table) |
| Item | Single document/record (JSON) |
| Partition Key | Determines data distribution |
| Logical Partition | Items with same partition key |
| Physical Partition | Storage unit (max 50GB, 10K RU/s) |
| RU (Request Unit) | Throughput currency |
| 概念 | 说明 |
|---|---|
| Container(容器) | 项的集合(类似表) |
| Item(项) | 单个文档/记录(JSON 格式) |
| Partition Key(分区键) | 决定数据的分配方式 |
| Logical Partition(逻辑分区) | 拥有相同分区键的项集合 |
| Physical Partition(物理分区) | 存储单元(最大 50GB,10K RU/s) |
| RU (Request Unit,请求单元) | 吞吐量计算单位 |
Partition Key Design
分区键设计
Good Partition Keys
优质分区键示例
typescript
// High cardinality, even distribution, used in queries
// E-commerce: userId for user data
{ "id": "order-123", "userId": "user-456", ... } // PK: /userId
// Multi-tenant: tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId
// IoT: deviceId for telemetry
{ "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId
// Logs: synthetic key (date + category)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKeytypescript
// 高基数、均匀分布、常用于查询
// 电商场景:用 userId 存储用户数据
{ "id": "order-123", "userId": "user-456", ... } // 分区键: /userId
// 多租户场景:tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... } // 分区键: /tenantId
// IoT 场景:用 deviceId 存储遥测数据
{ "id": "reading-1", "deviceId": "device-789", ... } // 分区键: /deviceId
// 日志场景:合成键(日期 + 类别)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // 分区键: /partitionKeyHierarchical Partition Keys
分层分区键
typescript
// For multi-level distribution (e.g., tenant → user)
// Container created with: /tenantId, /userId
{
"id": "order-123",
"tenantId": "acme-corp",
"userId": "user-456",
"items": [...]
}
// Query within tenant and user efficientlytypescript
// 用于多级分配(例如:租户 → 用户)
// 创建容器时指定: /tenantId, /userId
{
"id": "order-123",
"tenantId": "acme-corp",
"userId": "user-456",
"items": [...]
}
// 可高效查询指定租户和用户的数据Bad Partition Keys
不良分区键示例
typescript
// Avoid:
// - Low cardinality (status, type, boolean)
// - Monotonically increasing (timestamp, auto-increment)
// - Frequently updated fields
// - Fields not used in queries
// Bad: Only 3 values → hot partitions
{ "status": "pending" | "completed" | "cancelled" }
// Bad: All writes go to latest partition
{ "timestamp": "2024-01-15T10:30:00Z" }typescript
// 应避免:
// - 低基数(状态、类型、布尔值)
// - 单调递增(时间戳、自增 ID)
// - 频繁更新的字段
// - 查询中不使用的字段
// 不良示例:仅 3 个值 → 热点分区
{ "status": "pending" | "completed" | "cancelled" }
// 不良示例:所有写入都指向最新分区
{ "timestamp": "2024-01-15T10:30:00Z" }SDK Setup (TypeScript)
SDK 配置(TypeScript)
Install
安装
bash
npm install @azure/cosmosbash
npm install @azure/cosmosInitialize Client
初始化客户端
typescript
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// Or with connection string
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container {
return database.container(containerId);
}typescript
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// 或使用连接字符串
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container {
return database.container(containerId);
}Type Definitions
类型定义
typescript
// types/cosmos.ts
export interface BaseItem {
id: string;
_ts?: number; // Auto-generated timestamp
_etag?: string; // For optimistic concurrency
}
export interface User extends BaseItem {
userId: string; // Partition key
email: string;
name: string;
createdAt: string;
updatedAt: string;
}
export interface Order extends BaseItem {
userId: string; // Partition key
orderId: string;
items: OrderItem[];
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
createdAt: string;
}
export interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}typescript
// types/cosmos.ts
export interface BaseItem {
id: string;
_ts?: number; // 自动生成的时间戳
_etag?: string; // 用于乐观并发
}
export interface User extends BaseItem {
userId: string; // 分区键
email: string;
name: string;
createdAt: string;
updatedAt: string;
}
export interface Order extends BaseItem {
userId: string; // 分区键
orderId: string;
items: OrderItem[];
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
createdAt: string;
}
export interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}CRUD Operations
CRUD 操作
Create Item
创建项
typescript
import { getContainer } from './cosmosdb';
import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
const now = new Date().toISOString();
const user: User = {
id: crypto.randomUUID(),
...data,
createdAt: now,
updatedAt: now
};
const { resource } = await usersContainer.items.create(user);
return resource as User;
}typescript
import { getContainer } from './cosmosdb';
import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
const now = new Date().toISOString();
const user: User = {
id: crypto.randomUUID(),
...data,
createdAt: now,
updatedAt: now
};
const { resource } = await usersContainer.items.create(user);
return resource as User;
}Read Item (Point Read)
读取项(点读取)
typescript
// Most efficient read - requires id AND partition key
async function getUser(userId: string, id: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(id, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// If id equals partition key value
async function getUserById(userId: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(userId, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}typescript
// 最高效的读取方式 - 需要 id 和分区键
async function getUser(userId: string, id: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(id, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// 当 id 等于分区键值时
async function getUserById(userId: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(userId, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}Query Items
查询项
typescript
// Query within partition (efficient)
async function getUserOrders(userId: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
})
.fetchAll();
return resources;
}
// Cross-partition query (use sparingly)
async function getOrdersByStatus(status: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.status = @status',
parameters: [{ name: '@status', value: status }]
})
.fetchAll();
return resources;
}
// Paginated query
async function getOrdersPaginated(
userId: string,
pageSize: number = 10,
continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query<Order>(
{
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
},
{
maxItemCount: pageSize,
continuationToken
}
);
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return {
items: resources,
continuationToken: nextToken
};
}typescript
// 分区内查询(高效)
async function getUserOrders(userId: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
})
.fetchAll();
return resources;
}
// 跨分区查询(谨慎使用)
async function getOrdersByStatus(status: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.status = @status',
parameters: [{ name: '@status', value: status }]
})
.fetchAll();
return resources;
}
// 分页查询
async function getOrdersPaginated(
userId: string,
pageSize: number = 10,
continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query<Order>(
{
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
},
{
maxItemCount: pageSize,
continuationToken
}
);
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return {
items: resources,
continuationToken: nextToken
};
}Update Item
更新项
typescript
// Replace entire item
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
const { resource } = await usersContainer.item(id, userId).replace(updated);
return resource as User;
}
// Partial update (patch operations)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
const { resource } = await usersContainer.item(id, userId).patch(operations);
return resource as User;
}
// Usage:
await patchUser('user-123', 'user-123', [
{ op: 'set', path: '/name', value: 'New Name' },
{ op: 'set', path: '/updatedAt', value: new Date().toISOString() },
{ op: 'incr', path: '/loginCount', value: 1 }
]);typescript
// 替换整个项
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('用户不存在');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
const { resource } = await usersContainer.item(id, userId).replace(updated);
return resource as User;
}
// 部分更新(补丁操作)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
const { resource } = await usersContainer.item(id, userId).patch(operations);
return resource as User;
}
// 使用示例:
await patchUser('user-123', 'user-123', [
{ op: 'set', path: '/name', value: '新名称' },
{ op: 'set', path: '/updatedAt', value: new Date().toISOString() },
{ op: 'incr', path: '/loginCount', value: 1 }
]);Delete Item
删除项
typescript
async function deleteUser(userId: string, id: string): Promise<void> {
await usersContainer.item(id, userId).delete();
}typescript
async function deleteUser(userId: string, id: string): Promise<void> {
await usersContainer.item(id, userId).delete();
}Optimistic Concurrency (ETags)
乐观并发(ETags)
typescript
async function updateUserWithETag(
userId: string,
id: string,
updates: Partial<User>,
etag: string
): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
try {
const { resource } = await usersContainer.item(id, userId).replace(updated, {
accessCondition: { type: 'IfMatch', condition: etag }
});
return resource as User;
} catch (error: any) {
if (error.code === 412) {
throw new Error('Document was modified by another process');
}
throw error;
}
}typescript
async function updateUserWithETag(
userId: string,
id: string,
updates: Partial<User>,
etag: string
): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('用户不存在');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
try {
const { resource } = await usersContainer.item(id, userId).replace(updated, {
accessCondition: { type: 'IfMatch', condition: etag }
});
return resource as User;
} catch (error: any) {
if (error.code === 412) {
throw new Error('文档已被其他进程修改');
}
throw error;
}
}Consistency Levels
一致性级别
| Level | Guarantees | Latency | Use Case |
|---|---|---|---|
| Strong | Linearizable reads | Highest | Financial, inventory |
| Bounded Staleness | Consistent within bounds | High | Leaderboards, counters |
| Session | Read your writes | Medium | User sessions (default) |
| Consistent Prefix | Ordered reads | Low | Social feeds |
| Eventual | No ordering guarantee | Lowest | Analytics, logs |
| 级别 | 保障 | 延迟 | 适用场景 |
|---|---|---|---|
| Strong(强一致性) | 线性化读取 | 最高 | 金融、库存管理 |
| Bounded Staleness(有限过期) | 在指定范围内保持一致 | 高 | 排行榜、计数器 |
| Session(会话一致性) | 读取自己写入的数据 | 中等 | 用户会话(默认) |
| Consistent Prefix(一致前缀) | 读取有序 | 低 | 社交信息流 |
| Eventual(最终一致性) | 无顺序保证 | 最低 | 分析、日志 |
Set Consistency Per Request
按请求设置一致性
typescript
// Override default consistency
const { resource } = await usersContainer.item(id, userId).read<User>({
consistencyLevel: 'Strong'
});
// For queries
const { resources } = await container.items.query(
{ query: 'SELECT * FROM c' },
{ consistencyLevel: 'BoundedStaleness' }
).fetchAll();typescript
// 覆盖默认一致性
const { resource } = await usersContainer.item(id, userId).read<User>({
consistencyLevel: 'Strong'
});
// 针对查询设置
const { resources } = await container.items.query(
{ query: 'SELECT * FROM c' },
{ consistencyLevel: 'BoundedStaleness' }
).fetchAll();Batch Operations
批量操作
Transactional Batch (Same Partition)
事务性批量(同一分区)
typescript
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
const ordersContainer = getContainer('orders');
const operations = [
{ operationType: 'Create' as const, resourceBody: order },
...items.map(item => ({
operationType: 'Create' as const,
resourceBody: { ...item, userId, orderId: order.orderId }
}))
];
const { result } = await ordersContainer.items.batch(operations, userId);
// Check if any operation failed
if (result.some(r => r.statusCode >= 400)) {
throw new Error('Batch operation failed');
}
}typescript
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
const ordersContainer = getContainer('orders');
const operations = [
{ operationType: 'Create' as const, resourceBody: order },
...items.map(item => ({
operationType: 'Create' as const,
resourceBody: { ...item, userId, orderId: order.orderId }
}))
];
const { result } = await ordersContainer.items.batch(operations, userId);
// 检查是否有操作失败
if (result.some(r => r.statusCode >= 400)) {
throw new Error('批量操作失败');
}
}Bulk Operations
大容量操作
typescript
// For large-scale imports (not transactional)
async function bulkImportUsers(users: User[]): Promise<void> {
const operations = users.map(user => ({
operationType: 'Create' as const,
resourceBody: user,
partitionKey: user.userId
}));
// Process in chunks
const chunkSize = 100;
for (let i = 0; i < operations.length; i += chunkSize) {
const chunk = operations.slice(i, i + chunkSize);
await usersContainer.items.bulk(chunk);
}
}typescript
// 用于大规模导入(非事务性)
async function bulkImportUsers(users: User[]): Promise<void> {
const operations = users.map(user => ({
operationType: 'Create' as const,
resourceBody: user,
partitionKey: user.userId
}));
// 分块处理
const chunkSize = 100;
for (let i = 0; i < operations.length; i += chunkSize) {
const chunk = operations.slice(i, i + chunkSize);
await usersContainer.items.bulk(chunk);
}
}Change Feed
更改源
Process Changes
处理更改
typescript
import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise<void> {
const container = getContainer('orders');
const changeFeedIterator = container.items.changeFeed({
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
});
while (changeFeedIterator.hasMoreResults) {
const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// No new changes
await sleep(1000);
continue;
}
for (const item of items) {
console.log('Changed item:', item);
// Process the change...
}
}
}
// For production, use Change Feed Processor with lease containertypescript
import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise<void> {
const container = getContainer('orders');
const changeFeedIterator = container.items.changeFeed({
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
});
while (changeFeedIterator.hasMoreResults) {
const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// 无新更改
await sleep(1000);
continue;
}
for (const item of items) {
console.log('更改的项:', item);
// 处理更改...
}
}
}
// 生产环境中,建议使用带租约容器的更改源处理器Change Feed Processor Pattern
更改源处理器模式
typescript
async function startChangeFeedProcessor(): Promise<void> {
const sourceContainer = getContainer('orders');
const leaseContainer = getContainer('leases');
const changeFeedProcessor = sourceContainer.items.changeFeed
.for(item => {
// Process each change
console.log('Processing:', item);
})
.withLeaseContainer(leaseContainer)
.build();
await changeFeedProcessor.start();
}typescript
async function startChangeFeedProcessor(): Promise<void> {
const sourceContainer = getContainer('orders');
const leaseContainer = getContainer('leases');
const changeFeedProcessor = sourceContainer.items.changeFeed
.for(item => {
// 处理每个更改
console.log('处理中:', item);
})
.withLeaseContainer(leaseContainer)
.build();
await changeFeedProcessor.start();
}Python SDK
Python SDK
Install
安装
bash
pip install azure-cosmosbash
pip install azure-cosmosSetup and Operations
配置与操作
python
undefinedpython
undefinedcosmos_db.py
cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid
Initialize client
初始化客户端
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
def get_container(container_name: str):
return database.get_container_client(container_name)
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
def get_container(container_name: str):
return database.get_container_client(container_name)
CRUD Operations
CRUD 操作
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict:
user_id = user_id or str(uuid.uuid4())
now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # Partition key
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)def get_user(user_id: str) -> Optional[dict]:
try:
return users_container.read_item(item=user_id, partition_key=user_id)
except CosmosResourceNotFoundError:
return None
def query_users(email_domain: str) -> List[dict]:
query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))def update_user(user_id: str, **updates) -> dict:
user = get_user(user_id)
if not user:
raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)def delete_user(user_id: str) -> None:
users_container.delete_item(item=user_id, partition_key=user_id)
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict:
user_id = user_id or str(uuid.uuid4())
now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # 分区键
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)def get_user(user_id: str) -> Optional[dict]:
try:
return users_container.read_item(item=user_id, partition_key=user_id)
except CosmosResourceNotFoundError:
return None
def query_users(email_domain: str) -> List[dict]:
query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))def update_user(user_id: str, **updates) -> dict:
user = get_user(user_id)
if not user:
raise ValueError('用户不存在')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)def delete_user(user_id: str) -> None:
users_container.delete_item(item=user_id, partition_key=user_id)
Paginated query
分页查询
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}
---def get_users_paginated(page_size: int = 10, continuation_token: str = None):
query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}
---Indexing
索引
Custom Indexing Policy
自定义索引策略
json
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/userId/?" },
{ "path": "/status/?" },
{ "path": "/createdAt/?" }
],
"excludedPaths": [
{ "path": "/content/*" },
{ "path": "/_etag/?" }
],
"compositeIndexes": [
[
{ "path": "/userId", "order": "ascending" },
{ "path": "/createdAt", "order": "descending" }
]
]
}json
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/userId/?" },
{ "path": "/status/?" },
{ "path": "/createdAt/?" }
],
"excludedPaths": [
{ "path": "/content/*" },
{ "path": "/_etag/?" }
],
"compositeIndexes": [
[
{ "path": "/userId", "order": "ascending" },
{ "path": "/createdAt", "order": "descending" }
]
]
}Create Container with Index
创建带索引的容器
typescript
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
indexingPolicy: {
indexingMode: 'consistent',
includedPaths: [
{ path: '/userId/?' },
{ path: '/status/?' },
{ path: '/createdAt/?' }
],
excludedPaths: [
{ path: '/*' } // Exclude all by default
]
}
});typescript
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
indexingPolicy: {
indexingMode: 'consistent',
includedPaths: [
{ path: '/userId/?' },
{ path: '/status/?' },
{ path: '/createdAt/?' }
],
excludedPaths: [
{ path: '/*' } // 默认排除所有路径
]
}
});Throughput Management
吞吐量管理
Provisioned Throughput
预配吞吐量
typescript
// Container level
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
throughput: 1000 // RU/s
});
// Scale throughput
const container = database.container('orders');
await container.throughput.replace(2000);typescript
// 容器级别
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
throughput: 1000 // RU/s
});
// 调整吞吐量
const container = database.container('orders');
await container.throughput.replace(2000);Autoscale
自动缩放
typescript
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
maxThroughput: 10000 // Auto-scales 10% to 100%
});typescript
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
maxThroughput: 10000 // 自动缩放范围 10% 到 100%
});Serverless
无服务器模式
typescript
// No throughput configuration needed
// Pay per request (good for dev/test, intermittent workloads)
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] }
// No throughput = serverless
});typescript
// 无需配置吞吐量
// 按请求付费(适合开发/测试、间歇性工作负载)
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] }
// 不设置吞吐量 = 无服务器模式
});CLI Quick Reference
CLI 快速参考
bash
undefinedbash
undefinedAzure CLI
Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create
--account-name myaccount
--database-name mydb
--name orders
--partition-key-path /userId
--throughput 400
--account-name myaccount
--database-name mydb
--name orders
--partition-key-path /userId
--throughput 400
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create
--account-name myaccount
--database-name mydb
--name orders
--partition-key-path /userId
--throughput 400
--account-name myaccount
--database-name mydb
--name orders
--partition-key-path /userId
--throughput 400
Query
查询
az cosmosdb sql query --account-name myaccount --database-name mydb
--container-name orders --query "SELECT * FROM c"
--container-name orders --query "SELECT * FROM c"
az cosmosdb sql query --account-name myaccount --database-name mydb
--container-name orders --query "SELECT * FROM c"
--container-name orders --query "SELECT * FROM c"
Keys
密钥
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings
---az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings
---Cost Optimization
成本优化
| Strategy | Impact |
|---|---|
| Right partition key | Avoid hot partitions (wasted RUs) |
| Index only what you query | Reduce write RU cost |
| Use point reads | 1 RU vs 3+ RU for queries |
| Serverless for dev/test | Pay per request |
| Autoscale for production | Scale down during low traffic |
| TTL for temporary data | Auto-delete old items |
| 策略 | 影响 |
|---|---|
| 选择合适的分区键 | 避免热点分区(浪费 RU) |
| 仅索引查询所需字段 | 降低写入 RU 成本 |
| 使用点读取 | 1 RU 对比查询的 3+ RU |
| 开发/测试用无服务器模式 | 按请求付费 |
| 生产环境用自动缩放 | 低流量时自动缩容 |
| 临时数据使用 TTL | 自动删除旧项 |
Time-to-Live (TTL)
生存时间(TTL)
typescript
// Enable TTL on container
await database.containers.createIfNotExists({
id: 'sessions',
partitionKey: { paths: ['/userId'] },
defaultTtl: 3600 // 1 hour
});
// Per-item TTL
const session = {
id: 'session-123',
userId: 'user-456',
ttl: 1800 // Override: 30 minutes
};typescript
// 在容器上启用 TTL
await database.containers.createIfNotExists({
id: 'sessions',
partitionKey: { paths: ['/userId'] },
defaultTtl: 3600 // 1 小时
});
// 单个项的 TTL
const session = {
id: 'session-123',
userId: 'user-456',
ttl: 1800 // 覆盖默认值:30 分钟
};Anti-Patterns
反模式
- Bad partition key - Low cardinality causes hot partitions
- Cross-partition queries - Expensive; design for single-partition queries
- Over-indexing - Increases write cost; index only queried paths
- Large items - Max 2MB; store blobs in Azure Blob Storage
- Ignoring RU cost - Monitor and optimize expensive queries
- Strong consistency everywhere - Use Session (default) unless required
- No retry logic - Handle 429 (throttling) with exponential backoff
- Missing TTL - Set TTL for temporary/session data
- 不良分区键 - 低基数导致热点分区
- 跨分区查询 - 成本高;应设计为单分区查询
- 过度索引 - 增加写入成本;仅索引查询路径
- 大项 - 最大 2MB;大对象存储在 Azure Blob Storage
- 忽略 RU 成本 - 监控并优化高成本查询
- 全局使用强一致性 - 除非必要,否则使用会话一致性(默认)
- 无重试逻辑 - 处理 429(限流)错误时使用指数退避
- 未设置 TTL - 为临时/会话数据设置 TTL