azure-cosmosdb

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Azure Cosmos DB Skill

Azure Cosmos DB 技能指南

Load with: base.md + [typescript.md | python.md]
Azure Cosmos DB is a globally distributed, multi-model database with guaranteed low latency, elastic scalability, and multiple consistency models.

加载方式:base.md + [typescript.md | python.md]
Azure Cosmos DB 是一款全球分布式多模型数据库,可保证低延迟、弹性伸缩,并支持多种一致性模型。

Core Principle

核心原则

Choose partition key wisely, design for your access patterns, understand consistency tradeoffs.
Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.

谨慎选择分区键,针对访问模式设计,理解一致性权衡。
Cosmos DB 会跨分区分配数据。分区键的选择将决定系统的可扩展性、性能和成本。设计时需保证数据均匀分布,并提升查询效率。

Cosmos DB APIs

Cosmos DB API

APIUse Case
NoSQL (Core)Document database, most flexible
MongoDBMongoDB wire protocol compatible
PostgreSQLDistributed PostgreSQL (Citus)
Apache CassandraWide-column store
Apache GremlinGraph database
TableKey-value (Azure Table Storage compatible)
This skill focuses on NoSQL (Core) API - the most common choice.

API适用场景
NoSQL (Core)文档型数据库,灵活性最高
MongoDB兼容 MongoDB 有线协议
PostgreSQL分布式 PostgreSQL(基于 Citus)
Apache Cassandra宽列存储
Apache Gremlin图数据库
Table键值存储(兼容 Azure 表存储)
本技能指南重点介绍 NoSQL (Core) API - 最常用的选择。

Key Concepts

关键概念

ConceptDescription
ContainerCollection of items (like a table)
ItemSingle document/record (JSON)
Partition KeyDetermines data distribution
Logical PartitionItems with same partition key
Physical PartitionStorage unit (max 50GB, 10K RU/s)
RU (Request Unit)Throughput currency

概念说明
Container(容器)项的集合(类似表)
Item(项)单个文档/记录(JSON 格式)
Partition Key(分区键)决定数据的分配方式
Logical Partition(逻辑分区)拥有相同分区键的项集合
Physical Partition(物理分区)存储单元(最大 50GB,10K RU/s)
RU (Request Unit,请求单元)吞吐量计算单位

Partition Key Design

分区键设计

Good Partition Keys

优质分区键示例

typescript
// High cardinality, even distribution, used in queries

// E-commerce: userId for user data
{ "id": "order-123", "userId": "user-456", ... }  // PK: /userId

// Multi-tenant: tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... }  // PK: /tenantId

// IoT: deviceId for telemetry
{ "id": "reading-1", "deviceId": "device-789", ... }  // PK: /deviceId

// Logs: synthetic key (date + category)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... }  // PK: /partitionKey
typescript
// 高基数、均匀分布、常用于查询

// 电商场景:用 userId 存储用户数据
{ "id": "order-123", "userId": "user-456", ... }  // 分区键: /userId

// 多租户场景:tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... }  // 分区键: /tenantId

// IoT 场景:用 deviceId 存储遥测数据
{ "id": "reading-1", "deviceId": "device-789", ... }  // 分区键: /deviceId

// 日志场景:合成键(日期 + 类别)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... }  // 分区键: /partitionKey

Hierarchical Partition Keys

分层分区键

typescript
// For multi-level distribution (e.g., tenant → user)
// Container created with: /tenantId, /userId

{
  "id": "order-123",
  "tenantId": "acme-corp",
  "userId": "user-456",
  "items": [...]
}

// Query within tenant and user efficiently
typescript
// 用于多级分配(例如:租户 → 用户)
// 创建容器时指定: /tenantId, /userId

{
  "id": "order-123",
  "tenantId": "acme-corp",
  "userId": "user-456",
  "items": [...]
}

// 可高效查询指定租户和用户的数据

Bad Partition Keys

不良分区键示例

typescript
// Avoid:
// - Low cardinality (status, type, boolean)
// - Monotonically increasing (timestamp, auto-increment)
// - Frequently updated fields
// - Fields not used in queries

// Bad: Only 3 values → hot partitions
{ "status": "pending" | "completed" | "cancelled" }

// Bad: All writes go to latest partition
{ "timestamp": "2024-01-15T10:30:00Z" }

typescript
// 应避免:
// - 低基数(状态、类型、布尔值)
// - 单调递增(时间戳、自增 ID)
// - 频繁更新的字段
// - 查询中不使用的字段

// 不良示例:仅 3 个值 → 热点分区
{ "status": "pending" | "completed" | "cancelled" }

// 不良示例:所有写入都指向最新分区
{ "timestamp": "2024-01-15T10:30:00Z" }

SDK Setup (TypeScript)

SDK 配置(TypeScript)

Install

安装

bash
npm install @azure/cosmos
bash
npm install @azure/cosmos

Initialize Client

初始化客户端

typescript
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';

const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;

const client = new CosmosClient({ endpoint, key });

// Or with connection string
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);

export const database: Database = client.database(databaseId);

export function getContainer(containerId: string): Container {
  return database.container(containerId);
}
typescript
// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';

const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;

const client = new CosmosClient({ endpoint, key });

// 或使用连接字符串
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);

export const database: Database = client.database(databaseId);

export function getContainer(containerId: string): Container {
  return database.container(containerId);
}

Type Definitions

类型定义

typescript
// types/cosmos.ts
export interface BaseItem {
  id: string;
  _ts?: number;      // Auto-generated timestamp
  _etag?: string;    // For optimistic concurrency
}

export interface User extends BaseItem {
  userId: string;    // Partition key
  email: string;
  name: string;
  createdAt: string;
  updatedAt: string;
}

export interface Order extends BaseItem {
  userId: string;    // Partition key
  orderId: string;
  items: OrderItem[];
  total: number;
  status: 'pending' | 'paid' | 'shipped' | 'delivered';
  createdAt: string;
}

export interface OrderItem {
  productId: string;
  name: string;
  quantity: number;
  price: number;
}

typescript
// types/cosmos.ts
export interface BaseItem {
  id: string;
  _ts?: number;      // 自动生成的时间戳
  _etag?: string;    // 用于乐观并发
}

export interface User extends BaseItem {
  userId: string;    // 分区键
  email: string;
  name: string;
  createdAt: string;
  updatedAt: string;
}

export interface Order extends BaseItem {
  userId: string;    // 分区键
  orderId: string;
  items: OrderItem[];
  total: number;
  status: 'pending' | 'paid' | 'shipped' | 'delivered';
  createdAt: string;
}

export interface OrderItem {
  productId: string;
  name: string;
  quantity: number;
  price: number;
}

CRUD Operations

CRUD 操作

Create Item

创建项

typescript
import { getContainer } from './cosmosdb';
import { User } from './types';

const usersContainer = getContainer('users');

async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
  const now = new Date().toISOString();
  const user: User = {
    id: crypto.randomUUID(),
    ...data,
    createdAt: now,
    updatedAt: now
  };

  const { resource } = await usersContainer.items.create(user);
  return resource as User;
}
typescript
import { getContainer } from './cosmosdb';
import { User } from './types';

const usersContainer = getContainer('users');

async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
  const now = new Date().toISOString();
  const user: User = {
    id: crypto.randomUUID(),
    ...data,
    createdAt: now,
    updatedAt: now
  };

  const { resource } = await usersContainer.items.create(user);
  return resource as User;
}

Read Item (Point Read)

读取项(点读取)

typescript
// Most efficient read - requires id AND partition key
async function getUser(userId: string, id: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(id, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

// If id equals partition key value
async function getUserById(userId: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(userId, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}
typescript
// 最高效的读取方式 - 需要 id 和分区键
async function getUser(userId: string, id: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(id, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

// 当 id 等于分区键值时
async function getUserById(userId: string): Promise<User | null> {
  try {
    const { resource } = await usersContainer.item(userId, userId).read<User>();
    return resource || null;
  } catch (error: any) {
    if (error.code === 404) return null;
    throw error;
  }
}

Query Items

查询项

typescript
// Query within partition (efficient)
async function getUserOrders(userId: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    })
    .fetchAll();

  return resources;
}

// Cross-partition query (use sparingly)
async function getOrdersByStatus(status: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.status = @status',
      parameters: [{ name: '@status', value: status }]
    })
    .fetchAll();

  return resources;
}

// Paginated query
async function getOrdersPaginated(
  userId: string,
  pageSize: number = 10,
  continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
  const ordersContainer = getContainer('orders');

  const queryIterator = ordersContainer.items.query<Order>(
    {
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    },
    {
      maxItemCount: pageSize,
      continuationToken
    }
  );

  const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();

  return {
    items: resources,
    continuationToken: nextToken
  };
}
typescript
// 分区内查询(高效)
async function getUserOrders(userId: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    })
    .fetchAll();

  return resources;
}

// 跨分区查询(谨慎使用)
async function getOrdersByStatus(status: string): Promise<Order[]> {
  const ordersContainer = getContainer('orders');

  const { resources } = await ordersContainer.items
    .query<Order>({
      query: 'SELECT * FROM c WHERE c.status = @status',
      parameters: [{ name: '@status', value: status }]
    })
    .fetchAll();

  return resources;
}

// 分页查询
async function getOrdersPaginated(
  userId: string,
  pageSize: number = 10,
  continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
  const ordersContainer = getContainer('orders');

  const queryIterator = ordersContainer.items.query<Order>(
    {
      query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
      parameters: [{ name: '@userId', value: userId }]
    },
    {
      maxItemCount: pageSize,
      continuationToken
    }
  );

  const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();

  return {
    items: resources,
    continuationToken: nextToken
  };
}

Update Item

更新项

typescript
// Replace entire item
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('User not found');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  const { resource } = await usersContainer.item(id, userId).replace(updated);
  return resource as User;
}

// Partial update (patch operations)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
  const { resource } = await usersContainer.item(id, userId).patch(operations);
  return resource as User;
}

// Usage:
await patchUser('user-123', 'user-123', [
  { op: 'set', path: '/name', value: 'New Name' },
  { op: 'set', path: '/updatedAt', value: new Date().toISOString() },
  { op: 'incr', path: '/loginCount', value: 1 }
]);
typescript
// 替换整个项
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('用户不存在');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  const { resource } = await usersContainer.item(id, userId).replace(updated);
  return resource as User;
}

// 部分更新(补丁操作)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
  const { resource } = await usersContainer.item(id, userId).patch(operations);
  return resource as User;
}

// 使用示例:
await patchUser('user-123', 'user-123', [
  { op: 'set', path: '/name', value: '新名称' },
  { op: 'set', path: '/updatedAt', value: new Date().toISOString() },
  { op: 'incr', path: '/loginCount', value: 1 }
]);

Delete Item

删除项

typescript
async function deleteUser(userId: string, id: string): Promise<void> {
  await usersContainer.item(id, userId).delete();
}
typescript
async function deleteUser(userId: string, id: string): Promise<void> {
  await usersContainer.item(id, userId).delete();
}

Optimistic Concurrency (ETags)

乐观并发(ETags)

typescript
async function updateUserWithETag(
  userId: string,
  id: string,
  updates: Partial<User>,
  etag: string
): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('User not found');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  try {
    const { resource } = await usersContainer.item(id, userId).replace(updated, {
      accessCondition: { type: 'IfMatch', condition: etag }
    });
    return resource as User;
  } catch (error: any) {
    if (error.code === 412) {
      throw new Error('Document was modified by another process');
    }
    throw error;
  }
}

typescript
async function updateUserWithETag(
  userId: string,
  id: string,
  updates: Partial<User>,
  etag: string
): Promise<User> {
  const existing = await getUser(userId, id);
  if (!existing) throw new Error('用户不存在');

  const updated: User = {
    ...existing,
    ...updates,
    updatedAt: new Date().toISOString()
  };

  try {
    const { resource } = await usersContainer.item(id, userId).replace(updated, {
      accessCondition: { type: 'IfMatch', condition: etag }
    });
    return resource as User;
  } catch (error: any) {
    if (error.code === 412) {
      throw new Error('文档已被其他进程修改');
    }
    throw error;
  }
}

Consistency Levels

一致性级别

LevelGuaranteesLatencyUse Case
StrongLinearizable readsHighestFinancial, inventory
Bounded StalenessConsistent within boundsHighLeaderboards, counters
SessionRead your writesMediumUser sessions (default)
Consistent PrefixOrdered readsLowSocial feeds
EventualNo ordering guaranteeLowestAnalytics, logs
级别保障延迟适用场景
Strong(强一致性)线性化读取最高金融、库存管理
Bounded Staleness(有限过期)在指定范围内保持一致排行榜、计数器
Session(会话一致性)读取自己写入的数据中等用户会话(默认)
Consistent Prefix(一致前缀)读取有序社交信息流
Eventual(最终一致性)无顺序保证最低分析、日志

Set Consistency Per Request

按请求设置一致性

typescript
// Override default consistency
const { resource } = await usersContainer.item(id, userId).read<User>({
  consistencyLevel: 'Strong'
});

// For queries
const { resources } = await container.items.query(
  { query: 'SELECT * FROM c' },
  { consistencyLevel: 'BoundedStaleness' }
).fetchAll();

typescript
// 覆盖默认一致性
const { resource } = await usersContainer.item(id, userId).read<User>({
  consistencyLevel: 'Strong'
});

// 针对查询设置
const { resources } = await container.items.query(
  { query: 'SELECT * FROM c' },
  { consistencyLevel: 'BoundedStaleness' }
).fetchAll();

Batch Operations

批量操作

Transactional Batch (Same Partition)

事务性批量(同一分区)

typescript
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
  const ordersContainer = getContainer('orders');

  const operations = [
    { operationType: 'Create' as const, resourceBody: order },
    ...items.map(item => ({
      operationType: 'Create' as const,
      resourceBody: { ...item, userId, orderId: order.orderId }
    }))
  ];

  const { result } = await ordersContainer.items.batch(operations, userId);

  // Check if any operation failed
  if (result.some(r => r.statusCode >= 400)) {
    throw new Error('Batch operation failed');
  }
}
typescript
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
  const ordersContainer = getContainer('orders');

  const operations = [
    { operationType: 'Create' as const, resourceBody: order },
    ...items.map(item => ({
      operationType: 'Create' as const,
      resourceBody: { ...item, userId, orderId: order.orderId }
    }))
  ];

  const { result } = await ordersContainer.items.batch(operations, userId);

  // 检查是否有操作失败
  if (result.some(r => r.statusCode >= 400)) {
    throw new Error('批量操作失败');
  }
}

Bulk Operations

大容量操作

typescript
// For large-scale imports (not transactional)
async function bulkImportUsers(users: User[]): Promise<void> {
  const operations = users.map(user => ({
    operationType: 'Create' as const,
    resourceBody: user,
    partitionKey: user.userId
  }));

  // Process in chunks
  const chunkSize = 100;
  for (let i = 0; i < operations.length; i += chunkSize) {
    const chunk = operations.slice(i, i + chunkSize);
    await usersContainer.items.bulk(chunk);
  }
}

typescript
// 用于大规模导入(非事务性)
async function bulkImportUsers(users: User[]): Promise<void> {
  const operations = users.map(user => ({
    operationType: 'Create' as const,
    resourceBody: user,
    partitionKey: user.userId
  }));

  // 分块处理
  const chunkSize = 100;
  for (let i = 0; i < operations.length; i += chunkSize) {
    const chunk = operations.slice(i, i + chunkSize);
    await usersContainer.items.bulk(chunk);
  }
}

Change Feed

更改源

Process Changes

处理更改

typescript
import { ChangeFeedStartFrom } from '@azure/cosmos';

async function processChangeFeed(): Promise<void> {
  const container = getContainer('orders');

  const changeFeedIterator = container.items.changeFeed({
    changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
  });

  while (changeFeedIterator.hasMoreResults) {
    const { result: items, statusCode } = await changeFeedIterator.fetchNext();

    if (statusCode === 304) {
      // No new changes
      await sleep(1000);
      continue;
    }

    for (const item of items) {
      console.log('Changed item:', item);
      // Process the change...
    }
  }
}

// For production, use Change Feed Processor with lease container
typescript
import { ChangeFeedStartFrom } from '@azure/cosmos';

async function processChangeFeed(): Promise<void> {
  const container = getContainer('orders');

  const changeFeedIterator = container.items.changeFeed({
    changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
  });

  while (changeFeedIterator.hasMoreResults) {
    const { result: items, statusCode } = await changeFeedIterator.fetchNext();

    if (statusCode === 304) {
      // 无新更改
      await sleep(1000);
      continue;
    }

    for (const item of items) {
      console.log('更改的项:', item);
      // 处理更改...
    }
  }
}

// 生产环境中,建议使用带租约容器的更改源处理器

Change Feed Processor Pattern

更改源处理器模式

typescript
async function startChangeFeedProcessor(): Promise<void> {
  const sourceContainer = getContainer('orders');
  const leaseContainer = getContainer('leases');

  const changeFeedProcessor = sourceContainer.items.changeFeed
    .for(item => {
      // Process each change
      console.log('Processing:', item);
    })
    .withLeaseContainer(leaseContainer)
    .build();

  await changeFeedProcessor.start();
}

typescript
async function startChangeFeedProcessor(): Promise<void> {
  const sourceContainer = getContainer('orders');
  const leaseContainer = getContainer('leases');

  const changeFeedProcessor = sourceContainer.items.changeFeed
    .for(item => {
      // 处理每个更改
      console.log('处理中:', item);
    })
    .withLeaseContainer(leaseContainer)
    .build();

  await changeFeedProcessor.start();
}

Python SDK

Python SDK

Install

安装

bash
pip install azure-cosmos
bash
pip install azure-cosmos

Setup and Operations

配置与操作

python
undefined
python
undefined

cosmos_db.py

cosmos_db.py

import os from azure.cosmos import CosmosClient, PartitionKey from azure.cosmos.exceptions import CosmosResourceNotFoundError from typing import Optional, List from datetime import datetime import uuid
import os from azure.cosmos import CosmosClient, PartitionKey from azure.cosmos.exceptions import CosmosResourceNotFoundError from typing import Optional, List from datetime import datetime import uuid

Initialize client

初始化客户端

endpoint = os.environ['COSMOS_ENDPOINT'] key = os.environ['COSMOS_KEY'] database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key) database = client.get_database_client(database_name)
def get_container(container_name: str): return database.get_container_client(container_name)
endpoint = os.environ['COSMOS_ENDPOINT'] key = os.environ['COSMOS_KEY'] database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key) database = client.get_database_client(database_name)
def get_container(container_name: str): return database.get_container_client(container_name)

CRUD Operations

CRUD 操作

users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict: user_id = user_id or str(uuid.uuid4()) now = datetime.utcnow().isoformat()
user = {
    'id': user_id,
    'userId': user_id,  # Partition key
    'email': email,
    'name': name,
    'createdAt': now,
    'updatedAt': now
}

return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]: try: return users_container.read_item(item=user_id, partition_key=user_id) except CosmosResourceNotFoundError: return None
def query_users(email_domain: str) -> List[dict]: query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)" parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
    query=query,
    parameters=parameters,
    enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict: user = get_user(user_id) if not user: raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()

return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None: users_container.delete_item(item=user_id, partition_key=user_id)
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict: user_id = user_id or str(uuid.uuid4()) now = datetime.utcnow().isoformat()
user = {
    'id': user_id,
    'userId': user_id,  # 分区键
    'email': email,
    'name': name,
    'createdAt': now,
    'updatedAt': now
}

return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]: try: return users_container.read_item(item=user_id, partition_key=user_id) except CosmosResourceNotFoundError: return None
def query_users(email_domain: str) -> List[dict]: query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)" parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
    query=query,
    parameters=parameters,
    enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict: user = get_user(user_id) if not user: raise ValueError('用户不存在')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()

return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None: users_container.delete_item(item=user_id, partition_key=user_id)

Paginated query

分页查询

def get_users_paginated(page_size: int = 10, continuation_token: str = None): query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
    query=query,
    enable_cross_partition_query=True,
    max_item_count=page_size,
    continuation_token=continuation_token
)

page = items.by_page()
results = list(next(page))

return {
    'items': results,
    'continuation_token': page.continuation_token
}

---
def get_users_paginated(page_size: int = 10, continuation_token: str = None): query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
    query=query,
    enable_cross_partition_query=True,
    max_item_count=page_size,
    continuation_token=continuation_token
)

page = items.by_page()
results = list(next(page))

return {
    'items': results,
    'continuation_token': page.continuation_token
}

---

Indexing

索引

Custom Indexing Policy

自定义索引策略

json
{
  "indexingMode": "consistent",
  "automatic": true,
  "includedPaths": [
    { "path": "/userId/?" },
    { "path": "/status/?" },
    { "path": "/createdAt/?" }
  ],
  "excludedPaths": [
    { "path": "/content/*" },
    { "path": "/_etag/?" }
  ],
  "compositeIndexes": [
    [
      { "path": "/userId", "order": "ascending" },
      { "path": "/createdAt", "order": "descending" }
    ]
  ]
}
json
{
  "indexingMode": "consistent",
  "automatic": true,
  "includedPaths": [
    { "path": "/userId/?" },
    { "path": "/status/?" },
    { "path": "/createdAt/?" }
  ],
  "excludedPaths": [
    { "path": "/content/*" },
    { "path": "/_etag/?" }
  ],
  "compositeIndexes": [
    [
      { "path": "/userId", "order": "ascending" },
      { "path": "/createdAt", "order": "descending" }
    ]
  ]
}

Create Container with Index

创建带索引的容器

typescript
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  indexingPolicy: {
    indexingMode: 'consistent',
    includedPaths: [
      { path: '/userId/?' },
      { path: '/status/?' },
      { path: '/createdAt/?' }
    ],
    excludedPaths: [
      { path: '/*' }  // Exclude all by default
    ]
  }
});

typescript
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  indexingPolicy: {
    indexingMode: 'consistent',
    includedPaths: [
      { path: '/userId/?' },
      { path: '/status/?' },
      { path: '/createdAt/?' }
    ],
    excludedPaths: [
      { path: '/*' }  // 默认排除所有路径
    ]
  }
});

Throughput Management

吞吐量管理

Provisioned Throughput

预配吞吐量

typescript
// Container level
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  throughput: 1000  // RU/s
});

// Scale throughput
const container = database.container('orders');
await container.throughput.replace(2000);
typescript
// 容器级别
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  throughput: 1000  // RU/s
});

// 调整吞吐量
const container = database.container('orders');
await container.throughput.replace(2000);

Autoscale

自动缩放

typescript
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  maxThroughput: 10000  // Auto-scales 10% to 100%
});
typescript
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] },
  maxThroughput: 10000  // 自动缩放范围 10% 到 100%
});

Serverless

无服务器模式

typescript
// No throughput configuration needed
// Pay per request (good for dev/test, intermittent workloads)
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] }
  // No throughput = serverless
});

typescript
// 无需配置吞吐量
// 按请求付费(适合开发/测试、间歇性工作负载)
await database.containers.createIfNotExists({
  id: 'orders',
  partitionKey: { paths: ['/userId'] }
  // 不设置吞吐量 = 无服务器模式
});

CLI Quick Reference

CLI 快速参考

bash
undefined
bash
undefined

Azure CLI

Azure CLI

az cosmosdb create --name myaccount --resource-group mygroup az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup az cosmosdb sql container create
--account-name myaccount
--database-name mydb
--name orders
--partition-key-path /userId
--throughput 400
az cosmosdb create --name myaccount --resource-group mygroup az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup az cosmosdb sql container create
--account-name myaccount
--database-name mydb
--name orders
--partition-key-path /userId
--throughput 400

Query

查询

az cosmosdb sql query --account-name myaccount --database-name mydb
--container-name orders --query "SELECT * FROM c"
az cosmosdb sql query --account-name myaccount --database-name mydb
--container-name orders --query "SELECT * FROM c"

Keys

密钥

az cosmosdb keys list --name myaccount --resource-group mygroup az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings

---
az cosmosdb keys list --name myaccount --resource-group mygroup az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings

---

Cost Optimization

成本优化

StrategyImpact
Right partition keyAvoid hot partitions (wasted RUs)
Index only what you queryReduce write RU cost
Use point reads1 RU vs 3+ RU for queries
Serverless for dev/testPay per request
Autoscale for productionScale down during low traffic
TTL for temporary dataAuto-delete old items
策略影响
选择合适的分区键避免热点分区(浪费 RU)
仅索引查询所需字段降低写入 RU 成本
使用点读取1 RU 对比查询的 3+ RU
开发/测试用无服务器模式按请求付费
生产环境用自动缩放低流量时自动缩容
临时数据使用 TTL自动删除旧项

Time-to-Live (TTL)

生存时间(TTL)

typescript
// Enable TTL on container
await database.containers.createIfNotExists({
  id: 'sessions',
  partitionKey: { paths: ['/userId'] },
  defaultTtl: 3600  // 1 hour
});

// Per-item TTL
const session = {
  id: 'session-123',
  userId: 'user-456',
  ttl: 1800  // Override: 30 minutes
};

typescript
// 在容器上启用 TTL
await database.containers.createIfNotExists({
  id: 'sessions',
  partitionKey: { paths: ['/userId'] },
  defaultTtl: 3600  // 1 小时
});

// 单个项的 TTL
const session = {
  id: 'session-123',
  userId: 'user-456',
  ttl: 1800  // 覆盖默认值:30 分钟
};

Anti-Patterns

反模式

  • Bad partition key - Low cardinality causes hot partitions
  • Cross-partition queries - Expensive; design for single-partition queries
  • Over-indexing - Increases write cost; index only queried paths
  • Large items - Max 2MB; store blobs in Azure Blob Storage
  • Ignoring RU cost - Monitor and optimize expensive queries
  • Strong consistency everywhere - Use Session (default) unless required
  • No retry logic - Handle 429 (throttling) with exponential backoff
  • Missing TTL - Set TTL for temporary/session data
  • 不良分区键 - 低基数导致热点分区
  • 跨分区查询 - 成本高;应设计为单分区查询
  • 过度索引 - 增加写入成本;仅索引查询路径
  • 大项 - 最大 2MB;大对象存储在 Azure Blob Storage
  • 忽略 RU 成本 - 监控并优化高成本查询
  • 全局使用强一致性 - 除非必要,否则使用会话一致性(默认)
  • 无重试逻辑 - 处理 429(限流)错误时使用指数退避
  • 未设置 TTL - 为临时/会话数据设置 TTL