Loading...
Loading...
Creates repository following Clean Architecture with Protocol in domain layer and Implementation in infrastructure layer. Use when adding new data access layer, creating database interaction, implementing persistence, or need to store/retrieve domain models. Enforces Protocol/ABC pattern with ServiceResult, ManagedResource lifecycle, and proper layer separation. Triggers on "create repository for X", "implement data access for Y", "add persistence layer", or "store/retrieve domain model".
npx skill4agent add dawiddutoit/custom-claude implement-repository-patterndomain/repositories/search_history_repository.pyinfrastructure/neo4j/search_history_repository.pysrc/project_watch_mcp/domain/repositories/{name}_repository.pyfrom abc import ABC, abstractmethod
from project_watch_mcp.domain.common import ServiceResult
class {Name}Repository(ABC):
"""Port for {purpose} storage and retrieval.
This interface defines the contract for {operations}.
Concrete implementations will be provided in the infrastructure layer.
"""
@abstractmethod
async def {operation}(self, param: Type) -> ServiceResult[ReturnType]:
"""Brief description of operation.
Args:
param: Description
Returns:
ServiceResult[ReturnType]: Success with data or Failure on errors
"""
passABC@abstractmethodServiceResult[T]src/project_watch_mcp/infrastructure/neo4j/{name}_repository.pyfrom neo4j import AsyncDriver, RoutingControl
from project_watch_mcp.config.settings import Settings
from project_watch_mcp.domain.common import ServiceResult
from project_watch_mcp.domain.repositories.{name}_repository import {Name}Repository
from project_watch_mcp.domain.services.resource_manager import ManagedResource
class Neo4j{Name}Repository({Name}Repository, ManagedResource):
"""Neo4j adapter implementing {Name}Repository interface."""
def __init__(self, driver: AsyncDriver, settings: Settings):
if not driver:
raise ValueError("Neo4j driver is required")
if not settings:
raise ValueError("Settings is required")
self.driver = driver
self.settings = settings
self.database = settings.neo4j.database_name
async def _execute_with_retry(
self,
query: str,
parameters: dict[str, Any] | None = None,
routing: RoutingControl = RoutingControl.WRITE,
) -> ServiceResult[list[dict]]:
"""Execute query with parameter validation and retry logic."""
# Validate before executing
validation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"Validation failed: {validation_result.error}")
validated_query = validation_result.data
try:
records, _, _ = await self.driver.execute_query(
cast(LiteralString, validated_query.query),
validated_query.parameters,
database_=self.database,
routing_=routing,
)
return ServiceResult.ok([dict(record) for record in records])
except Neo4jError as e:
return ServiceResult.fail(f"Database error: {str(e)}")
async def close(self) -> None:
"""Close and cleanup resources (ManagedResource protocol)."""
# Repository-specific cleanup if needed
passManagedResourcedriver: AsyncDriver, settings: Settingsif not driver: raise ValueError_execute_with_retry()close()ServiceResult[T]src/project_watch_mcp/infrastructure/neo4j/queries.pyclass CypherQueries:
# Existing queries...
# {Name}Repository Queries
GET_{ENTITY} = """
MATCH (e:{Label} {project_name: $project_name, id: $id})
RETURN e
"""
SAVE_{ENTITY} = """
MERGE (e:{Label} {project_name: $project_name, id: $id})
SET e += $properties
SET e.updated_at = datetime()
RETURN e
"""src/project_watch_mcp/infrastructure/container.pyasync def {name}_repository(self) -> {Name}Repository:
"""Provide {Name}Repository implementation."""
driver = await self.neo4j_driver()
settings = await self.settings()
return Neo4j{Name}Repository(driver, settings){name}_repository()tests/unit/infrastructure/neo4j/test_{name}_repository.pytests/integration/infrastructure/neo4j/test_{name}_repository.py@pytest.mark.asyncio
async def test_save_{entity}_success(mock_driver, settings):
"""Test successful {entity} save operation."""
# Arrange
repo = Neo4j{Name}Repository(mock_driver, settings)
mock_driver.execute_query.return_value = (
[{"e": {"id": "test", "name": "Test"}}],
None,
None,
)
# Act
result = await repo.save_{entity}(entity_data)
# Assert
assert result.is_success
assert result.data is not Noneclass SearchHistoryRepository(ABC):
@abstractmethod
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
pass
@abstractmethod
async def get_recent_queries(self, user_id: str, limit: int) -> ServiceResult[list[str]]:
passclass Neo4jSearchHistoryRepository(SearchHistoryRepository, ManagedResource):
async def save_query(self, query: str, user_id: str) -> ServiceResult[None]:
cypher = """
CREATE (q:SearchQuery {query: $query, user_id: $user_id, timestamp: datetime()})
"""
result = await self._execute_with_retry(cypher, {"query": query, "user_id": user_id})
return ServiceResult.ok(None) if result.is_success else resultneo4j>=5.0.0project_watch_mcp.domain.commonproject_watch_mcp.domain.services.resource_managerproject_watch_mcp.config.settingssrc/project_watch_mcp/
├── domain/
│ └── repositories/
│ └── {name}_repository.py # Protocol (ABC)
├── infrastructure/
│ └── neo4j/
│ ├── {name}_repository.py # Implementation
│ └── queries.py # Cypher queries
└── tests/
├── unit/infrastructure/neo4j/
│ └── test_{name}_repository.py
└── integration/infrastructure/neo4j/
└── test_{name}_repository.pyvalidation_result = validate_and_build_query(query, parameters, strict=True)
if validation_result.is_failure:
return ServiceResult.fail(f"Validation failed: {validation_result.error}")result = await self._execute_with_retry(query, params)
if result.is_failure:
return ServiceResult.fail(f"Failed to save: {result.error}")
# Transform data and return success
return ServiceResult.ok(transformed_data)async def close(self) -> None:
"""Cleanup repository-specific resources."""
# Driver is managed externally by container
# Only cleanup repository-specific resources here
logger.debug(f"{self.__class__.__name__} cleanup complete")NoneServiceResult.fail()settings_execute_with_retry()CypherQueriesasyncManagedResourcedomain/repositories/infrastructure/neo4j/driversettingsServiceResult[T]ManagedResourceclose()_execute_with_retry()CypherQueries./scripts/check_all.shimplement-dependency-injectionvalidate-layer-boundariesrun-quality-gates