azure-cosmos-py
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAzure Cosmos DB SDK for Python
Azure Cosmos DB SDK for Python
Client library for Azure Cosmos DB NoSQL API — globally distributed, multi-model database.
适用于Azure Cosmos DB NoSQL API的客户端库——一款全局分布式的多模型数据库。
Installation
安装
bash
pip install azure-cosmos azure-identitybash
pip install azure-cosmos azure-identityEnvironment Variables
环境变量
bash
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE=mydb
COSMOS_CONTAINER=mycontainerbash
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE=mydb
COSMOS_CONTAINER=mycontainerAuthentication
身份验证
python
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient
credential = DefaultAzureCredential()
endpoint = "https://<account>.documents.azure.com:443/"
client = CosmosClient(url=endpoint, credential=credential)python
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient
credential = DefaultAzureCredential()
endpoint = "https://<account>.documents.azure.com:443/"
client = CosmosClient(url=endpoint, credential=credential)Client Hierarchy
客户端层级结构
| Client | Purpose | Get From |
|---|---|---|
| Account-level operations | Direct instantiation |
| Database operations | |
| Container/item operations | |
| 客户端 | 用途 | 获取方式 |
|---|---|---|
| 账户级操作 | 直接实例化 |
| 数据库操作 | |
| 容器/项操作 | |
Core Workflow
核心工作流
Setup Database and Container
配置数据库与容器
python
undefinedpython
undefinedGet or create database
Get or create database
database = client.create_database_if_not_exists(id="mydb")
database = client.create_database_if_not_exists(id="mydb")
Get or create container with partition key
Get or create container with partition key
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/category")
)
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/category")
)
Get existing
Get existing
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
undefineddatabase = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
undefinedCreate Item
创建项
python
item = {
"id": "item-001", # Required: unique within partition
"category": "electronics", # Partition key value
"name": "Laptop",
"price": 999.99,
"tags": ["computer", "portable"]
}
created = container.create_item(body=item)
print(f"Created: {created['id']}")python
item = {
"id": "item-001", # Required: unique within partition
"category": "electronics", # Partition key value
"name": "Laptop",
"price": 999.99,
"tags": ["computer", "portable"]
}
created = container.create_item(body=item)
print(f"Created: {created['id']}")Read Item
读取项
python
undefinedpython
undefinedRead requires id AND partition key
Read requires id AND partition key
item = container.read_item(
item="item-001",
partition_key="electronics"
)
print(f"Name: {item['name']}")
undefineditem = container.read_item(
item="item-001",
partition_key="electronics"
)
print(f"Name: {item['name']}")
undefinedUpdate Item (Replace)
更新项(替换)
python
item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True
updated = container.replace_item(item=item["id"], body=item)python
item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True
updated = container.replace_item(item=item["id"], body=item)Upsert Item
插入更新项(Upsert)
python
undefinedpython
undefinedCreate if not exists, replace if exists
Create if not exists, replace if exists
item = {
"id": "item-002",
"category": "electronics",
"name": "Tablet",
"price": 499.99
}
result = container.upsert_item(body=item)
undefineditem = {
"id": "item-002",
"category": "electronics",
"name": "Tablet",
"price": 499.99
}
result = container.upsert_item(body=item)
undefinedDelete Item
删除项
python
container.delete_item(
item="item-001",
partition_key="electronics"
)python
container.delete_item(
item="item-001",
partition_key="electronics"
)Queries
查询
Basic Query
基础查询
python
undefinedpython
undefinedQuery within a partition (efficient)
Query within a partition (efficient)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
partition_key="electronics"
)
for item in items:
print(f"{item['name']}: ${item['price']}")
undefinedquery = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
partition_key="electronics"
)
for item in items:
print(f"{item['name']}: ${item['price']}")
undefinedCross-Partition Query
跨分区查询
python
undefinedpython
undefinedCross-partition (more expensive, use sparingly)
Cross-partition (more expensive, use sparingly)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
enable_cross_partition_query=True
)
for item in items:
print(item)
undefinedquery = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
enable_cross_partition_query=True
)
for item in items:
print(item)
undefinedQuery with Projection
带投影的查询
python
query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
query=query,
parameters=[{"name": "@category", "value": "electronics"}],
partition_key="electronics"
)python
query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
query=query,
parameters=[{"name": "@category", "value": "electronics"}],
partition_key="electronics"
)Read All Items
读取所有项
python
undefinedpython
undefinedRead all in a partition
Read all in a partition
items = container.read_all_items() # Cross-partition
items = container.read_all_items() # Cross-partition
Or with partition key
Or with partition key
items = container.query_items(
query="SELECT * FROM c",
partition_key="electronics"
)
undefineditems = container.query_items(
query="SELECT * FROM c",
partition_key="electronics"
)
undefinedPartition Keys
分区键
Critical: Always include partition key for efficient operations.
python
from azure.cosmos import PartitionKey关键提示:执行高效操作时务必指定分区键。
python
from azure.cosmos import PartitionKeySingle partition key
Single partition key
container = database.create_container_if_not_exists(
id="orders",
partition_key=PartitionKey(path="/customer_id")
)
container = database.create_container_if_not_exists(
id="orders",
partition_key=PartitionKey(path="/customer_id")
)
Hierarchical partition key (preview)
Hierarchical partition key (preview)
container = database.create_container_if_not_exists(
id="events",
partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)
undefinedcontainer = database.create_container_if_not_exists(
id="events",
partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)
undefinedThroughput
吞吐量
python
undefinedpython
undefinedCreate container with provisioned throughput
Create container with provisioned throughput
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/pk"),
offer_throughput=400 # RU/s
)
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/pk"),
offer_throughput=400 # RU/s
)
Read current throughput
Read current throughput
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")
Update throughput
Update throughput
container.replace_throughput(throughput=1000)
undefinedcontainer.replace_throughput(throughput=1000)
undefinedAsync Client
异步客户端
python
from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential
async def cosmos_operations():
credential = DefaultAzureCredential()
async with CosmosClient(endpoint, credential=credential) as client:
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
# Create
await container.create_item(body={"id": "1", "pk": "test"})
# Read
item = await container.read_item(item="1", partition_key="test")
# Query
async for item in container.query_items(
query="SELECT * FROM c",
partition_key="test"
):
print(item)
import asyncio
asyncio.run(cosmos_operations())python
from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential
async def cosmos_operations():
credential = DefaultAzureCredential()
async with CosmosClient(endpoint, credential=credential) as client:
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
# Create
await container.create_item(body={"id": "1", "pk": "test"})
# Read
item = await container.read_item(item="1", partition_key="test")
# Query
async for item in container.query_items(
query="SELECT * FROM c",
partition_key="test"
):
print(item)
import asyncio
asyncio.run(cosmos_operations())Error Handling
错误处理
python
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
else:
raisepython
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
else:
raiseBest Practices
最佳实践
- Always specify partition key for point reads and queries
- Use parameterized queries to prevent injection and improve caching
- Avoid cross-partition queries when possible
- Use for idempotent writes
upsert_item - Use async client for high-throughput scenarios
- Design partition key for even data distribution
- Use instead of query for single document retrieval
read_item
- 始终指定分区键,用于点读和查询操作
- 使用参数化查询,防止注入攻击并提升缓存效率
- 尽可能避免跨分区查询
- **使用**实现幂等写入
upsert_item - 高吞吐量场景下使用异步客户端
- 设计分区键以实现数据均匀分布
- **读取单个文档时使用**而非查询
read_item
Reference Files
参考文件
| File | Contents |
|---|---|
| references/partitioning.md | Partition key strategies, hierarchical keys, hot partition detection and mitigation |
| references/query-patterns.md | Query optimization, aggregations, pagination, transactions, change feed |
| scripts/setup_cosmos_container.py | CLI tool for creating containers with partitioning, throughput, and indexing |
| 文件 | 内容 |
|---|---|
| references/partitioning.md | 分区键策略、分层键、热点分区检测与缓解方案 |
| references/query-patterns.md | 查询优化、聚合、分页、事务、变更源 |
| scripts/setup_cosmos_container.py | 用于创建容器的CLI工具,支持分区、吞吐量和索引配置 |