Loading...
Loading...
Research-to-implement pipeline chaining 5 MCP tools with graceful degradation
npx skill4agent add parcadei/continuous-claude-v3 mcp-chaining| Step | Server | Tool ID | Purpose |
|---|---|---|---|
| 1 | nia | | Search library documentation |
| 2 | ast-grep | | Find AST code patterns |
| 3 | morph | | Fast codebase search |
| 4 | qlty | | Code quality validation |
| 5 | git | | Git operations |
scripts/research_implement_pipeline.pyscripts/test_research_pipeline.pyworkspace/pipeline-test/sample_code.py# Dry-run pipeline (preview plan without changes)
uv run python -m runtime.harness scripts/research_implement_pipeline.py \
--topic "async error handling python" \
--target-dir "./workspace/pipeline-test" \
--dry-run --verbose
# Run tests
uv run python -m runtime.harness scripts/test_research_pipeline.py --test all
# View the pipeline script
cat scripts/research_implement_pipeline.pyget_default_environment()os.environsrc/runtime/mcp_client.py# In _connect_stdio method:
full_env = {**os.environ, **(resolved_env or {})}~/.claude/.envasync def check_tool_available(tool_id: str) -> bool:
"""Check if an MCP tool is available."""
server_name = tool_id.split("__")[0]
server_config = manager._config.get_server(server_name)
if not server_config or server_config.disabled:
return False
return True
# In step function:
if not await check_tool_available("nia__search"):
return StepResult(status=StepStatus.SKIPPED, message="Nia not available")nia__search - Universal documentation search
nia__nia_research - Research with sources
nia__nia_grep - Grep-style doc search
nia__nia_explore - Explore package structureast-grep__find_code - Find code by AST pattern
ast-grep__find_code_by_rule - Find by YAML rule
ast-grep__scan_code - Scan with multiple patternsmorph__warpgrep_codebase_search - 20x faster grep
morph__edit_file - Smart file editingqlty__qlty_check - Run quality checks
qlty__qlty_fmt - Auto-format code
qlty__qlty_metrics - Get code metrics
qlty__smells - Detect code smellsgit__git_status - Get repo status
git__git_diff - Show differences
git__git_log - View commit history
git__git_add - Stage files +----------------+
| CLI Args |
| (topic, dir) |
+-------+--------+
|
+-------v--------+
| PipelineContext|
| (shared state) |
+-------+--------+
|
+-------+-------+-------+-------+-------+
| | | | | |
+---v---+---v---+---v---+---v---+---v---+
| nia |ast-grp| morph | qlty | git |
|search |pattern|search |check |status |
+---+---+---+---+---+---+---+---+---+---+
| | | | |
+-------v-------v-------v-------+
|
+-------v--------+
| StepResult[] |
| (aggregated) |
+----------------+try:
result = await call_mcp_tool("nia__search", {"query": topic})
return StepResult(status=StepStatus.SUCCESS, data=result)
except Exception as e:
ctx.errors.append(f"nia: {e}")
return StepResult(status=StepStatus.FAILED, error=str(e))scripts/research_implement_pipeline.pycheck_tool_available()PipelineContextprint_summary()