mcp-protocol-builder

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MCP (Model Context Protocol) - AI-Native Server Development

MCP (Model Context Protocol) - AI原生服务器开发

Overview

概述

Model Context Protocol (MCP) is an open standard for connecting AI assistants to external data sources and tools. Build servers that expose tools (functions LLMs can call), resources (data LLMs can read), and prompts (templates LLMs can use).
Key Concepts:
  • Tools: Functions LLMs can execute (read files, query APIs, run commands)
  • Resources: Data sources LLMs can access (files, databases, APIs)
  • Prompts: Reusable templates with arguments for common tasks
  • Client-Server: MCP servers expose capabilities, clients (like Claude Desktop) consume them
  • Transport: STDIO (local), SSE (Server-Sent Events), HTTP (network)
Official SDKs:
  • TypeScript:
    @modelcontextprotocol/sdk
  • Python:
    mcp
Installation:
bash
undefined
Model Context Protocol(MCP)是一个用于将AI助手连接到外部数据源和工具的开放标准。构建可暴露工具(大语言模型可调用的函数)、资源(大语言模型可读取的数据)和提示词(大语言模型可使用的模板)的服务器。
核心概念:
  • 工具: 大语言模型可执行的函数(读取文件、查询API、运行命令)
  • 资源: 大语言模型可访问的数据源(文件、数据库、API)
  • 提示词: 带参数的可复用模板,适用于常见任务
  • 客户端-服务器架构: MCP服务器对外暴露能力,客户端(如Claude Desktop)调用这些能力
  • 传输方式: STDIO(本地)、SSE(Server-Sent Events)、HTTP(网络)
官方SDK:
  • TypeScript:
    @modelcontextprotocol/sdk
  • Python:
    mcp
安装方式:
bash
undefined

TypeScript server

TypeScript服务器

npx @modelcontextprotocol/create-server@latest my-server cd my-server && npm install
npx @modelcontextprotocol/create-server@latest my-server cd my-server && npm install

Python server

Python服务器

pip install mcp
pip install mcp

Or use uv (recommended)

推荐使用uv

uv pip install mcp
undefined
uv pip install mcp
undefined

Quick Start - TypeScript Server

快速开始 - TypeScript服务器

1. Create Server with CLI

1. 使用CLI创建服务器

bash
undefined
bash
undefined

Interactive setup

交互式设置

npx @modelcontextprotocol/create-server@latest my-filesystem-server
npx @modelcontextprotocol/create-server@latest my-filesystem-server

Options prompt:

选项提示:

- Server name: my-filesystem-server

- 服务器名称: my-filesystem-server

- Language: TypeScript

- 开发语言: TypeScript

- Include example tools: Yes

- 包含示例工具: 是

undefined
undefined

2. Define Tools

2. 定义工具

typescript
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";

const server = new Server(
  {
    name: "filesystem-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "Read contents of a file",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "Path to the file to read",
            },
          },
          required: ["path"],
        },
      },
      {
        name: "list_directory",
        description: "List contents of a directory",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "Directory path to list",
            },
          },
          required: ["path"],
        },
      },
    ],
  };
});

// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file": {
      const filePath = args.path as string;
      const content = await fs.readFile(filePath, "utf-8");
      return {
        content: [{ type: "text", text: content }],
      };
    }

    case "list_directory": {
      const dirPath = args.path as string;
      const entries = await fs.readdir(dirPath, { withFileTypes: true });
      const listing = entries
        .map((entry) => `${entry.isDirectory() ? "📁" : "📄"} ${entry.name}`)
        .join("\n");
      return {
        content: [{ type: "text", text: listing }],
      };
    }

    default:
      throw new Error(`Unknown tool: ${name}`);
  }
});

// Start server
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Filesystem MCP server running on stdio");
}

main();
typescript
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";

const server = new Server(
  {
    name: "filesystem-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 列出可用工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "读取文件内容",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "要读取的文件路径",
            },
          },
          required: ["path"],
        },
      },
      {
        name: "list_directory",
        description: "列出目录内容",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "要列出的目录路径",
            },
          },
          required: ["path"],
        },
      },
    ],
  };
});

// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file": {
      const filePath = args.path as string;
      const content = await fs.readFile(filePath, "utf-8");
      return {
        content: [{ type: "text", text: content }],
      };
    }

    case "list_directory": {
      const dirPath = args.path as string;
      const entries = await fs.readdir(dirPath, { withFileTypes: true });
      const listing = entries
        .map((entry) => `${entry.isDirectory() ? "📁" : "📄"} ${entry.name}`)
        .join("\n");
      return {
        content: [{ type: "text", text: listing }],
      };
    }

    default:
      throw new Error(`未知工具: ${name}`);
  }
});

// 启动服务器
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("文件系统MCP服务器通过stdio运行中");
}

main();

3. Configure Claude Desktop

3. 配置Claude Desktop

json
// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
// %APPDATA%/Claude/claude_desktop_config.json (Windows)
{
  "mcpServers": {
    "filesystem": {
      "command": "node",
      "args": ["/absolute/path/to/my-filesystem-server/build/index.js"]
    }
  }
}
json
// macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
// Windows: %APPDATA%/Claude/claude_desktop_config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "node",
      "args": ["/absolute/path/to/my-filesystem-server/build/index.js"]
    }
  }
}

4. Build and Test

4. 构建并测试

bash
undefined
bash
undefined

Build TypeScript

构建TypeScript项目

npm run build
npm run build

Restart Claude Desktop (Cmd+Q and reopen)

重启Claude Desktop(Cmd+Q后重新打开)

Server appears in 🔌 menu

服务器会出现在🔌菜单中

undefined
undefined

Quick Start - Python Server

快速开始 - Python服务器

1. Create Server

1. 创建服务器

python
undefined
python
undefined

server.py

server.py

import asyncio from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent import json import os
import asyncio from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent import json import os

Create server instance

创建服务器实例

app = Server("filesystem-server")
app = Server("filesystem-server")

Define tools

定义工具

@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="read_file", description="Read contents of a file", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path to read"} }, "required": ["path"], }, ), Tool( name="list_directory", description="List directory contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "Directory path"} }, "required": ["path"], }, ), ]
@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="read_file", description="读取文件内容", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "要读取的文件路径"} }, "required": ["path"], }, ), Tool( name="list_directory", description="列出目录内容", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "目录路径"} }, "required": ["path"], }, ), ]

Handle tool calls

处理工具调用

@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "read_file": file_path = arguments["path"] with open(file_path, "r") as f: content = f.read() return [TextContent(type="text", text=content)]
elif name == "list_directory":
    dir_path = arguments["path"]
    entries = os.listdir(dir_path)
    listing = "\n".join(
        f"{'📁' if os.path.isdir(os.path.join(dir_path, e)) else '📄'} {e}"
        for e in entries
    )
    return [TextContent(type="text", text=listing)]

else:
    raise ValueError(f"Unknown tool: {name}")
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "read_file": file_path = arguments["path"] with open(file_path, "r") as f: content = f.read() return [TextContent(type="text", text=content)]
elif name == "list_directory":
    dir_path = arguments["path"]
    entries = os.listdir(dir_path)
    listing = "\n".join(
        f"{'📁' if os.path.isdir(os.path.join(dir_path, e)) else '📄'} {e}"
        for e in entries
    )
    return [TextContent(type="text", text=listing)]

else:
    raise ValueError(f"未知工具: {name}")

Start server

启动服务器

async def main(): async with stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options())
if name == "main": asyncio.run(main())
undefined
async def main(): async with stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options())
if name == "main": asyncio.run(main())
undefined

2. Configure Claude Desktop

2. 配置Claude Desktop

json
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["/absolute/path/to/server.py"]
    }
  }
}
json
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["/absolute/path/to/server.py"]
    }
  }
}

3. Test

3. 测试

bash
undefined
bash
undefined

Test server standalone

单独测试服务器

python server.py
python server.py

Restart Claude Desktop

重启Claude Desktop

Tools appear in 🔌 menu

工具会出现在🔌菜单中

undefined
undefined

Resources - Exposing Data to LLMs

资源 - 向大语言模型暴露数据

Resources provide read-only access to data sources.
资源提供对数据源的只读访问权限。

TypeScript Resource Example

TypeScript资源示例

typescript
import {
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// List available resources
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "file:///docs/readme.md",
        name: "README",
        description: "Project README documentation",
        mimeType: "text/markdown",
      },
      {
        uri: "file:///config/settings.json",
        name: "Settings",
        description: "Application settings",
        mimeType: "application/json",
      },
    ],
  };
});

// Handle resource reads
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const uri = request.params.uri;

  if (uri.startsWith("file://")) {
    const filePath = uri.replace("file://", "");
    const content = await fs.readFile(filePath, "utf-8");
    return {
      contents: [
        {
          uri,
          mimeType: "text/plain",
          text: content,
        },
      ],
    };
  }

  throw new Error(`Unknown resource: ${uri}`);
});
typescript
import {
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// 列出可用资源
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "file:///docs/readme.md",
        name: "README",
        description: "项目README文档",
        mimeType: "text/markdown",
      },
      {
        uri: "file:///config/settings.json",
        name: "Settings",
        description: "应用设置",
        mimeType: "application/json",
      },
    ],
  };
});

// 处理资源读取请求
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const uri = request.params.uri;

  if (uri.startsWith("file://")) {
    const filePath = uri.replace("file://", "");
    const content = await fs.readFile(filePath, "utf-8");
    return {
      contents: [
        {
          uri,
          mimeType: "text/plain",
          text: content,
        },
      ],
    };
  }

  throw new Error(`未知资源: ${uri}`);
});

Python Resource Example

Python资源示例

python
from mcp.types import Resource, ResourceContent

@app.list_resources()
async def list_resources() -> list[Resource]:
    return [
        Resource(
            uri="file:///docs/readme.md",
            name="README",
            description="Project README",
            mimeType="text/markdown",
        ),
        Resource(
            uri="file:///config/settings.json",
            name="Settings",
            description="App settings",
            mimeType="application/json",
        ),
    ]

@app.read_resource()
async def read_resource(uri: str) -> ResourceContent:
    if uri.startswith("file://"):
        file_path = uri.replace("file://", "")
        with open(file_path, "r") as f:
            content = f.read()
        return ResourceContent(uri=uri, mimeType="text/plain", text=content)

    raise ValueError(f"Unknown resource: {uri}")
python
from mcp.types import Resource, ResourceContent

@app.list_resources()
async def list_resources() -> list[Resource]:
    return [
        Resource(
            uri="file:///docs/readme.md",
            name="README",
            description="项目README文档",
            mimeType="text/markdown",
        ),
        Resource(
            uri="file:///config/settings.json",
            name="Settings",
            description="应用设置",
            mimeType="application/json",
        ),
    ]

@app.read_resource()
async def read_resource(uri: str) -> ResourceContent:
    if uri.startswith("file://"):
        file_path = uri.replace("file://", "")
        with open(file_path, "r") as f:
            content = f.read()
        return ResourceContent(uri=uri, mimeType="text/plain", text=content)

    raise ValueError(f"未知资源: {uri}")

Prompts - Reusable Templates

提示词 - 可复用模板

Prompts are templates that LLMs can use with arguments.
提示词是大语言模型可带参数使用的模板。

TypeScript Prompt Example

TypeScript提示词示例

typescript
import {
  ListPromptsRequestSchema,
  GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// List prompts
server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [
      {
        name: "code_review",
        description: "Review code for best practices",
        arguments: [
          {
            name: "language",
            description: "Programming language",
            required: true,
          },
          {
            name: "code",
            description: "Code to review",
            required: true,
          },
        ],
      },
    ],
  };
});

// Handle prompt requests
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "code_review") {
    const language = args?.language || "unknown";
    const code = args?.code || "";

    return {
      messages: [
        {
          role: "user",
          content: {
            type: "text",
            text: `Review this ${language} code for best practices, security issues, and improvements:\n\n\`\`\`${language}\n${code}\n\`\`\``,
          },
        },
      ],
    };
  }

  throw new Error(`Unknown prompt: ${name}`);
});
typescript
import {
  ListPromptsRequestSchema,
  GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// 列出提示词
server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [
      {
        name: "code_review",
        description: "针对最佳实践评审代码",
        arguments: [
          {
            name: "language",
            description: "编程语言",
            required: true,
          },
          {
            name: "code",
            description: "要评审的代码",
            required: true,
          },
        ],
      },
    ],
  };
});

// 处理提示词请求
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "code_review") {
    const language = args?.language || "unknown";
    const code = args?.code || "";

    return {
      messages: [
        {
          role: "user",
          content: {
            type: "text",
            text: `评审以下${language}代码的最佳实践、安全问题及改进方向:\n\n\`\`\`${language}\n${code}\n\`\`\``,
          },
        },
      ],
    };
  }

  throw new Error(`未知提示词: ${name}`);
});

Python Prompt Example

Python提示词示例

python
from mcp.types import Prompt, PromptMessage, PromptArgument

@app.list_prompts()
async def list_prompts() -> list[Prompt]:
    return [
        Prompt(
            name="code_review",
            description="Review code for best practices",
            arguments=[
                PromptArgument(
                    name="language", description="Programming language", required=True
                ),
                PromptArgument(name="code", description="Code to review", required=True),
            ],
        )
    ]

@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
    if name == "code_review":
        language = arguments.get("language", "unknown")
        code = arguments.get("code", "")

        return [
            PromptMessage(
                role="user",
                content={
                    "type": "text",
                    "text": f"Review this {language} code:\n\n```{language}\n{code}\n```",
                },
            )
        ]

    raise ValueError(f"Unknown prompt: {name}")
python
from mcp.types import Prompt, PromptMessage, PromptArgument

@app.list_prompts()
async def list_prompts() -> list[Prompt]:
    return [
        Prompt(
            name="code_review",
            description="针对最佳实践评审代码",
            arguments=[
                PromptArgument(
                    name="language", description="编程语言", required=True
                ),
                PromptArgument(name="code", description="要评审的代码", required=True),
            ],
        )
    ]

@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
    if name == "code_review":
        language = arguments.get("language", "unknown")
        code = arguments.get("code", "")

        return [
            PromptMessage(
                role="user",
                content={
                    "type": "text",
                    "text": f"评审以下{language}代码:\n\n```{language}\n{code}\n```",
                },
            )
        ]

    raise ValueError(f"未知提示词: {name}")

Advanced Patterns

进阶模式

1. Error Handling

1. 错误处理

typescript
// TypeScript
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const { name, arguments: args } = request.params;

    if (name === "risky_operation") {
      // Validate inputs
      if (!args.required_param) {
        throw new Error("Missing required parameter: required_param");
      }

      // Perform operation with proper error handling
      const result = await performRiskyOperation(args.required_param);

      return {
        content: [{ type: "text", text: JSON.stringify(result) }],
      };
    }
  } catch (error) {
    // Return error to LLM with helpful message
    return {
      content: [
        {
          type: "text",
          text: `Error: ${error instanceof Error ? error.message : String(error)}`,
        },
      ],
      isError: true,
    };
  }
});
python
undefined
typescript
// TypeScript
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const { name, arguments: args } = request.params;

    if (name === "risky_operation") {
      // 验证输入
      if (!args.required_param) {
        throw new Error("缺少必填参数: required_param");
      }

      // 执行操作并妥善处理错误
      const result = await performRiskyOperation(args.required_param);

      return {
        content: [{ type: "text", text: JSON.stringify(result) }],
      };
    }
  } catch (error) {
    // 返回带有用信息的错误给大语言模型
    return {
      content: [
        {
          type: "text",
          text: `错误: ${error instanceof Error ? error.message : String(error)}`,
        },
      ],
      isError: true,
    };
  }
});
python
undefined

Python

Python

@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: if name == "risky_operation": if "required_param" not in arguments: raise ValueError("Missing required parameter: required_param")
        result = await perform_risky_operation(arguments["required_param"])
        return [TextContent(type="text", text=json.dumps(result))]

except Exception as e:
    return [TextContent(type="text", text=f"Error: {str(e)}", isError=True)]
undefined
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: if name == "risky_operation": if "required_param" not in arguments: raise ValueError("缺少必填参数: required_param")
        result = await perform_risky_operation(arguments["required_param"])
        return [TextContent(type="text", text=json.dumps(result))]

except Exception as e:
    return [TextContent(type="text", text=f"错误: {str(e)}", isError=True)]
undefined

2. Async Operations

2. 异步操作

typescript
// TypeScript - Async API calls
import axios from "axios";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "fetch_api_data") {
    const url = args.url as string;
    const response = await axios.get(url);

    return {
      content: [
        {
          type: "text",
          text: JSON.stringify(response.data, null, 2),
        },
      ],
    };
  }
});
python
undefined
typescript
// TypeScript - 异步API调用
import axios from "axios";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "fetch_api_data") {
    const url = args.url as string;
    const response = await axios.get(url);

    return {
      content: [
        {
          type: "text",
          text: JSON.stringify(response.data, null, 2),
        },
      ],
    };
  }
});
python
undefined

Python - Async database queries

Python - 异步数据库查询

import aiosqlite
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "query_database": query = arguments["query"]
    async with aiosqlite.connect("database.db") as db:
        async with db.execute(query) as cursor:
            rows = await cursor.fetchall()
            result = json.dumps(rows)

    return [TextContent(type="text", text=result)]
undefined
import aiosqlite
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "query_database": query = arguments["query"]
    async with aiosqlite.connect("database.db") as db:
        async with db.execute(query) as cursor:
            rows = await cursor.fetchall()
            result = json.dumps(rows)

    return [TextContent(type="text", text=result)]
undefined

3. Environment Variables & Configuration

3. 环境变量与配置

typescript
// TypeScript - Load config from environment
import dotenv from "dotenv";
dotenv.config();

const API_KEY = process.env.API_KEY;
const BASE_URL = process.env.BASE_URL || "https://api.example.com";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "api_call") {
    const response = await fetch(`${BASE_URL}/endpoint`, {
      headers: { Authorization: `Bearer ${API_KEY}` },
    });
    // ...
  }
});
python
undefined
typescript
// TypeScript - 从环境变量加载配置
import dotenv from "dotenv";
dotenv.config();

const API_KEY = process.env.API_KEY;
const BASE_URL = process.env.BASE_URL || "https://api.example.com";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "api_call") {
    const response = await fetch(`${BASE_URL}/endpoint`, {
      headers: { Authorization: `Bearer ${API_KEY}` },
    });
    // ...
  }
});
python
undefined

Python - Load config from environment

Python - 从环境变量加载配置

import os from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "api_call": headers = {"Authorization": f"Bearer {API_KEY}"} async with aiohttp.ClientSession() as session: async with session.get(f"{BASE_URL}/endpoint", headers=headers) as resp: data = await resp.json() return [TextContent(type="text", text=json.dumps(data))]
undefined
import os from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "api_call": headers = {"Authorization": f"Bearer {API_KEY}"} async with aiohttp.ClientSession() as session: async with session.get(f"{BASE_URL}/endpoint", headers=headers) as resp: data = await resp.json() return [TextContent(type="text", text=json.dumps(data))]
undefined

4. Streaming Responses (Large Data)

4. 流式响应(大数据)

typescript
// TypeScript - Stream large file contents
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "read_large_file") {
    const filePath = request.params.arguments.path as string;
    const stream = fs.createReadStream(filePath, { encoding: "utf-8" });

    let content = "";
    for await (const chunk of stream) {
      content += chunk;
      // Could yield chunks incrementally in future MCP versions
    }

    return {
      content: [{ type: "text", text: content }],
    };
  }
});
typescript
// TypeScript - 流式传输大文件内容
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "read_large_file") {
    const filePath = request.params.arguments.path as string;
    const stream = fs.createReadStream(filePath, { encoding: "utf-8" });

    let content = "";
    for await (const chunk of stream) {
      content += chunk;
      // 未来MCP版本可支持增量返回数据块
    }

    return {
      content: [{ type: "text", text: content }],
    };
  }
});

5. Dynamic Tool Registration

5. 动态工具注册

typescript
// TypeScript - Register tools from config
interface ToolConfig {
  name: string;
  description: string;
  schema: object;
  handler: (args: any) => Promise<any>;
}

const toolRegistry = new Map<string, ToolConfig>();

function registerTool(config: ToolConfig) {
  toolRegistry.set(config.name, config);
}

// Register custom tools
registerTool({
  name: "custom_tool",
  description: "Dynamically registered tool",
  schema: {
    type: "object",
    properties: {
      input: { type: "string" },
    },
  },
  handler: async (args) => {
    return { result: `Processed: ${args.input}` };
  },
});

// List tools dynamically
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: Array.from(toolRegistry.values()).map((tool) => ({
      name: tool.name,
      description: tool.description,
      inputSchema: tool.schema,
    })),
  };
});

// Call tools dynamically
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const tool = toolRegistry.get(request.params.name);
  if (!tool) {
    throw new Error(`Unknown tool: ${request.params.name}`);
  }

  const result = await tool.handler(request.params.arguments);
  return {
    content: [{ type: "text", text: JSON.stringify(result) }],
  };
});
typescript
// TypeScript - 从配置文件注册工具
interface ToolConfig {
  name: string;
  description: string;
  schema: object;
  handler: (args: any) => Promise<any>;
}

const toolRegistry = new Map<string, ToolConfig>();

function registerTool(config: ToolConfig) {
  toolRegistry.set(config.name, config);
}

// 注册自定义工具
registerTool({
  name: "custom_tool",
  description: "动态注册的工具",
  schema: {
    type: "object",
    properties: {
      input: { type: "string" },
    },
  },
  handler: async (args) => {
    return { result: `已处理: ${args.input}` };
  },
});

// 动态列出工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: Array.from(toolRegistry.values()).map((tool) => ({
      name: tool.name,
      description: tool.description,
      inputSchema: tool.schema,
    })),
  };
});

// 动态调用工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const tool = toolRegistry.get(request.params.name);
  if (!tool) {
    throw new Error(`未知工具: ${request.params.name}`);
  }

  const result = await tool.handler(request.params.arguments);
  return {
    content: [{ type: "text", text: JSON.stringify(result) }],
  };
});

Transport Types

传输类型

1. STDIO (Local Execution)

1. STDIO(本地执行)

Default for Claude Desktop integration. Server runs as subprocess.
json
// Claude Desktop config
{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/path/to/server/build/index.js"],
      "env": {
        "API_KEY": "your-api-key"
      }
    }
  }
}
Claude Desktop集成的默认方式。服务器作为子进程运行。
json
// Claude Desktop配置
{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/path/to/server/build/index.js"],
      "env": {
        "API_KEY": "your-api-key"
      }
    }
  }
}

2. SSE (Server-Sent Events)

2. SSE(Server-Sent Events)

For long-running servers with HTTP transport.
typescript
// TypeScript SSE server
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";

const app = express();
const server = new Server(/* ... */);

app.get("/sse", async (req, res) => {
  const transport = new SSEServerTransport("/messages", res);
  await server.connect(transport);
});

app.post("/messages", async (req, res) => {
  // Handle incoming messages
});

app.listen(3000, () => {
  console.log("MCP server listening on http://localhost:3000");
});
json
// Claude Desktop config for SSE
{
  "mcpServers": {
    "remote-server": {
      "url": "http://localhost:3000/sse"
    }
  }
}
适用于基于HTTP传输的长期运行服务器。
typescript
// TypeScript SSE服务器
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";

const app = express();
const server = new Server(/* ... */);

app.get("/sse", async (req, res) => {
  const transport = new SSEServerTransport("/messages", res);
  await server.connect(transport);
});

app.post("/messages", async (req, res) => {
  // 处理传入消息
});

app.listen(3000, () => {
  console.log("MCP服务器运行在http://localhost:3000");
});
json
// Claude Desktop的SSE配置
{
  "mcpServers": {
    "remote-server": {
      "url": "http://localhost:3000/sse"
    }
  }
}

3. HTTP (Streamable HTTP)

3. HTTP(可流式传输的HTTP)

For REST-style MCP servers.
python
undefined
适用于REST风格的MCP服务器。
python
undefined

Python HTTP server with FastAPI

基于FastAPI的Python HTTP服务器

from fastapi import FastAPI from mcp.server.fastapi import create_fastapi_app
app = FastAPI() mcp_app = Server("http-server")
from fastapi import FastAPI from mcp.server.fastapi import create_fastapi_app
app = FastAPI() mcp_app = Server("http-server")

Define tools/resources/prompts as usual

像往常一样定义工具/资源/提示词

...

...

Mount MCP routes

挂载MCP路由

app.mount("/mcp", create_fastapi_app(mcp_app))
if name == "main": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
undefined
app.mount("/mcp", create_fastapi_app(mcp_app))
if name == "main": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
undefined

Debugging MCP Servers

调试MCP服务器

1. Server Logs

1. 服务器日志

typescript
// TypeScript - Console.error for logs (STDOUT is for MCP protocol)
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  console.error(`Tool called: ${request.params.name}`);
  console.error(`Arguments: ${JSON.stringify(request.params.arguments)}`);
  // ...
});
python
undefined
typescript
// TypeScript - 使用console.error记录日志(STDOUT用于MCP协议)
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  console.error(`工具被调用: ${request.params.name}`);
  console.error(`参数: ${JSON.stringify(request.params.arguments)}`);
  // ...
});
python
undefined

Python - Use logging module

Python - 使用logging模块

import logging
logging.basicConfig(level=logging.DEBUG, filename="mcp-server.log") logger = logging.getLogger(name)
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: logger.debug(f"Tool called: {name}") logger.debug(f"Arguments: {arguments}") # ...
undefined
import logging
logging.basicConfig(level=logging.DEBUG, filename="mcp-server.log") logger = logging.getLogger(name)
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: logger.debug(f"工具被调用: {name}") logger.debug(f"参数: {arguments}") // ...
undefined

2. MCP Inspector

2. MCP Inspector

bash
undefined
bash
undefined

Install MCP Inspector (browser-based debugging tool)

安装MCP Inspector(基于浏览器的调试工具)

npm install -g @modelcontextprotocol/inspector
npm install -g @modelcontextprotocol/inspector

Run inspector with your server

运行Inspector并连接你的服务器

mcp-inspector node /path/to/server/build/index.js
mcp-inspector node /path/to/server/build/index.js

Opens browser at http://localhost:6274

在浏览器中打开http://localhost:6274

- Test tools manually

- 手动测试工具

- See request/response payloads

- 查看请求/响应负载

- Debug JSON schema validation

- 调试JSON Schema验证

undefined
undefined

3. Claude Desktop Logs

3. Claude Desktop日志

bash
undefined
bash
undefined

macOS

macOS

tail -f ~/Library/Logs/Claude/mcp*.log
tail -f ~/Library/Logs/Claude/mcp*.log

Windows

Windows

Check %APPDATA%/Claude/logs/

查看%APPDATA%/Claude/logs/目录

undefined
undefined

4. Test Server Standalone

4. 单独测试服务器

typescript
// TypeScript - Add test harness
if (process.argv.includes("--test")) {
  // Simulate tool call
  const testRequest = {
    params: {
      name: "read_file",
      arguments: { path: "./test.txt" },
    },
  };

  server
    .setRequestHandler(CallToolRequestSchema, async (request) => {
      // ... your handler
    })
    .then((handler) => handler(testRequest))
    .then((result) => console.log(JSON.stringify(result, null, 2)))
    .catch((error) => console.error(error));
} else {
  // Normal server startup
  main();
}
bash
undefined
typescript
// TypeScript - 添加测试工具
if (process.argv.includes("--test")) {
  // 模拟工具调用
  const testRequest = {
    params: {
      name: "read_file",
      arguments: { path: "./test.txt" },
    },
  };

  server
    .setRequestHandler(CallToolRequestSchema, async (request) => {
      // ... 你的处理逻辑
    })
    .then((handler) => handler(testRequest))
    .then((result) => console.log(JSON.stringify(result, null, 2)))
    .catch((error) => console.error(error));
} else {
  // 正常启动服务器
  main();
}
bash
undefined

Test without Claude Desktop

无需Claude Desktop即可测试

node build/index.js --test
undefined
node build/index.js --test
undefined

Real-World Server Examples

真实世界服务器示例

1. GitHub Integration Server

1. GitHub集成服务器

typescript
// github-server/src/index.ts
import { Octokit } from "@octokit/rest";

const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "list_repos",
        description: "List repositories for a user/org",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string", description: "Username or org name" },
          },
          required: ["owner"],
        },
      },
      {
        name: "create_issue",
        description: "Create a GitHub issue",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string" },
            repo: { type: "string" },
            title: { type: "string" },
            body: { type: "string" },
          },
          required: ["owner", "repo", "title"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "list_repos") {
    const { data } = await octokit.repos.listForUser({
      username: args.owner as string,
    });
    const repos = data.map((r) => ({
      name: r.name,
      description: r.description,
      stars: r.stargazers_count,
    }));
    return {
      content: [{ type: "text", text: JSON.stringify(repos, null, 2) }],
    };
  }

  if (name === "create_issue") {
    const { data } = await octokit.issues.create({
      owner: args.owner as string,
      repo: args.repo as string,
      title: args.title as string,
      body: args.body as string,
    });
    return {
      content: [
        { type: "text", text: `Issue created: ${data.html_url}` },
      ],
    };
  }
});
typescript
// github-server/src/index.ts
import { Octokit } from "@octokit/rest";

const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "list_repos",
        description: "列出用户/组织的仓库",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string", description: "用户名或组织名" },
          },
          required: ["owner"],
        },
      },
      {
        name: "create_issue",
        description: "创建GitHub Issue",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string" },
            repo: { type: "string" },
            title: { type: "string" },
            body: { type: "string" },
          },
          required: ["owner", "repo", "title"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "list_repos") {
    const { data } = await octokit.repos.listForUser({
      username: args.owner as string,
    });
    const repos = data.map((r) => ({
      name: r.name,
      description: r.description,
      stars: r.stargazers_count,
    }));
    return {
      content: [{ type: "text", text: JSON.stringify(repos, null, 2) }],
    };
  }

  if (name === "create_issue") {
    const { data } = await octokit.issues.create({
      owner: args.owner as string,
      repo: args.repo as string,
      title: args.title as string,
      body: args.body as string,
    });
    return {
      content: [
        { type: "text", text: `Issue已创建: ${data.html_url}` },
      ],
    };
  }
});

2. Database Query Server

2. 数据库查询服务器

python
undefined
python
undefined

database-server/server.py

database-server/server.py

import asyncpg import json from mcp.server import Server from mcp.types import Tool, TextContent
app = Server("database-server")
async def get_db_pool(): return await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password" )
@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="query", description="Execute SQL query (SELECT only)", inputSchema={ "type": "object", "properties": { "sql": {"type": "string", "description": "SQL query"}, }, "required": ["sql"], }, ), Tool( name="list_tables", description="List all tables in database", inputSchema={"type": "object", "properties": {}}, ), ]
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: pool = await get_db_pool()
if name == "query":
    sql = arguments["sql"]
    # Security: Only allow SELECT
    if not sql.strip().upper().startswith("SELECT"):
        raise ValueError("Only SELECT queries allowed")

    async with pool.acquire() as conn:
        rows = await conn.fetch(sql)
        result = [dict(row) for row in rows]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]

elif name == "list_tables":
    async with pool.acquire() as conn:
        tables = await conn.fetch("""
            SELECT table_name FROM information_schema.tables
            WHERE table_schema = 'public'
        """)
        result = [row["table_name"] for row in tables]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]
undefined
import asyncpg import json from mcp.server import Server from mcp.types import Tool, TextContent
app = Server("database-server")
async def get_db_pool(): return await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password" )
@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="query", description="执行SQL查询(仅支持SELECT)", inputSchema={ "type": "object", "properties": { "sql": {"type": "string", "description": "SQL查询语句"}, }, "required": ["sql"], }, ), Tool( name="list_tables", description="列出数据库中的所有表", inputSchema={"type": "object", "properties": {}}, ), ]
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: pool = await get_db_pool()
if name == "query":
    sql = arguments["sql"]
    # 安全限制: 仅允许SELECT语句
    if not sql.strip().upper().startswith("SELECT"):
        raise ValueError("仅允许SELECT查询")

    async with pool.acquire() as conn:
        rows = await conn.fetch(sql)
        result = [dict(row) for row in rows]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]

elif name == "list_tables":
    async with pool.acquire() as conn:
        tables = await conn.fetch("""
            SELECT table_name FROM information_schema.tables
            WHERE table_schema = 'public'
        """)
        result = [row["table_name"] for row in tables]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]
undefined

3. Web Scraper Server

3. 网页爬虫服务器

typescript
// scraper-server/src/index.ts
import * as cheerio from "cheerio";
import axios from "axios";

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "scrape_url",
        description: "Scrape content from a URL",
        inputSchema: {
          type: "object",
          properties: {
            url: { type: "string", description: "URL to scrape" },
            selector: {
              type: "string",
              description: "CSS selector for content",
            },
          },
          required: ["url"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "scrape_url") {
    const url = request.params.arguments.url as string;
    const selector = request.params.arguments.selector as string;

    const response = await axios.get(url);
    const $ = cheerio.load(response.data);

    let content: string;
    if (selector) {
      content = $(selector).text();
    } else {
      content = $("body").text();
    }

    return {
      content: [
        {
          type: "text",
          text: content.trim(),
        },
      ],
    };
  }
});
typescript
// scraper-server/src/index.ts
import * as cheerio from "cheerio";
import axios from "axios";

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "scrape_url",
        description: "从URL抓取内容",
        inputSchema: {
          type: "object",
          properties: {
            url: { type: "string", description: "要抓取的URL" },
            selector: {
              type: "string",
              description: "内容的CSS选择器",
            },
          },
          required: ["url"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "scrape_url") {
    const url = request.params.arguments.url as string;
    const selector = request.params.arguments.selector as string;

    const response = await axios.get(url);
    const $ = cheerio.load(response.data);

    let content: string;
    if (selector) {
      content = $(selector).text();
    } else {
      content = $("body").text();
    }

    return {
      content: [
        {
          type: "text",
          text: content.trim(),
        },
      ],
    };
  }
});

Best Practices

最佳实践

1. Tool Design

1. 工具设计

✅ DO:
  • Use clear, descriptive tool names (
    read_file
    not
    rf
    )
  • Provide detailed descriptions for LLM understanding
  • Define comprehensive JSON schemas with descriptions
  • Return structured data (JSON) when possible
  • Handle errors gracefully with helpful messages
❌ DON'T:
  • Create overly broad tools (split complex operations)
  • Return massive payloads (paginate large datasets)
  • Use ambiguous parameter names
  • Assume LLM knows your domain-specific terminology
✅ 推荐做法:
  • 使用清晰、描述性的工具名称(如
    read_file
    而非
    rf
  • 提供详细描述,帮助大语言模型理解用途
  • 定义带描述的完整JSON Schema
  • 尽可能返回结构化数据(JSON)
  • 优雅处理错误,返回有用的错误信息
❌ 不推荐做法:
  • 创建过于宽泛的工具(拆分复杂操作)
  • 返回过大的负载(对大数据集进行分页)
  • 使用模糊的参数名称
  • 假设大语言模型了解你的领域特定术语

2. Security

2. 安全

Critical Rules:
  • Validate ALL inputs (type, range, format)
  • Sanitize file paths (prevent directory traversal)
  • Use allowlists for commands/operations
  • Never expose sensitive credentials in responses
  • Implement rate limiting for expensive operations
  • Use read-only access by default
typescript
// Example: Path validation
function validatePath(inputPath: string): string {
  const normalized = path.normalize(inputPath);
  const allowed = path.resolve("/safe/directory");

  if (!normalized.startsWith(allowed)) {
    throw new Error("Path outside allowed directory");
  }

  return normalized;
}
关键规则:
  • 验证所有输入(类型、范围、格式)
  • 清理文件路径(防止目录遍历攻击)
  • 对命令/操作使用白名单
  • 绝不在响应中暴露敏感凭证
  • 对高开销操作实现速率限制
  • 默认使用只读访问权限
typescript
// 示例: 路径验证
function validatePath(inputPath: string): string {
  const normalized = path.normalize(inputPath);
  const allowed = path.resolve("/safe/directory");

  if (!normalized.startsWith(allowed)) {
    throw new Error("路径超出允许的目录范围");
  }

  return normalized;
}

3. Performance

3. 性能

  • Cache expensive operations
  • Stream large responses when possible
  • Use pagination for list operations
  • Set reasonable timeouts
  • Implement request queuing for rate-limited APIs
  • 缓存高开销操作
  • 尽可能流式传输大响应
  • 对列表操作使用分页
  • 设置合理的超时时间
  • 对速率受限的API实现请求排队

4. Testing

4. 测试

bash
undefined
bash
undefined

Use MCP Inspector for manual testing

使用MCP Inspector进行手动测试

mcp-inspector node build/index.js
mcp-inspector node build/index.js

Unit test tool handlers

对工具处理逻辑进行单元测试

npm test
npm test

Integration test with Claude Desktop

与Claude Desktop进行集成测试

1. Add to config

1. 添加到配置文件

2. Restart Claude

2. 重启Claude

3. Test in conversation

3. 在对话中测试

undefined
undefined

5. Documentation

5. 文档

  • Document all tools in code comments
  • Provide example usage in descriptions
  • Include error scenarios in documentation
  • Maintain a CHANGELOG for server updates
  • 在代码注释中记录所有工具
  • 在描述中提供示例用法
  • 在文档中包含错误场景
  • 为服务器更新维护CHANGELOG

Common Pitfalls

常见陷阱

Writing to STDOUT (breaks MCP protocol):
typescript
// WRONG
console.log("Debug message");  // STDOUT is for MCP protocol

// CORRECT
console.error("Debug message");  // STDERR for logs
Not handling errors:
typescript
// WRONG - unhandled promise rejection
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const data = await riskyOperation();  // Can throw
  return { content: [{ type: "text", text: data }] };
});

// CORRECT
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const data = await riskyOperation();
    return { content: [{ type: "text", text: data }] };
  } catch (error) {
    return {
      content: [{ type: "text", text: `Error: ${error.message}` }],
      isError: true,
    };
  }
});
Blocking operations:
python
undefined
向STDOUT输出内容(破坏MCP协议):
typescript
// 错误写法
console.log("调试信息");  // STDOUT用于MCP协议通信

// 正确写法
console.error("调试信息");  // STDERR用于日志输出
不处理错误:
typescript
// 错误写法 - 未处理的Promise拒绝
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const data = await riskyOperation();  // 可能抛出错误
  return { content: [{ type: "text", text: data }] };
});

// 正确写法
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const data = await riskyOperation();
    return { content: [{ type: "text", text: data }] };
  } catch (error) {
    return {
      content: [{ type: "text", text: `错误: ${error.message}` }],
      isError: true,
    };
  }
});
阻塞操作:
python
undefined

WRONG - synchronous file read blocks event loop

错误写法 - 同步文件读取阻塞事件循环

@app.call_tool() async def call_tool(name: str, arguments: dict): with open("large_file.txt", "r") as f: # Blocks! content = f.read() return [TextContent(type="text", text=content)]
@app.call_tool() async def call_tool(name: str, arguments: dict): with open("large_file.txt", "r") as f: # 阻塞! content = f.read() return [TextContent(type="text", text=content)]

CORRECT - async file I/O

正确写法 - 异步文件I/O

@app.call_tool() async def call_tool(name: str, arguments: dict): async with aiofiles.open("large_file.txt", "r") as f: content = await f.read() return [TextContent(type="text", text=content)]

❌ **Missing required fields in schema**:
```typescript
// WRONG - missing "required" field
{
  name: "search",
  inputSchema: {
    type: "object",
    properties: {
      query: { type: "string" }
    }
    // Missing: required: ["query"]
  }
}
@app.call_tool() async def call_tool(name: str, arguments: dict): async with aiofiles.open("large_file.txt", "r") as f: content = await f.read() return [TextContent(type="text", text=content)]

❌ **Schema中缺少必填字段**:
```typescript
// 错误写法 - 缺少"required"字段
{
  name: "search",
  inputSchema: {
    type: "object",
    properties: {
      query: { type: "string" }
    }
    // 缺失: required: ["query"]
  }
}

Resources

资源

Related Skills

相关技能

When building MCP servers, consider these complementary skills:
  • typescript-core: TypeScript type safety, tsconfig optimization, and advanced patterns
  • asyncio: Python async patterns for MCP servers with async/await
  • openrouter: Alternative LLM API integration for multi-model support
构建MCP服务器时,可考虑以下互补技能:
  • typescript-core: TypeScript类型安全、tsconfig优化及进阶模式
  • asyncio: Python异步模式,用于支持async/await的MCP服务器
  • openrouter: 替代LLM API集成,支持多模型

Quick TypeScript MCP Patterns (Inlined for Standalone Use)

快速TypeScript MCP模式(独立使用)

typescript
// Type-safe MCP server with TypeScript
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';

// Define tool schemas with runtime validation
const SearchSchema = z.object({
  query: z.string().min(1).max(500),
  limit: z.number().int().min(1).max(100).default(10),
});

type SearchArgs = z.infer<typeof SearchSchema>;

class TypeSafeMCPServer {
  private server: Server;

  constructor() {
    this.server = new Server(
      {
        name: 'typed-search-server',
        version: '1.0.0',
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    this.setupToolHandlers();
  }

  private setupToolHandlers() {
    // List tools with full type inference
    this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
      tools: [
        {
          name: 'search',
          description: 'Search with type-safe parameters',
          inputSchema: {
            type: 'object',
            properties: {
              query: { type: 'string', minLength: 1, maxLength: 500 },
              limit: { type: 'number', minimum: 1, maximum: 100, default: 10 },
            },
            required: ['query'],
          },
        },
      ],
    }));

    // Type-safe tool execution
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      if (request.params.name === 'search') {
        // Runtime validation with Zod
        const args = SearchSchema.parse(request.params.arguments);

        // Type-safe implementation
        const results = await this.performSearch(args);

        return {
          content: [{ type: 'text', text: JSON.stringify(results) }],
        };
      }

      throw new Error(`Unknown tool: ${request.params.name}`);
    });
  }

  private async performSearch(args: SearchArgs): Promise<Array<{ title: string; url: string }>> {
    // Implementation with full type safety
    // args.query is string, args.limit is number
    return [];
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// Start server
const server = new TypeSafeMCPServer();
server.run();
typescript
// 基于TypeScript的类型安全MCP服务器
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';

// 使用运行时验证定义工具Schema
const SearchSchema = z.object({
  query: z.string().min(1).max(500),
  limit: z.number().int().min(1).max(100).default(10),
});

type SearchArgs = z.infer<typeof SearchSchema>;

class TypeSafeMCPServer {
  private server: Server;

  constructor() {
    this.server = new Server(
      {
        name: 'typed-search-server',
        version: '1.0.0',
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    this.setupToolHandlers();
  }

  private setupToolHandlers() {
    // 带完整类型推断的工具列表
    this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
      tools: [
        {
          name: 'search',
          description: '使用类型安全参数进行搜索',
          inputSchema: {
            type: 'object',
            properties: {
              query: { type: 'string', minLength: 1, maxLength: 500 },
              limit: { type: 'number', minimum: 1, maximum: 100, default: 10 },
            },
            required: ['query'],
          },
        },
      ],
    }));

    // 类型安全的工具执行
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      if (request.params.name === 'search') {
        // 使用Zod进行运行时验证
        const args = SearchSchema.parse(request.params.arguments);

        // 类型安全的实现
        const results = await this.performSearch(args);

        return {
          content: [{ type: 'text', text: JSON.stringify(results) }],
        };
      }

      throw new Error(`未知工具: ${request.params.name}`);
    });
  }

  private async performSearch(args: SearchArgs): Promise<Array<{ title: string; url: string }>> {
    // 带完整类型安全的实现
    // args.query是string类型,args.limit是number类型
    return [];
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// 启动服务器
const server = new TypeSafeMCPServer();
server.run();

Quick Python Async MCP Patterns (Inlined for Standalone Use)

快速Python异步MCP模式(独立使用)

python
undefined
python
// 基于Python的异步MCP服务器
import asyncio
import logging
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field

// 使用Pydantic定义类型安全的参数模型
class SearchArgs(BaseModel):
    query: str = Field(..., min_length=1, max_length=500)
    limit: int = Field(10, ge=1, le=100)

class AsyncMCPServer:
    def __init__(self):
        self.server = Server("async-search-server")
        self.setup_handlers()

    def setup_handlers(self):
        @self.server.list_tools()
        async def list_tools() -> list[Tool]:
            return [
                Tool(
                    name="search",
                    description="使用异步处理进行搜索",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "minLength": 1, "maxLength": 500},
                            "limit": {"type": "number", "minimum": 1, "maximum": 100, "default": 10},
                        },
                        "required": ["query"],
                    },
                )
            ]

        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
            if name == "search":
                // 使用Pydantic验证
                args = SearchArgs(**arguments)

                // 异步实现
                results = await self.perform_search(args)

                return [
                    TextContent(
                        type="text",
                        text=str(results)
                    )
                ]

            raise ValueError(f"未知工具: {name}")

    async def perform_search(self, args: SearchArgs) -> list[dict[str, str]]:
        """异步搜索实现"""
        // 模拟异步I/O
        await asyncio.sleep(0.1)

        // 使用已验证的参数(args.query是str类型,args.limit是int类型)
        return [
            {"title": f"{args.query}的搜索结果", "url": "https://example.com"}
        ]

async def main():
    """运行异步MCP服务器"""
    logging.basicConfig(level=logging.INFO)
    server = AsyncMCPServer()

    // 使用stdio传输与Claude Desktop集成
    async with stdio_server() as (read_stream, write_stream):
        await server.server.run(
            read_stream,
            write_stream,
            server.server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Async MCP server with Python

快速多语言MCP模式(独立使用)

import asyncio import logging from typing import Any from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from pydantic import BaseModel, Field
TypeScript vs Python 权衡:
特性TypeScriptPython
类型安全编译时+运行时(Zod)仅运行时(Pydantic)
性能启动更快,Node.js有额外开销启动较慢,CPU任务表现更好
异步支持原生async/await,事件循环asyncio,适合I/O密集型任务
生态系统npm包,前端工具丰富数据科学、ML库完善
最佳适用场景Web API、实时工具数据处理、ML集成
两种语言的通用模式:
  1. 输入验证
    • TypeScript: Zod Schema
    • Python: Pydantic模型
  2. 错误处理
    • 两者都使用try/catch捕获特定错误类型
    • 在MCP响应中返回错误内容
  3. 资源管理
    • TypeScript: async/await搭配try/finally
    • Python: 异步上下文管理器
  4. 测试
    • TypeScript: Vitest/Jest搭配模拟传输层
    • Python: pytest搭配pytest-asyncio
选择实现语言:
typescript
// TypeScript - 最适合:
// - 文件系统操作
// - 网页抓取/HTTP请求
// - JSON/API处理
// - 实时数据流

// Python - 最适合:
// - 数据分析(pandas、numpy)
// - 机器学习(scikit-learn、torch)
// - 数据库ETL操作
// - 科学计算
[完整的TypeScript、Python异步及OpenRouter模式可在相关技能中获取(若一起部署)]

Type-safe argument models with Pydantic

本地MCP集成模式(你的仓库)

.mcp.json布局(项目范围)

class SearchArgs(BaseModel): query: str = Field(..., min_length=1, max_length=500) limit: int = Field(10, ge=1, le=100)
class AsyncMCPServer: def init(self): self.server = Server("async-search-server") self.setup_handlers()
def setup_handlers(self):
    @self.server.list_tools()
    async def list_tools() -> list[Tool]:
        return [
            Tool(
                name="search",
                description="Search with async processing",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {"type": "string", "minLength": 1, "maxLength": 500},
                        "limit": {"type": "number", "minimum": 1, "maximum": 100, "default": 10},
                    },
                    "required": ["query"],
                },
            )
        ]

    @self.server.call_tool()
    async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
        if name == "search":
            # Validate with Pydantic
            args = SearchArgs(**arguments)

            # Async implementation
            results = await self.perform_search(args)

            return [
                TextContent(
                    type="text",
                    text=str(results)
                )
            ]

        raise ValueError(f"Unknown tool: {name}")

async def perform_search(self, args: SearchArgs) -> list[dict[str, str]]:
    """Async search implementation"""
    # Simulate async I/O
    await asyncio.sleep(0.1)

    # Use validated args (args.query is str, args.limit is int)
    return [
        {"title": f"Result for {args.query}", "url": "https://example.com"}
    ]
async def main(): """Run async MCP server""" logging.basicConfig(level=logging.INFO) server = AsyncMCPServer()
# Use stdio transport for Claude Desktop
async with stdio_server() as (read_stream, write_stream):
    await server.server.run(
        read_stream,
        write_stream,
        server.server.create_initialization_options()
    )
if name == "main": asyncio.run(main())
undefined
json
{
  "mcpServers": {
    "kuzu-memory": {
      "type": "stdio",
      "command": "kuzu-memory",
      "args": ["mcp"]
    },
    "mcp-vector-search": {
      "type": "stdio",
      "command": "uv",
      "args": ["run", "mcp-vector-search", "mcp"],
      "env": {
        "MCP_ENABLE_FILE_WATCHING": "true"
      }
    }
  }
}

Quick Multi-Language MCP Patterns (Inlined for Standalone Use)

CLI约定

TypeScript vs Python Trade-offs:
FeatureTypeScriptPython
Type SafetyCompile-time + runtime (Zod)Runtime only (Pydantic)
PerformanceFaster startup, Node.js overheadSlower startup, better for CPU tasks
Async SupportNative async/await, event loopasyncio, great for I/O
Ecosystemnpm packages, frontend toolsData science, ML libraries
Best ForWeb APIs, real-time toolsData processing, ML integration
Common Patterns Across Both:
  1. Input Validation
    • TypeScript: Zod schemas
    • Python: Pydantic models
  2. Error Handling
    • Both: Try/catch with specific error types
    • Return error content in MCP response
  3. Resource Management
    • TypeScript: async/await with try/finally
    • Python: async context managers
  4. Testing
    • TypeScript: Vitest/Jest with mock transport
    • Python: pytest with pytest-asyncio
Choosing Implementation Language:
typescript
// TypeScript - Best for:
// - File system operations
// - Web scraping/HTTP requests
// - JSON/API manipulation
// - Real-time data streams

// Python - Best for:
// - Data analysis (pandas, numpy)
// - Machine learning (scikit-learn, torch)
// - Database ETL operations
// - Scientific computing
[Full TypeScript, Python async, and OpenRouter patterns available in respective skills if deployed together]
  • 为stdio服务器提供
    mcp
    子命令。
  • 使用
    setup
    命令进行端到端初始化和集成(如mcp-vector-search、kuzu-memory)。
  • 使用
    install
    /
    uninstall
    命令针对特定客户端进行部署(如mcp-ticketer)。
  • 提供
    doctor
    命令验证依赖项。

Local MCP Integration Patterns (Your Repos)

环境变量约定

.mcp.json Layout (Project-Scoped)

json
{
  "mcpServers": {
    "kuzu-memory": {
      "type": "stdio",
      "command": "kuzu-memory",
      "args": ["mcp"]
    },
    "mcp-vector-search": {
      "type": "stdio",
      "command": "uv",
      "args": ["run", "mcp-vector-search", "mcp"],
      "env": {
        "MCP_ENABLE_FILE_WATCHING": "true"
      }
    }
  }
}
  • 文件监听:
    MCP_ENABLE_FILE_WATCHING=true
  • Kuzu内存:
    KUZU_MEMORY_PROJECT_ROOT
    ,
    KUZU_MEMORY_DB
  • Ticketer适配器:
    MCP_TICKETER_ADAPTER
    ,
    GITHUB_TOKEN
    ,
    GITHUB_OWNER
    ,
    GITHUB_REPO

CLI Conventions

常见服务器名称

  • Provide a
    mcp
    subcommand for stdio servers.
  • Use
    setup
    for end-to-end init + integration (mcp-vector-search, kuzu-memory).
  • Use
    install
    /
    uninstall
    to target specific clients (mcp-ticketer).
  • Provide
    doctor
    commands to validate dependencies.
kuzu-memory
,
mcp-vector-search
,
mcp-ticketer
,
mcp-skillset
,
mcp-browser

Environment Conventions

总结

  • File watching:
    MCP_ENABLE_FILE_WATCHING=true
  • Kuzu memory:
    KUZU_MEMORY_PROJECT_ROOT
    ,
    KUZU_MEMORY_DB
  • Ticketer adapters:
    MCP_TICKETER_ADAPTER
    ,
    GITHUB_TOKEN
    ,
    GITHUB_OWNER
    ,
    GITHUB_REPO
  • MCP 支持借助工具、资源和提示词构建AI原生服务器
  • 工具 是大语言模型可调用的函数(读取文件、查询API、执行操作)
  • 资源 提供只读数据访问(文件、数据库、文档)
  • 提示词 是带参数的可复用模板,适用于常见任务
  • SDK 支持TypeScript和Python,具备完整异步能力
  • Claude Desktop 通过JSON配置集成(STDIO传输)
  • 调试 可使用MCP Inspector、服务器日志和单独测试
  • 安全 至关重要: 验证输入、清理路径、使用白名单
  • 最佳实践: 清晰命名、完整Schema、错误处理、性能优化

Common Server Names

kuzu-memory
,
mcp-vector-search
,
mcp-ticketer
,
mcp-skillset
,
mcp-browser

Summary

  • MCP enables AI-native server development with tools, resources, and prompts
  • Tools are functions LLMs can call (read files, query APIs, run operations)
  • Resources provide read-only data access (files, databases, documentation)
  • Prompts are reusable templates with arguments for common tasks
  • SDKs available for TypeScript and Python with full async support
  • Claude Desktop integration via JSON config (STDIO transport)
  • Debugging with MCP Inspector, server logs, and standalone testing
  • Security critical: validate inputs, sanitize paths, use allowlists
  • Best practices: Clear naming, comprehensive schemas, error handling, performance optimization