Loading...
Loading...
Compare original and translation side by side
domain/repositories/search_history_repository.pyinfrastructure/neo4j/search_history_repository.pydomain/repositories/search_history_repository.pyinfrastructure/neo4j/search_history_repository.pysrc/project_watch_mcp/domain/repositories/{name}_repository.pyfrom abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult
class {Name}Repository(ABC):
"""Port for {purpose} storage and retrieval.
This interface defines the contract for {operations}.
Concrete implementations will be provided in the infrastructure layer.
"""
@abstractmethod
async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
"""Brief description of operation.
Args:
param: Description
Returns:
ServiceResult[ReturnType]: Success with data or Failure on errors
"""
passABC@abstractmethodServiceResult[T]src/project_watch_mcp/domain/repositories/{name}_repository.pyfrom abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult
class {Name}Repository(ABC):
"""用于{purpose}存储与检索的端口。
此接口定义了{operations}的契约。
具体实现将在基础设施层提供。
"""
@abstractmethod
async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
"""操作的简要描述。
参数:
param: 参数描述
返回:
ServiceResult[ReturnType]: 成功时返回数据,失败时返回错误信息
"""
passABC@abstractmethodServiceResult[T]src/project_watch_mcp/infrastructure/neo4j/{name}_repository.pyfrom neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource
class Neo4j{Name}Repository({Name}Repository, ManagedResource):
"""Neo4j adapter implementing {Name}Repository interface."""
def __init__(self, driver: AsyncDriver, settings: Settings):
if not driver:
raise ValueError("Neo4j driver is required")
if not settings:
raise ValueError("Settings is required")
self.driver = driver
self.settings = settings
self.database = settings.neo4j.database_name
async def _execute_with_retry(
self,
query: str,
parameters: dict[str, Any] | None = None,
routing: RoutingControl = RoutingControl.WRITE,
) -> ServiceResult[list[dict]]:
"""Execute query with parameter validation and retry logic."""
# Validate before executing
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"Validation failed: {validation_result.error}")
validated_query = validation_result.data
try:
records, _, _ = await self.driver.execute_query(
cast(LiteralString, validated_query.query),
validated_query.parameters,
database_=self.database,
routing_=routing,
)
return ServiceResult.ok([dict(record) for record in records])
except Neo4jError as e:
return ServiceResult.fail(f"Database error: {str(e)}")
async def close(self) -> None:
"""Close and cleanup resources (ManagedResource protocol)."""
# Repository-specific cleanup if needed
passManagedResourcedriver: AsyncDriver, settings: Settingsif not driver: raise ValueError_execute_with_retry()close()ServiceResult[T]src/project_watch_mcp/infrastructure/neo4j/{name}_repository.pyfrom neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource
class Neo4j{Name}Repository({Name}Repository, ManagedResource):
"""实现{Name}Repository接口的Neo4j适配器。"""
def __init__(self, driver: AsyncDriver, settings: Settings):
if not driver:
raise ValueError("Neo4j driver是必填项")
if not settings:
raise ValueError("Settings是必填项")
self.driver = driver
self.settings = settings
self.database = settings.neo4j.database_name
async def _execute_with_retry(
self,
query: str,
parameters: dict[str, Any] | None = None,
routing: RoutingControl = RoutingControl.WRITE,
) -> ServiceResult[list[dict]]:
"""带参数验证与重试逻辑的查询执行方法。"""
# 执行前验证
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"验证失败: {validation_result.error}")
validated_query = validation_result.data
try:
records, _, _ = await self.driver.execute_query(
cast(LiteralString, validated_query.query),
validated_query.parameters,
database_=self.database,
routing_=routing,
)
return ServiceResult.ok([dict(record) for record in records])
except Neo4jError as e:
return ServiceResult.fail(f"数据库错误: {str(e)}")
async def close(self) -> None:
"""关闭并清理资源(遵循ManagedResource协议)。"""
# 如有需要,添加仓库特定的清理逻辑
passManagedResourcedriver: AsyncDriver, settings: Settingsif not driver: raise ValueError_execute_with_retry()close()ServiceResult[T]src/project_watch_mcp/infrastructure/neo4j/queries.pyclass CypherQueries:
# Existing queries...
# {Name}Repository Queries
GET_{ENTITY} = """
MATCH (e:{Label} {project_name: $project_name, id: $id})
RETURN e
"""
SAVE_{ENTITY} = """
MERGE (e:{Label} {project_name: $project_name, id: $id})
SET e += $properties
SET e.updated_at = datetime()
RETURN e
"""src/project_watch_mcp/infrastructure/neo4j/queries.pyclass CypherQueries:
# 已有的查询语句...
# {Name}Repository 查询语句
GET_{ENTITY} = """
MATCH (e:{Label} {project_name: $project_name, id: $id})
RETURN e
"""
SAVE_{ENTITY} = """
MERGE (e:{Label} {project_name: $project_name, id: $id})
SET e += $properties
SET e.updated_at = datetime()
RETURN e
"""src/project_watch_mcp/infrastructure/container.pyasync def {name}_repository(self) -> {Name}Repository:
"""Provide {Name}Repository implementation."""
driver = await self.neo4j_driver()
settings = await self.settings()
return Neo4j{Name}Repository(driver, settings){name}_repository()src/project_watch_mcp/infrastructure/container.pyasync def {name}_repository(self) -> {Name}Repository:
"""提供{Name}Repository的实现。"""
driver = await self.neo4j_driver()
settings = await self.settings()
return Neo4j{Name}Repository(driver, settings){name}_repository()tests/unit/infrastructure/neo4j/test_{name}_repository.pytests/integration/infrastructure/neo4j/test_{name}_repository.py@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
"""Test successful {entity} save operation."""
# Arrange
repo = Neo4j{Name}Repository(mock_driver, settings)
mock_driver.execute_query.return_value = (
[{"e": {"id": "test", "name": "Test"}}],
None,
None,
)
# Act
result = await repo.save_{entity}(entity_data)
# Assert
assert result.is_success
assert result.data is not Nonetests/unit/infrastructure/neo4j/test_{name}_repository.pytests/integration/infrastructure/neo4j/test_{name}_repository.py@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
"""测试{entity}保存操作的成功场景。"""
# 准备
repo = Neo4j{Name}Repository(mock_driver, settings)
mock_driver.execute_query.return_value = (
[{"e": {"id": "test", "name": "Test"}}],
None,
None,
)
# 执行
result = await repo.save_{entity}(entity_data)
# 断言
assert result.is_success
assert result.data is not Noneclass SearchHistoryRepository(ABC):
@abstractmethod
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
pass
@abstractmethod
async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
passclass Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
cypher = """
CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
"""
result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
return ServiceResult.ok(None) if result.is_success else resultclass SearchHistoryRepository(ABC):
@abstractmethod
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
pass
@abstractmethod
async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
passclass Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
cypher = """
CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
"""
result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
return ServiceResult.ok(None) if result.is_success else resultneo4j>=5.0.0project_watch_mcp.domain.commonproject_watch_mcp.domain.services.resource_managerproject_watch_mcp.config.settingssrc/project_watch_mcp/
├── domain/
│ └── repositories/
│ └── {name}_repository.py # Protocol (ABC)
├── infrastructure/
│ └── neo4j/
│ ├── {name}_repository.py # Implementation
│ └── queries.py # Cypher queries
└── tests/
├── unit/infrastructure/neo4j/
│ └── test_{name}_repository.py
└── integration/infrastructure/neo4j/
└── test_{name}_repository.pyneo4j>=5.0.0project_watch_mcp.domain.commonproject_watch_mcp.domain.services.resource_managerproject_watch_mcp.config.settingssrc/project_watch_mcp/
├── domain/
│ └── repositories/
│ └── {name}_repository.py # Protocol (ABC)
├── infrastructure/
│ └── neo4j/
│ ├── {name}_repository.py # 实现
│ └── queries.py # Cypher查询语句
└── tests/
├── unit/infrastructure/neo4j/
│ └── test_{name}_repository.py
└── integration/infrastructure/neo4j/
└── test_{name}_repository.pyvalidation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"Validation failed: {validation_result.error}")validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"验证失败: {validation_result.error}")result = await self._execute_with_retry(query, params)
if result.is_failure:
return ServiceResult.fail(f"Failed to save: {result.error}")result = await self._execute_with_retry(query, params)
if result.is_failure:
return ServiceResult.fail(f"保存失败: {result.error}")undefinedundefinedasync def close(self) -> None:
"""Cleanup repository-specific resources."""
# Driver is managed externally by container
# Only cleanup repository-specific resources here
logger.debug(f"{self.__class__.__name__} cleanup complete")async def close(self) -> None:
"""清理仓库特定资源。"""
# Driver由容器外部管理
# 仅在此处清理仓库特定的资源
logger.debug(f"{self.__class__.__name__} 清理完成")NoneServiceResult.fail()settings_execute_with_retry()CypherQueriesasyncManagedResourceNoneServiceResult.fail()settings_execute_with_retry()CypherQueriesasyncManagedResourcedomain/repositories/infrastructure/neo4j/driversettingsServiceResult[T]ManagedResourceclose()_execute_with_retry()CypherQueries./scripts/check_all.shdomain/repositories/infrastructure/neo4j/driversettingsServiceResult[T]ManagedResourceclose()_execute_with_retry()CypherQueries./scripts/check_all.shimplement-dependency-injectionvalidate-layer-boundariesrun-quality-gatesimplement-dependency-injectionvalidate-layer-boundariesrun-quality-gates