azure-cosmos-db-py

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cosmos DB Service Implementation

Cosmos DB 服务实现

Build production-grade Azure Cosmos DB NoSQL services following clean code, security best practices, and TDD principles.
遵循整洁代码、安全最佳实践和TDD原则,构建生产级Azure Cosmos DB NoSQL服务。

Installation

安装

bash
pip install azure-cosmos azure-identity
bash
pip install azure-cosmos azure-identity

Environment Variables

环境变量

bash
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE_NAME=<database-name>
COSMOS_CONTAINER_ID=<container-id>
bash
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE_NAME=<database-name>
COSMOS_CONTAINER_ID=<container-id>

For emulator only (not production)

仅适用于模拟器(生产环境不使用)

COSMOS_KEY=<emulator-key>
undefined
COSMOS_KEY=<emulator-key>
undefined

Authentication

认证方式

DefaultAzureCredential (preferred):
python
from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential

client = CosmosClient(
    url=os.environ["COSMOS_ENDPOINT"],
    credential=DefaultAzureCredential()
)
Emulator (local development):
python
from azure.cosmos import CosmosClient

client = CosmosClient(
    url="https://localhost:8081",
    credential=os.environ["COSMOS_KEY"],
    connection_verify=False
)
DefaultAzureCredential(推荐):
python
from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential

client = CosmosClient(
    url=os.environ["COSMOS_ENDPOINT"],
    credential=DefaultAzureCredential()
)
模拟器(本地开发):
python
from azure.cosmos import CosmosClient

client = CosmosClient(
    url="https://localhost:8081",
    credential=os.environ["COSMOS_KEY"],
    connection_verify=False
)

Architecture Overview

架构概述

┌─────────────────────────────────────────────────────────────────┐
│                         FastAPI Router                          │
│  - Auth dependencies (get_current_user, get_current_user_required)
│  - HTTP error responses (HTTPException)                         │
└──────────────────────────────┬──────────────────────────────────┘
┌──────────────────────────────▼──────────────────────────────────┐
│                        Service Layer                            │
│  - Business logic and validation                                │
│  - Document ↔ Model conversion                                  │
│  - Graceful degradation when Cosmos unavailable                 │
└──────────────────────────────┬──────────────────────────────────┘
┌──────────────────────────────▼──────────────────────────────────┐
│                     Cosmos DB Client Module                     │
│  - Singleton container initialization                           │
│  - Dual auth: DefaultAzureCredential (Azure) / Key (emulator)   │
│  - Async wrapper via run_in_threadpool                          │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                     FastAPI 路由层                             │
│  - 认证依赖项 (get_current_user, get_current_user_required)
│  - HTTP错误响应 (HTTPException)                                │
└──────────────────────────────┬──────────────────────────────────┘
┌──────────────────────────────▼──────────────────────────────────┐
│                        服务层                                  │
│  - 业务逻辑与验证                                              │
│  - 文档 ↔ 模型转换                                            │
│  - Cosmos不可用时的优雅降级                                    │
└──────────────────────────────┬──────────────────────────────────┘
┌──────────────────────────────▼──────────────────────────────────┐
│                     Cosmos DB 客户端模块                       │
│  - 单例容器初始化                                              │
│  - 双重认证:DefaultAzureCredential(Azure环境)/ 密钥(模拟器)│
│  - 通过run_in_threadpool实现异步包装                           │
└─────────────────────────────────────────────────────────────────┘

Quick Start

快速开始

1. Client Module Setup

1. 客户端模块配置

Create a singleton Cosmos client with dual authentication:
python
undefined
创建支持双重认证的单例Cosmos客户端:
python
undefined

db/cosmos.py

db/cosmos.py

from azure.cosmos import CosmosClient from azure.identity import DefaultAzureCredential from starlette.concurrency import run_in_threadpool
_cosmos_container = None
def _is_emulator_endpoint(endpoint: str) -> bool: return "localhost" in endpoint or "127.0.0.1" in endpoint
async def get_container(): global _cosmos_container if _cosmos_container is None: if _is_emulator_endpoint(settings.cosmos_endpoint): client = CosmosClient( url=settings.cosmos_endpoint, credential=settings.cosmos_key, connection_verify=False ) else: client = CosmosClient( url=settings.cosmos_endpoint, credential=DefaultAzureCredential() ) db = client.get_database_client(settings.cosmos_database_name) _cosmos_container = db.get_container_client(settings.cosmos_container_id) return _cosmos_container

**Full implementation**: See [references/client-setup.md](references/client-setup.md)
from azure.cosmos import CosmosClient from azure.identity import DefaultAzureCredential from starlette.concurrency import run_in_threadpool
_cosmos_container = None
def _is_emulator_endpoint(endpoint: str) -> bool: return "localhost" in endpoint or "127.0.0.1" in endpoint
async def get_container(): global _cosmos_container if _cosmos_container is None: if _is_emulator_endpoint(settings.cosmos_endpoint): client = CosmosClient( url=settings.cosmos_endpoint, credential=settings.cosmos_key, connection_verify=False ) else: client = CosmosClient( url=settings.cosmos_endpoint, credential=DefaultAzureCredential() ) db = client.get_database_client(settings.cosmos_database_name) _cosmos_container = db.get_container_client(settings.cosmos_container_id) return _cosmos_container

**完整实现**:查看 [references/client-setup.md](references/client-setup.md)

2. Pydantic Model Hierarchy

2. Pydantic 模型层级

Use five-tier model pattern for clean separation:
python
class ProjectBase(BaseModel):           # Shared fields
    name: str = Field(..., min_length=1, max_length=200)

class ProjectCreate(ProjectBase):       # Creation request
    workspace_id: str = Field(..., alias="workspaceId")

class ProjectUpdate(BaseModel):         # Partial updates (all optional)
    name: Optional[str] = Field(None, min_length=1)

class Project(ProjectBase):             # API response
    id: str
    created_at: datetime = Field(..., alias="createdAt")

class ProjectInDB(Project):             # Internal with docType
    doc_type: str = "project"
使用五层模型模式实现清晰的职责分离:
python
class ProjectBase(BaseModel):           # 共享字段
    name: str = Field(..., min_length=1, max_length=200)

class ProjectCreate(ProjectBase):       # 创建请求模型
    workspace_id: str = Field(..., alias="workspaceId")

class ProjectUpdate(BaseModel):         # 部分更新模型(所有字段可选)
    name: Optional[str] = Field(None, min_length=1)

class Project(ProjectBase):             # API响应模型
    id: str
    created_at: datetime = Field(..., alias="createdAt")

class ProjectInDB(Project):             # 内部数据库模型(含docType)
    doc_type: str = "project"

3. Service Layer Pattern

3. 服务层模式

python
class ProjectService:
    def _use_cosmos(self) -> bool:
        return get_container() is not None
    
    async def get_by_id(self, project_id: str, workspace_id: str) -> Project | None:
        if not self._use_cosmos():
            return None
        doc = await get_document(project_id, partition_key=workspace_id)
        if doc is None:
            return None
        return self._doc_to_model(doc)
Full patterns: See references/service-layer.md
python
class ProjectService:
    def _use_cosmos(self) -> bool:
        return get_container() is not None
    
    async def get_by_id(self, project_id: str, workspace_id: str) -> Project | None:
        if not self._use_cosmos():
            return None
        doc = await get_document(project_id, partition_key=workspace_id)
        if doc is None:
            return None
        return self._doc_to_model(doc)
完整模式:查看 references/service-layer.md

Core Principles

核心原则

Security Requirements

安全要求

  1. RBAC Authentication: Use
    DefaultAzureCredential
    in Azure — never store keys in code
  2. Emulator-Only Keys: Hardcode the well-known emulator key only for local development
  3. Parameterized Queries: Always use
    @parameter
    syntax — never string concatenation
  4. Partition Key Validation: Validate partition key access matches user authorization
  1. RBAC认证:在Azure环境中使用
    DefaultAzureCredential
    ——绝不在代码中存储密钥
  2. 仅模拟器使用密钥:仅在本地开发时使用众所周知的模拟器密钥
  3. 参数化查询:始终使用
    @parameter
    语法——绝不使用字符串拼接
  4. 分区键验证:验证分区键访问权限与用户授权匹配

Clean Code Conventions

整洁代码规范

  1. Single Responsibility: Client module handles connection; services handle business logic
  2. Graceful Degradation: Services return
    None
    /
    []
    when Cosmos unavailable
  3. Consistent Naming:
    _doc_to_model()
    ,
    _model_to_doc()
    ,
    _use_cosmos()
  4. Type Hints: Full typing on all public methods
  5. CamelCase Aliases: Use
    Field(alias="camelCase")
    for JSON serialization
  1. 单一职责:客户端模块处理连接;服务层处理业务逻辑
  2. 优雅降级:Cosmos不可用时,服务返回
    None
    /
    []
  3. 命名一致
    _doc_to_model()
    _model_to_doc()
    _use_cosmos()
  4. 类型提示:所有公共方法都添加完整类型注解
  5. 驼峰别名:使用
    Field(alias="camelCase")
    实现JSON序列化

TDD Requirements

TDD 要求

Write tests BEFORE implementation using these patterns:
python
@pytest.fixture
def mock_cosmos_container(mocker):
    container = mocker.MagicMock()
    mocker.patch("app.db.cosmos.get_container", return_value=container)
    return container

@pytest.mark.asyncio
async def test_get_project_by_id_returns_project(mock_cosmos_container):
    # Arrange
    mock_cosmos_container.read_item.return_value = {"id": "123", "name": "Test"}
    
    # Act
    result = await project_service.get_by_id("123", "workspace-1")
    
    # Assert
    assert result.id == "123"
    assert result.name == "Test"
Full testing guide: See references/testing.md
在实现前编写测试,遵循以下模式:
python
@pytest.fixture
def mock_cosmos_container(mocker):
    container = mocker.MagicMock()
    mocker.patch("app.db.cosmos.get_container", return_value=container)
    return container

@pytest.mark.asyncio
async def test_get_project_by_id_returns_project(mock_cosmos_container):
    # 准备
    mock_cosmos_container.read_item.return_value = {"id": "123", "name": "Test"}
    
    # 执行
    result = await project_service.get_by_id("123", "workspace-1")
    
    # 断言
    assert result.id == "123"
    assert result.name == "Test"
完整测试指南:查看 references/testing.md

Reference Files

参考文档

FileWhen to Read
references/client-setup.mdSetting up Cosmos client with dual auth, SSL config, singleton pattern
references/service-layer.mdImplementing full service class with CRUD, conversions, graceful degradation
references/testing.mdWriting pytest tests, mocking Cosmos, integration test setup
references/partitioning.mdChoosing partition keys, cross-partition queries, move operations
references/error-handling.mdHandling CosmosResourceNotFoundError, logging, HTTP error mapping
文件阅读场景
references/client-setup.md配置带有双重认证、SSL设置、单例模式的Cosmos客户端
references/service-layer.md实现包含CRUD、转换逻辑、优雅降级的完整服务类
references/testing.md编写pytest测试、模拟Cosmos、集成测试配置
references/partitioning.md选择分区键、跨分区查询、数据迁移操作
references/error-handling.md处理CosmosResourceNotFoundError、日志记录、HTTP错误映射

Template Files

模板文件

FilePurpose
assets/cosmos_client_template.pyReady-to-use client module
assets/service_template.pyService class skeleton
assets/conftest_template.pypytest fixtures for Cosmos mocking
文件用途
assets/cosmos_client_template.py可直接使用的客户端模块
assets/service_template.py服务类骨架
assets/conftest_template.py用于Cosmos模拟的pytest夹具

Quality Attributes (NFRs)

质量属性(非功能需求)

Reliability

可靠性

  • Graceful degradation when Cosmos unavailable
  • Retry logic with exponential backoff for transient failures
  • Connection pooling via singleton pattern
  • Cosmos不可用时的优雅降级
  • 针对瞬时故障的指数退避重试逻辑
  • 通过单例模式实现连接池

Security

安全性

  • Zero secrets in code (RBAC via DefaultAzureCredential)
  • Parameterized queries prevent injection
  • Partition key isolation enforces data boundaries
  • 代码中无敏感信息(通过DefaultAzureCredential实现RBAC)
  • 参数化查询防止注入攻击
  • 分区键隔离强化数据边界

Maintainability

可维护性

  • Five-tier model pattern enables schema evolution
  • Service layer decouples business logic from storage
  • Consistent patterns across all entity services
  • 五层模型模式支持 schema 演进
  • 服务层解耦业务逻辑与存储实现
  • 所有实体服务遵循一致模式

Testability

可测试性

  • Dependency injection via
    get_container()
  • Easy mocking with module-level globals
  • Clear separation enables unit testing without Cosmos
  • 通过
    get_container()
    实现依赖注入
  • 模块级全局变量便于模拟
  • 清晰的职责分离无需Cosmos即可进行单元测试

Performance

性能

  • Partition key queries avoid cross-partition scans
  • Async wrapping prevents blocking FastAPI event loop
  • Minimal document conversion overhead
  • 分区键查询避免跨分区扫描
  • 异步包装避免阻塞FastAPI事件循环
  • 最小化文档转换开销