building-mcp-servers

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Building MCP Servers

构建MCP服务器

Reference guide for Model Context Protocol server development (January 2026). Covers TypeScript, Python, and C# implementations with Streamable HTTP transport.
Model Context Protocol(模型上下文协议)服务器开发参考指南(2026年1月)。涵盖TypeScript、Python和C#三种语言的实现,以及Streamable HTTP传输的配置。

Quick Reference

快速参考

LanguagePackageVersionTransport
TypeScript
@modelcontextprotocol/sdk
1.25.1
StreamableHTTPServerTransport
Python
mcp
1.25.0
transport="streamable-http"
C#
ModelContextProtocol.AspNetCore
0.6.0-preview
.WithHttpTransport()
语言版本传输方式
TypeScript
@modelcontextprotocol/sdk
1.25.1
StreamableHTTPServerTransport
Python
mcp
1.25.0
transport="streamable-http"
C#
ModelContextProtocol.AspNetCore
0.6.0-preview
.WithHttpTransport()

Transport Status

传输方式状态

TransportStatusUse Case
stdioSupportedLocal/CLI (Claude Desktop, Cursor)
Streamable HTTPRecommendedRemote servers, production
SSEDeprecatedLegacy only
传输方式状态使用场景
stdio已支持本地/CLI(Claude Desktop、Cursor)
Streamable HTTP推荐使用远程服务器、生产环境
SSE已废弃仅用于遗留系统

Streamable HTTP Transport

Streamable HTTP传输

Single endpoint replaces dual SSE endpoints. Supports stateful sessions or stateless (serverless) mode.
单端点替代了原有的双SSE端点。支持有状态会话或无状态(Serverless)模式。

Protocol Flow

协议流程

Client                              Server
  |------ POST /mcp ---------------->|  (JSON-RPC messages)
  |<----- JSON or SSE response ------|
  |------ GET /mcp ----------------->|  (Optional: server-initiated)
  |<----- SSE stream ----------------|
Client                              Server
  |------ POST /mcp ---------------->|  (JSON-RPC messages)
  |<----- JSON or SSE response ------|
  |------ GET /mcp ----------------->|  (Optional: server-initiated)
  |<----- SSE stream ----------------|

Required Headers

必填请求头

http
POST /mcp HTTP/1.1
Content-Type: application/json
Accept: application/json, text/event-stream
Mcp-Session-Id: <session-id>  # After initialization
http
POST /mcp HTTP/1.1
Content-Type: application/json
Accept: application/json, text/event-stream
Mcp-Session-Id: <session-id>  # After initialization

TypeScript Implementation

TypeScript实现

Installation

安装

bash
npm install @modelcontextprotocol/sdk zod express
npm install -D @types/express
bash
npm install @modelcontextprotocol/sdk zod express
npm install -D @types/express

Basic Server

基础服务器

typescript
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { z } from "zod";
import express from "express";

const app = express();
app.use(express.json());

const server = new McpServer({
  name: "my-server",
  version: "1.0.0"
});

// Tool
server.registerTool("add", {
  description: "Add two numbers",
  inputSchema: { a: z.number(), b: z.number() }
}, async ({ a, b }) => ({
  content: [{ type: "text", text: String(a + b) }]
}));

// Resource
server.registerResource(
  "config",
  "config://app",
  { description: "App configuration" },
  async (uri) => ({
    contents: [{ uri: uri.href, text: JSON.stringify({ env: "prod" }) }]
  })
);

// Prompt
server.registerPrompt("review", {
  description: "Code review",
  argsSchema: { code: z.string() }
}, ({ code }) => ({
  messages: [{ role: "user", content: { type: "text", text: `Review:\n${code}` } }]
}));

// Transport
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => crypto.randomUUID()
});

await server.connect(transport);

app.all("/mcp", async (req, res) => {
  await transport.handleRequest(req, res);
});

app.listen(3000);
Note: Top-level
await
requires Node.js with ES modules (
"type": "module"
in package.json or
.mjs
extension).
typescript
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { z } from "zod";
import express from "express";

const app = express();
app.use(express.json());

const server = new McpServer({
  name: "my-server",
  version: "1.0.0"
});

// Tool
server.registerTool("add", {
  description: "Add two numbers",
  inputSchema: { a: z.number(), b: z.number() }
}, async ({ a, b }) => ({
  content: [{ type: "text", text: String(a + b) }]
}));

// Resource
server.registerResource(
  "config",
  "config://app",
  { description: "App configuration" },
  async (uri) => ({
    contents: [{ uri: uri.href, text: JSON.stringify({ env: "prod" }) }]
  })
);

// Prompt
server.registerPrompt("review", {
  description: "Code review",
  argsSchema: { code: z.string() }
}, ({ code }) => ({
  messages: [{ role: "user", content: { type: "text", text: `Review:\n${code}` } }]
}));

// Transport
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => crypto.randomUUID()
});

await server.connect(transport);

app.all("/mcp", async (req, res) => {
  await transport.handleRequest(req, res);
});

app.listen(3000);
注意: 顶层
await
需要Node.js启用ES模块(在package.json中设置
"type": "module"
或使用
.mjs
扩展名)。

Stateless Mode (Serverless)

无状态模式(Serverless)

typescript
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: undefined  // Disables sessions
});
typescript
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: undefined  // Disables sessions
});

Python Implementation

Python实现

Installation

安装

bash
pip install "mcp[cli]"
bash
pip install "mcp[cli]"

Basic Server (FastMCP)

基础服务器(FastMCP)

python
from mcp.server.fastmcp import FastMCP, Context
from typing import List

mcp = FastMCP("my-server")
python
from mcp.server.fastmcp import FastMCP, Context
from typing import List

mcp = FastMCP("my-server")

Tool

Tool

@mcp.tool() def add(a: int, b: int) -> int: """Add two numbers.""" return a + b
@mcp.tool() def add(a: int, b: int) -> int: """Add two numbers.""" return a + b

Async tool with progress

Async tool with progress

@mcp.tool() async def process_files(files: List[str], ctx: Context) -> str: """Process files with progress.""" for i, f in enumerate(files): await ctx.report_progress(i + 1, len(files)) return f"Processed {len(files)} files"
@mcp.tool() async def process_files(files: List[str], ctx: Context) -> str: """Process files with progress.""" for i, f in enumerate(files): await ctx.report_progress(i + 1, len(files)) return f"Processed {len(files)} files"

Resource

Resource

@mcp.resource("config://app") def get_config() -> dict: """App configuration.""" return {"env": "prod", "version": "1.0.0"}
@mcp.resource("config://app") def get_config() -> dict: """App configuration.""" return {"env": "prod", "version": "1.0.0"}

Dynamic resource

Dynamic resource

@mcp.resource("users://{user_id}/profile") def get_user(user_id: str) -> dict: """Get user profile.""" return {"id": user_id, "name": f"User {user_id}"}
@mcp.resource("users://{user_id}/profile") def get_user(user_id: str) -> dict: """Get user profile.""" return {"id": user_id, "name": f"User {user_id}"}

Prompt

Prompt

@mcp.prompt() def code_review(code: str, language: str = "python") -> str: """Code review prompt.""" return f"Review this {language} code:\n\n
{language}\n{code}\n
"
if name == "main": # Streamable HTTP mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp") # Or stdio: mcp.run()
undefined
@mcp.prompt() def code_review(code: str, language: str = "python") -> str: """Code review prompt.""" return f"Review this {language} code:\n\n
{language}\n{code}\n
"
if name == "main": # Streamable HTTP mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp") # Or stdio: mcp.run()
undefined

FastAPI Integration

FastAPI集成

python
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP

api = FastAPI()
mcp = FastMCP("api-tools")

@mcp.tool()
def query(sql: str) -> dict:
    return {"result": "data"}

api.mount("/mcp", mcp.streamable_http_app())
python
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP

api = FastAPI()
mcp = FastMCP("api-tools")

@mcp.tool()
def query(sql: str) -> dict:
    return {"result": "data"}

api.mount("/mcp", mcp.streamable_http_app())

Run: uvicorn app:api --port 8000

Run: uvicorn app:api --port 8000

undefined
undefined

C# Implementation

C#实现

Installation

安装

bash
dotnet add package ModelContextProtocol.AspNetCore --prerelease
bash
dotnet add package ModelContextProtocol.AspNetCore --prerelease

Basic Server

基础服务器

csharp
using System.ComponentModel;
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.Server;

var builder = WebApplication.CreateBuilder(args);

builder.Services
    .AddMcpServer()
    .WithHttpTransport()
    .WithToolsFromAssembly()
    .WithPromptsFromAssembly()
    .WithResourcesFromAssembly();

var app = builder.Build();
app.MapMcp();  // Maps /, /sse, /messages
app.Run();

// Tools
[McpServerToolType]
public static class MyTools
{
    [McpServerTool, Description("Add two numbers")]
    public static int Add(
        [Description("First number")] int a,
        [Description("Second number")] int b) => a + b;

    [McpServerTool, Description("Get weather")]
    public static async Task<string> GetWeather(
        HttpClient http,  // Injected from DI
        [Description("City name")] string city,
        CancellationToken ct)
    {
        var data = await http.GetStringAsync($"https://api.weather.example/{city}", ct);
        return data;
    }
}

// Prompts
[McpServerPromptType]
public static class MyPrompts
{
    [McpServerPrompt, Description("Code review prompt")]
    public static ChatMessage CodeReview(
        [Description("Code to review")] string code) =>
        new(ChatRole.User, $"Review this code:\n\n{code}");
}

// Resources
[McpServerResourceType]
public static class MyResources
{
    [McpServerResource(Name = "config://app"), Description("App config")]
    public static string Config() => """{"env": "production"}""";

    [McpServerResource(UriTemplate = "docs://{topic}")]
    public static string GetDoc([Description("Topic")] string topic) =>
        $"Documentation for {topic}";
}
csharp
using System.ComponentModel;
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.Server;

var builder = WebApplication.CreateBuilder(args);

builder.Services
    .AddMcpServer()
    .WithHttpTransport()
    .WithToolsFromAssembly()
    .WithPromptsFromAssembly()
    .WithResourcesFromAssembly();

var app = builder.Build();
app.MapMcp();  // Maps /, /sse, /messages
app.Run();

// Tools
[McpServerToolType]
public static class MyTools
{
    [McpServerTool, Description("Add two numbers")]
    public static int Add(
        [Description("First number")] int a,
        [Description("Second number")] int b) => a + b;

    [McpServerTool, Description("Get weather")]
    public static async Task<string> GetWeather(
        HttpClient http,  // Injected from DI
        [Description("City name")] string city,
        CancellationToken ct)
    {
        var data = await http.GetStringAsync($"https://api.weather.example/{city}", ct);
        return data;
    }
}

// Prompts
[McpServerPromptType]
public static class MyPrompts
{
    [McpServerPrompt, Description("Code review prompt")]
    public static ChatMessage CodeReview(
        [Description("Code to review")] string code) =>
        new(ChatRole.User, $"Review this code:\n\n{code}");
}

// Resources
[McpServerResourceType]
public static class MyResources
{
    [McpServerResource(Name = "config://app"), Description("App config")]
    public static string Config() => """{"env": "production"}""";

    [McpServerResource(UriTemplate = "docs://{topic}")]
    public static string GetDoc([Description("Topic")] string topic) =>
        $"Documentation for {topic}";
}

Stdio Transport (Local)

Stdio传输(本地)

csharp
var builder = Host.CreateApplicationBuilder(args);

builder.Logging.AddConsole(o => o.LogToStandardErrorThreshold = LogLevel.Trace);

builder.Services
    .AddMcpServer()
    .WithStdioServerTransport()
    .WithToolsFromAssembly();

await builder.Build().RunAsync();
csharp
var builder = Host.CreateApplicationBuilder(args);

builder.Logging.AddConsole(o => o.LogToStandardErrorThreshold = LogLevel.Trace);

builder.Services
    .AddMcpServer()
    .WithStdioServerTransport()
    .WithToolsFromAssembly();

await builder.Build().RunAsync();

Authentication (OAuth 2.1)

认证(OAuth 2.1)

MCP servers are OAuth Resource Servers (not Authorization Servers). Key requirements:
  • PKCE mandatory (S256 method)
  • Resource Indicators (RFC 8707) required
  • Bearer tokens in Authorization header
  • Audience validation on every request
MCP服务器是OAuth资源服务器(而非授权服务器)。核心要求:
  • 强制使用PKCE(S256方法)
  • 必须使用资源指示器(RFC 8707)
  • 在Authorization请求头中使用Bearer令牌
  • 对每个请求都验证受众(Audience)

Discovery Endpoints

发现端点

http
GET /.well-known/oauth-protected-resource  # Server metadata
GET /.well-known/oauth-authorization-server  # Auth server metadata
http
GET /.well-known/oauth-protected-resource  # Server metadata
GET /.well-known/oauth-authorization-server  # Auth server metadata

Server Response on 401

401错误时的服务器响应

http
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="mcp",
  resource_metadata="https://mcp.example.com/.well-known/oauth-protected-resource"
http
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Bearer realm="mcp",
  resource_metadata="https://mcp.example.com/.well-known/oauth-protected-resource"

Token Validation (Python)

令牌验证(Python)

python
import jwt
from jwt import PyJWKClient

class TokenValidator:
    def __init__(self, jwks_uri: str, audience: str):
        self.jwks = PyJWKClient(jwks_uri)
        self.audience = audience

    def validate(self, token: str) -> dict:
        key = self.jwks.get_signing_key_from_jwt(token)
        return jwt.decode(
            token, key.key,
            algorithms=["RS256"],
            audience=self.audience,
            options={"require": ["exp", "aud", "iss"]}
        )
python
import jwt
from jwt import PyJWKClient

class TokenValidator:
    def __init__(self, jwks_uri: str, audience: str):
        self.jwks = PyJWKClient(jwks_uri)
        self.audience = audience

    def validate(self, token: str) -> dict:
        key = self.jwks.get_signing_key_from_jwt(token)
        return jwt.decode(
            token, key.key,
            algorithms=["RS256"],
            audience=self.audience,
            options={"require": ["exp", "aud", "iss"]}
        )

Protected Resource Metadata Response

受保护资源元数据响应

json
{
  "resource": "https://mcp.example.com",
  "authorization_servers": ["https://auth.example.com"],
  "scopes_supported": ["read", "write", "admin"],
  "bearer_methods_supported": ["header"]
}
json
{
  "resource": "https://mcp.example.com",
  "authorization_servers": ["https://auth.example.com"],
  "scopes_supported": ["read", "write", "admin"],
  "bearer_methods_supported": ["header"]
}

OAuth Middleware Integration (Python/Starlette)

OAuth中间件集成(Python/Starlette)

python
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from mcp.server.fastmcp import FastMCP

class BearerAuthMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, validator: TokenValidator):
        super().__init__(app)
        self.validator = validator

    async def dispatch(self, request, call_next):
        auth = request.headers.get("Authorization", "")
        if not auth.startswith("Bearer "):
            return JSONResponse(
                {"error": "unauthorized"},
                status_code=401,
                headers={"WWW-Authenticate": 'Bearer realm="mcp"'}
            )
        try:
            token = auth.replace("Bearer ", "")
            request.state.auth = self.validator.validate(token)
        except Exception:
            return JSONResponse({"error": "invalid_token"}, status_code=401)
        return await call_next(request)
python
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from mcp.server.fastmcp import FastMCP

class BearerAuthMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, validator: TokenValidator):
        super().__init__(app)
        self.validator = validator

    async def dispatch(self, request, call_next):
        auth = request.headers.get("Authorization", "")
        if not auth.startswith("Bearer "):
            return JSONResponse(
                {"error": "unauthorized"},
                status_code=401,
                headers={"WWW-Authenticate": 'Bearer realm="mcp"'}
            )
        try:
            token = auth.replace("Bearer ", "")
            request.state.auth = self.validator.validate(token)
        except Exception:
            return JSONResponse({"error": "invalid_token"}, status_code=401)
        return await call_next(request)

Setup

Setup

mcp = FastMCP("secure-server") validator = TokenValidator( jwks_uri="https://auth.example.com/.well-known/jwks.json", audience="https://mcp.example.com" )
app = Starlette( routes=[Mount("/mcp", app=mcp.streamable_http_app())], middleware=[Middleware(BearerAuthMiddleware, validator=validator)] )
undefined
mcp = FastMCP("secure-server") validator = TokenValidator( jwks_uri="https://auth.example.com/.well-known/jwks.json", audience="https://mcp.example.com" )
app = Starlette( routes=[Mount("/mcp", app=mcp.streamable_http_app())], middleware=[Middleware(BearerAuthMiddleware, validator=validator)] )
undefined

OAuth Middleware Integration (TypeScript/Express)

OAuth中间件集成(TypeScript/Express)

typescript
import jwt from "jsonwebtoken";
import jwksClient from "jwks-rsa";

const client = jwksClient({ jwksUri: "https://auth.example.com/.well-known/jwks.json" });

const authMiddleware = async (req, res, next) => {
  const auth = req.headers.authorization;
  if (!auth?.startsWith("Bearer ")) {
    return res.status(401).json({ error: "unauthorized" });
  }
  try {
    const token = auth.slice(7);
    const decoded = jwt.decode(token, { complete: true });
    const key = await client.getSigningKey(decoded.header.kid);
    req.auth = jwt.verify(token, key.getPublicKey(), {
      audience: "https://mcp.example.com",
      algorithms: ["RS256"]
    });
    next();
  } catch (err) {
    res.status(401).json({ error: "invalid_token" });
  }
};

app.use("/mcp", authMiddleware);
typescript
import jwt from "jsonwebtoken";
import jwksClient from "jwks-rsa";

const client = jwksClient({ jwksUri: "https://auth.example.com/.well-known/jwks.json" });

const authMiddleware = async (req, res, next) => {
  const auth = req.headers.authorization;
  if (!auth?.startsWith("Bearer ")) {
    return res.status(401).json({ error: "unauthorized" });
  }
  try {
    const token = auth.slice(7);
    const decoded = jwt.decode(token, { complete: true });
    const key = await client.getSigningKey(decoded.header.kid);
    req.auth = jwt.verify(token, key.getPublicKey(), {
      audience: "https://mcp.example.com",
      algorithms: ["RS256"]
    });
    next();
  } catch (err) {
    res.status(401).json({ error: "invalid_token" });
  }
};

app.use("/mcp", authMiddleware);

Authenticated Client Request

已认证的客户端请求

typescript
const transport = new StreamableHTTPClientTransport(
  new URL("https://mcp.example.com/mcp"),
  {
    requestInit: {
      headers: { Authorization: `Bearer ${accessToken}` }
    }
  }
);
typescript
const transport = new StreamableHTTPClientTransport(
  new URL("https://mcp.example.com/mcp"),
  {
    requestInit: {
      headers: { Authorization: `Bearer ${accessToken}` }
    }
  }
);

Security Anti-Patterns

安全反模式

Anti-PatternFix
Token passthrough to upstream APIsUse separate tokens for upstream calls
Missing audience validationAlways validate
aud
claim
Tokens in URLsUse Authorization header only
反模式修复方案
将令牌透传给上游API为上游调用使用独立的令牌
缺少受众验证始终验证
aud
声明
在URL中传递令牌仅使用Authorization请求头

Scope Enforcement in Tools

工具中的权限范围校验

Check scopes before executing sensitive operations:
TypeScript:
typescript
const authMiddleware = async (req, res, next) => {
  // ... token validation ...
  req.auth = { sub: decoded.sub, scopes: decoded.scope?.split(" ") || [] };
  next();
};

server.registerTool("delete_user", { /* ... */ }, async ({ userId }, { meta }) => {
  const scopes = meta?.auth?.scopes || [];
  if (!scopes.includes("admin:write")) {
    return { isError: true, content: [{ type: "text", text: "Insufficient scope" }] };
  }
  // ... perform deletion ...
});
Python: Use HTTP middleware to validate, then check in tools:
python
undefined
在执行敏感操作前检查权限范围:
TypeScript:
typescript
const authMiddleware = async (req, res, next) => {
  // ... token validation ...
  req.auth = { sub: decoded.sub, scopes: decoded.scope?.split(" ") || [] };
  next();
};

server.registerTool("delete_user", { /* ... */ }, async ({ userId }, { meta }) => {
  const scopes = meta?.auth?.scopes || [];
  if (!scopes.includes("admin:write")) {
    return { isError: true, content: [{ type: "text", text: "Insufficient scope" }] };
  }
  // ... perform deletion ...
});
Python: 使用HTTP中间件进行验证,然后在工具中检查:
python
undefined

With Starlette middleware (see OAuth Middleware Integration above)

With Starlette middleware (see OAuth Middleware Integration above)

Store validated claims in request.state, then check in tool:

Store validated claims in request.state, then check in tool:

@mcp.tool() def delete_user(user_id: str) -> str: """Delete user - requires admin:write scope.""" # Scope enforcement happens at HTTP layer via middleware # Tool assumes request already passed auth checks return f"Deleted user {user_id}"

> **Note:** For advanced middleware with per-tool auth context, use `fastmcp` package (`pip install fastmcp`) which provides `Middleware` class and `Context.get_state()`. The official `mcp` package provides FastMCP but with simpler middleware options.
@mcp.tool() def delete_user(user_id: str) -> str: """Delete user - requires admin:write scope.""" # Scope enforcement happens at HTTP layer via middleware # Tool assumes request already passed auth checks return f"Deleted user {user_id}"

> **注意:** 如需支持带每工具认证上下文的高级中间件,请使用`fastmcp`包(`pip install fastmcp`),它提供了`Middleware`类和`Context.get_state()`方法。官方`mcp`包提供FastMCP,但中间件选项较为简单。

Passing Auth Context to Tool Handlers

将认证上下文传递给工具处理器

TypeScript: Store auth on transport or use a request-scoped context:
typescript
// In middleware: attach to request
req.auth = { sub: decoded.sub, email: decoded.email };

// Pass to tool via server context or closure
const sessions = new Map();
app.all("/mcp", async (req, res) => {
  const transport = new StreamableHTTPServerTransport({ /* ... */ });
  sessions.set(transport.sessionId, { auth: req.auth });
  // Tools access via sessions.get(sessionId)
});
Python (with
fastmcp
package):
Use middleware and context state:
python
undefined
TypeScript: 将认证信息存储在传输层或使用请求作用域上下文:
typescript
// In middleware: attach to request
req.auth = { sub: decoded.sub, email: decoded.email };

// Pass to tool via server context or closure
const sessions = new Map();
app.all("/mcp", async (req, res) => {
  const transport = new StreamableHTTPServerTransport({ /* ... */ });
  sessions.set(transport.sessionId, { auth: req.auth });
  // Tools access via sessions.get(sessionId)
});
Python(使用
fastmcp
包):
使用中间件和上下文状态:
python
undefined

pip install fastmcp

pip install fastmcp

from fastmcp import FastMCP, Context from fastmcp.server.middleware import Middleware, MiddlewareContext
mcp = FastMCP("my-server")
class AuthMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): # Extract and validate token, then store in context context.fastmcp_context.set_state("user_id", "user_123") context.fastmcp_context.set_state("scopes", ["read", "write"]) return await call_next()
mcp.add_middleware(AuthMiddleware())
@mcp.tool async def get_my_profile(ctx: Context) -> dict: user_id = ctx.get_state("user_id") # Set by middleware return {"user_id": user_id, "profile": "..."}
undefined
from fastmcp import FastMCP, Context from fastmcp.server.middleware import Middleware, MiddlewareContext
mcp = FastMCP("my-server")
class AuthMiddleware(Middleware): async def on_call_tool(self, context: MiddlewareContext, call_next): # Extract and validate token, then store in context context.fastmcp_context.set_state("user_id", "user_123") context.fastmcp_context.set_state("scopes", ["read", "write"]) return await call_next()
mcp.add_middleware(AuthMiddleware())
@mcp.tool async def get_my_profile(ctx: Context) -> dict: user_id = ctx.get_state("user_id") # Set by middleware return {"user_id": user_id, "profile": "..."}
undefined

Token Refresh Handling

令牌刷新处理

Access tokens are short-lived (typically 1 hour). Strategies:
  1. Client-side refresh: Clients refresh tokens before expiration and reconnect
  2. Proactive server refresh: Background task refreshes tokens expiring soon
  3. On-demand refresh: Return 401, client refreshes and retries
Recommended pattern: Use short-lived access tokens (5-60 min) with refresh tokens. On 401:
typescript
// Client retry logic
async function callWithRefresh(tool: string, args: object) {
  try {
    return await client.callTool(tool, args);
  } catch (err) {
    if (err.status === 401) {
      accessToken = await refreshAccessToken(refreshToken);
      transport.updateHeaders({ Authorization: `Bearer ${accessToken}` });
      return await client.callTool(tool, args);
    }
    throw err;
  }
}
访问令牌的有效期较短(通常为1小时)。处理策略:
  1. 客户端侧刷新: 客户端在令牌过期前刷新令牌并重新连接
  2. 服务器主动刷新: 后台任务刷新即将过期的令牌
  3. 按需刷新: 返回401错误,客户端刷新令牌后重试
推荐模式: 使用短有效期的访问令牌(5-60分钟)搭配刷新令牌。当收到401错误时:
typescript
// Client retry logic
async function callWithRefresh(tool: string, args: object) {
  try {
    return await client.callTool(tool, args);
  } catch (err) {
    if (err.status === 401) {
      accessToken = await refreshAccessToken(refreshToken);
      transport.updateHeaders({ Authorization: `Bearer ${accessToken}` });
      return await client.callTool(tool, args);
    }
    throw err;
  }
}

Migration: SSE to Streamable HTTP

迁移:从SSE到Streamable HTTP

Server Changes

服务器端变更

typescript
// OLD (SSE) - Two endpoints
app.get("/sse", async (req, res) => {
  const transport = new SSEServerTransport("/sse/messages", res);
  await server.connect(transport);
});
app.post("/sse/messages", async (req, res) => { /* ... */ });

// NEW (Streamable HTTP) - Single endpoint
app.all("/mcp", async (req, res) => {
  const transport = new StreamableHTTPServerTransport();
  await server.connect(transport);
  await transport.handleRequest(req, res);
});
typescript
// OLD (SSE) - Two endpoints
app.get("/sse", async (req, res) => {
  const transport = new SSEServerTransport("/sse/messages", res);
  await server.connect(transport);
});
app.post("/sse/messages", async (req, res) => { /* ... */ });

// NEW (Streamable HTTP) - Single endpoint
app.all("/mcp", async (req, res) => {
  const transport = new StreamableHTTPServerTransport();
  await server.connect(transport);
  await transport.handleRequest(req, res);
});

Client Changes

客户端变更

typescript
// OLD
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
const transport = new SSEClientTransport(new URL("http://localhost:3000/sse"));

// NEW
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
const transport = new StreamableHTTPClientTransport(new URL("http://localhost:3000/mcp"));
typescript
// OLD
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
const transport = new SSEClientTransport(new URL("http://localhost:3000/sse"));

// NEW
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
const transport = new StreamableHTTPClientTransport(new URL("http://localhost:3000/mcp"));

Backward-Compatible Server

向后兼容的服务器

typescript
// Support both transports during migration
app.all("/mcp", /* new Streamable HTTP handler */);
app.get("/sse", /* legacy SSE handler */);
app.post("/sse/messages", /* legacy message handler */);
typescript
// Support both transports during migration
app.all("/mcp", /* new Streamable HTTP handler */);
app.get("/sse", /* legacy SSE handler */);
app.post("/sse/messages", /* legacy message handler */);

Client Configuration

客户端配置

Claude Desktop / Claude Code (stdio)

Claude Desktop / Claude Code(stdio)

json
{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/path/to/server.js"],
      "env": { "API_KEY": "secret" }
    }
  }
}
json
{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/path/to/server.js"],
      "env": { "API_KEY": "secret" }
    }
  }
}

Remote Server (Streamable HTTP)

远程服务器(Streamable HTTP)

json
{
  "mcpServers": {
    "remote": {
      "type": "streamable-http",
      "url": "https://mcp.example.com/mcp"
    }
  }
}
json
{
  "mcpServers": {
    "remote": {
      "type": "streamable-http",
      "url": "https://mcp.example.com/mcp"
    }
  }
}

Error Handling

错误处理

Return tool errors (not protocol errors) for model self-correction:
TypeScript:
typescript
server.registerTool("query", { /* ... */ }, async ({ sql }) => {
  if (sql.includes("DROP")) {
    return {
      isError: true,
      content: [{ type: "text", text: "Destructive queries not allowed" }]
    };
  }
  // ...
});
Python: Raise exceptions in tools - they're caught and returned as errors:
python
@mcp.tool()
def query(sql: str) -> dict:
    if "DROP" in sql.upper():
        raise ValueError("Destructive queries not allowed")
    return {"result": "data"}
返回工具错误(而非协议错误)以便模型进行自我修正:
TypeScript:
typescript
server.registerTool("query", { /* ... */ }, async ({ sql }) => {
  if (sql.includes("DROP")) {
    return {
      isError: true,
      content: [{ type: "text", text: "Destructive queries not allowed" }]
    };
  }
  // ...
});
Python: 在工具中抛出异常 - 这些异常会被捕获并作为错误返回:
python
@mcp.tool()
def query(sql: str) -> dict:
    if "DROP" in sql.upper():
        raise ValueError("Destructive queries not allowed")
    return {"result": "data"}

Common Mistakes

常见错误

MistakeFix
Using SSE for new serversUse Streamable HTTP
Writing to stdout in stdio serversUse stderr for logs
Missing session ID after initAlways include
Mcp-Session-Id
header
Not validating OAuth audienceValidate token
aud
matches your server
Forwarding client tokens upstreamUse separate credentials for upstream APIs
错误修复方案
为新服务器使用SSE使用Streamable HTTP
在stdio服务器中写入stdout使用stderr输出日志
初始化后缺少会话ID始终包含
Mcp-Session-Id
请求头
未验证OAuth受众验证令牌的
aud
声明是否与你的服务器匹配
将客户端令牌转发给上游为上游API使用独立的凭据

Official Resources

官方资源

SDKs

SDK

Documentation

文档

Testing

测试

bash
npx @modelcontextprotocol/inspector node path/to/server.js
bash
npx @modelcontextprotocol/inspector node path/to/server.js