mcp
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMCP (Model Context Protocol) - AI-Native Server Development
MCP(Model Context Protocol,模型上下文协议)- 原生AI服务器开发
Overview
概述
Model Context Protocol (MCP) is an open standard for connecting AI assistants to external data sources and tools. Build servers that expose tools (functions LLMs can call), resources (data LLMs can read), and prompts (templates LLMs can use).
Key Concepts:
- Tools: Functions LLMs can execute (read files, query APIs, run commands)
- Resources: Data sources LLMs can access (files, databases, APIs)
- Prompts: Reusable templates with arguments for common tasks
- Client-Server: MCP servers expose capabilities, clients (like Claude Desktop) consume them
- Transport: STDIO (local), SSE (Server-Sent Events), HTTP (network)
Official SDKs:
- TypeScript:
@modelcontextprotocol/sdk - Python:
mcp
Installation:
bash
undefinedModel Context Protocol(MCP)是用于将AI助手连接到外部数据源和工具的开放标准。构建可对外暴露工具(LLM可调用的函数)、资源(LLM可读取的数据)和提示词(LLM可使用的模板)的服务器。
核心概念:
- 工具:LLM可执行的函数(读取文件、查询API、运行命令)
- 资源:LLM可访问的数据源(文件、数据库、API)
- 提示词:带参数的可复用模板,适用于常见任务
- 客户端-服务器架构:MCP服务器对外提供能力,客户端(如Claude Desktop)调用这些能力
- 传输方式:STDIO(本地)、SSE(Server-Sent Events,服务器推送事件)、HTTP(网络)
官方SDK:
- TypeScript:
@modelcontextprotocol/sdk - Python:
mcp
安装方式:
bash
undefinedTypeScript server
TypeScript服务器
npx @modelcontextprotocol/create-server@latest my-server
cd my-server && npm install
npx @modelcontextprotocol/create-server@latest my-server
cd my-server && npm install
Python server
Python服务器
pip install mcp
pip install mcp
Or use uv (recommended)
推荐使用uv
uv pip install mcp
undefineduv pip install mcp
undefinedQuick Start - TypeScript Server
快速开始 - TypeScript服务器
1. Create Server with CLI
1. 使用CLI创建服务器
bash
undefinedbash
undefinedInteractive setup
交互式设置
npx @modelcontextprotocol/create-server@latest my-filesystem-server
npx @modelcontextprotocol/create-server@latest my-filesystem-server
Options prompt:
选项提示:
- Server name: my-filesystem-server
- 服务器名称: my-filesystem-server
- Language: TypeScript
- 开发语言: TypeScript
- Include example tools: Yes
- 包含示例工具: 是
undefinedundefined2. Define Tools
2. 定义工具
typescript
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";
const server = new Server(
{
name: "filesystem-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description: "Read contents of a file",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "Path to the file to read",
},
},
required: ["path"],
},
},
{
name: "list_directory",
description: "List contents of a directory",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "Directory path to list",
},
},
required: ["path"],
},
},
],
};
});
// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case "read_file": {
const filePath = args.path as string;
const content = await fs.readFile(filePath, "utf-8");
return {
content: [{ type: "text", text: content }],
};
}
case "list_directory": {
const dirPath = args.path as string;
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const listing = entries
.map((entry) => `${entry.isDirectory() ? "📁" : "📄"} ${entry.name}`)
.join("\n");
return {
content: [{ type: "text", text: listing }],
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}
});
// Start server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Filesystem MCP server running on stdio");
}
main();typescript
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";
const server = new Server(
{
name: "filesystem-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// 列出可用工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description: "Read contents of a file",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "Path to the file to read",
},
},
required: ["path"],
},
},
{
name: "list_directory",
description: "List contents of a directory",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "Directory path to list",
},
},
required: ["path"],
},
},
],
};
});
// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case "read_file": {
const filePath = args.path as string;
const content = await fs.readFile(filePath, "utf-8");
return {
content: [{ type: "text", text: content }],
};
}
case "list_directory": {
const dirPath = args.path as string;
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const listing = entries
.map((entry) => `${entry.isDirectory() ? "📁" : "📄"} ${entry.name}`)
.join("\n");
return {
content: [{ type: "text", text: listing }],
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}
});
// 启动服务器
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Filesystem MCP server running on stdio");
}
main();3. Configure Claude Desktop
3. 配置Claude Desktop
json
// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
// %APPDATA%/Claude/claude_desktop_config.json (Windows)
{
"mcpServers": {
"filesystem": {
"command": "node",
"args": ["/absolute/path/to/my-filesystem-server/build/index.js"]
}
}
}json
// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
// %APPDATA%/Claude/claude_desktop_config.json (Windows)
{
"mcpServers": {
"filesystem": {
"command": "node",
"args": ["/absolute/path/to/my-filesystem-server/build/index.js"]
}
}
}4. Build and Test
4. 构建并测试
bash
undefinedbash
undefinedBuild TypeScript
构建TypeScript代码
npm run build
npm run build
Restart Claude Desktop (Cmd+Q and reopen)
重启Claude Desktop(Cmd+Q后重新打开)
Server appears in 🔌 menu
服务器会出现在🔌菜单中
undefinedundefinedQuick Start - Python Server
快速开始 - Python服务器
1. Create Server
1. 创建服务器
python
undefinedpython
undefinedserver.py
server.py
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
import os
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
import os
Create server instance
创建服务器实例
app = Server("filesystem-server")
app = Server("filesystem-server")
Define tools
定义工具
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="read_file",
description="Read contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"}
},
"required": ["path"],
},
),
Tool(
name="list_directory",
description="List directory contents",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "Directory path"}
},
"required": ["path"],
},
),
]
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="read_file",
description="Read contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"}
},
"required": ["path"],
},
),
Tool(
name="list_directory",
description="List directory contents",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "Directory path"}
},
"required": ["path"],
},
),
]
Handle tool calls
处理工具调用
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "read_file":
file_path = arguments["path"]
with open(file_path, "r") as f:
content = f.read()
return [TextContent(type="text", text=content)]
elif name == "list_directory":
dir_path = arguments["path"]
entries = os.listdir(dir_path)
listing = "\n".join(
f"{'📁' if os.path.isdir(os.path.join(dir_path, e)) else '📄'} {e}"
for e in entries
)
return [TextContent(type="text", text=listing)]
else:
raise ValueError(f"Unknown tool: {name}")@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "read_file":
file_path = arguments["path"]
with open(file_path, "r") as f:
content = f.read()
return [TextContent(type="text", text=content)]
elif name == "list_directory":
dir_path = arguments["path"]
entries = os.listdir(dir_path)
listing = "\n".join(
f"{'📁' if os.path.isdir(os.path.join(dir_path, e)) else '📄'} {e}"
for e in entries
)
return [TextContent(type="text", text=listing)]
else:
raise ValueError(f"Unknown tool: {name}")Start server
启动服务器
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if name == "main":
asyncio.run(main())
undefinedasync def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if name == "main":
asyncio.run(main())
undefined2. Configure Claude Desktop
2. 配置Claude Desktop
json
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["/absolute/path/to/server.py"]
}
}
}json
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["/absolute/path/to/server.py"]
}
}
}3. Test
3. 测试
bash
undefinedbash
undefinedTest server standalone
单独测试服务器
python server.py
python server.py
Restart Claude Desktop
重启Claude Desktop
Tools appear in 🔌 menu
工具会出现在🔌菜单中
undefinedundefinedResources - Exposing Data to LLMs
资源 - 向LLM暴露数据
Resources provide read-only access to data sources.
资源提供对数据源的只读访问权限。
TypeScript Resource Example
TypeScript资源示例
typescript
import {
ListResourcesRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// List available resources
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "file:///docs/readme.md",
name: "README",
description: "Project README documentation",
mimeType: "text/markdown",
},
{
uri: "file:///config/settings.json",
name: "Settings",
description: "Application settings",
mimeType: "application/json",
},
],
};
});
// Handle resource reads
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const uri = request.params.uri;
if (uri.startsWith("file://")) {
const filePath = uri.replace("file://", "");
const content = await fs.readFile(filePath, "utf-8");
return {
contents: [
{
uri,
mimeType: "text/plain",
text: content,
},
],
};
}
throw new Error(`Unknown resource: ${uri}`);
});typescript
import {
ListResourcesRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// 列出可用资源
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "file:///docs/readme.md",
name: "README",
description: "Project README documentation",
mimeType: "text/markdown",
},
{
uri: "file:///config/settings.json",
name: "Settings",
description: "Application settings",
mimeType: "application/json",
},
],
};
});
// 处理资源读取请求
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const uri = request.params.uri;
if (uri.startsWith("file://")) {
const filePath = uri.replace("file://", "");
const content = await fs.readFile(filePath, "utf-8");
return {
contents: [
{
uri,
mimeType: "text/plain",
text: content,
},
],
};
}
throw new Error(`Unknown resource: ${uri}`);
});Python Resource Example
Python资源示例
python
from mcp.types import Resource, ResourceContent
@app.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="file:///docs/readme.md",
name="README",
description="Project README",
mimeType="text/markdown",
),
Resource(
uri="file:///config/settings.json",
name="Settings",
description="App settings",
mimeType="application/json",
),
]
@app.read_resource()
async def read_resource(uri: str) -> ResourceContent:
if uri.startswith("file://"):
file_path = uri.replace("file://", "")
with open(file_path, "r") as f:
content = f.read()
return ResourceContent(uri=uri, mimeType="text/plain", text=content)
raise ValueError(f"Unknown resource: {uri}")python
from mcp.types import Resource, ResourceContent
@app.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="file:///docs/readme.md",
name="README",
description="Project README",
mimeType="text/markdown",
),
Resource(
uri="file:///config/settings.json",
name="Settings",
description="App settings",
mimeType="application/json",
),
]
@app.read_resource()
async def read_resource(uri: str) -> ResourceContent:
if uri.startswith("file://"):
file_path = uri.replace("file://", "")
with open(file_path, "r") as f:
content = f.read()
return ResourceContent(uri=uri, mimeType="text/plain", text=content)
raise ValueError(f"Unknown resource: {uri}")Prompts - Reusable Templates
提示词 - 可复用模板
Prompts are templates that LLMs can use with arguments.
提示词是LLM可使用的带参数模板。
TypeScript Prompt Example
TypeScript提示词示例
typescript
import {
ListPromptsRequestSchema,
GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// List prompts
server.setRequestHandler(ListPromptsRequestSchema, async () => {
return {
prompts: [
{
name: "code_review",
description: "Review code for best practices",
arguments: [
{
name: "language",
description: "Programming language",
required: true,
},
{
name: "code",
description: "Code to review",
required: true,
},
],
},
],
};
});
// Handle prompt requests
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "code_review") {
const language = args?.language || "unknown";
const code = args?.code || "";
return {
messages: [
{
role: "user",
content: {
type: "text",
text: `Review this ${language} code for best practices, security issues, and improvements:\n\n\`\`\`${language}\n${code}\n\`\`\``,
},
},
],
};
}
throw new Error(`Unknown prompt: ${name}`);
});typescript
import {
ListPromptsRequestSchema,
GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
// 列出提示词
server.setRequestHandler(ListPromptsRequestSchema, async () => {
return {
prompts: [
{
name: "code_review",
description: "Review code for best practices",
arguments: [
{
name: "language",
description: "Programming language",
required: true,
},
{
name: "code",
description: "Code to review",
required: true,
},
],
},
],
};
});
// 处理提示词请求
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "code_review") {
const language = args?.language || "unknown";
const code = args?.code || "";
return {
messages: [
{
role: "user",
content: {
type: "text",
text: `Review this ${language} code for best practices, security issues, and improvements:\n\n\`\`\`${language}\n${code}\n\`\`\``,
},
},
],
};
}
throw new Error(`Unknown prompt: ${name}`);
});Python Prompt Example
Python提示词示例
python
from mcp.types import Prompt, PromptMessage, PromptArgument
@app.list_prompts()
async def list_prompts() -> list[Prompt]:
return [
Prompt(
name="code_review",
description="Review code for best practices",
arguments=[
PromptArgument(
name="language", description="Programming language", required=True
),
PromptArgument(name="code", description="Code to review", required=True),
],
)
]
@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
if name == "code_review":
language = arguments.get("language", "unknown")
code = arguments.get("code", "")
return [
PromptMessage(
role="user",
content={
"type": "text",
"text": f"Review this {language} code:\n\n```{language}\n{code}\n```",
},
)
]
raise ValueError(f"Unknown prompt: {name}")python
from mcp.types import Prompt, PromptMessage, PromptArgument
@app.list_prompts()
async def list_prompts() -> list[Prompt]:
return [
Prompt(
name="code_review",
description="Review code for best practices",
arguments=[
PromptArgument(
name="language", description="Programming language", required=True
),
PromptArgument(name="code", description="Code to review", required=True),
],
)
]
@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
if name == "code_review":
language = arguments.get("language", "unknown")
code = arguments.get("code", "")
return [
PromptMessage(
role="user",
content={
"type": "text",
"text": f"Review this {language} code:\n\n```{language}\n{code}\n```",
},
)
]
raise ValueError(f"Unknown prompt: {name}")Advanced Patterns
高级模式
1. Error Handling
1. 错误处理
typescript
// TypeScript
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
const { name, arguments: args } = request.params;
if (name === "risky_operation") {
// Validate inputs
if (!args.required_param) {
throw new Error("Missing required parameter: required_param");
}
// Perform operation with proper error handling
const result = await performRiskyOperation(args.required_param);
return {
content: [{ type: "text", text: JSON.stringify(result) }],
};
}
} catch (error) {
// Return error to LLM with helpful message
return {
content: [
{
type: "text",
text: `Error: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
});python
undefinedtypescript
// TypeScript
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
const { name, arguments: args } = request.params;
if (name === "risky_operation") {
// 验证输入
if (!args.required_param) {
throw new Error("Missing required parameter: required_param");
}
// 执行操作并妥善处理错误
const result = await performRiskyOperation(args.required_param);
return {
content: [{ type: "text", text: JSON.stringify(result) }],
};
}
} catch (error) {
// 向LLM返回包含有用信息的错误
return {
content: [
{
type: "text",
text: `Error: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
});python
undefinedPython
Python
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "risky_operation":
if "required_param" not in arguments:
raise ValueError("Missing required parameter: required_param")
result = await perform_risky_operation(arguments["required_param"])
return [TextContent(type="text", text=json.dumps(result))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}", isError=True)]undefined@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "risky_operation":
if "required_param" not in arguments:
raise ValueError("Missing required parameter: required_param")
result = await perform_risky_operation(arguments["required_param"])
return [TextContent(type="text", text=json.dumps(result))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}", isError=True)]undefined2. Async Operations
2. 异步操作
typescript
// TypeScript - Async API calls
import axios from "axios";
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "fetch_api_data") {
const url = args.url as string;
const response = await axios.get(url);
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
});python
undefinedtypescript
// TypeScript - 异步API调用
import axios from "axios";
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "fetch_api_data") {
const url = args.url as string;
const response = await axios.get(url);
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
});python
undefinedPython - Async database queries
Python - 异步数据库查询
import aiosqlite
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "query_database":
query = arguments["query"]
async with aiosqlite.connect("database.db") as db:
async with db.execute(query) as cursor:
rows = await cursor.fetchall()
result = json.dumps(rows)
return [TextContent(type="text", text=result)]undefinedimport aiosqlite
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "query_database":
query = arguments["query"]
async with aiosqlite.connect("database.db") as db:
async with db.execute(query) as cursor:
rows = await cursor.fetchall()
result = json.dumps(rows)
return [TextContent(type="text", text=result)]undefined3. Environment Variables & Configuration
3. 环境变量与配置
typescript
// TypeScript - Load config from environment
import dotenv from "dotenv";
dotenv.config();
const API_KEY = process.env.API_KEY;
const BASE_URL = process.env.BASE_URL || "https://api.example.com";
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "api_call") {
const response = await fetch(`${BASE_URL}/endpoint`, {
headers: { Authorization: `Bearer ${API_KEY}` },
});
// ...
}
});python
undefinedtypescript
// TypeScript - 从环境变量加载配置
import dotenv from "dotenv";
dotenv.config();
const API_KEY = process.env.API_KEY;
const BASE_URL = process.env.BASE_URL || "https://api.example.com";
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "api_call") {
const response = await fetch(`${BASE_URL}/endpoint`, {
headers: { Authorization: `Bearer ${API_KEY}` },
});
// ...
}
});python
undefinedPython - Load config from environment
Python - 从环境变量加载配置
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "api_call":
headers = {"Authorization": f"Bearer {API_KEY}"}
async with aiohttp.ClientSession() as session:
async with session.get(f"{BASE_URL}/endpoint", headers=headers) as resp:
data = await resp.json()
return [TextContent(type="text", text=json.dumps(data))]
undefinedimport os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "api_call":
headers = {"Authorization": f"Bearer {API_KEY}"}
async with aiohttp.ClientSession() as session:
async with session.get(f"{BASE_URL}/endpoint", headers=headers) as resp:
data = await resp.json()
return [TextContent(type="text", text=json.dumps(data))]
undefined4. Streaming Responses (Large Data)
4. 流式响应(大数据)
typescript
// TypeScript - Stream large file contents
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "read_large_file") {
const filePath = request.params.arguments.path as string;
const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
let content = "";
for await (const chunk of stream) {
content += chunk;
// Could yield chunks incrementally in future MCP versions
}
return {
content: [{ type: "text", text: content }],
};
}
});typescript
// TypeScript - 流式传输大文件内容
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "read_large_file") {
const filePath = request.params.arguments.path as string;
const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
let content = "";
for await (const chunk of stream) {
content += chunk;
// 未来MCP版本可支持增量返回数据块
}
return {
content: [{ type: "text", text: content }],
};
}
});5. Dynamic Tool Registration
5. 动态工具注册
typescript
// TypeScript - Register tools from config
interface ToolConfig {
name: string;
description: string;
schema: object;
handler: (args: any) => Promise<any>;
}
const toolRegistry = new Map<string, ToolConfig>();
function registerTool(config: ToolConfig) {
toolRegistry.set(config.name, config);
}
// Register custom tools
registerTool({
name: "custom_tool",
description: "Dynamically registered tool",
schema: {
type: "object",
properties: {
input: { type: "string" },
},
},
handler: async (args) => {
return { result: `Processed: ${args.input}` };
},
});
// List tools dynamically
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: Array.from(toolRegistry.values()).map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.schema,
})),
};
});
// Call tools dynamically
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const tool = toolRegistry.get(request.params.name);
if (!tool) {
throw new Error(`Unknown tool: ${request.params.name}`);
}
const result = await tool.handler(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }],
};
});typescript
// TypeScript - 从配置注册工具
interface ToolConfig {
name: string;
description: string;
schema: object;
handler: (args: any) => Promise<any>;
}
const toolRegistry = new Map<string, ToolConfig>();
function registerTool(config: ToolConfig) {
toolRegistry.set(config.name, config);
}
// 注册自定义工具
registerTool({
name: "custom_tool",
description: "Dynamically registered tool",
schema: {
type: "object",
properties: {
input: { type: "string" },
},
},
handler: async (args) => {
return { result: `Processed: ${args.input}` };
},
});
// 动态列出工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: Array.from(toolRegistry.values()).map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.schema,
})),
};
});
// 动态调用工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const tool = toolRegistry.get(request.params.name);
if (!tool) {
throw new Error(`Unknown tool: ${request.params.name}`);
}
const result = await tool.handler(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }],
};
});Transport Types
传输类型
1. STDIO (Local Execution)
1. STDIO(本地执行)
Default for Claude Desktop integration. Server runs as subprocess.
json
// Claude Desktop config
{
"mcpServers": {
"my-server": {
"command": "node",
"args": ["/path/to/server/build/index.js"],
"env": {
"API_KEY": "your-api-key"
}
}
}
}Claude Desktop集成的默认方式。服务器作为子进程运行。
json
// Claude Desktop配置
{
"mcpServers": {
"my-server": {
"command": "node",
"args": ["/path/to/server/build/index.js"],
"env": {
"API_KEY": "your-api-key"
}
}
}
}2. SSE (Server-Sent Events)
2. SSE(服务器推送事件)
For long-running servers with HTTP transport.
typescript
// TypeScript SSE server
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";
const app = express();
const server = new Server(/* ... */);
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/messages", res);
await server.connect(transport);
});
app.post("/messages", async (req, res) => {
// Handle incoming messages
});
app.listen(3000, () => {
console.log("MCP server listening on http://localhost:3000");
});json
// Claude Desktop config for SSE
{
"mcpServers": {
"remote-server": {
"url": "http://localhost:3000/sse"
}
}
}适用于长期运行的HTTP传输服务器。
typescript
// TypeScript SSE服务器
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";
const app = express();
const server = new Server(/* ... */);
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/messages", res);
await server.connect(transport);
});
app.post("/messages", async (req, res) => {
// 处理传入消息
});
app.listen(3000, () => {
console.log("MCP server listening on http://localhost:3000");
});json
// Claude Desktop的SSE配置
{
"mcpServers": {
"remote-server": {
"url": "http://localhost:3000/sse"
}
}
}3. HTTP (Streamable HTTP)
3. HTTP(可流式传输的HTTP)
For REST-style MCP servers.
python
undefined适用于REST风格的MCP服务器。
python
undefinedPython HTTP server with FastAPI
基于FastAPI的Python HTTP服务器
from fastapi import FastAPI
from mcp.server.fastapi import create_fastapi_app
app = FastAPI()
mcp_app = Server("http-server")
from fastapi import FastAPI
from mcp.server.fastapi import create_fastapi_app
app = FastAPI()
mcp_app = Server("http-server")
Define tools/resources/prompts as usual
像往常一样定义工具/资源/提示词
...
...
Mount MCP routes
挂载MCP路由
app.mount("/mcp", create_fastapi_app(mcp_app))
if name == "main":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
undefinedapp.mount("/mcp", create_fastapi_app(mcp_app))
if name == "main":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
undefinedDebugging MCP Servers
调试MCP服务器
1. Server Logs
1. 服务器日志
typescript
// TypeScript - Console.error for logs (STDOUT is for MCP protocol)
server.setRequestHandler(CallToolRequestSchema, async (request) => {
console.error(`Tool called: ${request.params.name}`);
console.error(`Arguments: ${JSON.stringify(request.params.arguments)}`);
// ...
});python
undefinedtypescript
// TypeScript - 使用console.error记录日志(STDOUT用于MCP协议)
server.setRequestHandler(CallToolRequestSchema, async (request) => {
console.error(`Tool called: ${request.params.name}`);
console.error(`Arguments: ${JSON.stringify(request.params.arguments)}`);
// ...
});python
undefinedPython - Use logging module
Python - 使用logging模块
import logging
logging.basicConfig(level=logging.DEBUG, filename="mcp-server.log")
logger = logging.getLogger(name)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
logger.debug(f"Tool called: {name}")
logger.debug(f"Arguments: {arguments}")
# ...
undefinedimport logging
logging.basicConfig(level=logging.DEBUG, filename="mcp-server.log")
logger = logging.getLogger(name)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
logger.debug(f"Tool called: {name}")
logger.debug(f"Arguments: {arguments}")
// ...
undefined2. MCP Inspector
2. MCP Inspector
bash
undefinedbash
undefinedInstall MCP Inspector (browser-based debugging tool)
安装MCP Inspector(基于浏览器的调试工具)
npm install -g @modelcontextprotocol/inspector
npm install -g @modelcontextprotocol/inspector
Run inspector with your server
运行Inspector并连接你的服务器
mcp-inspector node /path/to/server/build/index.js
mcp-inspector node /path/to/server/build/index.js
Opens browser at http://localhost:6274
在浏览器中打开http://localhost:6274
- Test tools manually
- 手动测试工具
- See request/response payloads
- 查看请求/响应负载
- Debug JSON schema validation
- 调试JSON schema验证
undefinedundefined3. Claude Desktop Logs
3. Claude Desktop日志
bash
undefinedbash
undefinedmacOS
macOS
tail -f ~/Library/Logs/Claude/mcp*.log
tail -f ~/Library/Logs/Claude/mcp*.log
Windows
Windows
Check %APPDATA%/Claude/logs/
查看%APPDATA%/Claude/logs/目录
undefinedundefined4. Test Server Standalone
4. 单独测试服务器
typescript
// TypeScript - Add test harness
if (process.argv.includes("--test")) {
// Simulate tool call
const testRequest = {
params: {
name: "read_file",
arguments: { path: "./test.txt" },
},
};
server
.setRequestHandler(CallToolRequestSchema, async (request) => {
// ... your handler
})
.then((handler) => handler(testRequest))
.then((result) => console.log(JSON.stringify(result, null, 2)))
.catch((error) => console.error(error));
} else {
// Normal server startup
main();
}bash
undefinedtypescript
// TypeScript - 添加测试工具
if (process.argv.includes("--test")) {
// 模拟工具调用
const testRequest = {
params: {
name: "read_file",
arguments: { path: "./test.txt" },
},
};
server
.setRequestHandler(CallToolRequestSchema, async (request) => {
// ... 你的处理器逻辑
})
.then((handler) => handler(testRequest))
.then((result) => console.log(JSON.stringify(result, null, 2)))
.catch((error) => console.error(error));
} else {
// 正常启动服务器
main();
}bash
undefinedTest without Claude Desktop
无需Claude Desktop即可测试
node build/index.js --test
undefinednode build/index.js --test
undefinedReal-World Server Examples
真实场景服务器示例
1. GitHub Integration Server
1. GitHub集成服务器
typescript
// github-server/src/index.ts
import { Octokit } from "@octokit/rest";
const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "list_repos",
description: "List repositories for a user/org",
inputSchema: {
type: "object",
properties: {
owner: { type: "string", description: "Username or org name" },
},
required: ["owner"],
},
},
{
name: "create_issue",
description: "Create a GitHub issue",
inputSchema: {
type: "object",
properties: {
owner: { type: "string" },
repo: { type: "string" },
title: { type: "string" },
body: { type: "string" },
},
required: ["owner", "repo", "title"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "list_repos") {
const { data } = await octokit.repos.listForUser({
username: args.owner as string,
});
const repos = data.map((r) => ({
name: r.name,
description: r.description,
stars: r.stargazers_count,
}));
return {
content: [{ type: "text", text: JSON.stringify(repos, null, 2) }],
};
}
if (name === "create_issue") {
const { data } = await octokit.issues.create({
owner: args.owner as string,
repo: args.repo as string,
title: args.title as string,
body: args.body as string,
});
return {
content: [
{ type: "text", text: `Issue created: ${data.html_url}` },
],
};
}
});typescript
// github-server/src/index.ts
import { Octokit } from "@octokit/rest";
const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "list_repos",
description: "List repositories for a user/org",
inputSchema: {
type: "object",
properties: {
owner: { type: "string", description: "Username or org name" },
},
required: ["owner"],
},
},
{
name: "create_issue",
description: "Create a GitHub issue",
inputSchema: {
type: "object",
properties: {
owner: { type: "string" },
repo: { type: "string" },
title: { type: "string" },
body: { type: "string" },
},
required: ["owner", "repo", "title"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "list_repos") {
const { data } = await octokit.repos.listForUser({
username: args.owner as string,
});
const repos = data.map((r) => ({
name: r.name,
description: r.description,
stars: r.stargazers_count,
}));
return {
content: [{ type: "text", text: JSON.stringify(repos, null, 2) }],
};
}
if (name === "create_issue") {
const { data } = await octokit.issues.create({
owner: args.owner as string,
repo: args.repo as string,
title: args.title as string,
body: args.body as string,
});
return {
content: [
{ type: "text", text: `Issue created: ${data.html_url}` },
],
};
}
});2. Database Query Server
2. 数据库查询服务器
python
undefinedpython
undefineddatabase-server/server.py
database-server/server.py
import asyncpg
import json
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("database-server")
async def get_db_pool():
return await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password"
)
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description="Execute SQL query (SELECT only)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL query"},
},
"required": ["sql"],
},
),
Tool(
name="list_tables",
description="List all tables in database",
inputSchema={"type": "object", "properties": {}},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
pool = await get_db_pool()
if name == "query":
sql = arguments["sql"]
# Security: Only allow SELECT
if not sql.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries allowed")
async with pool.acquire() as conn:
rows = await conn.fetch(sql)
result = [dict(row) for row in rows]
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "list_tables":
async with pool.acquire() as conn:
tables = await conn.fetch("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
""")
result = [row["table_name"] for row in tables]
return [TextContent(type="text", text=json.dumps(result, indent=2))]undefinedimport asyncpg
import json
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("database-server")
async def get_db_pool():
return await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password"
)
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description="Execute SQL query (SELECT only)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL query"},
},
"required": ["sql"],
},
),
Tool(
name="list_tables",
description="List all tables in database",
inputSchema={"type": "object", "properties": {}},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
pool = await get_db_pool()
if name == "query":
sql = arguments["sql"]
# 安全:仅允许SELECT查询
if not sql.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries allowed")
async with pool.acquire() as conn:
rows = await conn.fetch(sql)
result = [dict(row) for row in rows]
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "list_tables":
async with pool.acquire() as conn:
tables = await conn.fetch("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
""")
result = [row["table_name"] for row in tables]
return [TextContent(type="text", text=json.dumps(result, indent=2))]undefined3. Web Scraper Server
3. 网页抓取服务器
typescript
// scraper-server/src/index.ts
import * as cheerio from "cheerio";
import axios from "axios";
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "scrape_url",
description: "Scrape content from a URL",
inputSchema: {
type: "object",
properties: {
url: { type: "string", description: "URL to scrape" },
selector: {
type: "string",
description: "CSS selector for content",
},
},
required: ["url"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "scrape_url") {
const url = request.params.arguments.url as string;
const selector = request.params.arguments.selector as string;
const response = await axios.get(url);
const $ = cheerio.load(response.data);
let content: string;
if (selector) {
content = $(selector).text();
} else {
content = $("body").text();
}
return {
content: [
{
type: "text",
text: content.trim(),
},
],
};
}
});typescript
// scraper-server/src/index.ts
import * as cheerio from "cheerio";
import axios from "axios";
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "scrape_url",
description: "Scrape content from a URL",
inputSchema: {
type: "object",
properties: {
url: { type: "string", description: "URL to scrape" },
selector: {
type: "string",
description: "CSS selector for content",
},
},
required: ["url"],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "scrape_url") {
const url = request.params.arguments.url as string;
const selector = request.params.arguments.selector as string;
const response = await axios.get(url);
const $ = cheerio.load(response.data);
let content: string;
if (selector) {
content = $(selector).text();
} else {
content = $("body").text();
}
return {
content: [
{
type: "text",
text: content.trim(),
},
],
};
}
});Best Practices
最佳实践
1. Tool Design
1. 工具设计
✅ DO:
- Use clear, descriptive tool names (not
read_file)rf - Provide detailed descriptions for LLM understanding
- Define comprehensive JSON schemas with descriptions
- Return structured data (JSON) when possible
- Handle errors gracefully with helpful messages
❌ DON'T:
- Create overly broad tools (split complex operations)
- Return massive payloads (paginate large datasets)
- Use ambiguous parameter names
- Assume LLM knows your domain-specific terminology
✅ 推荐做法:
- 使用清晰、描述性的工具名称(如而非
read_file)rf - 提供详细描述,帮助LLM理解工具用途
- 定义带描述信息的完整JSON schema
- 尽可能返回结构化数据(JSON格式)
- 优雅处理错误并返回有用的错误信息
❌ 不推荐做法:
- 创建过于宽泛的工具(拆分复杂操作)
- 返回过大的负载(对大数据集进行分页)
- 使用模糊的参数名称
- 假设LLM了解你的领域特定术语
2. Security
2. 安全
Critical Rules:
- Validate ALL inputs (type, range, format)
- Sanitize file paths (prevent directory traversal)
- Use allowlists for commands/operations
- Never expose sensitive credentials in responses
- Implement rate limiting for expensive operations
- Use read-only access by default
typescript
// Example: Path validation
function validatePath(inputPath: string): string {
const normalized = path.normalize(inputPath);
const allowed = path.resolve("/safe/directory");
if (!normalized.startsWith(allowed)) {
throw new Error("Path outside allowed directory");
}
return normalized;
}关键规则:
- 验证所有输入(类型、范围、格式)
- 清理文件路径(防止目录遍历攻击)
- 对命令/操作使用白名单
- 绝不在响应中暴露敏感凭证
- 对高开销操作实现速率限制
- 默认使用只读访问权限
typescript
// 示例:路径验证
function validatePath(inputPath: string): string {
const normalized = path.normalize(inputPath);
const allowed = path.resolve("/safe/directory");
if (!normalized.startsWith(allowed)) {
throw new Error("Path outside allowed directory");
}
return normalized;
}3. Performance
3. 性能
- Cache expensive operations
- Stream large responses when possible
- Use pagination for list operations
- Set reasonable timeouts
- Implement request queuing for rate-limited APIs
- 缓存高开销操作的结果
- 尽可能流式返回大响应
- 对列表操作使用分页
- 设置合理的超时时间
- 对受速率限制的API实现请求排队
4. Testing
4. 测试
bash
undefinedbash
undefinedUse MCP Inspector for manual testing
使用MCP Inspector进行手动测试
mcp-inspector node build/index.js
mcp-inspector node build/index.js
Unit test tool handlers
单元测试工具处理器
npm test
npm test
Integration test with Claude Desktop
与Claude Desktop进行集成测试
1. Add to config
1. 添加到配置文件
2. Restart Claude
2. 重启Claude
3. Test in conversation
3. 在对话中测试
undefinedundefined5. Documentation
5. 文档
- Document all tools in code comments
- Provide example usage in descriptions
- Include error scenarios in documentation
- Maintain a CHANGELOG for server updates
- 在代码注释中记录所有工具
- 在描述中提供示例用法
- 在文档中包含错误场景
- 为服务器更新维护CHANGELOG
Common Pitfalls
常见陷阱
❌ Writing to STDOUT (breaks MCP protocol):
typescript
// WRONG
console.log("Debug message"); // STDOUT is for MCP protocol
// CORRECT
console.error("Debug message"); // STDERR for logs❌ Not handling errors:
typescript
// WRONG - unhandled promise rejection
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const data = await riskyOperation(); // Can throw
return { content: [{ type: "text", text: data }] };
});
// CORRECT
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
const data = await riskyOperation();
return { content: [{ type: "text", text: data }] };
} catch (error) {
return {
content: [{ type: "text", text: `Error: ${error.message}` }],
isError: true,
};
}
});❌ Blocking operations:
python
undefined❌ 向STDOUT输出内容(破坏MCP协议):
typescript
// 错误写法
console.log("Debug message"); // STDOUT用于MCP协议通信
// 正确写法
console.error("Debug message"); // STDERR用于日志输出❌ 不处理错误:
typescript
// 错误写法 - 未处理的Promise拒绝
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const data = await riskyOperation(); // 可能抛出错误
return { content: [{ type: "text", text: data }] };
});
// 正确写法
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
const data = await riskyOperation();
return { content: [{ type: "text", text: data }] };
} catch (error) {
return {
content: [{ type: "text", text: `Error: ${error.message}` }],
isError: true,
};
}
});❌ 阻塞操作:
python
undefinedWRONG - synchronous file read blocks event loop
错误写法 - 同步文件读取阻塞事件循环
@app.call_tool()
async def call_tool(name: str, arguments: dict):
with open("large_file.txt", "r") as f: # Blocks!
content = f.read()
return [TextContent(type="text", text=content)]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
with open("large_file.txt", "r") as f: # 阻塞!
content = f.read()
return [TextContent(type="text", text=content)]
CORRECT - async file I/O
正确写法 - 异步文件I/O
@app.call_tool()
async def call_tool(name: str, arguments: dict):
async with aiofiles.open("large_file.txt", "r") as f:
content = await f.read()
return [TextContent(type="text", text=content)]
❌ **Missing required fields in schema**:
```typescript
// WRONG - missing "required" field
{
name: "search",
inputSchema: {
type: "object",
properties: {
query: { type: "string" }
}
// Missing: required: ["query"]
}
}@app.call_tool()
async def call_tool(name: str, arguments: dict):
async with aiofiles.open("large_file.txt", "r") as f:
content = await f.read()
return [TextContent(type="text", text=content)]
❌ **Schema中缺少必填字段**:
```typescript
// 错误写法 - 缺少"required"字段
{
name: "search",
inputSchema: {
type: "object",
properties: {
query: { type: "string" }
}
// 缺少: required: ["query"]
}
}Resources
资源
- Official Docs: https://modelcontextprotocol.io
- TypeScript SDK: https://github.com/modelcontextprotocol/typescript-sdk
- Python SDK: https://github.com/modelcontextprotocol/python-sdk
- Server Examples: https://github.com/modelcontextprotocol/servers
- MCP Inspector: https://github.com/modelcontextprotocol/inspector
- Specification: https://spec.modelcontextprotocol.io
- 官方文档: https://modelcontextprotocol.io
- TypeScript SDK: https://github.com/modelcontextprotocol/typescript-sdk
- Python SDK: https://github.com/modelcontextprotocol/python-sdk
- 服务器示例: https://github.com/modelcontextprotocol/servers
- MCP Inspector: https://github.com/modelcontextprotocol/inspector
- 规范: https://spec.modelcontextprotocol.io
Related Skills
相关技能
When building MCP servers, consider these complementary skills:
- typescript-core: TypeScript type safety, tsconfig optimization, and advanced patterns
- asyncio: Python async patterns for MCP servers with async/await
- openrouter: Alternative LLM API integration for multi-model support
构建MCP服务器时,可考虑以下互补技能:
- typescript-core: TypeScript类型安全、tsconfig优化和高级模式
- asyncio: 用于MCP服务器的Python异步模式(async/await)
- openrouter: 替代LLM API集成,支持多模型
Quick TypeScript MCP Patterns (Inlined for Standalone Use)
快速TypeScript MCP模式(独立使用)
typescript
// Type-safe MCP server with TypeScript
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
// Define tool schemas with runtime validation
const SearchSchema = z.object({
query: z.string().min(1).max(500),
limit: z.number().int().min(1).max(100).default(10),
});
type SearchArgs = z.infer<typeof SearchSchema>;
class TypeSafeMCPServer {
private server: Server;
constructor() {
this.server = new Server(
{
name: 'typed-search-server',
version: '1.0.0',
},
{
capabilities: {
tools: {},
},
}
);
this.setupToolHandlers();
}
private setupToolHandlers() {
// List tools with full type inference
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'search',
description: 'Search with type-safe parameters',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', minLength: 1, maxLength: 500 },
limit: { type: 'number', minimum: 1, maximum: 100, default: 10 },
},
required: ['query'],
},
},
],
}));
// Type-safe tool execution
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === 'search') {
// Runtime validation with Zod
const args = SearchSchema.parse(request.params.arguments);
// Type-safe implementation
const results = await this.performSearch(args);
return {
content: [{ type: 'text', text: JSON.stringify(results) }],
};
}
throw new Error(`Unknown tool: ${request.params.name}`);
});
}
private async performSearch(args: SearchArgs): Promise<Array<{ title: string; url: string }>> {
// Implementation with full type safety
// args.query is string, args.limit is number
return [];
}
async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
}
}
// Start server
const server = new TypeSafeMCPServer();
server.run();typescript
// 基于TypeScript的类型安全MCP服务器
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
// 使用运行时验证定义工具Schema
const SearchSchema = z.object({
query: z.string().min(1).max(500),
limit: z.number().int().min(1).max(100).default(10),
});
type SearchArgs = z.infer<typeof SearchSchema>;
class TypeSafeMCPServer {
private server: Server;
constructor() {
this.server = new Server(
{
name: 'typed-search-server',
version: '1.0.0',
},
{
capabilities: {
tools: {},
},
}
);
this.setupToolHandlers();
}
private setupToolHandlers() {
// 利用完整类型推断列出工具
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'search',
description: 'Search with type-safe parameters',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', minLength: 1, maxLength: 500 },
limit: { type: 'number', minimum: 1, maximum: 100, default: 10 },
},
required: ['query'],
},
},
],
}));
// 类型安全的工具执行
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === 'search') {
// 使用Zod进行运行时验证
const args = SearchSchema.parse(request.params.arguments);
// 类型安全的实现
const results = await this.performSearch(args);
return {
content: [{ type: 'text', text: JSON.stringify(results) }],
};
}
throw new Error(`Unknown tool: ${request.params.name}`);
});
}
private async performSearch(args: SearchArgs): Promise<Array<{ title: string; url: string }>> {
// 完整类型安全的实现
// args.query是string类型,args.limit是number类型
return [];
}
async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
}
}
// 启动服务器
const server = new TypeSafeMCPServer();
server.run();Quick Python Async MCP Patterns (Inlined for Standalone Use)
快速Python异步MCP模式(独立使用)
python
undefinedpython
// 基于Python的异步MCP服务器
import asyncio
import logging
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field
// 使用Pydantic定义类型安全的参数模型
class SearchArgs(BaseModel):
query: str = Field(..., min_length=1, max_length=500)
limit: int = Field(10, ge=1, le=100)
class AsyncMCPServer:
def __init__(self):
self.server = Server("async-search-server")
self.setup_handlers()
def setup_handlers(self):
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="search",
description="Search with async processing",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "minLength": 1, "maxLength": 500},
"limit": {"type": "number", "minimum": 1, "maximum": 100, "default": 10},
},
"required": ["query"],
},
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
if name == "search":
// 使用Pydantic验证参数
args = SearchArgs(**arguments)
// 异步实现
results = await self.perform_search(args)
return [
TextContent(
type="text",
text=str(results)
)
]
raise ValueError(f"Unknown tool: {name}")
async def perform_search(self, args: SearchArgs) -> list[dict[str, str]]:
"""异步搜索实现"""
// 模拟异步I/O
await asyncio.sleep(0.1)
// 使用已验证的参数(args.query是str类型,args.limit是int类型)
return [
{"title": f"Result for {args.query}", "url": "https://example.com"}
]
async def main():
"""运行异步MCP服务器"""
logging.basicConfig(level=logging.INFO)
server = AsyncMCPServer()
// 使用stdio传输连接Claude Desktop
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
server.server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())Async MCP server with Python
快速多语言MCP模式(独立使用)
import asyncio
import logging
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field
TypeScript vs Python 对比:
| 特性 | TypeScript | Python |
|---|---|---|
| 类型安全 | 编译时+运行时(Zod) | 仅运行时(Pydantic) |
| 性能 | 启动更快,Node.js开销 | 启动较慢,CPU任务表现更好 |
| 异步支持 | 原生async/await,事件循环 | asyncio,适合I/O密集型任务 |
| 生态系统 | npm包,前端工具 | 数据科学、ML库 |
| 最佳适用场景 | Web API、实时工具 | 数据处理、ML集成 |
两种语言的通用模式:
-
输入验证
- TypeScript: Zod schema
- Python: Pydantic模型
-
错误处理
- 两者:使用try/catch捕获特定错误类型
- 在MCP响应中返回错误内容
-
资源管理
- TypeScript: async/await结合try/finally
- Python: 异步上下文管理器
-
测试
- TypeScript: Vitest/Jest结合模拟传输
- Python: pytest结合pytest-asyncio
选择实现语言:
typescript
// TypeScript - 最适合:
// - 文件系统操作
// - 网页抓取/HTTP请求
// - JSON/API处理
// - 实时数据流
// Python - 最适合:
// - 数据分析(pandas、numpy)
// - 机器学习(scikit-learn、torch)
// - 数据库ETL操作
// - 科学计算[完整的TypeScript、Python异步和OpenRouter模式可在相关技能中获取(若一起部署)]
Type-safe argument models with Pydantic
本地MCP集成模式(你的代码库)
—
.mcp.json布局(项目范围)
class SearchArgs(BaseModel):
query: str = Field(..., min_length=1, max_length=500)
limit: int = Field(10, ge=1, le=100)
class AsyncMCPServer:
def init(self):
self.server = Server("async-search-server")
self.setup_handlers()
def setup_handlers(self):
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="search",
description="Search with async processing",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "minLength": 1, "maxLength": 500},
"limit": {"type": "number", "minimum": 1, "maximum": 100, "default": 10},
},
"required": ["query"],
},
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
if name == "search":
# Validate with Pydantic
args = SearchArgs(**arguments)
# Async implementation
results = await self.perform_search(args)
return [
TextContent(
type="text",
text=str(results)
)
]
raise ValueError(f"Unknown tool: {name}")
async def perform_search(self, args: SearchArgs) -> list[dict[str, str]]:
"""Async search implementation"""
# Simulate async I/O
await asyncio.sleep(0.1)
# Use validated args (args.query is str, args.limit is int)
return [
{"title": f"Result for {args.query}", "url": "https://example.com"}
]async def main():
"""Run async MCP server"""
logging.basicConfig(level=logging.INFO)
server = AsyncMCPServer()
# Use stdio transport for Claude Desktop
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
server.server.create_initialization_options()
)if name == "main":
asyncio.run(main())
undefinedjson
{
"mcpServers": {
"kuzu-memory": {
"type": "stdio",
"command": "kuzu-memory",
"args": ["mcp"]
},
"mcp-vector-search": {
"type": "stdio",
"command": "uv",
"args": ["run", "mcp-vector-search", "mcp"],
"env": {
"MCP_ENABLE_FILE_WATCHING": "true"
}
}
}
}Quick Multi-Language MCP Patterns (Inlined for Standalone Use)
CLI约定
TypeScript vs Python Trade-offs:
| Feature | TypeScript | Python |
|---|---|---|
| Type Safety | Compile-time + runtime (Zod) | Runtime only (Pydantic) |
| Performance | Faster startup, Node.js overhead | Slower startup, better for CPU tasks |
| Async Support | Native async/await, event loop | asyncio, great for I/O |
| Ecosystem | npm packages, frontend tools | Data science, ML libraries |
| Best For | Web APIs, real-time tools | Data processing, ML integration |
Common Patterns Across Both:
-
Input Validation
- TypeScript: Zod schemas
- Python: Pydantic models
-
Error Handling
- Both: Try/catch with specific error types
- Return error content in MCP response
-
Resource Management
- TypeScript: async/await with try/finally
- Python: async context managers
-
Testing
- TypeScript: Vitest/Jest with mock transport
- Python: pytest with pytest-asyncio
Choosing Implementation Language:
typescript
// TypeScript - Best for:
// - File system operations
// - Web scraping/HTTP requests
// - JSON/API manipulation
// - Real-time data streams
// Python - Best for:
// - Data analysis (pandas, numpy)
// - Machine learning (scikit-learn, torch)
// - Database ETL operations
// - Scientific computing[Full TypeScript, Python async, and OpenRouter patterns available in respective skills if deployed together]
- 为stdio服务器提供子命令。
mcp - 使用命令进行端到端初始化和集成(如mcp-vector-search、kuzu-memory)。
setup - 使用/
install命令针对特定客户端(如mcp-ticketer)。uninstall - 提供命令验证依赖。
doctor
Local MCP Integration Patterns (Your Repos)
环境变量约定
.mcp.json Layout (Project-Scoped)
—
json
{
"mcpServers": {
"kuzu-memory": {
"type": "stdio",
"command": "kuzu-memory",
"args": ["mcp"]
},
"mcp-vector-search": {
"type": "stdio",
"command": "uv",
"args": ["run", "mcp-vector-search", "mcp"],
"env": {
"MCP_ENABLE_FILE_WATCHING": "true"
}
}
}
}- 文件监听:
MCP_ENABLE_FILE_WATCHING=true - Kuzu内存: ,
KUZU_MEMORY_PROJECT_ROOTKUZU_MEMORY_DB - Ticketer适配器: ,
MCP_TICKETER_ADAPTER,GITHUB_TOKEN,GITHUB_OWNERGITHUB_REPO
CLI Conventions
常见服务器名称
- Provide a subcommand for stdio servers.
mcp - Use for end-to-end init + integration (mcp-vector-search, kuzu-memory).
setup - Use /
installto target specific clients (mcp-ticketer).uninstall - Provide commands to validate dependencies.
doctor
kuzu-memorymcp-vector-searchmcp-ticketermcp-skillsetmcp-browserEnvironment Conventions
总结
- File watching:
MCP_ENABLE_FILE_WATCHING=true - Kuzu memory: ,
KUZU_MEMORY_PROJECT_ROOTKUZU_MEMORY_DB - Ticketer adapters: ,
MCP_TICKETER_ADAPTER,GITHUB_TOKEN,GITHUB_OWNERGITHUB_REPO
- MCP 支持借助工具、资源和提示词开发原生AI服务器
- 工具 是LLM可调用的函数(读取文件、查询API、执行操作)
- 资源 提供只读数据访问(文件、数据库、文档)
- 提示词 是适用于常见任务的带参数可复用模板
- 提供 SDK 支持TypeScript和Python,具备完整异步能力
- 通过JSON配置实现与 Claude Desktop 的集成(STDIO传输)
- 使用 MCP Inspector、服务器日志和单独测试进行调试
- 安全 至关重要:验证输入、清理路径、使用白名单
- 最佳实践:清晰命名、完整Schema、错误处理、性能优化
Common Server Names
—
kuzu-memorymcp-vector-searchmcp-ticketermcp-skillsetmcp-browser—
Summary
—
- MCP enables AI-native server development with tools, resources, and prompts
- Tools are functions LLMs can call (read files, query APIs, run operations)
- Resources provide read-only data access (files, databases, documentation)
- Prompts are reusable templates with arguments for common tasks
- SDKs available for TypeScript and Python with full async support
- Claude Desktop integration via JSON config (STDIO transport)
- Debugging with MCP Inspector, server logs, and standalone testing
- Security critical: validate inputs, sanitize paths, use allowlists
- Best practices: Clear naming, comprehensive schemas, error handling, performance optimization
—