mcp

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MCP (Model Context Protocol) - AI-Native Server Development

MCP(Model Context Protocol,模型上下文协议)- 原生AI服务器开发

Overview

概述

Model Context Protocol (MCP) is an open standard for connecting AI assistants to external data sources and tools. Build servers that expose tools (functions LLMs can call), resources (data LLMs can read), and prompts (templates LLMs can use).
Key Concepts:
  • Tools: Functions LLMs can execute (read files, query APIs, run commands)
  • Resources: Data sources LLMs can access (files, databases, APIs)
  • Prompts: Reusable templates with arguments for common tasks
  • Client-Server: MCP servers expose capabilities, clients (like Claude Desktop) consume them
  • Transport: STDIO (local), SSE (Server-Sent Events), HTTP (network)
Official SDKs:
  • TypeScript:
    @modelcontextprotocol/sdk
  • Python:
    mcp
Installation:
bash
undefined
Model Context Protocol(MCP)是用于将AI助手连接到外部数据源和工具的开放标准。构建可对外暴露工具(LLM可调用的函数)、资源(LLM可读取的数据)和提示词(LLM可使用的模板)的服务器。
核心概念:
  • 工具:LLM可执行的函数(读取文件、查询API、运行命令)
  • 资源:LLM可访问的数据源(文件、数据库、API)
  • 提示词:带参数的可复用模板,适用于常见任务
  • 客户端-服务器架构:MCP服务器对外提供能力,客户端(如Claude Desktop)调用这些能力
  • 传输方式:STDIO(本地)、SSE(Server-Sent Events,服务器推送事件)、HTTP(网络)
官方SDK:
  • TypeScript:
    @modelcontextprotocol/sdk
  • Python:
    mcp
安装方式:
bash
undefined

TypeScript server

TypeScript服务器

npx @modelcontextprotocol/create-server@latest my-server cd my-server && npm install
npx @modelcontextprotocol/create-server@latest my-server cd my-server && npm install

Python server

Python服务器

pip install mcp
pip install mcp

Or use uv (recommended)

推荐使用uv

uv pip install mcp
undefined
uv pip install mcp
undefined

Quick Start - TypeScript Server

快速开始 - TypeScript服务器

1. Create Server with CLI

1. 使用CLI创建服务器

bash
undefined
bash
undefined

Interactive setup

交互式设置

npx @modelcontextprotocol/create-server@latest my-filesystem-server
npx @modelcontextprotocol/create-server@latest my-filesystem-server

Options prompt:

选项提示:

- Server name: my-filesystem-server

- 服务器名称: my-filesystem-server

- Language: TypeScript

- 开发语言: TypeScript

- Include example tools: Yes

- 包含示例工具: 是

undefined
undefined

2. Define Tools

2. 定义工具

typescript
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";

const server = new Server(
  {
    name: "filesystem-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "Read contents of a file",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "Path to the file to read",
            },
          },
          required: ["path"],
        },
      },
      {
        name: "list_directory",
        description: "List contents of a directory",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "Directory path to list",
            },
          },
          required: ["path"],
        },
      },
    ],
  };
});

// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file": {
      const filePath = args.path as string;
      const content = await fs.readFile(filePath, "utf-8");
      return {
        content: [{ type: "text", text: content }],
      };
    }

    case "list_directory": {
      const dirPath = args.path as string;
      const entries = await fs.readdir(dirPath, { withFileTypes: true });
      const listing = entries
        .map((entry) => `${entry.isDirectory() ? "📁" : "📄"} ${entry.name}`)
        .join("\n");
      return {
        content: [{ type: "text", text: listing }],
      };
    }

    default:
      throw new Error(`Unknown tool: ${name}`);
  }
});

// Start server
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Filesystem MCP server running on stdio");
}

main();
typescript
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";

const server = new Server(
  {
    name: "filesystem-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// 列出可用工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "Read contents of a file",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "Path to the file to read",
            },
          },
          required: ["path"],
        },
      },
      {
        name: "list_directory",
        description: "List contents of a directory",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "Directory path to list",
            },
          },
          required: ["path"],
        },
      },
    ],
  };
});

// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file": {
      const filePath = args.path as string;
      const content = await fs.readFile(filePath, "utf-8");
      return {
        content: [{ type: "text", text: content }],
      };
    }

    case "list_directory": {
      const dirPath = args.path as string;
      const entries = await fs.readdir(dirPath, { withFileTypes: true });
      const listing = entries
        .map((entry) => `${entry.isDirectory() ? "📁" : "📄"} ${entry.name}`)
        .join("\n");
      return {
        content: [{ type: "text", text: listing }],
      };
    }

    default:
      throw new Error(`Unknown tool: ${name}`);
  }
});

// 启动服务器
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Filesystem MCP server running on stdio");
}

main();

3. Configure Claude Desktop

3. 配置Claude Desktop

json
// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
// %APPDATA%/Claude/claude_desktop_config.json (Windows)
{
  "mcpServers": {
    "filesystem": {
      "command": "node",
      "args": ["/absolute/path/to/my-filesystem-server/build/index.js"]
    }
  }
}
json
// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
// %APPDATA%/Claude/claude_desktop_config.json (Windows)
{
  "mcpServers": {
    "filesystem": {
      "command": "node",
      "args": ["/absolute/path/to/my-filesystem-server/build/index.js"]
    }
  }
}

4. Build and Test

4. 构建并测试

bash
undefined
bash
undefined

Build TypeScript

构建TypeScript代码

npm run build
npm run build

Restart Claude Desktop (Cmd+Q and reopen)

重启Claude Desktop(Cmd+Q后重新打开)

Server appears in 🔌 menu

服务器会出现在🔌菜单中

undefined
undefined

Quick Start - Python Server

快速开始 - Python服务器

1. Create Server

1. 创建服务器

python
undefined
python
undefined

server.py

server.py

import asyncio from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent import json import os
import asyncio from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent import json import os

Create server instance

创建服务器实例

app = Server("filesystem-server")
app = Server("filesystem-server")

Define tools

定义工具

@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="read_file", description="Read contents of a file", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path to read"} }, "required": ["path"], }, ), Tool( name="list_directory", description="List directory contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "Directory path"} }, "required": ["path"], }, ), ]
@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="read_file", description="Read contents of a file", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path to read"} }, "required": ["path"], }, ), Tool( name="list_directory", description="List directory contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "Directory path"} }, "required": ["path"], }, ), ]

Handle tool calls

处理工具调用

@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "read_file": file_path = arguments["path"] with open(file_path, "r") as f: content = f.read() return [TextContent(type="text", text=content)]
elif name == "list_directory":
    dir_path = arguments["path"]
    entries = os.listdir(dir_path)
    listing = "\n".join(
        f"{'📁' if os.path.isdir(os.path.join(dir_path, e)) else '📄'} {e}"
        for e in entries
    )
    return [TextContent(type="text", text=listing)]

else:
    raise ValueError(f"Unknown tool: {name}")
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "read_file": file_path = arguments["path"] with open(file_path, "r") as f: content = f.read() return [TextContent(type="text", text=content)]
elif name == "list_directory":
    dir_path = arguments["path"]
    entries = os.listdir(dir_path)
    listing = "\n".join(
        f"{'📁' if os.path.isdir(os.path.join(dir_path, e)) else '📄'} {e}"
        for e in entries
    )
    return [TextContent(type="text", text=listing)]

else:
    raise ValueError(f"Unknown tool: {name}")

Start server

启动服务器

async def main(): async with stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options())
if name == "main": asyncio.run(main())
undefined
async def main(): async with stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options())
if name == "main": asyncio.run(main())
undefined

2. Configure Claude Desktop

2. 配置Claude Desktop

json
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["/absolute/path/to/server.py"]
    }
  }
}
json
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["/absolute/path/to/server.py"]
    }
  }
}

3. Test

3. 测试

bash
undefined
bash
undefined

Test server standalone

单独测试服务器

python server.py
python server.py

Restart Claude Desktop

重启Claude Desktop

Tools appear in 🔌 menu

工具会出现在🔌菜单中

undefined
undefined

Resources - Exposing Data to LLMs

资源 - 向LLM暴露数据

Resources provide read-only access to data sources.
资源提供对数据源的只读访问权限。

TypeScript Resource Example

TypeScript资源示例

typescript
import {
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// List available resources
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "file:///docs/readme.md",
        name: "README",
        description: "Project README documentation",
        mimeType: "text/markdown",
      },
      {
        uri: "file:///config/settings.json",
        name: "Settings",
        description: "Application settings",
        mimeType: "application/json",
      },
    ],
  };
});

// Handle resource reads
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const uri = request.params.uri;

  if (uri.startsWith("file://")) {
    const filePath = uri.replace("file://", "");
    const content = await fs.readFile(filePath, "utf-8");
    return {
      contents: [
        {
          uri,
          mimeType: "text/plain",
          text: content,
        },
      ],
    };
  }

  throw new Error(`Unknown resource: ${uri}`);
});
typescript
import {
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// 列出可用资源
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "file:///docs/readme.md",
        name: "README",
        description: "Project README documentation",
        mimeType: "text/markdown",
      },
      {
        uri: "file:///config/settings.json",
        name: "Settings",
        description: "Application settings",
        mimeType: "application/json",
      },
    ],
  };
});

// 处理资源读取请求
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const uri = request.params.uri;

  if (uri.startsWith("file://")) {
    const filePath = uri.replace("file://", "");
    const content = await fs.readFile(filePath, "utf-8");
    return {
      contents: [
        {
          uri,
          mimeType: "text/plain",
          text: content,
        },
      ],
    };
  }

  throw new Error(`Unknown resource: ${uri}`);
});

Python Resource Example

Python资源示例

python
from mcp.types import Resource, ResourceContent

@app.list_resources()
async def list_resources() -> list[Resource]:
    return [
        Resource(
            uri="file:///docs/readme.md",
            name="README",
            description="Project README",
            mimeType="text/markdown",
        ),
        Resource(
            uri="file:///config/settings.json",
            name="Settings",
            description="App settings",
            mimeType="application/json",
        ),
    ]

@app.read_resource()
async def read_resource(uri: str) -> ResourceContent:
    if uri.startswith("file://"):
        file_path = uri.replace("file://", "")
        with open(file_path, "r") as f:
            content = f.read()
        return ResourceContent(uri=uri, mimeType="text/plain", text=content)

    raise ValueError(f"Unknown resource: {uri}")
python
from mcp.types import Resource, ResourceContent

@app.list_resources()
async def list_resources() -> list[Resource]:
    return [
        Resource(
            uri="file:///docs/readme.md",
            name="README",
            description="Project README",
            mimeType="text/markdown",
        ),
        Resource(
            uri="file:///config/settings.json",
            name="Settings",
            description="App settings",
            mimeType="application/json",
        ),
    ]

@app.read_resource()
async def read_resource(uri: str) -> ResourceContent:
    if uri.startswith("file://"):
        file_path = uri.replace("file://", "")
        with open(file_path, "r") as f:
            content = f.read()
        return ResourceContent(uri=uri, mimeType="text/plain", text=content)

    raise ValueError(f"Unknown resource: {uri}")

Prompts - Reusable Templates

提示词 - 可复用模板

Prompts are templates that LLMs can use with arguments.
提示词是LLM可使用的带参数模板。

TypeScript Prompt Example

TypeScript提示词示例

typescript
import {
  ListPromptsRequestSchema,
  GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// List prompts
server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [
      {
        name: "code_review",
        description: "Review code for best practices",
        arguments: [
          {
            name: "language",
            description: "Programming language",
            required: true,
          },
          {
            name: "code",
            description: "Code to review",
            required: true,
          },
        ],
      },
    ],
  };
});

// Handle prompt requests
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "code_review") {
    const language = args?.language || "unknown";
    const code = args?.code || "";

    return {
      messages: [
        {
          role: "user",
          content: {
            type: "text",
            text: `Review this ${language} code for best practices, security issues, and improvements:\n\n\`\`\`${language}\n${code}\n\`\`\``,
          },
        },
      ],
    };
  }

  throw new Error(`Unknown prompt: ${name}`);
});
typescript
import {
  ListPromptsRequestSchema,
  GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

// 列出提示词
server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [
      {
        name: "code_review",
        description: "Review code for best practices",
        arguments: [
          {
            name: "language",
            description: "Programming language",
            required: true,
          },
          {
            name: "code",
            description: "Code to review",
            required: true,
          },
        ],
      },
    ],
  };
});

// 处理提示词请求
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "code_review") {
    const language = args?.language || "unknown";
    const code = args?.code || "";

    return {
      messages: [
        {
          role: "user",
          content: {
            type: "text",
            text: `Review this ${language} code for best practices, security issues, and improvements:\n\n\`\`\`${language}\n${code}\n\`\`\``,
          },
        },
      ],
    };
  }

  throw new Error(`Unknown prompt: ${name}`);
});

Python Prompt Example

Python提示词示例

python
from mcp.types import Prompt, PromptMessage, PromptArgument

@app.list_prompts()
async def list_prompts() -> list[Prompt]:
    return [
        Prompt(
            name="code_review",
            description="Review code for best practices",
            arguments=[
                PromptArgument(
                    name="language", description="Programming language", required=True
                ),
                PromptArgument(name="code", description="Code to review", required=True),
            ],
        )
    ]

@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
    if name == "code_review":
        language = arguments.get("language", "unknown")
        code = arguments.get("code", "")

        return [
            PromptMessage(
                role="user",
                content={
                    "type": "text",
                    "text": f"Review this {language} code:\n\n```{language}\n{code}\n```",
                },
            )
        ]

    raise ValueError(f"Unknown prompt: {name}")
python
from mcp.types import Prompt, PromptMessage, PromptArgument

@app.list_prompts()
async def list_prompts() -> list[Prompt]:
    return [
        Prompt(
            name="code_review",
            description="Review code for best practices",
            arguments=[
                PromptArgument(
                    name="language", description="Programming language", required=True
                ),
                PromptArgument(name="code", description="Code to review", required=True),
            ],
        )
    ]

@app.get_prompt()
async def get_prompt(name: str, arguments: dict) -> list[PromptMessage]:
    if name == "code_review":
        language = arguments.get("language", "unknown")
        code = arguments.get("code", "")

        return [
            PromptMessage(
                role="user",
                content={
                    "type": "text",
                    "text": f"Review this {language} code:\n\n```{language}\n{code}\n```",
                },
            )
        ]

    raise ValueError(f"Unknown prompt: {name}")

Advanced Patterns

高级模式

1. Error Handling

1. 错误处理

typescript
// TypeScript
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const { name, arguments: args } = request.params;

    if (name === "risky_operation") {
      // Validate inputs
      if (!args.required_param) {
        throw new Error("Missing required parameter: required_param");
      }

      // Perform operation with proper error handling
      const result = await performRiskyOperation(args.required_param);

      return {
        content: [{ type: "text", text: JSON.stringify(result) }],
      };
    }
  } catch (error) {
    // Return error to LLM with helpful message
    return {
      content: [
        {
          type: "text",
          text: `Error: ${error instanceof Error ? error.message : String(error)}`,
        },
      ],
      isError: true,
    };
  }
});
python
undefined
typescript
// TypeScript
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const { name, arguments: args } = request.params;

    if (name === "risky_operation") {
      // 验证输入
      if (!args.required_param) {
        throw new Error("Missing required parameter: required_param");
      }

      // 执行操作并妥善处理错误
      const result = await performRiskyOperation(args.required_param);

      return {
        content: [{ type: "text", text: JSON.stringify(result) }],
      };
    }
  } catch (error) {
    // 向LLM返回包含有用信息的错误
    return {
      content: [
        {
          type: "text",
          text: `Error: ${error instanceof Error ? error.message : String(error)}`,
        },
      ],
      isError: true,
    };
  }
});
python
undefined

Python

Python

@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: if name == "risky_operation": if "required_param" not in arguments: raise ValueError("Missing required parameter: required_param")
        result = await perform_risky_operation(arguments["required_param"])
        return [TextContent(type="text", text=json.dumps(result))]

except Exception as e:
    return [TextContent(type="text", text=f"Error: {str(e)}", isError=True)]
undefined
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: if name == "risky_operation": if "required_param" not in arguments: raise ValueError("Missing required parameter: required_param")
        result = await perform_risky_operation(arguments["required_param"])
        return [TextContent(type="text", text=json.dumps(result))]

except Exception as e:
    return [TextContent(type="text", text=f"Error: {str(e)}", isError=True)]
undefined

2. Async Operations

2. 异步操作

typescript
// TypeScript - Async API calls
import axios from "axios";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "fetch_api_data") {
    const url = args.url as string;
    const response = await axios.get(url);

    return {
      content: [
        {
          type: "text",
          text: JSON.stringify(response.data, null, 2),
        },
      ],
    };
  }
});
python
undefined
typescript
// TypeScript - 异步API调用
import axios from "axios";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "fetch_api_data") {
    const url = args.url as string;
    const response = await axios.get(url);

    return {
      content: [
        {
          type: "text",
          text: JSON.stringify(response.data, null, 2),
        },
      ],
    };
  }
});
python
undefined

Python - Async database queries

Python - 异步数据库查询

import aiosqlite
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "query_database": query = arguments["query"]
    async with aiosqlite.connect("database.db") as db:
        async with db.execute(query) as cursor:
            rows = await cursor.fetchall()
            result = json.dumps(rows)

    return [TextContent(type="text", text=result)]
undefined
import aiosqlite
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "query_database": query = arguments["query"]
    async with aiosqlite.connect("database.db") as db:
        async with db.execute(query) as cursor:
            rows = await cursor.fetchall()
            result = json.dumps(rows)

    return [TextContent(type="text", text=result)]
undefined

3. Environment Variables & Configuration

3. 环境变量与配置

typescript
// TypeScript - Load config from environment
import dotenv from "dotenv";
dotenv.config();

const API_KEY = process.env.API_KEY;
const BASE_URL = process.env.BASE_URL || "https://api.example.com";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "api_call") {
    const response = await fetch(`${BASE_URL}/endpoint`, {
      headers: { Authorization: `Bearer ${API_KEY}` },
    });
    // ...
  }
});
python
undefined
typescript
// TypeScript - 从环境变量加载配置
import dotenv from "dotenv";
dotenv.config();

const API_KEY = process.env.API_KEY;
const BASE_URL = process.env.BASE_URL || "https://api.example.com";

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "api_call") {
    const response = await fetch(`${BASE_URL}/endpoint`, {
      headers: { Authorization: `Bearer ${API_KEY}` },
    });
    // ...
  }
});
python
undefined

Python - Load config from environment

Python - 从环境变量加载配置

import os from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "api_call": headers = {"Authorization": f"Bearer {API_KEY}"} async with aiohttp.ClientSession() as session: async with session.get(f"{BASE_URL}/endpoint", headers=headers) as resp: data = await resp.json() return [TextContent(type="text", text=json.dumps(data))]
undefined
import os from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY") BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: if name == "api_call": headers = {"Authorization": f"Bearer {API_KEY}"} async with aiohttp.ClientSession() as session: async with session.get(f"{BASE_URL}/endpoint", headers=headers) as resp: data = await resp.json() return [TextContent(type="text", text=json.dumps(data))]
undefined

4. Streaming Responses (Large Data)

4. 流式响应(大数据)

typescript
// TypeScript - Stream large file contents
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "read_large_file") {
    const filePath = request.params.arguments.path as string;
    const stream = fs.createReadStream(filePath, { encoding: "utf-8" });

    let content = "";
    for await (const chunk of stream) {
      content += chunk;
      // Could yield chunks incrementally in future MCP versions
    }

    return {
      content: [{ type: "text", text: content }],
    };
  }
});
typescript
// TypeScript - 流式传输大文件内容
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "read_large_file") {
    const filePath = request.params.arguments.path as string;
    const stream = fs.createReadStream(filePath, { encoding: "utf-8" });

    let content = "";
    for await (const chunk of stream) {
      content += chunk;
      // 未来MCP版本可支持增量返回数据块
    }

    return {
      content: [{ type: "text", text: content }],
    };
  }
});

5. Dynamic Tool Registration

5. 动态工具注册

typescript
// TypeScript - Register tools from config
interface ToolConfig {
  name: string;
  description: string;
  schema: object;
  handler: (args: any) => Promise<any>;
}

const toolRegistry = new Map<string, ToolConfig>();

function registerTool(config: ToolConfig) {
  toolRegistry.set(config.name, config);
}

// Register custom tools
registerTool({
  name: "custom_tool",
  description: "Dynamically registered tool",
  schema: {
    type: "object",
    properties: {
      input: { type: "string" },
    },
  },
  handler: async (args) => {
    return { result: `Processed: ${args.input}` };
  },
});

// List tools dynamically
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: Array.from(toolRegistry.values()).map((tool) => ({
      name: tool.name,
      description: tool.description,
      inputSchema: tool.schema,
    })),
  };
});

// Call tools dynamically
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const tool = toolRegistry.get(request.params.name);
  if (!tool) {
    throw new Error(`Unknown tool: ${request.params.name}`);
  }

  const result = await tool.handler(request.params.arguments);
  return {
    content: [{ type: "text", text: JSON.stringify(result) }],
  };
});
typescript
// TypeScript - 从配置注册工具
interface ToolConfig {
  name: string;
  description: string;
  schema: object;
  handler: (args: any) => Promise<any>;
}

const toolRegistry = new Map<string, ToolConfig>();

function registerTool(config: ToolConfig) {
  toolRegistry.set(config.name, config);
}

// 注册自定义工具
registerTool({
  name: "custom_tool",
  description: "Dynamically registered tool",
  schema: {
    type: "object",
    properties: {
      input: { type: "string" },
    },
  },
  handler: async (args) => {
    return { result: `Processed: ${args.input}` };
  },
});

// 动态列出工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: Array.from(toolRegistry.values()).map((tool) => ({
      name: tool.name,
      description: tool.description,
      inputSchema: tool.schema,
    })),
  };
});

// 动态调用工具
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const tool = toolRegistry.get(request.params.name);
  if (!tool) {
    throw new Error(`Unknown tool: ${request.params.name}`);
  }

  const result = await tool.handler(request.params.arguments);
  return {
    content: [{ type: "text", text: JSON.stringify(result) }],
  };
});

Transport Types

传输类型

1. STDIO (Local Execution)

1. STDIO(本地执行)

Default for Claude Desktop integration. Server runs as subprocess.
json
// Claude Desktop config
{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/path/to/server/build/index.js"],
      "env": {
        "API_KEY": "your-api-key"
      }
    }
  }
}
Claude Desktop集成的默认方式。服务器作为子进程运行。
json
// Claude Desktop配置
{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/path/to/server/build/index.js"],
      "env": {
        "API_KEY": "your-api-key"
      }
    }
  }
}

2. SSE (Server-Sent Events)

2. SSE(服务器推送事件)

For long-running servers with HTTP transport.
typescript
// TypeScript SSE server
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";

const app = express();
const server = new Server(/* ... */);

app.get("/sse", async (req, res) => {
  const transport = new SSEServerTransport("/messages", res);
  await server.connect(transport);
});

app.post("/messages", async (req, res) => {
  // Handle incoming messages
});

app.listen(3000, () => {
  console.log("MCP server listening on http://localhost:3000");
});
json
// Claude Desktop config for SSE
{
  "mcpServers": {
    "remote-server": {
      "url": "http://localhost:3000/sse"
    }
  }
}
适用于长期运行的HTTP传输服务器。
typescript
// TypeScript SSE服务器
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";

const app = express();
const server = new Server(/* ... */);

app.get("/sse", async (req, res) => {
  const transport = new SSEServerTransport("/messages", res);
  await server.connect(transport);
});

app.post("/messages", async (req, res) => {
  // 处理传入消息
});

app.listen(3000, () => {
  console.log("MCP server listening on http://localhost:3000");
});
json
// Claude Desktop的SSE配置
{
  "mcpServers": {
    "remote-server": {
      "url": "http://localhost:3000/sse"
    }
  }
}

3. HTTP (Streamable HTTP)

3. HTTP(可流式传输的HTTP)

For REST-style MCP servers.
python
undefined
适用于REST风格的MCP服务器。
python
undefined

Python HTTP server with FastAPI

基于FastAPI的Python HTTP服务器

from fastapi import FastAPI from mcp.server.fastapi import create_fastapi_app
app = FastAPI() mcp_app = Server("http-server")
from fastapi import FastAPI from mcp.server.fastapi import create_fastapi_app
app = FastAPI() mcp_app = Server("http-server")

Define tools/resources/prompts as usual

像往常一样定义工具/资源/提示词

...

...

Mount MCP routes

挂载MCP路由

app.mount("/mcp", create_fastapi_app(mcp_app))
if name == "main": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
undefined
app.mount("/mcp", create_fastapi_app(mcp_app))
if name == "main": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
undefined

Debugging MCP Servers

调试MCP服务器

1. Server Logs

1. 服务器日志

typescript
// TypeScript - Console.error for logs (STDOUT is for MCP protocol)
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  console.error(`Tool called: ${request.params.name}`);
  console.error(`Arguments: ${JSON.stringify(request.params.arguments)}`);
  // ...
});
python
undefined
typescript
// TypeScript - 使用console.error记录日志(STDOUT用于MCP协议)
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  console.error(`Tool called: ${request.params.name}`);
  console.error(`Arguments: ${JSON.stringify(request.params.arguments)}`);
  // ...
});
python
undefined

Python - Use logging module

Python - 使用logging模块

import logging
logging.basicConfig(level=logging.DEBUG, filename="mcp-server.log") logger = logging.getLogger(name)
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: logger.debug(f"Tool called: {name}") logger.debug(f"Arguments: {arguments}") # ...
undefined
import logging
logging.basicConfig(level=logging.DEBUG, filename="mcp-server.log") logger = logging.getLogger(name)
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: logger.debug(f"Tool called: {name}") logger.debug(f"Arguments: {arguments}") // ...
undefined

2. MCP Inspector

2. MCP Inspector

bash
undefined
bash
undefined

Install MCP Inspector (browser-based debugging tool)

安装MCP Inspector(基于浏览器的调试工具)

npm install -g @modelcontextprotocol/inspector
npm install -g @modelcontextprotocol/inspector

Run inspector with your server

运行Inspector并连接你的服务器

mcp-inspector node /path/to/server/build/index.js
mcp-inspector node /path/to/server/build/index.js

Opens browser at http://localhost:6274

在浏览器中打开http://localhost:6274

- Test tools manually

- 手动测试工具

- See request/response payloads

- 查看请求/响应负载

- Debug JSON schema validation

- 调试JSON schema验证

undefined
undefined

3. Claude Desktop Logs

3. Claude Desktop日志

bash
undefined
bash
undefined

macOS

macOS

tail -f ~/Library/Logs/Claude/mcp*.log
tail -f ~/Library/Logs/Claude/mcp*.log

Windows

Windows

Check %APPDATA%/Claude/logs/

查看%APPDATA%/Claude/logs/目录

undefined
undefined

4. Test Server Standalone

4. 单独测试服务器

typescript
// TypeScript - Add test harness
if (process.argv.includes("--test")) {
  // Simulate tool call
  const testRequest = {
    params: {
      name: "read_file",
      arguments: { path: "./test.txt" },
    },
  };

  server
    .setRequestHandler(CallToolRequestSchema, async (request) => {
      // ... your handler
    })
    .then((handler) => handler(testRequest))
    .then((result) => console.log(JSON.stringify(result, null, 2)))
    .catch((error) => console.error(error));
} else {
  // Normal server startup
  main();
}
bash
undefined
typescript
// TypeScript - 添加测试工具
if (process.argv.includes("--test")) {
  // 模拟工具调用
  const testRequest = {
    params: {
      name: "read_file",
      arguments: { path: "./test.txt" },
    },
  };

  server
    .setRequestHandler(CallToolRequestSchema, async (request) => {
      // ... 你的处理器逻辑
    })
    .then((handler) => handler(testRequest))
    .then((result) => console.log(JSON.stringify(result, null, 2)))
    .catch((error) => console.error(error));
} else {
  // 正常启动服务器
  main();
}
bash
undefined

Test without Claude Desktop

无需Claude Desktop即可测试

node build/index.js --test
undefined
node build/index.js --test
undefined

Real-World Server Examples

真实场景服务器示例

1. GitHub Integration Server

1. GitHub集成服务器

typescript
// github-server/src/index.ts
import { Octokit } from "@octokit/rest";

const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "list_repos",
        description: "List repositories for a user/org",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string", description: "Username or org name" },
          },
          required: ["owner"],
        },
      },
      {
        name: "create_issue",
        description: "Create a GitHub issue",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string" },
            repo: { type: "string" },
            title: { type: "string" },
            body: { type: "string" },
          },
          required: ["owner", "repo", "title"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "list_repos") {
    const { data } = await octokit.repos.listForUser({
      username: args.owner as string,
    });
    const repos = data.map((r) => ({
      name: r.name,
      description: r.description,
      stars: r.stargazers_count,
    }));
    return {
      content: [{ type: "text", text: JSON.stringify(repos, null, 2) }],
    };
  }

  if (name === "create_issue") {
    const { data } = await octokit.issues.create({
      owner: args.owner as string,
      repo: args.repo as string,
      title: args.title as string,
      body: args.body as string,
    });
    return {
      content: [
        { type: "text", text: `Issue created: ${data.html_url}` },
      ],
    };
  }
});
typescript
// github-server/src/index.ts
import { Octokit } from "@octokit/rest";

const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "list_repos",
        description: "List repositories for a user/org",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string", description: "Username or org name" },
          },
          required: ["owner"],
        },
      },
      {
        name: "create_issue",
        description: "Create a GitHub issue",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string" },
            repo: { type: "string" },
            title: { type: "string" },
            body: { type: "string" },
          },
          required: ["owner", "repo", "title"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "list_repos") {
    const { data } = await octokit.repos.listForUser({
      username: args.owner as string,
    });
    const repos = data.map((r) => ({
      name: r.name,
      description: r.description,
      stars: r.stargazers_count,
    }));
    return {
      content: [{ type: "text", text: JSON.stringify(repos, null, 2) }],
    };
  }

  if (name === "create_issue") {
    const { data } = await octokit.issues.create({
      owner: args.owner as string,
      repo: args.repo as string,
      title: args.title as string,
      body: args.body as string,
    });
    return {
      content: [
        { type: "text", text: `Issue created: ${data.html_url}` },
      ],
    };
  }
});

2. Database Query Server

2. 数据库查询服务器

python
undefined
python
undefined

database-server/server.py

database-server/server.py

import asyncpg import json from mcp.server import Server from mcp.types import Tool, TextContent
app = Server("database-server")
async def get_db_pool(): return await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password" )
@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="query", description="Execute SQL query (SELECT only)", inputSchema={ "type": "object", "properties": { "sql": {"type": "string", "description": "SQL query"}, }, "required": ["sql"], }, ), Tool( name="list_tables", description="List all tables in database", inputSchema={"type": "object", "properties": {}}, ), ]
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: pool = await get_db_pool()
if name == "query":
    sql = arguments["sql"]
    # Security: Only allow SELECT
    if not sql.strip().upper().startswith("SELECT"):
        raise ValueError("Only SELECT queries allowed")

    async with pool.acquire() as conn:
        rows = await conn.fetch(sql)
        result = [dict(row) for row in rows]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]

elif name == "list_tables":
    async with pool.acquire() as conn:
        tables = await conn.fetch("""
            SELECT table_name FROM information_schema.tables
            WHERE table_schema = 'public'
        """)
        result = [row["table_name"] for row in tables]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]
undefined
import asyncpg import json from mcp.server import Server from mcp.types import Tool, TextContent
app = Server("database-server")
async def get_db_pool(): return await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password" )
@app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="query", description="Execute SQL query (SELECT only)", inputSchema={ "type": "object", "properties": { "sql": {"type": "string", "description": "SQL query"}, }, "required": ["sql"], }, ), Tool( name="list_tables", description="List all tables in database", inputSchema={"type": "object", "properties": {}}, ), ]
@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: pool = await get_db_pool()
if name == "query":
    sql = arguments["sql"]
    # 安全:仅允许SELECT查询
    if not sql.strip().upper().startswith("SELECT"):
        raise ValueError("Only SELECT queries allowed")

    async with pool.acquire() as conn:
        rows = await conn.fetch(sql)
        result = [dict(row) for row in rows]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]

elif name == "list_tables":
    async with pool.acquire() as conn:
        tables = await conn.fetch("""
            SELECT table_name FROM information_schema.tables
            WHERE table_schema = 'public'
        """)
        result = [row["table_name"] for row in tables]

    return [TextContent(type="text", text=json.dumps(result, indent=2))]
undefined

3. Web Scraper Server

3. 网页抓取服务器

typescript
// scraper-server/src/index.ts
import * as cheerio from "cheerio";
import axios from "axios";

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "scrape_url",
        description: "Scrape content from a URL",
        inputSchema: {
          type: "object",
          properties: {
            url: { type: "string", description: "URL to scrape" },
            selector: {
              type: "string",
              description: "CSS selector for content",
            },
          },
          required: ["url"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "scrape_url") {
    const url = request.params.arguments.url as string;
    const selector = request.params.arguments.selector as string;

    const response = await axios.get(url);
    const $ = cheerio.load(response.data);

    let content: string;
    if (selector) {
      content = $(selector).text();
    } else {
      content = $("body").text();
    }

    return {
      content: [
        {
          type: "text",
          text: content.trim(),
        },
      ],
    };
  }
});
typescript
// scraper-server/src/index.ts
import * as cheerio from "cheerio";
import axios from "axios";

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "scrape_url",
        description: "Scrape content from a URL",
        inputSchema: {
          type: "object",
          properties: {
            url: { type: "string", description: "URL to scrape" },
            selector: {
              type: "string",
              description: "CSS selector for content",
            },
          },
          required: ["url"],
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "scrape_url") {
    const url = request.params.arguments.url as string;
    const selector = request.params.arguments.selector as string;

    const response = await axios.get(url);
    const $ = cheerio.load(response.data);

    let content: string;
    if (selector) {
      content = $(selector).text();
    } else {
      content = $("body").text();
    }

    return {
      content: [
        {
          type: "text",
          text: content.trim(),
        },
      ],
    };
  }
});

Best Practices

最佳实践

1. Tool Design

1. 工具设计

✅ DO:
  • Use clear, descriptive tool names (
    read_file
    not
    rf
    )
  • Provide detailed descriptions for LLM understanding
  • Define comprehensive JSON schemas with descriptions
  • Return structured data (JSON) when possible
  • Handle errors gracefully with helpful messages
❌ DON'T:
  • Create overly broad tools (split complex operations)
  • Return massive payloads (paginate large datasets)
  • Use ambiguous parameter names
  • Assume LLM knows your domain-specific terminology
✅ 推荐做法:
  • 使用清晰、描述性的工具名称(如
    read_file
    而非
    rf
  • 提供详细描述,帮助LLM理解工具用途
  • 定义带描述信息的完整JSON schema
  • 尽可能返回结构化数据(JSON格式)
  • 优雅处理错误并返回有用的错误信息
❌ 不推荐做法:
  • 创建过于宽泛的工具(拆分复杂操作)
  • 返回过大的负载(对大数据集进行分页)
  • 使用模糊的参数名称
  • 假设LLM了解你的领域特定术语

2. Security

2. 安全

Critical Rules:
  • Validate ALL inputs (type, range, format)
  • Sanitize file paths (prevent directory traversal)
  • Use allowlists for commands/operations
  • Never expose sensitive credentials in responses
  • Implement rate limiting for expensive operations
  • Use read-only access by default
typescript
// Example: Path validation
function validatePath(inputPath: string): string {
  const normalized = path.normalize(inputPath);
  const allowed = path.resolve("/safe/directory");

  if (!normalized.startsWith(allowed)) {
    throw new Error("Path outside allowed directory");
  }

  return normalized;
}
关键规则:
  • 验证所有输入(类型、范围、格式)
  • 清理文件路径(防止目录遍历攻击)
  • 对命令/操作使用白名单
  • 绝不在响应中暴露敏感凭证
  • 对高开销操作实现速率限制
  • 默认使用只读访问权限
typescript
// 示例:路径验证
function validatePath(inputPath: string): string {
  const normalized = path.normalize(inputPath);
  const allowed = path.resolve("/safe/directory");

  if (!normalized.startsWith(allowed)) {
    throw new Error("Path outside allowed directory");
  }

  return normalized;
}

3. Performance

3. 性能

  • Cache expensive operations
  • Stream large responses when possible
  • Use pagination for list operations
  • Set reasonable timeouts
  • Implement request queuing for rate-limited APIs
  • 缓存高开销操作的结果
  • 尽可能流式返回大响应
  • 对列表操作使用分页
  • 设置合理的超时时间
  • 对受速率限制的API实现请求排队

4. Testing

4. 测试

bash
undefined
bash
undefined

Use MCP Inspector for manual testing

使用MCP Inspector进行手动测试

mcp-inspector node build/index.js
mcp-inspector node build/index.js

Unit test tool handlers

单元测试工具处理器

npm test
npm test

Integration test with Claude Desktop

与Claude Desktop进行集成测试

1. Add to config

1. 添加到配置文件

2. Restart Claude

2. 重启Claude

3. Test in conversation

3. 在对话中测试

undefined
undefined

5. Documentation

5. 文档

  • Document all tools in code comments
  • Provide example usage in descriptions
  • Include error scenarios in documentation
  • Maintain a CHANGELOG for server updates
  • 在代码注释中记录所有工具
  • 在描述中提供示例用法
  • 在文档中包含错误场景
  • 为服务器更新维护CHANGELOG

Common Pitfalls

常见陷阱

Writing to STDOUT (breaks MCP protocol):
typescript
// WRONG
console.log("Debug message");  // STDOUT is for MCP protocol

// CORRECT
console.error("Debug message");  // STDERR for logs
Not handling errors:
typescript
// WRONG - unhandled promise rejection
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const data = await riskyOperation();  // Can throw
  return { content: [{ type: "text", text: data }] };
});

// CORRECT
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const data = await riskyOperation();
    return { content: [{ type: "text", text: data }] };
  } catch (error) {
    return {
      content: [{ type: "text", text: `Error: ${error.message}` }],
      isError: true,
    };
  }
});
Blocking operations:
python
undefined
向STDOUT输出内容(破坏MCP协议):
typescript
// 错误写法
console.log("Debug message");  // STDOUT用于MCP协议通信

// 正确写法
console.error("Debug message");  // STDERR用于日志输出
不处理错误:
typescript
// 错误写法 - 未处理的Promise拒绝
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const data = await riskyOperation();  // 可能抛出错误
  return { content: [{ type: "text", text: data }] };
});

// 正确写法
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    const data = await riskyOperation();
    return { content: [{ type: "text", text: data }] };
  } catch (error) {
    return {
      content: [{ type: "text", text: `Error: ${error.message}` }],
      isError: true,
    };
  }
});
阻塞操作:
python
undefined

WRONG - synchronous file read blocks event loop

错误写法 - 同步文件读取阻塞事件循环

@app.call_tool() async def call_tool(name: str, arguments: dict): with open("large_file.txt", "r") as f: # Blocks! content = f.read() return [TextContent(type="text", text=content)]
@app.call_tool() async def call_tool(name: str, arguments: dict): with open("large_file.txt", "r") as f: # 阻塞! content = f.read() return [TextContent(type="text", text=content)]

CORRECT - async file I/O

正确写法 - 异步文件I/O

@app.call_tool() async def call_tool(name: str, arguments: dict): async with aiofiles.open("large_file.txt", "r") as f: content = await f.read() return [TextContent(type="text", text=content)]

❌ **Missing required fields in schema**:
```typescript
// WRONG - missing "required" field
{
  name: "search",
  inputSchema: {
    type: "object",
    properties: {
      query: { type: "string" }
    }
    // Missing: required: ["query"]
  }
}
@app.call_tool() async def call_tool(name: str, arguments: dict): async with aiofiles.open("large_file.txt", "r") as f: content = await f.read() return [TextContent(type="text", text=content)]

❌ **Schema中缺少必填字段**:
```typescript
// 错误写法 - 缺少"required"字段
{
  name: "search",
  inputSchema: {
    type: "object",
    properties: {
      query: { type: "string" }
    }
    // 缺少: required: ["query"]
  }
}

Resources

资源

Related Skills

相关技能

When building MCP servers, consider these complementary skills:
  • typescript-core: TypeScript type safety, tsconfig optimization, and advanced patterns
  • asyncio: Python async patterns for MCP servers with async/await
  • openrouter: Alternative LLM API integration for multi-model support
构建MCP服务器时,可考虑以下互补技能:
  • typescript-core: TypeScript类型安全、tsconfig优化和高级模式
  • asyncio: 用于MCP服务器的Python异步模式(async/await)
  • openrouter: 替代LLM API集成,支持多模型

Quick TypeScript MCP Patterns (Inlined for Standalone Use)

快速TypeScript MCP模式(独立使用)

typescript
// Type-safe MCP server with TypeScript
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';

// Define tool schemas with runtime validation
const SearchSchema = z.object({
  query: z.string().min(1).max(500),
  limit: z.number().int().min(1).max(100).default(10),
});

type SearchArgs = z.infer<typeof SearchSchema>;

class TypeSafeMCPServer {
  private server: Server;

  constructor() {
    this.server = new Server(
      {
        name: 'typed-search-server',
        version: '1.0.0',
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    this.setupToolHandlers();
  }

  private setupToolHandlers() {
    // List tools with full type inference
    this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
      tools: [
        {
          name: 'search',
          description: 'Search with type-safe parameters',
          inputSchema: {
            type: 'object',
            properties: {
              query: { type: 'string', minLength: 1, maxLength: 500 },
              limit: { type: 'number', minimum: 1, maximum: 100, default: 10 },
            },
            required: ['query'],
          },
        },
      ],
    }));

    // Type-safe tool execution
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      if (request.params.name === 'search') {
        // Runtime validation with Zod
        const args = SearchSchema.parse(request.params.arguments);

        // Type-safe implementation
        const results = await this.performSearch(args);

        return {
          content: [{ type: 'text', text: JSON.stringify(results) }],
        };
      }

      throw new Error(`Unknown tool: ${request.params.name}`);
    });
  }

  private async performSearch(args: SearchArgs): Promise<Array<{ title: string; url: string }>> {
    // Implementation with full type safety
    // args.query is string, args.limit is number
    return [];
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// Start server
const server = new TypeSafeMCPServer();
server.run();
typescript
// 基于TypeScript的类型安全MCP服务器
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';

// 使用运行时验证定义工具Schema
const SearchSchema = z.object({
  query: z.string().min(1).max(500),
  limit: z.number().int().min(1).max(100).default(10),
});

type SearchArgs = z.infer<typeof SearchSchema>;

class TypeSafeMCPServer {
  private server: Server;

  constructor() {
    this.server = new Server(
      {
        name: 'typed-search-server',
        version: '1.0.0',
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    this.setupToolHandlers();
  }

  private setupToolHandlers() {
    // 利用完整类型推断列出工具
    this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
      tools: [
        {
          name: 'search',
          description: 'Search with type-safe parameters',
          inputSchema: {
            type: 'object',
            properties: {
              query: { type: 'string', minLength: 1, maxLength: 500 },
              limit: { type: 'number', minimum: 1, maximum: 100, default: 10 },
            },
            required: ['query'],
          },
        },
      ],
    }));

    // 类型安全的工具执行
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      if (request.params.name === 'search') {
        // 使用Zod进行运行时验证
        const args = SearchSchema.parse(request.params.arguments);

        // 类型安全的实现
        const results = await this.performSearch(args);

        return {
          content: [{ type: 'text', text: JSON.stringify(results) }],
        };
      }

      throw new Error(`Unknown tool: ${request.params.name}`);
    });
  }

  private async performSearch(args: SearchArgs): Promise<Array<{ title: string; url: string }>> {
    // 完整类型安全的实现
    // args.query是string类型,args.limit是number类型
    return [];
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// 启动服务器
const server = new TypeSafeMCPServer();
server.run();

Quick Python Async MCP Patterns (Inlined for Standalone Use)

快速Python异步MCP模式(独立使用)

python
undefined
python
// 基于Python的异步MCP服务器
import asyncio
import logging
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field

// 使用Pydantic定义类型安全的参数模型
class SearchArgs(BaseModel):
    query: str = Field(..., min_length=1, max_length=500)
    limit: int = Field(10, ge=1, le=100)

class AsyncMCPServer:
    def __init__(self):
        self.server = Server("async-search-server")
        self.setup_handlers()

    def setup_handlers(self):
        @self.server.list_tools()
        async def list_tools() -> list[Tool]:
            return [
                Tool(
                    name="search",
                    description="Search with async processing",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "minLength": 1, "maxLength": 500},
                            "limit": {"type": "number", "minimum": 1, "maximum": 100, "default": 10},
                        },
                        "required": ["query"],
                    },
                )
            ]

        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
            if name == "search":
                // 使用Pydantic验证参数
                args = SearchArgs(**arguments)

                // 异步实现
                results = await self.perform_search(args)

                return [
                    TextContent(
                        type="text",
                        text=str(results)
                    )
                ]

            raise ValueError(f"Unknown tool: {name}")

    async def perform_search(self, args: SearchArgs) -> list[dict[str, str]]:
        """异步搜索实现"""
        // 模拟异步I/O
        await asyncio.sleep(0.1)

        // 使用已验证的参数(args.query是str类型,args.limit是int类型)
        return [
            {"title": f"Result for {args.query}", "url": "https://example.com"}
        ]

async def main():
    """运行异步MCP服务器"""
    logging.basicConfig(level=logging.INFO)
    server = AsyncMCPServer()

    // 使用stdio传输连接Claude Desktop
    async with stdio_server() as (read_stream, write_stream):
        await server.server.run(
            read_stream,
            write_stream,
            server.server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Async MCP server with Python

快速多语言MCP模式(独立使用)

import asyncio import logging from typing import Any from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from pydantic import BaseModel, Field
TypeScript vs Python 对比:
特性TypeScriptPython
类型安全编译时+运行时(Zod)仅运行时(Pydantic)
性能启动更快,Node.js开销启动较慢,CPU任务表现更好
异步支持原生async/await,事件循环asyncio,适合I/O密集型任务
生态系统npm包,前端工具数据科学、ML库
最佳适用场景Web API、实时工具数据处理、ML集成
两种语言的通用模式:
  1. 输入验证
    • TypeScript: Zod schema
    • Python: Pydantic模型
  2. 错误处理
    • 两者:使用try/catch捕获特定错误类型
    • 在MCP响应中返回错误内容
  3. 资源管理
    • TypeScript: async/await结合try/finally
    • Python: 异步上下文管理器
  4. 测试
    • TypeScript: Vitest/Jest结合模拟传输
    • Python: pytest结合pytest-asyncio
选择实现语言:
typescript
// TypeScript - 最适合:
// - 文件系统操作
// - 网页抓取/HTTP请求
// - JSON/API处理
// - 实时数据流

// Python - 最适合:
// - 数据分析(pandas、numpy)
// - 机器学习(scikit-learn、torch)
// - 数据库ETL操作
// - 科学计算
[完整的TypeScript、Python异步和OpenRouter模式可在相关技能中获取(若一起部署)]

Type-safe argument models with Pydantic

本地MCP集成模式(你的代码库)

.mcp.json布局(项目范围)

class SearchArgs(BaseModel): query: str = Field(..., min_length=1, max_length=500) limit: int = Field(10, ge=1, le=100)
class AsyncMCPServer: def init(self): self.server = Server("async-search-server") self.setup_handlers()
def setup_handlers(self):
    @self.server.list_tools()
    async def list_tools() -> list[Tool]:
        return [
            Tool(
                name="search",
                description="Search with async processing",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {"type": "string", "minLength": 1, "maxLength": 500},
                        "limit": {"type": "number", "minimum": 1, "maximum": 100, "default": 10},
                    },
                    "required": ["query"],
                },
            )
        ]

    @self.server.call_tool()
    async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
        if name == "search":
            # Validate with Pydantic
            args = SearchArgs(**arguments)

            # Async implementation
            results = await self.perform_search(args)

            return [
                TextContent(
                    type="text",
                    text=str(results)
                )
            ]

        raise ValueError(f"Unknown tool: {name}")

async def perform_search(self, args: SearchArgs) -> list[dict[str, str]]:
    """Async search implementation"""
    # Simulate async I/O
    await asyncio.sleep(0.1)

    # Use validated args (args.query is str, args.limit is int)
    return [
        {"title": f"Result for {args.query}", "url": "https://example.com"}
    ]
async def main(): """Run async MCP server""" logging.basicConfig(level=logging.INFO) server = AsyncMCPServer()
# Use stdio transport for Claude Desktop
async with stdio_server() as (read_stream, write_stream):
    await server.server.run(
        read_stream,
        write_stream,
        server.server.create_initialization_options()
    )
if name == "main": asyncio.run(main())
undefined
json
{
  "mcpServers": {
    "kuzu-memory": {
      "type": "stdio",
      "command": "kuzu-memory",
      "args": ["mcp"]
    },
    "mcp-vector-search": {
      "type": "stdio",
      "command": "uv",
      "args": ["run", "mcp-vector-search", "mcp"],
      "env": {
        "MCP_ENABLE_FILE_WATCHING": "true"
      }
    }
  }
}

Quick Multi-Language MCP Patterns (Inlined for Standalone Use)

CLI约定

TypeScript vs Python Trade-offs:
FeatureTypeScriptPython
Type SafetyCompile-time + runtime (Zod)Runtime only (Pydantic)
PerformanceFaster startup, Node.js overheadSlower startup, better for CPU tasks
Async SupportNative async/await, event loopasyncio, great for I/O
Ecosystemnpm packages, frontend toolsData science, ML libraries
Best ForWeb APIs, real-time toolsData processing, ML integration
Common Patterns Across Both:
  1. Input Validation
    • TypeScript: Zod schemas
    • Python: Pydantic models
  2. Error Handling
    • Both: Try/catch with specific error types
    • Return error content in MCP response
  3. Resource Management
    • TypeScript: async/await with try/finally
    • Python: async context managers
  4. Testing
    • TypeScript: Vitest/Jest with mock transport
    • Python: pytest with pytest-asyncio
Choosing Implementation Language:
typescript
// TypeScript - Best for:
// - File system operations
// - Web scraping/HTTP requests
// - JSON/API manipulation
// - Real-time data streams

// Python - Best for:
// - Data analysis (pandas, numpy)
// - Machine learning (scikit-learn, torch)
// - Database ETL operations
// - Scientific computing
[Full TypeScript, Python async, and OpenRouter patterns available in respective skills if deployed together]
  • 为stdio服务器提供
    mcp
    子命令。
  • 使用
    setup
    命令进行端到端初始化和集成(如mcp-vector-search、kuzu-memory)。
  • 使用
    install
    /
    uninstall
    命令针对特定客户端(如mcp-ticketer)。
  • 提供
    doctor
    命令验证依赖。

Local MCP Integration Patterns (Your Repos)

环境变量约定

.mcp.json Layout (Project-Scoped)

json
{
  "mcpServers": {
    "kuzu-memory": {
      "type": "stdio",
      "command": "kuzu-memory",
      "args": ["mcp"]
    },
    "mcp-vector-search": {
      "type": "stdio",
      "command": "uv",
      "args": ["run", "mcp-vector-search", "mcp"],
      "env": {
        "MCP_ENABLE_FILE_WATCHING": "true"
      }
    }
  }
}
  • 文件监听:
    MCP_ENABLE_FILE_WATCHING=true
  • Kuzu内存:
    KUZU_MEMORY_PROJECT_ROOT
    ,
    KUZU_MEMORY_DB
  • Ticketer适配器:
    MCP_TICKETER_ADAPTER
    ,
    GITHUB_TOKEN
    ,
    GITHUB_OWNER
    ,
    GITHUB_REPO

CLI Conventions

常见服务器名称

  • Provide a
    mcp
    subcommand for stdio servers.
  • Use
    setup
    for end-to-end init + integration (mcp-vector-search, kuzu-memory).
  • Use
    install
    /
    uninstall
    to target specific clients (mcp-ticketer).
  • Provide
    doctor
    commands to validate dependencies.
kuzu-memory
,
mcp-vector-search
,
mcp-ticketer
,
mcp-skillset
,
mcp-browser

Environment Conventions

总结

  • File watching:
    MCP_ENABLE_FILE_WATCHING=true
  • Kuzu memory:
    KUZU_MEMORY_PROJECT_ROOT
    ,
    KUZU_MEMORY_DB
  • Ticketer adapters:
    MCP_TICKETER_ADAPTER
    ,
    GITHUB_TOKEN
    ,
    GITHUB_OWNER
    ,
    GITHUB_REPO
  • MCP 支持借助工具、资源和提示词开发原生AI服务器
  • 工具 是LLM可调用的函数(读取文件、查询API、执行操作)
  • 资源 提供只读数据访问(文件、数据库、文档)
  • 提示词 是适用于常见任务的带参数可复用模板
  • 提供 SDK 支持TypeScript和Python,具备完整异步能力
  • 通过JSON配置实现与 Claude Desktop 的集成(STDIO传输)
  • 使用 MCP Inspector、服务器日志和单独测试进行调试
  • 安全 至关重要:验证输入、清理路径、使用白名单
  • 最佳实践:清晰命名、完整Schema、错误处理、性能优化

Common Server Names

kuzu-memory
,
mcp-vector-search
,
mcp-ticketer
,
mcp-skillset
,
mcp-browser

Summary

  • MCP enables AI-native server development with tools, resources, and prompts
  • Tools are functions LLMs can call (read files, query APIs, run operations)
  • Resources provide read-only data access (files, databases, documentation)
  • Prompts are reusable templates with arguments for common tasks
  • SDKs available for TypeScript and Python with full async support
  • Claude Desktop integration via JSON config (STDIO transport)
  • Debugging with MCP Inspector, server logs, and standalone testing
  • Security critical: validate inputs, sanitize paths, use allowlists
  • Best practices: Clear naming, comprehensive schemas, error handling, performance optimization