fastmcp

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastMCP - Build MCP Servers in Python

FastMCP - 在Python中构建MCP服务器

FastMCP is a Python framework for building Model Context Protocol (MCP) servers that expose tools, resources, and prompts to Large Language Models like Claude. This skill provides production-tested patterns, error prevention, and deployment strategies for building robust MCP servers.
FastMCP是一个Python框架,用于构建模型上下文协议(MCP)服务器,向Claude等大语言模型(LLM)暴露工具、资源和提示词。本技能提供经过生产验证的模式、错误预防方案和部署策略,帮助你构建稳健的MCP服务器。

Quick Start

快速开始

Installation

安装

bash
pip install fastmcp
bash
pip install fastmcp

or

uv pip install fastmcp
undefined
uv pip install fastmcp
undefined

Minimal Server

最简服务器

python
from fastmcp import FastMCP
python
from fastmcp import FastMCP

MUST be at module level for FastMCP Cloud

必须在模块级别定义,以适配FastMCP Cloud

mcp = FastMCP("My Server")
@mcp.tool() async def hello(name: str) -> str: """Say hello to someone.""" return f"Hello, {name}!"
if name == "main": mcp.run()

**Run it:**
```bash
mcp = FastMCP("My Server")
@mcp.tool() async def hello(name: str) -> str: """向某人打招呼。""" return f"Hello, {name}!"
if name == "main": mcp.run()

**运行方式:**
```bash

Local development

本地开发

python server.py
python server.py

With FastMCP CLI

使用FastMCP CLI

fastmcp dev server.py
fastmcp dev server.py

HTTP mode

HTTP模式

python server.py --transport http --port 8000
undefined
python server.py --transport http --port 8000
undefined

Core Concepts

核心概念

1. Tools

1. 工具

Tools are functions that LLMs can call to perform actions:
python
@mcp.tool()
def calculate(operation: str, a: float, b: float) -> float:
    """Perform mathematical operations.

    Args:
        operation: add, subtract, multiply, or divide
        a: First number
        b: Second number

    Returns:
        Result of the operation
    """
    operations = {
        "add": lambda x, y: x + y,
        "subtract": lambda x, y: x - y,
        "multiply": lambda x, y: x * y,
        "divide": lambda x, y: x / y if y != 0 else None
    }
    return operations.get(operation, lambda x, y: None)(a, b)
Best Practices:
  • Clear, descriptive function names
  • Comprehensive docstrings (LLMs read these!)
  • Strong type hints (Pydantic validates automatically)
  • Return structured data (dicts/lists)
  • Handle errors gracefully
Sync vs Async:
python
undefined
工具是LLM可以调用以执行操作的函数:
python
@mcp.tool()
def calculate(operation: str, a: float, b: float) -> float:
    """执行数学运算。

    参数:
        operation: 加、减、乘或除
        a: 第一个数字
        b: 第二个数字

    返回:
        运算结果
    """
    operations = {
        "add": lambda x, y: x + y,
        "subtract": lambda x, y: x - y,
        "multiply": lambda x, y: x * y,
        "divide": lambda x, y: x / y if y != 0 else None
    }
    return operations.get(operation, lambda x, y: None)(a, b)
最佳实践:
  • 清晰、描述性的函数名称
  • 全面的文档字符串(LLM会读取这些内容!)
  • 强类型提示(Pydantic会自动验证)
  • 返回结构化数据(字典/列表)
  • 优雅处理错误
同步 vs 异步:
python
undefined

Sync tool (for non-blocking operations)

同步工具(用于非阻塞操作)

@mcp.tool() def sync_tool(param: str) -> dict: return {"result": param.upper()}
@mcp.tool() def sync_tool(param: str) -> dict: return {"result": param.upper()}

Async tool (for I/O operations, API calls)

异步工具(用于I/O操作、API调用)

@mcp.tool() async def async_tool(url: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get(url) return response.json()
undefined
@mcp.tool() async def async_tool(url: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get(url) return response.json()
undefined

2. Resources

2. 资源

Resources expose static or dynamic data to LLMs:
python
undefined
资源向LLM暴露静态或动态数据:
python
undefined

Static resource

静态资源

@mcp.resource("data://config") def get_config() -> dict: """Provide application configuration.""" return { "version": "1.0.0", "features": ["auth", "api", "cache"] }
@mcp.resource("data://config") def get_config() -> dict: """提供应用配置。""" return { "version": "1.0.0", "features": ["auth", "api", "cache"] }

Dynamic resource

动态资源

@mcp.resource("info://status") async def server_status() -> dict: """Get current server status.""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "api_configured": bool(os.getenv("API_KEY")) }

**Resource URI Schemes:**
- `data://` - Generic data
- `file://` - File resources
- `resource://` - General resources
- `info://` - Information/metadata
- `api://` - API endpoints
- Custom schemes allowed
@mcp.resource("info://status") async def server_status() -> dict: """获取当前服务器状态。""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "api_configured": bool(os.getenv("API_KEY")) }

**资源URI方案:**
- `data://` - 通用数据
- `file://` - 文件资源
- `resource://` - 通用资源
- `info://` - 信息/元数据
- `api://` - API端点
- 允许自定义方案

3. Resource Templates

3. 资源模板

Dynamic resources with parameters in the URI:
python
undefined
URI中包含参数的动态资源:
python
undefined

Single parameter

单个参数

@mcp.resource("user://{user_id}/profile") async def get_user_profile(user_id: str) -> dict: """Get user profile by ID.""" user = await fetch_user_from_db(user_id) return { "id": user_id, "name": user.name, "email": user.email }
@mcp.resource("user://{user_id}/profile") async def get_user_profile(user_id: str) -> dict: """根据ID获取用户资料。""" user = await fetch_user_from_db(user_id) return { "id": user_id, "name": user.name, "email": user.email }

Multiple parameters

多个参数

@mcp.resource("org://{org_id}/team/{team_id}/members") async def get_team_members(org_id: str, team_id: str) -> list: """Get team members with org context.""" return await db.query( "SELECT * FROM members WHERE org_id = ? AND team_id = ?", [org_id, team_id] )

**Critical:** Parameter names must match exactly between URI template and function signature.
@mcp.resource("org://{org_id}/team/{team_id}/members") async def get_team_members(org_id: str, team_id: str) -> list: """获取组织上下文下的团队成员。""" return await db.query( "SELECT * FROM members WHERE org_id = ? AND team_id = ?", [org_id, team_id] )

**关键注意事项:** URI模板中的参数名称必须与函数签名中的参数名称完全匹配。

4. Prompts

4. 提示词

Pre-configured prompts for LLMs:
python
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
    """Generate analysis prompt."""
    return f"""
    Analyze {topic} considering:
    1. Current state
    2. Challenges
    3. Opportunities
    4. Recommendations

    Use available tools to gather data.
    """

@mcp.prompt("help")
def help_prompt() -> str:
    """Generate help text for server."""
    return """
    Welcome to My Server!

    Available tools:
    - search: Search for items
    - process: Process data

    Available resources:
    - info://status: Server status
    """
为LLM预配置的提示词:
python
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
    """生成分析提示词。"""
    return f"""
    分析{topic}时请考虑:
    1. 当前状态
    2. 挑战
    3. 机遇
    4. 建议

    使用可用工具收集数据。
    """

@mcp.prompt("help")
def help_prompt() -> str:
    """生成服务器帮助文本。"""
    return """
    欢迎使用My Server!

    可用工具:
    - search: 搜索项目
    - process: 处理数据

    可用资源:
    - info://status: 服务器状态
    """

Context Features

上下文特性

FastMCP provides advanced features through context injection:
FastMCP通过上下文注入提供高级功能:

1. Elicitation (User Input)

1. 启发式交互(用户输入)

Request user input during tool execution:
python
from fastmcp import Context

@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
    """Perform action with user confirmation."""
    # Request confirmation from user
    confirmed = await context.request_elicitation(
        prompt=f"Confirm {action}? (yes/no)",
        response_type=str
    )

    if confirmed.lower() == "yes":
        result = await perform_action(action)
        return {"status": "completed", "action": action}
    else:
        return {"status": "cancelled", "action": action}
在工具执行期间请求用户输入:
python
from fastmcp import Context

@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
    """在用户确认后执行操作。"""
    # 向用户请求确认
    confirmed = await context.request_elicitation(
        prompt=f"确认执行{action}?(是/否)",
        response_type=str
    )

    if confirmed.lower() == "yes":
        result = await perform_action(action)
        return {"status": "completed", "action": action}
    else:
        return {"status": "cancelled", "action": action}

2. Progress Tracking

2. 进度跟踪

Report progress for long-running operations:
python
@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
    """Import data with progress updates."""
    data = await read_file(file_path)
    total = len(data)

    imported = []
    for i, item in enumerate(data):
        # Report progress
        await context.report_progress(
            progress=i + 1,
            total=total,
            message=f"Importing item {i + 1}/{total}"
        )

        result = await import_item(item)
        imported.append(result)

    return {"imported": len(imported), "total": total}
为长时间运行的操作报告进度:
python
@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
    """导入数据并更新进度。"""
    data = await read_file(file_path)
    total = len(data)

    imported = []
    for i, item in enumerate(data):
        # 报告进度
        await context.report_progress(
            progress=i + 1,
            total=total,
            message=f"正在导入第{i + 1}/{total}项"
        )

        result = await import_item(item)
        imported.append(result)

    return {"imported": len(imported), "total": total}

3. Sampling (LLM Integration)

3. 采样(LLM集成)

Request LLM completions from within tools:
python
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
    """Enhance text using LLM."""
    response = await context.request_sampling(
        messages=[{
            "role": "system",
            "content": "You are a professional copywriter."
        }, {
            "role": "user",
            "content": f"Enhance this text: {text}"
        }],
        temperature=0.7,
        max_tokens=500
    )

    return response["content"]
在工具内部请求LLM补全:
python
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
    """使用LLM优化文本。"""
    response = await context.request_sampling(
        messages=[{
            "role": "system",
            "content": "你是一名专业文案。"
        }, {
            "role": "user",
            "content": f"优化以下文本:{text}"
        }],
        temperature=0.7,
        max_tokens=500
    )

    return response["content"]

Storage Backends

存储后端

FastMCP supports pluggable storage backends built on the
py-key-value-aio
library. Storage backends enable persistent state for OAuth tokens, response caching, and client-side token storage.
FastMCP支持基于
py-key-value-aio
库的可插拔存储后端。存储后端为OAuth令牌、响应缓存和客户端令牌存储提供持久化状态。

Available Backends

可用后端

Memory Store (Default):
  • Ephemeral storage (lost on restart)
  • Fast, no configuration needed
  • Good for development
Disk Store:
  • Persistent storage on local filesystem
  • Encrypted by default with
    FernetEncryptionWrapper
  • Platform-aware defaults (Mac/Windows use disk, Linux uses memory)
Redis Store:
  • Distributed storage for production
  • Supports multi-instance deployments
  • Ideal for response caching across servers
Other Supported:
  • DynamoDB (AWS)
  • MongoDB
  • Elasticsearch
  • Memcached
  • RocksDB
  • Valkey
内存存储(默认):
  • 临时存储(重启后丢失)
  • 速度快,无需配置
  • 适合开发环境
磁盘存储:
  • 本地文件系统上的持久化存储
  • 默认使用
    FernetEncryptionWrapper
    加密
  • 感知平台的默认配置(Mac/Windows使用磁盘,Linux使用内存)
Redis存储:
  • 生产环境的分布式存储
  • 支持多实例部署
  • 适合跨服务器的响应缓存
其他支持的后端:
  • DynamoDB(AWS)
  • MongoDB
  • Elasticsearch
  • Memcached
  • RocksDB
  • Valkey

Basic Usage

基础用法

python
from fastmcp import FastMCP
from key_value.stores import MemoryStore, DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
python
from fastmcp import FastMCP
from key_value.stores import MemoryStore, DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os

Memory storage (default)

内存存储(默认)

mcp = FastMCP("My Server")
mcp = FastMCP("My Server")

Disk storage (persistent)

磁盘存储(持久化)

from key_value.stores import DiskStore
mcp = FastMCP( "My Server", storage=DiskStore(path="/app/data/storage") )
from key_value.stores import DiskStore
mcp = FastMCP( "My Server", storage=DiskStore(path="/app/data/storage") )

Redis storage (production)

Redis存储(生产环境)

from key_value.stores import RedisStore
mcp = FastMCP( "My Server", storage=RedisStore( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", "6379")), password=os.getenv("REDIS_PASSWORD") ) )
undefined
from key_value.stores import RedisStore
mcp = FastMCP( "My Server", storage=RedisStore( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", "6379")), password=os.getenv("REDIS_PASSWORD") ) )
undefined

Encrypted Storage

加密存储

Storage backends support automatic encryption:
python
from cryptography.fernet import Fernet
from key_value.encryption import FernetEncryptionWrapper
from key_value.stores import DiskStore
存储后端支持自动加密:
python
from cryptography.fernet import Fernet
from key_value.encryption import FernetEncryptionWrapper
from key_value.stores import DiskStore

Generate encryption key (store in environment!)

生成加密密钥(请存储在环境变量中!)

key = Fernet.generate_key()

key = Fernet.generate_key()

Use encrypted storage

使用加密存储

encrypted_storage = FernetEncryptionWrapper( key_value=DiskStore(path="/app/data/storage"), fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY")) )
mcp = FastMCP("My Server", storage=encrypted_storage)
undefined
encrypted_storage = FernetEncryptionWrapper( key_value=DiskStore(path="/app/data/storage"), fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY")) )
mcp = FastMCP("My Server", storage=encrypted_storage)
undefined

OAuth Token Storage

OAuth令牌存储

Storage backends automatically persist OAuth tokens:
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
存储后端会自动持久化OAuth令牌:
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet

Production OAuth with encrypted Redis storage

生产环境OAuth + 加密Redis存储

auth = OAuthProxy( jwt_signing_key=os.environ["JWT_SIGNING_KEY"], client_storage=FernetEncryptionWrapper( key_value=RedisStore( host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD") ), fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"]) ), upstream_authorization_endpoint="https://provider.com/oauth/authorize", upstream_token_endpoint="https://provider.com/oauth/token", upstream_client_id=os.getenv("OAUTH_CLIENT_ID"), upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET") )
mcp = FastMCP("OAuth Server", auth=auth)
undefined
auth = OAuthProxy( jwt_signing_key=os.environ["JWT_SIGNING_KEY"], client_storage=FernetEncryptionWrapper( key_value=RedisStore( host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD") ), fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"]) ), upstream_authorization_endpoint="https://provider.com/oauth/authorize", upstream_token_endpoint="https://provider.com/oauth/token", upstream_client_id=os.getenv("OAUTH_CLIENT_ID"), upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET") )
mcp = FastMCP("OAuth Server", auth=auth)
undefined

Platform-Aware Defaults

平台感知默认配置

FastMCP automatically chooses storage based on platform:
  • Mac/Windows: Disk storage (persistent)
  • Linux: Memory storage (ephemeral)
  • Override: Set
    storage
    parameter explicitly
python
undefined
FastMCP会根据平台自动选择存储:
  • Mac/Windows:磁盘存储(持久化)
  • Linux:内存存储(临时)
  • 覆盖默认:显式设置
    storage
    参数
python
undefined

Explicitly use disk storage on Linux

在Linux上显式使用磁盘存储

from key_value.stores import DiskStore
mcp = FastMCP( "My Server", storage=DiskStore(path="/var/lib/mcp/storage") )
undefined
from key_value.stores import DiskStore
mcp = FastMCP( "My Server", storage=DiskStore(path="/var/lib/mcp/storage") )
undefined

Server Lifespans

服务器生命周期

Server lifespans provide initialization and cleanup hooks that run once per server instance (NOT per client session). This is critical for managing database connections, API clients, and other resources.
⚠️ Breaking Change in v2.13.0: Lifespan behavior changed from per-session to per-server-instance.
服务器生命周期提供初始化和清理钩子,每个服务器实例运行一次(而非每个客户端会话)。这对于管理数据库连接、API客户端和其他资源至关重要。
⚠️ v2.13.0中的重大变更:生命周期行为从每个会话变为每个服务器实例。

Basic Pattern

基础模式

python
from fastmcp import FastMCP
from contextlib import asynccontextmanager
from typing import AsyncIterator
from dataclasses import dataclass

@dataclass
class AppContext:
    """Shared application state."""
    db: Database
    api_client: httpx.AsyncClient

@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
    """
    Initialize resources on startup, cleanup on shutdown.
    Runs ONCE per server instance, NOT per client session.
    """
    # Startup: Initialize resources
    db = await Database.connect(os.getenv("DATABASE_URL"))
    api_client = httpx.AsyncClient(
        base_url=os.getenv("API_BASE_URL"),
        headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
        timeout=30.0
    )

    print("Server initialized")

    try:
        # Yield context to tools
        yield AppContext(db=db, api_client=api_client)
    finally:
        # Shutdown: Cleanup resources
        await db.disconnect()
        await api_client.aclose()
        print("Server shutdown complete")
python
from fastmcp import FastMCP
from contextlib import asynccontextmanager
from typing import AsyncIterator
from dataclasses import dataclass

@dataclass
class AppContext:
    """共享应用状态。"""
    db: Database
    api_client: httpx.AsyncClient

@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
    """
    启动时初始化资源,关闭时清理资源。
    每个服务器实例运行一次,而非每个客户端会话。
    """
    # 启动:初始化资源
    db = await Database.connect(os.getenv("DATABASE_URL"))
    api_client = httpx.AsyncClient(
        base_url=os.getenv("API_BASE_URL"),
        headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
        timeout=30.0
    )

    print("服务器已初始化")

    try:
        # 将上下文传递给工具
        yield AppContext(db=db, api_client=api_client)
    finally:
        # 关闭:清理资源
        await db.disconnect()
        await api_client.aclose()
        print("服务器关闭完成")

Create server with lifespan

创建带生命周期的服务器

mcp = FastMCP("My Server", lifespan=app_lifespan)
mcp = FastMCP("My Server", lifespan=app_lifespan)

Access context in tools

在工具中访问上下文

from fastmcp import Context
@mcp.tool() async def query_database(sql: str, context: Context) -> list: """Query database using shared connection.""" # Access lifespan context app_context: AppContext = context.fastmcp_context.lifespan_context return await app_context.db.query(sql)
@mcp.tool() async def api_request(endpoint: str, context: Context) -> dict: """Make API request using shared client.""" app_context: AppContext = context.fastmcp_context.lifespan_context response = await app_context.api_client.get(endpoint) return response.json()
undefined
from fastmcp import Context
@mcp.tool() async def query_database(sql: str, context: Context) -> list: """使用共享连接查询数据库。""" # 访问生命周期上下文 app_context: AppContext = context.fastmcp_context.lifespan_context return await app_context.db.query(sql)
@mcp.tool() async def api_request(endpoint: str, context: Context) -> dict: """使用共享客户端发起API请求。""" app_context: AppContext = context.fastmcp_context.lifespan_context response = await app_context.api_client.get(endpoint) return response.json()
undefined

ASGI Integration

ASGI集成

When using FastMCP with ASGI apps (FastAPI, Starlette), you must pass the lifespan explicitly:
python
from fastapi import FastAPI
from fastmcp import FastMCP
当将FastMCP与ASGI应用(FastAPI、Starlette)一起使用时,必须显式传递生命周期:
python
from fastapi import FastAPI
from fastmcp import FastMCP

FastMCP lifespan

FastMCP生命周期

@asynccontextmanager async def mcp_lifespan(server: FastMCP): print("MCP server starting") yield print("MCP server stopping")
mcp = FastMCP("My Server", lifespan=mcp_lifespan)
@asynccontextmanager async def mcp_lifespan(server: FastMCP): print("MCP服务器启动") yield print("MCP服务器停止")
mcp = FastMCP("My Server", lifespan=mcp_lifespan)

FastAPI app MUST include MCP lifespan

FastAPI应用必须包含MCP生命周期

app = FastAPI(lifespan=mcp.lifespan)
app = FastAPI(lifespan=mcp.lifespan)

Add routes

添加路由

@app.get("/") def root(): return {"message": "Hello World"}

**❌ WRONG**: Not passing lifespan to parent app
```python
app = FastAPI()  # MCP lifespan won't run!
✅ CORRECT: Pass MCP lifespan to parent app
python
app = FastAPI(lifespan=mcp.lifespan)
@app.get("/") def root(): return {"message": "Hello World"}

**❌ 错误做法**:未将生命周期传递给父应用
```python
app = FastAPI()  # MCP生命周期不会运行!
✅ 正确做法:将MCP生命周期传递给父应用
python
app = FastAPI(lifespan=mcp.lifespan)

State Management

状态管理

Store and retrieve state during server lifetime:
python
from fastmcp import Context

@mcp.tool()
async def set_config(key: str, value: str, context: Context) -> dict:
    """Store configuration value."""
    context.fastmcp_context.set_state(key, value)
    return {"status": "saved", "key": key}

@mcp.tool()
async def get_config(key: str, context: Context) -> dict:
    """Retrieve configuration value."""
    value = context.fastmcp_context.get_state(key, default=None)
    if value is None:
        return {"error": f"Key '{key}' not found"}
    return {"key": key, "value": value}
在服务器运行期间存储和检索状态:
python
from fastmcp import Context

@mcp.tool()
async def set_config(key: str, value: str, context: Context) -> dict:
    """存储配置值。"""
    context.fastmcp_context.set_state(key, value)
    return {"status": "saved", "key": key}

@mcp.tool()
async def get_config(key: str, context: Context) -> dict:
    """检索配置值。"""
    value = context.fastmcp_context.get_state(key, default=None)
    if value is None:
        return {"error": f"未找到密钥'{key}'"}
    return {"key": key, "value": value}

Middleware System

中间件系统

FastMCP provides an MCP-native middleware system for cross-cutting functionality like logging, rate limiting, caching, and error handling.
FastMCP提供原生MCP中间件系统,用于实现日志记录、速率限制、缓存和错误处理等横切功能。

Built-in Middleware (8 Types)

内置中间件(8种类型)

  1. TimingMiddleware - Performance monitoring
  2. ResponseCachingMiddleware - TTL-based caching with pluggable storage
  3. LoggingMiddleware - Human-readable and JSON-structured logging
  4. RateLimitingMiddleware - Token bucket and sliding window algorithms
  5. ErrorHandlingMiddleware - Consistent error management
  6. ToolInjectionMiddleware - Dynamic tool injection
  7. PromptToolMiddleware - Tool-based prompt access for limited clients
  8. ResourceToolMiddleware - Tool-based resource access for limited clients
  1. TimingMiddleware - 性能监控
  2. ResponseCachingMiddleware - 基于TTL的缓存,支持可插拔存储
  3. LoggingMiddleware - 人类可读和JSON结构化日志
  4. RateLimitingMiddleware - 令牌桶和滑动窗口算法
  5. ErrorHandlingMiddleware - 一致的错误管理
  6. ToolInjectionMiddleware - 动态工具注入
  7. PromptToolMiddleware - 为受限客户端提供基于工具的提示词访问
  8. ResourceToolMiddleware - 为受限客户端提供基于工具的资源访问

Basic Usage

基础用法

python
from fastmcp import FastMCP
from fastmcp.middleware import (
    TimingMiddleware,
    LoggingMiddleware,
    RateLimitingMiddleware,
    ResponseCachingMiddleware,
    ErrorHandlingMiddleware
)

mcp = FastMCP("My Server")
python
from fastmcp import FastMCP
from fastmcp.middleware import (
    TimingMiddleware,
    LoggingMiddleware,
    RateLimitingMiddleware,
    ResponseCachingMiddleware,
    ErrorHandlingMiddleware
)

mcp = FastMCP("My Server")

Add middleware (order matters!)

添加中间件(顺序很重要!)

mcp.add_middleware(ErrorHandlingMiddleware()) mcp.add_middleware(TimingMiddleware()) mcp.add_middleware(LoggingMiddleware(level="INFO")) mcp.add_middleware(RateLimitingMiddleware( max_requests=100, window_seconds=60, algorithm="token_bucket" )) mcp.add_middleware(ResponseCachingMiddleware( ttl_seconds=300, storage=RedisStore(host="localhost") ))
undefined
mcp.add_middleware(ErrorHandlingMiddleware()) mcp.add_middleware(TimingMiddleware()) mcp.add_middleware(LoggingMiddleware(level="INFO")) mcp.add_middleware(RateLimitingMiddleware( max_requests=100, window_seconds=60, algorithm="token_bucket" )) mcp.add_middleware(ResponseCachingMiddleware( ttl_seconds=300, storage=RedisStore(host="localhost") ))
undefined

Middleware Execution Order

中间件执行顺序

Middleware executes in order added:
Request Flow:
  → ErrorHandlingMiddleware (catches errors)
    → TimingMiddleware (starts timer)
      → LoggingMiddleware (logs request)
        → RateLimitingMiddleware (checks rate limit)
          → ResponseCachingMiddleware (checks cache)
            → Tool/Resource Handler
          ← ResponseCachingMiddleware (stores in cache)
        ← RateLimitingMiddleware
      ← LoggingMiddleware (logs response)
    ← TimingMiddleware (stops timer, logs duration)
  ← ErrorHandlingMiddleware (returns error if any)
中间件按添加顺序执行:
请求流程:
  → ErrorHandlingMiddleware(捕获错误)
    → TimingMiddleware(启动计时器)
      → LoggingMiddleware(记录请求)
        → RateLimitingMiddleware(检查速率限制)
          → ResponseCachingMiddleware(检查缓存)
            → 工具/资源处理器
          ← ResponseCachingMiddleware(存储到缓存)
        ← RateLimitingMiddleware
      ← LoggingMiddleware(记录响应)
    ← TimingMiddleware(停止计时器,记录耗时)
  ← ErrorHandlingMiddleware(如有错误则返回)

Custom Middleware

自定义中间件

Create custom middleware using hooks:
python
from fastmcp.middleware import BaseMiddleware
from fastmcp import Context

class AccessControlMiddleware(BaseMiddleware):
    """Check authorization before tool execution."""

    def __init__(self, allowed_users: list[str]):
        self.allowed_users = allowed_users

    async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
        """Hook runs before tool execution."""
        # Get user from context (from auth)
        user = context.fastmcp_context.get_state("user_id")

        if user not in self.allowed_users:
            raise PermissionError(f"User '{user}' not authorized")

        # Continue to tool
        return await self.next(tool_name, arguments, context)
使用钩子创建自定义中间件:
python
from fastmcp.middleware import BaseMiddleware
from fastmcp import Context

class AccessControlMiddleware(BaseMiddleware):
    """在工具执行前检查授权。"""

    def __init__(self, allowed_users: list[str]):
        self.allowed_users = allowed_users

    async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
        """工具执行前运行的钩子。"""
        # 从上下文中获取用户(来自认证)
        user = context.fastmcp_context.get_state("user_id")

        if user not in self.allowed_users:
            raise PermissionError(f"用户'{user}'未被授权")

        # 继续执行工具
        return await self.next(tool_name, arguments, context)

Add to server

添加到服务器

mcp.add_middleware(AccessControlMiddleware( allowed_users=["alice", "bob", "charlie"] ))
undefined
mcp.add_middleware(AccessControlMiddleware( allowed_users=["alice", "bob", "charlie"] ))
undefined

Hook Hierarchy

钩子层次

Middleware hooks from most general to most specific:
  1. on_message
    - All messages (requests and notifications)
  2. on_request
    /
    on_notification
    - By message type
  3. on_call_tool
    ,
    on_read_resource
    ,
    on_get_prompt
    - Operation-specific
  4. on_list_tools
    ,
    on_list_resources
    ,
    on_list_prompts
    ,
    on_list_resource_templates
    - List operations
python
class ComprehensiveMiddleware(BaseMiddleware):
    async def on_message(self, message: dict, context: Context):
        """Runs for ALL messages."""
        print(f"Message: {message['method']}")
        return await self.next(message, context)

    async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
        """Runs only for tool calls."""
        print(f"Tool: {tool_name}")
        return await self.next(tool_name, arguments, context)

    async def on_read_resource(self, uri: str, context: Context):
        """Runs only for resource reads."""
        print(f"Resource: {uri}")
        return await self.next(uri, context)
中间件钩子从最通用到最具体:
  1. on_message
    - 所有消息(请求和通知)
  2. on_request
    /
    on_notification
    - 按消息类型
  3. on_call_tool
    ,
    on_read_resource
    ,
    on_get_prompt
    - 特定操作
  4. on_list_tools
    ,
    on_list_resources
    ,
    on_list_prompts
    ,
    on_list_resource_templates
    - 列表操作
python
class ComprehensiveMiddleware(BaseMiddleware):
    async def on_message(self, message: dict, context: Context):
        """对所有消息运行。"""
        print(f"消息: {message['method']}")
        return await self.next(message, context)

    async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
        """仅对工具调用运行。"""
        print(f"工具: {tool_name}")
        return await self.next(tool_name, arguments, context)

    async def on_read_resource(self, uri: str, context: Context):
        """仅对资源读取运行。"""
        print(f"资源: {uri}")
        return await self.next(uri, context)

Response Caching Middleware

响应缓存中间件

Improve performance by caching expensive operations:
python
from fastmcp.middleware import ResponseCachingMiddleware
from key_value.stores import RedisStore
通过缓存昂贵的操作来提高性能:
python
from fastmcp.middleware import ResponseCachingMiddleware
from key_value.stores import RedisStore

Cache responses for 5 minutes

缓存响应5分钟

cache_middleware = ResponseCachingMiddleware( ttl_seconds=300, storage=RedisStore(host="localhost"), # Shared across instances cache_tools=True, # Cache tool calls cache_resources=True, # Cache resource reads cache_prompts=False # Don't cache prompts )
mcp.add_middleware(cache_middleware)
cache_middleware = ResponseCachingMiddleware( ttl_seconds=300, storage=RedisStore(host="localhost"), # 跨实例共享 cache_tools=True, # 缓存工具调用 cache_resources=True, # 缓存资源读取 cache_prompts=False # 不缓存提示词 )
mcp.add_middleware(cache_middleware)

Tools/resources are automatically cached

工具/资源会被自动缓存

@mcp.tool() async def expensive_computation(data: str) -> dict: """This will be cached for 5 minutes.""" import time time.sleep(5) # Expensive operation return {"result": process(data)}
undefined
@mcp.tool() async def expensive_computation(data: str) -> dict: """此操作将被缓存5分钟。""" import time time.sleep(5) # 昂贵操作 return {"result": process(data)}
undefined

Server Composition

服务器组合

Organize tools, resources, and prompts into modular components using server composition.
使用服务器组合将工具、资源和提示词组织成模块化组件。

Two Strategies

两种策略

1.
import_server()
- Static Snapshot
:
  • One-time copy of components at import time
  • Changes to subserver don't propagate
  • Fast (no runtime delegation)
  • Use for: Bundling finalized components
2.
mount()
- Dynamic Link
:
  • Live runtime link to subserver
  • Changes to subserver immediately visible
  • Runtime delegation (slower)
  • Use for: Modular runtime composition
1.
import_server()
- 静态快照
  • 导入时一次性复制组件
  • 子服务器的更改不会传播
  • 速度快(无运行时委托)
  • 适用场景:打包已完成的组件
2.
mount()
- 动态链接
  • 与子服务器的实时运行时链接
  • 子服务器的更改立即可见
  • 运行时委托(速度较慢)
  • 适用场景:模块化运行时组合

Import Server (Static)

导入服务器(静态)

python
from fastmcp import FastMCP
python
from fastmcp import FastMCP

Subserver with tools

带工具的子服务器

api_server = FastMCP("API Server")
@api_server.tool() def api_tool(): return "API result"
@api_server.resource("api://status") def api_status(): return {"status": "ok"}
api_server = FastMCP("API Server")
@api_server.tool() def api_tool(): return "API result"
@api_server.resource("api://status") def api_status(): return {"status": "ok"}

Main server imports components

主服务器导入组件

main_server = FastMCP("Main Server")
main_server = FastMCP("Main Server")

Import all components from subserver

从子服务器导入所有组件

main_server.import_server(api_server)
main_server.import_server(api_server)

Now main_server has api_tool and api://status

现在main_server拥有api_tool和api://status

Changes to api_server won't affect main_server

对api_server的更改不会影响main_server

undefined
undefined

Mount Server (Dynamic)

挂载服务器(动态)

python
from fastmcp import FastMCP
python
from fastmcp import FastMCP

Create servers

创建服务器

api_server = FastMCP("API Server") db_server = FastMCP("DB Server")
@api_server.tool() def fetch_data(): return "API data"
@db_server.tool() def query_db(): return "DB result"
api_server = FastMCP("API Server") db_server = FastMCP("DB Server")
@api_server.tool() def fetch_data(): return "API data"
@db_server.tool() def query_db(): return "DB result"

Main server mounts subservers

主服务器挂载子服务器

main_server = FastMCP("Main Server")
main_server = FastMCP("Main Server")

Mount with prefix

带前缀挂载

main_server.mount(api_server, prefix="api") main_server.mount(db_server, prefix="db")
main_server.mount(api_server, prefix="api") main_server.mount(db_server, prefix="db")

Tools are namespaced:

工具会被命名空间化:

- api.fetch_data

- api.fetch_data

- db.query_db

- db.query_db

Resources are prefixed:

资源会被添加前缀:

- resource://api/path/to/resource

- resource://api/path/to/resource

- resource://db/path/to/resource

- resource://db/path/to/resource

undefined
undefined

Mounting Modes

挂载模式

Direct Mounting (Default):
python
undefined
直接挂载(默认):
python
undefined

In-memory access, subserver runs in same process

内存中访问,子服务器在同一进程中运行

main_server.mount(subserver, prefix="sub")

**Proxy Mounting**:
```python
main_server.mount(subserver, prefix="sub")

**代理挂载:**
```python

Treats subserver as separate entity with own lifecycle

将子服务器视为独立实体,拥有自己的生命周期

main_server.mount( subserver, prefix="sub", mode="proxy" )
undefined
main_server.mount( subserver, prefix="sub", mode="proxy" )
undefined

Tag Filtering

标签过滤

Filter components when importing/mounting:
python
undefined
在导入/挂载时过滤组件:
python
undefined

Tag subserver components

为子服务器组件添加标签

@api_server.tool(tags=["public"]) def public_api(): return "Public"
@api_server.tool(tags=["admin"]) def admin_api(): return "Admin only"
@api_server.tool(tags=["public"]) def public_api(): return "Public"
@api_server.tool(tags=["admin"]) def admin_api(): return "Admin only"

Import only public tools

仅导入公共工具

main_server.import_server( api_server, include_tags=["public"] )
main_server.import_server( api_server, include_tags=["public"] )

Or exclude admin tools

或排除管理员工具

main_server.import_server( api_server, exclude_tags=["admin"] )
main_server.import_server( api_server, exclude_tags=["admin"] )

Tag filtering is recursive with mount()

标签过滤在mount()中是递归的

main_server.mount( api_server, prefix="api", include_tags=["public"] )
undefined
main_server.mount( api_server, prefix="api", include_tags=["public"] )
undefined

Resource Prefix Formats

资源前缀格式

Path Format (Default since v2.4.0):
resource://prefix/path/to/resource
Protocol Format (Legacy):
prefix+resource://path/to/resource
Configure format:
python
main_server.mount(
    subserver,
    prefix="api",
    resource_prefix_format="path"  # or "protocol"
)
路径格式(v2.4.0起默认):
resource://prefix/path/to/resource
协议格式(旧版):
prefix+resource://path/to/resource
配置格式:
python
main_server.mount(
    subserver,
    prefix="api",
    resource_prefix_format="path"  # 或 "protocol"
)

OAuth Proxy & Authentication

OAuth代理与认证

FastMCP provides comprehensive authentication support for HTTP-based transports, including an OAuth Proxy for providers that don't support Dynamic Client Registration (DCR).
FastMCP为基于HTTP的传输提供全面的认证支持,包括OAuth代理,用于不支持动态客户端注册(DCR)的提供商。

Four Authentication Patterns

四种认证模式

  1. Token Validation (
    TokenVerifier
    /
    JWTVerifier
    ) - Validate external tokens
  2. External Identity Providers (
    RemoteAuthProvider
    ) - OAuth 2.0/OIDC with DCR
  3. OAuth Proxy (
    OAuthProxy
    ) - Bridge to traditional OAuth providers
  4. Full OAuth (
    OAuthProvider
    ) - Complete authorization server
  1. 令牌验证 (
    TokenVerifier
    /
    JWTVerifier
    ) - 验证外部令牌
  2. 外部身份提供商 (
    RemoteAuthProvider
    ) - 支持DCR的OAuth 2.0/OIDC
  3. OAuth代理 (
    OAuthProxy
    ) - 桥接到传统OAuth提供商
  4. 完整OAuth (
    OAuthProvider
    ) - 完整的授权服务器

Pattern 1: Token Validation

模式1:令牌验证

Validate tokens issued by external systems:
python
from fastmcp import FastMCP
from fastmcp.auth import JWTVerifier
验证外部系统颁发的令牌:
python
from fastmcp import FastMCP
from fastmcp.auth import JWTVerifier

JWT verification

JWT验证

auth = JWTVerifier( issuer="https://auth.example.com", audience="my-mcp-server", public_key=os.getenv("JWT_PUBLIC_KEY") )
mcp = FastMCP("Secure Server", auth=auth)
@mcp.tool() async def secure_operation(context: Context) -> dict: """Only accessible with valid JWT.""" # Token validated automatically user = context.fastmcp_context.get_state("user_id") return {"user": user, "status": "authorized"}
undefined
auth = JWTVerifier( issuer="https://auth.example.com", audience="my-mcp-server", public_key=os.getenv("JWT_PUBLIC_KEY") )
mcp = FastMCP("Secure Server", auth=auth)
@mcp.tool() async def secure_operation(context: Context) -> dict: """仅可通过有效JWT访问。""" # 令牌会自动验证 user = context.fastmcp_context.get_state("user_id") return {"user": user, "status": "authorized"}
undefined

Pattern 2: External Identity Providers

模式2:外部身份提供商

Use OAuth 2.0/OIDC providers with Dynamic Client Registration:
python
from fastmcp.auth import RemoteAuthProvider

auth = RemoteAuthProvider(
    issuer="https://auth.example.com",
    # Provider must support DCR
)

mcp = FastMCP("OAuth Server", auth=auth)
使用支持动态客户端注册的OAuth 2.0/OIDC提供商:
python
from fastmcp.auth import RemoteAuthProvider

auth = RemoteAuthProvider(
    issuer="https://auth.example.com",
    # 提供商必须支持DCR
)

mcp = FastMCP("OAuth Server", auth=auth)

Pattern 3: OAuth Proxy (Recommended for Production)

模式3:OAuth代理(生产环境推荐)

Bridge to OAuth providers without DCR support (GitHub, Google, Azure, AWS, Discord, Facebook, etc.):
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os

auth = OAuthProxy(
    # JWT signing for issued tokens
    jwt_signing_key=os.environ["JWT_SIGNING_KEY"],

    # Encrypted storage for upstream tokens
    client_storage=FernetEncryptionWrapper(
        key_value=RedisStore(
            host=os.getenv("REDIS_HOST"),
            password=os.getenv("REDIS_PASSWORD")
        ),
        fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
    ),

    # Upstream OAuth provider
    upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
    upstream_token_endpoint="https://github.com/login/oauth/access_token",
    upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
    upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),

    # Scopes
    upstream_scope="read:user user:email",

    # Security: Enable consent screen (prevents confused deputy attacks)
    enable_consent_screen=True
)

mcp = FastMCP("GitHub Auth Server", auth=auth)
桥接到不支持DCR的OAuth提供商(GitHub、Google、Azure、AWS、Discord、Facebook等):
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os

auth = OAuthProxy(
    # 颁发令牌的JWT签名密钥
    jwt_signing_key=os.environ["JWT_SIGNING_KEY"],

    # 上游令牌的加密存储
    client_storage=FernetEncryptionWrapper(
        key_value=RedisStore(
            host=os.getenv("REDIS_HOST"),
            password=os.getenv("REDIS_PASSWORD")
        ),
        fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
    ),

    # 上游OAuth提供商
    upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
    upstream_token_endpoint="https://github.com/login/oauth/access_token",
    upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
    upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),

    # 权限范围
    upstream_scope="read:user user:email",

    # 安全:启用同意屏幕(防止混淆代理攻击)
    enable_consent_screen=True
)

mcp = FastMCP("GitHub Auth Server", auth=auth)

OAuth Proxy Features

OAuth代理特性

Token Factory Pattern:
  • Proxy issues its own JWTs (not forwarding upstream tokens)
  • Upstream tokens stored encrypted
  • Proxy tokens can have custom claims
Consent Screens:
  • Prevents authorization bypass attacks
  • Shows user what permissions are being granted
  • Required for security compliance
PKCE Support:
  • End-to-end validation from client to upstream
  • Protects against authorization code interception
RFC 7662 Token Introspection:
  • Validate tokens with upstream provider
  • Check revocation status
令牌工厂模式
  • 代理颁发自己的JWT(不转发上游令牌)
  • 上游令牌加密存储
  • 代理令牌可包含自定义声明
同意屏幕
  • 防止授权绕过攻击
  • 向用户显示授予的权限
  • 安全合规要求
PKCE支持
  • 从客户端到上游的端到端验证
  • 防止授权码拦截
RFC 7662令牌 introspection
  • 与上游提供商验证令牌
  • 检查吊销状态

Pattern 4: Full OAuth Provider

模式4:完整OAuth提供商

Run complete authorization server:
python
from fastmcp.auth import OAuthProvider

auth = OAuthProvider(
    issuer="https://my-auth-server.com",
    client_storage=RedisStore(host="localhost"),
    # Full OAuth 2.0 server implementation
)

mcp = FastMCP("Auth Server", auth=auth)
运行完整的授权服务器:
python
from fastmcp.auth import OAuthProvider

auth = OAuthProvider(
    issuer="https://my-auth-server.com",
    client_storage=RedisStore(host="localhost"),
    # 完整的OAuth 2.0服务器实现
)

mcp = FastMCP("Auth Server", auth=auth)

Environment-Based Configuration

基于环境的配置

Auto-detect auth from environment:
bash
export FASTMCP_SERVER_AUTH='{"type": "oauth_proxy", "upstream_authorization_endpoint": "...", ...}'
python
undefined
从环境变量自动检测认证配置:
bash
export FASTMCP_SERVER_AUTH='{"type": "oauth_proxy", "upstream_authorization_endpoint": "...", ...}'
python
undefined

Automatically configures from FASTMCP_SERVER_AUTH

自动从FASTMCP_SERVER_AUTH配置

mcp = FastMCP("Auto Auth Server")
undefined
mcp = FastMCP("Auto Auth Server")
undefined

Supported OAuth Providers

支持的OAuth提供商

  • GitHub:
    https://github.com/login/oauth/authorize
  • Google:
    https://accounts.google.com/o/oauth2/v2/auth
  • Azure:
    https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize
  • AWS Cognito:
    https://{domain}.auth.{region}.amazoncognito.com/oauth2/authorize
  • Discord:
    https://discord.com/api/oauth2/authorize
  • Facebook:
    https://www.facebook.com/v12.0/dialog/oauth
  • WorkOS: Enterprise identity
  • AuthKit: Authentication toolkit
  • Descope: Auth platform
  • Scalekit: Enterprise SSO
  • GitHub
    https://github.com/login/oauth/authorize
  • Google
    https://accounts.google.com/o/oauth2/v2/auth
  • Azure
    https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize
  • AWS Cognito
    https://{domain}.auth.{region}.amazoncognito.com/oauth2/authorize
  • Discord
    https://discord.com/api/oauth2/authorize
  • Facebook
    https://www.facebook.com/v12.0/dialog/oauth
  • WorkOS:企业身份认证
  • AuthKit:认证工具包
  • Descope:认证平台
  • Scalekit:企业SSO

Icons Support

图标支持

Add visual representations to servers, tools, resources, and prompts for better UX in MCP clients.
为服务器、工具、资源和提示词添加视觉表示,以提升MCP客户端的用户体验。

Server-Level Icons

服务器级图标

python
from fastmcp import FastMCP, Icon

mcp = FastMCP(
    name="Weather Service",
    website_url="https://weather.example.com",
    icons=[
        Icon(
            url="https://example.com/icon-small.png",
            size="small"
        ),
        Icon(
            url="https://example.com/icon-large.png",
            size="large"
        )
    ]
)
python
from fastmcp import FastMCP, Icon

mcp = FastMCP(
    name="Weather Service",
    website_url="https://weather.example.com",
    icons=[
        Icon(
            url="https://example.com/icon-small.png",
            size="small"
        ),
        Icon(
            url="https://example.com/icon-large.png",
            size="large"
        )
    ]
)

Component-Level Icons

组件级图标

python
from fastmcp import Icon

@mcp.tool(icons=[
    Icon(url="https://example.com/tool-icon.png")
])
async def analyze_data(data: str) -> dict:
    """Analyze data with visual icon."""
    return {"result": "analyzed"}

@mcp.resource(
    "user://{user_id}/profile",
    icons=[Icon(url="https://example.com/user-icon.png")]
)
async def get_user(user_id: str) -> dict:
    """User profile with icon."""
    return {"id": user_id, "name": "Alice"}

@mcp.prompt(
    "analyze",
    icons=[Icon(url="https://example.com/prompt-icon.png")]
)
def analysis_prompt(topic: str) -> str:
    """Analysis prompt with icon."""
    return f"Analyze {topic}"
python
from fastmcp import Icon

@mcp.tool(icons=[
    Icon(url="https://example.com/tool-icon.png")
])
async def analyze_data(data: str) -> dict:
    """带视觉图标的数据分析工具。"""
    return {"result": "analyzed"}

@mcp.resource(
    "user://{user_id}/profile",
    icons=[Icon(url="https://example.com/user-icon.png")]
)
async def get_user(user_id: str) -> dict:
    """带图标的用户资料。"""
    return {"id": user_id, "name": "Alice"}

@mcp.prompt(
    "analyze",
    icons=[Icon(url="https://example.com/prompt-icon.png")]
)
def analysis_prompt(topic: str) -> str:
    """带图标的分析提示词。"""
    return f"分析{topic}"

Data URI Support

Data URI支持

Embed images directly (useful for self-contained deployments):
python
from fastmcp import Icon, Image
直接嵌入图像(适用于独立部署):
python
from fastmcp import Icon, Image

Convert local file to data URI

将本地文件转换为data URI

icon = Icon.from_file("/path/to/icon.png", size="medium")
icon = Icon.from_file("/path/to/icon.png", size="medium")

Or use Image utility

或使用Image工具

image_data_uri = Image.to_data_uri("/path/to/icon.png") icon = Icon(url=image_data_uri, size="medium")
image_data_uri = Image.to_data_uri("/path/to/icon.png") icon = Icon(url=image_data_uri, size="medium")

Use in server

在服务器中使用

mcp = FastMCP( "My Server", icons=[icon] )
undefined
mcp = FastMCP( "My Server", icons=[icon] )
undefined

Multiple Sizes

多尺寸支持

Provide different sizes for different contexts:
python
mcp = FastMCP(
    "Responsive Server",
    icons=[
        Icon(url="icon-16.png", size="small"),    # 16x16
        Icon(url="icon-32.png", size="medium"),   # 32x32
        Icon(url="icon-64.png", size="large"),    # 64x64
    ]
)
为不同场景提供不同尺寸的图标:
python
mcp = FastMCP(
    "Responsive Server",
    icons=[
        Icon(url="icon-16.png", size="small"),    # 16x16
        Icon(url="icon-32.png", size="medium"),   # 32x32
        Icon(url="icon-64.png", size="large"),    # 64x64
    ]
)

API Integration

API集成

FastMCP provides multiple patterns for API integration:
FastMCP提供多种API集成模式:

Pattern 1: Manual API Integration

模式1:手动API集成

python
import httpx
import os
python
import httpx
import os

Create reusable client

创建可重用客户端

client = httpx.AsyncClient( base_url=os.getenv("API_BASE_URL"), headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"}, timeout=30.0 )
@mcp.tool() async def fetch_data(endpoint: str) -> dict: """Fetch data from API.""" try: response = await client.get(endpoint) response.raise_for_status() return {"success": True, "data": response.json()} except httpx.HTTPStatusError as e: return {"error": f"HTTP {e.response.status_code}"} except Exception as e: return {"error": str(e)}
undefined
client = httpx.AsyncClient( base_url=os.getenv("API_BASE_URL"), headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"}, timeout=30.0 )
@mcp.tool() async def fetch_data(endpoint: str) -> dict: """从API获取数据。""" try: response = await client.get(endpoint) response.raise_for_status() return {"success": True, "data": response.json()} except httpx.HTTPStatusError as e: return {"error": f"HTTP {e.response.status_code}"} except Exception as e: return {"error": str(e)}
undefined

Pattern 2: OpenAPI/Swagger Auto-Generation

模式2:OpenAPI/Swagger自动生成

python
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpx
python
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpx

Load OpenAPI spec

加载OpenAPI规范

Create authenticated client

创建带认证的客户端

client = httpx.AsyncClient( base_url="https://api.example.com", headers={"Authorization": f"Bearer {API_TOKEN}"}, timeout=30.0 )
client = httpx.AsyncClient( base_url="https://api.example.com", headers={"Authorization": f"Bearer {API_TOKEN}"}, timeout=30.0 )

Auto-generate MCP server from OpenAPI

从OpenAPI自动生成MCP服务器

mcp = FastMCP.from_openapi( openapi_spec=spec, client=client, name="API Server", route_maps=[ # GET with parameters → Resource Templates RouteMap( methods=["GET"], pattern=r".{.}.*", mcp_type=MCPType.RESOURCE_TEMPLATE ), # GET without parameters → Resources RouteMap( methods=["GET"], mcp_type=MCPType.RESOURCE ), # POST/PUT/DELETE → Tools RouteMap( methods=["POST", "PUT", "DELETE"], mcp_type=MCPType.TOOL ), ] )
mcp = FastMCP.from_openapi( openapi_spec=spec, client=client, name="API Server", route_maps=[ # 带参数的GET → 资源模板 RouteMap( methods=["GET"], pattern=r".{.}.*", mcp_type=MCPType.RESOURCE_TEMPLATE ), # 无参数的GET → 资源 RouteMap( methods=["GET"], mcp_type=MCPType.RESOURCE ), # POST/PUT/DELETE → 工具 RouteMap( methods=["POST", "PUT", "DELETE"], mcp_type=MCPType.TOOL ), ] )

Optionally add custom tools

可选:添加自定义工具

@mcp.tool() async def custom_operation(data: dict) -> dict: """Custom tool on top of generated ones.""" return process_data(data)
undefined
@mcp.tool() async def custom_operation(data: dict) -> dict: """在自动生成的基础上添加自定义工具。""" return process_data(data)
undefined

Pattern 3: FastAPI Conversion

模式3:FastAPI转换

python
from fastapi import FastAPI
from fastmcp import FastMCP
python
from fastapi import FastAPI
from fastmcp import FastMCP

Existing FastAPI app

现有FastAPI应用

app = FastAPI()
@app.get("/items/{item_id}") def get_item(item_id: int): return {"id": item_id, "name": "Item"}
app = FastAPI()
@app.get("/items/{item_id}") def get_item(item_id: int): return {"id": item_id, "name": "Item"}

Convert to MCP server

转换为MCP服务器

mcp = FastMCP.from_fastapi( app=app, httpx_client_kwargs={ "headers": {"Authorization": "Bearer token"} } )
undefined
mcp = FastMCP.from_fastapi( app=app, httpx_client_kwargs={ "headers": {"Authorization": "Bearer token"} } )
undefined

Cloud Deployment (FastMCP Cloud)

云部署(FastMCP Cloud)

Critical Requirements

关键要求

❗️ IMPORTANT: These requirements are mandatory for FastMCP Cloud:
  1. Module-level server object named
    mcp
    ,
    server
    , or
    app
  2. PyPI dependencies only in requirements.txt
  3. Public GitHub repository (or accessible to FastMCP Cloud)
  4. Environment variables for configuration
❗️ 重要提示: 这些要求是FastMCP Cloud的强制要求:
  1. 模块级服务器对象,命名为
    mcp
    server
    app
  2. 仅使用PyPI依赖,在requirements.txt中声明
  3. 公开GitHub仓库(或FastMCP Cloud可访问的仓库)
  4. 使用环境变量进行配置

Cloud-Ready Server Pattern

云就绪服务器模式

python
undefined
python
undefined

server.py

server.py

from fastmcp import FastMCP import os
from fastmcp import FastMCP import os

✅ CORRECT: Module-level server object

✅ 正确做法:模块级服务器对象

mcp = FastMCP( name="production-server" )
mcp = FastMCP( name="production-server" )

✅ Use environment variables

✅ 使用环境变量

API_KEY = os.getenv("API_KEY") DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool() async def production_tool(data: str) -> dict: """Production-ready tool.""" if not API_KEY: return {"error": "API_KEY not configured"}
# Your implementation
return {"status": "success", "data": data}
API_KEY = os.getenv("API_KEY") DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool() async def production_tool(data: str) -> dict: """生产就绪工具。""" if not API_KEY: return {"error": "API_KEY未配置"}
# 你的实现
return {"status": "success", "data": data}

✅ Optional: for local testing

✅ 可选:用于本地测试

if name == "main": mcp.run()
undefined
if name == "main": mcp.run()
undefined

Common Cloud Deployment Errors

常见云部署错误

❌ WRONG: Function-wrapped server
python
def create_server():
    mcp = FastMCP("my-server")
    return mcp

if __name__ == "__main__":
    server = create_server()  # Too late for cloud!
    server.run()
✅ CORRECT: Factory with module export
python
def create_server() -> FastMCP:
    mcp = FastMCP("my-server")
    # Complex setup logic
    return mcp
❌ 错误做法:函数包裹的服务器
python
def create_server():
    mcp = FastMCP("my-server")
    return mcp

if __name__ == "__main__":
    server = create_server()  # 对云部署来说太晚了!
    server.run()
✅ 正确做法:带模块导出的工厂函数
python
def create_server() -> FastMCP:
    mcp = FastMCP("my-server")
    # 复杂的设置逻辑
    return mcp

Export at module level

在模块级别导出

mcp = create_server()
if name == "main": mcp.run()
undefined
mcp = create_server()
if name == "main": mcp.run()
undefined

Deployment Steps

部署步骤

  1. Prepare Repository:
bash
git init
git add .
git commit -m "Initial MCP server"
gh repo create my-mcp-server --public
git push -u origin main
  1. Deploy on FastMCP Cloud:
    • Visit https://fastmcp.cloud
    • Sign in with GitHub
    • Click "Create Project"
    • Select your repository
    • Configure:
      • Server Name: Your project name
      • Entrypoint:
        server.py
      • Environment Variables: Add any needed
  2. Access Your Server:
    • URL:
      https://your-project.fastmcp.app/mcp
    • Automatic deployment on push to main
    • PR preview deployments
  1. 准备仓库:
bash
git init
git add .
git commit -m "Initial MCP server"
gh repo create my-mcp-server --public
git push -u origin main
  1. 在FastMCP Cloud上部署:
    • 访问https://fastmcp.cloud
    • 使用GitHub登录
    • 点击“Create Project”
    • 选择你的仓库
    • 配置:
      • Server Name: 你的项目名称
      • Entrypoint:
        server.py
      • Environment Variables: 添加所需的环境变量
  2. 访问你的服务器:
    • URL:
      https://your-project.fastmcp.app/mcp
    • 推送到main分支时自动部署
    • PR预览部署

Client Configuration

客户端配置

Claude Desktop

Claude Desktop

Add to
claude_desktop_config.json
:
json
{
  "mcpServers": {
    "my-server": {
      "url": "https://your-project.fastmcp.app/mcp",
      "transport": "http"
    }
  }
}
添加到
claude_desktop_config.json
json
{
  "mcpServers": {
    "my-server": {
      "url": "https://your-project.fastmcp.app/mcp",
      "transport": "http"
    }
  }
}

Local Development

本地开发

json
{
  "mcpServers": {
    "my-server": {
      "command": "python",
      "args": ["/absolute/path/to/server.py"],
      "env": {
        "API_KEY": "your-key",
        "DATABASE_URL": "your-db-url"
      }
    }
  }
}
json
{
  "mcpServers": {
    "my-server": {
      "command": "python",
      "args": ["/absolute/path/to/server.py"],
      "env": {
        "API_KEY": "your-key",
        "DATABASE_URL": "your-db-url"
      }
    }
  }
}

Claude Code CLI

Claude Code CLI

json
{
  "mcpServers": {
    "my-server": {
      "command": "uv",
      "args": ["run", "python", "/absolute/path/to/server.py"]
    }
  }
}
json
{
  "mcpServers": {
    "my-server": {
      "command": "uv",
      "args": ["run", "python", "/absolute/path/to/server.py"]
    }
  }
}

25 Common Errors (With Solutions)

25种常见错误(及解决方案)

Error 1: Missing Server Object

错误1:缺少服务器对象

Error:
RuntimeError: No server object found at module level
Cause: Server object not exported at module level (FastMCP Cloud requirement)
Solution:
python
undefined
错误信息:
RuntimeError: No server object found at module level
原因: 未在模块级别导出服务器对象(FastMCP Cloud要求)
解决方案:
python
undefined

❌ WRONG

❌ 错误做法

def create_server(): return FastMCP("server")
def create_server(): return FastMCP("server")

✅ CORRECT

✅ 正确做法

mcp = FastMCP("server") # At module level

**Source:** FastMCP Cloud documentation, deployment failures

---
mcp = FastMCP("server") # 在模块级别定义

**来源:** FastMCP Cloud文档、部署失败案例

---

Error 2: Async/Await Confusion

错误2:Async/Await混淆

Error:
RuntimeError: no running event loop
TypeError: object coroutine can't be used in 'await' expression
Cause: Mixing sync/async incorrectly
Solution:
python
undefined
错误信息:
RuntimeError: no running event loop
TypeError: object coroutine can't be used in 'await' expression
原因: 错误地混合同步/异步代码
解决方案:
python
undefined

❌ WRONG: Sync function calling async

❌ 错误做法:同步函数调用异步代码

@mcp.tool() def bad_tool(): result = await async_function() # Error!
@mcp.tool() def bad_tool(): result = await async_function() # 错误!

✅ CORRECT: Async tool

✅ 正确做法:异步工具

@mcp.tool() async def good_tool(): result = await async_function() return result
@mcp.tool() async def good_tool(): result = await async_function() return result

✅ CORRECT: Sync tool with sync code

✅ 正确做法:带同步代码的同步工具

@mcp.tool() def sync_tool(): return "Hello"

**Source:** GitHub issues #156, #203

---
@mcp.tool() def sync_tool(): return "Hello"

**来源:** GitHub issues #156, #203

---

Error 3: Context Not Injected

错误3:上下文未注入

Error:
TypeError: missing 1 required positional argument: 'context'
Cause: Missing
Context
type annotation for context parameter
Solution:
python
from fastmcp import Context
错误信息:
TypeError: missing 1 required positional argument: 'context'
原因: 上下文参数缺少
Context
类型注解
解决方案:
python
from fastmcp import Context

❌ WRONG: No type hint

❌ 错误做法:无类型提示

@mcp.tool() async def bad_tool(context): # Missing type! await context.report_progress(...)
@mcp.tool() async def bad_tool(context): # 缺少类型! await context.report_progress(...)

✅ CORRECT: Proper type hint

✅ 正确做法:正确的类型提示

@mcp.tool() async def good_tool(context: Context): await context.report_progress(0, 100, "Starting")

**Source:** FastMCP v2 migration guide

---
@mcp.tool() async def good_tool(context: Context): await context.report_progress(0, 100, "Starting")

**来源:** FastMCP v2迁移指南

---

Error 4: Resource URI Syntax

错误4:资源URI语法错误

Error:
ValueError: Invalid resource URI: missing scheme
Cause: Resource URI missing scheme prefix
Solution:
python
undefined
错误信息:
ValueError: Invalid resource URI: missing scheme
原因: 资源URI缺少方案前缀
解决方案:
python
undefined

❌ WRONG: Missing scheme

❌ 错误做法:缺少方案

@mcp.resource("config") def get_config(): pass
@mcp.resource("config") def get_config(): pass

✅ CORRECT: Include scheme

✅ 正确做法:包含方案

@mcp.resource("data://config") def get_config(): pass
@mcp.resource("data://config") def get_config(): pass

✅ Valid schemes

✅ 有效的方案

@mcp.resource("file://config.json") @mcp.resource("api://status") @mcp.resource("info://health")

**Source:** MCP Protocol specification

---
@mcp.resource("file://config.json") @mcp.resource("api://status") @mcp.resource("info://health")

**来源:** MCP协议规范

---

Error 5: Resource Template Parameter Mismatch

错误5:资源模板参数不匹配

Error:
TypeError: get_user() missing 1 required positional argument: 'user_id'
Cause: Function parameter names don't match URI template
Solution:
python
undefined
错误信息:
TypeError: get_user() missing 1 required positional argument: 'user_id'
原因: 函数参数名称与URI模板不匹配
解决方案:
python
undefined

❌ WRONG: Parameter name mismatch

❌ 错误做法:参数名称不匹配

@mcp.resource("user://{user_id}/profile") def get_user(id: str): # Wrong name! pass
@mcp.resource("user://{user_id}/profile") def get_user(id: str): # 名称错误! pass

✅ CORRECT: Matching names

✅ 正确做法:名称匹配

@mcp.resource("user://{user_id}/profile") def get_user(user_id: str): # Matches {user_id} return {"id": user_id}

**Source:** FastMCP patterns documentation

---
@mcp.resource("user://{user_id}/profile") def get_user(user_id: str): # 与{user_id}匹配 return {"id": user_id}

**来源:** FastMCP模式文档

---

Error 6: Pydantic Validation Error

错误6:Pydantic验证错误

Error:
ValidationError: value is not a valid integer
Cause: Type hints don't match provided data
Solution:
python
from pydantic import BaseModel, Field
错误信息:
ValidationError: value is not a valid integer
原因: 类型提示与提供的数据不匹配
解决方案:
python
from pydantic import BaseModel, Field

✅ Use Pydantic models for complex validation

✅ 对复杂验证使用Pydantic模型

class SearchParams(BaseModel): query: str = Field(min_length=1, max_length=100) limit: int = Field(default=10, ge=1, le=100)
@mcp.tool() async def search(params: SearchParams) -> dict: # Validation automatic return await perform_search(params.query, params.limit)

**Source:** Pydantic documentation, FastMCP examples

---
class SearchParams(BaseModel): query: str = Field(min_length=1, max_length=100) limit: int = Field(default=10, ge=1, le=100)
@mcp.tool() async def search(params: SearchParams) -> dict: # 自动验证 return await perform_search(params.query, params.limit)

**来源:** Pydantic文档、FastMCP示例

---

Error 7: Transport/Protocol Mismatch

错误7:传输/协议不匹配

Error:
ConnectionError: Server using different transport
Cause: Client and server using incompatible transports
Solution:
python
undefined
错误信息:
ConnectionError: Server using different transport
原因: 客户端和服务器使用不兼容的传输方式
解决方案:
python
undefined

Server using stdio (default)

服务器使用stdio(默认)

mcp.run() # or mcp.run(transport="stdio")
mcp.run() # 或 mcp.run(transport="stdio")

Client configuration must match

客户端配置必须匹配

{ "command": "python", "args": ["server.py"] }
{ "command": "python", "args": ["server.py"] }

OR for HTTP:

或者使用HTTP:

mcp.run(transport="http", port=8000)
mcp.run(transport="http", port=8000)

Client:

客户端:

{ "url": "http://localhost:8000/mcp", "transport": "http" }

**Source:** MCP transport specification

---
{ "url": "http://localhost:8000/mcp", "transport": "http" }

**来源:** MCP传输规范

---

Error 8: Import Errors (Editable Package)

错误8:导入错误(可编辑包)

Error:
ModuleNotFoundError: No module named 'my_package'
Cause: Package not properly installed in editable mode
Solution:
bash
undefined
错误信息:
ModuleNotFoundError: No module named 'my_package'
原因: 包未以可编辑模式正确安装
解决方案:
bash
undefined

✅ Install in editable mode

✅ 以可编辑模式安装

pip install -e .
pip install -e .

✅ Or use absolute imports

✅ 或使用绝对导入

from src.tools import my_tool
from src.tools import my_tool

✅ Or add to PYTHONPATH

✅ 或添加到PYTHONPATH

export PYTHONPATH="${PYTHONPATH}:/path/to/project"

**Source:** Python packaging documentation

---
export PYTHONPATH="${PYTHONPATH}:/path/to/project"

**来源:** Python打包文档

---

Error 9: Deprecation Warnings

错误9:弃用警告

Error:
DeprecationWarning: 'mcp.settings' is deprecated, use global Settings instead
Cause: Using old FastMCP v1 API
Solution:
python
undefined
错误信息:
DeprecationWarning: 'mcp.settings' is deprecated, use global Settings instead
原因: 使用旧版FastMCP v1 API
解决方案:
python
undefined

❌ OLD: FastMCP v1

❌ 旧版:FastMCP v1

from fastmcp import FastMCP mcp = FastMCP() api_key = mcp.settings.get("API_KEY")
from fastmcp import FastMCP mcp = FastMCP() api_key = mcp.settings.get("API_KEY")

✅ NEW: FastMCP v2

✅ 新版:FastMCP v2

import os api_key = os.getenv("API_KEY")

**Source:** FastMCP v2 migration guide

---
import os api_key = os.getenv("API_KEY")

**来源:** FastMCP v2迁移指南

---

Error 10: Port Already in Use

错误10:端口已被占用

Error:
OSError: [Errno 48] Address already in use
Cause: Port 8000 already occupied
Solution:
bash
undefined
错误信息:
OSError: [Errno 48] Address already in use
原因: 端口8000已被占用
解决方案:
bash
undefined

✅ Use different port

✅ 使用不同端口

python server.py --transport http --port 8001
python server.py --transport http --port 8001

✅ Or kill process on port

✅ 或杀死端口上的进程

lsof -ti:8000 | xargs kill -9

**Source:** Common networking issue

---
lsof -ti:8000 | xargs kill -9

**来源:** 常见网络问题

---

Error 11: Schema Generation Failures

错误11:架构生成失败

Error:
TypeError: Object of type 'ndarray' is not JSON serializable
Cause: Unsupported type hints (NumPy arrays, custom classes)
Solution:
python
undefined
错误信息:
TypeError: Object of type 'ndarray' is not JSON serializable
原因: 使用了不支持的类型提示(NumPy数组、自定义类)
解决方案:
python
undefined

❌ WRONG: NumPy array

❌ 错误做法:NumPy数组

import numpy as np
@mcp.tool() def bad_tool() -> np.ndarray: # Not JSON serializable return np.array([1, 2, 3])
import numpy as np
@mcp.tool() def bad_tool() -> np.ndarray: # 不可JSON序列化 return np.array([1, 2, 3])

✅ CORRECT: Use JSON-compatible types

✅ 正确做法:使用兼容JSON的类型

@mcp.tool() def good_tool() -> list[float]: return [1.0, 2.0, 3.0]
@mcp.tool() def good_tool() -> list[float]: return [1.0, 2.0, 3.0]

✅ Or convert to dict

✅ 或转换为字典

@mcp.tool() def array_tool() -> dict: data = np.array([1, 2, 3]) return {"values": data.tolist()}

**Source:** JSON serialization requirements

---
@mcp.tool() def array_tool() -> dict: data = np.array([1, 2, 3]) return {"values": data.tolist()}

**来源:** JSON序列化要求

---

Error 12: JSON Serialization

错误12:JSON序列化错误

Error:
TypeError: Object of type 'datetime' is not JSON serializable
Cause: Returning non-JSON-serializable objects
Solution:
python
from datetime import datetime
错误信息:
TypeError: Object of type 'datetime' is not JSON serializable
原因: 返回了不可JSON序列化的对象
解决方案:
python
from datetime import datetime

❌ WRONG: Return datetime object

❌ 错误做法:返回datetime对象

@mcp.tool() def bad_tool() -> dict: return {"timestamp": datetime.now()} # Not serializable
@mcp.tool() def bad_tool() -> dict: return {"timestamp": datetime.now()} # 不可序列化

✅ CORRECT: Convert to string

✅ 正确做法:转换为字符串

@mcp.tool() def good_tool() -> dict: return {"timestamp": datetime.now().isoformat()}
@mcp.tool() def good_tool() -> dict: return {"timestamp": datetime.now().isoformat()}

✅ Use helper function

✅ 使用辅助函数

def make_serializable(obj): """Convert object to JSON-serializable format.""" if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, bytes): return obj.decode('utf-8') # Add more conversions as needed return obj

**Source:** JSON specification

---
def make_serializable(obj): """将对象转换为可JSON序列化的格式。""" if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, bytes): return obj.decode('utf-8') # 根据需要添加更多转换 return obj

**来源:** JSON规范

---

Error 13: Circular Import Errors

错误13:循环导入错误

Error:
ImportError: cannot import name 'X' from partially initialized module
Cause: Modules import from each other creating circular dependency (common in cloud deployment)
Solution:
python
undefined
错误信息:
ImportError: cannot import name 'X' from partially initialized module
原因: 模块之间互相导入,形成循环依赖(云部署中常见)
解决方案:
python
undefined

❌ WRONG: Factory function in init.py

❌ 错误做法:init.py中的工厂函数

shared/init.py

shared/init.py

_client = None def get_api_client(): from .api_client import APIClient # Circular! return APIClient()
_client = None def get_api_client(): from .api_client import APIClient # 循环! return APIClient()

shared/monitoring.py

shared/monitoring.py

from . import get_api_client # Creates circle
from . import get_api_client # 形成循环

✅ CORRECT: Direct imports

✅ 正确做法:直接导入

shared/init.py

shared/init.py

from .api_client import APIClient from .cache import CacheManager
from .api_client import APIClient from .cache import CacheManager

shared/monitoring.py

shared/monitoring.py

from .api_client import APIClient client = APIClient() # Create directly
from .api_client import APIClient client = APIClient() # 直接创建

✅ ALTERNATIVE: Lazy import

✅ 替代方案:延迟导入

shared/monitoring.py

shared/monitoring.py

def get_client(): from .api_client import APIClient return APIClient()

**Source:** Production cloud deployment errors, Python import system

---
def get_client(): from .api_client import APIClient return APIClient()

**来源:** 生产环境云部署错误、Python导入系统

---

Error 14: Python Version Compatibility

错误14:Python版本兼容性

Error:
DeprecationWarning: datetime.utcnow() is deprecated
Cause: Using deprecated Python 3.12+ methods
Solution:
python
undefined
错误信息:
DeprecationWarning: datetime.utcnow() is deprecated
原因: 使用了Python 3.12+中已弃用的方法
解决方案:
python
undefined

❌ DEPRECATED (Python 3.12+)

❌ 已弃用(Python 3.12+)

from datetime import datetime timestamp = datetime.utcnow()
from datetime import datetime timestamp = datetime.utcnow()

✅ CORRECT: Future-proof

✅ 正确做法:面向未来

from datetime import datetime, timezone timestamp = datetime.now(timezone.utc)

**Source:** Python 3.12 release notes

---
from datetime import datetime, timezone timestamp = datetime.now(timezone.utc)

**来源:** Python 3.12发布说明

---

Error 15: Import-Time Execution

错误15:导入时执行

Error:
RuntimeError: Event loop is closed
Cause: Creating async resources at module import time
Solution:
python
undefined
错误信息:
RuntimeError: Event loop is closed
原因: 在模块导入时创建异步资源
解决方案:
python
undefined

❌ WRONG: Module-level async execution

❌ 错误做法:模块级异步执行

import asyncpg connection = asyncpg.connect('postgresql://...') # Runs at import!
import asyncpg connection = asyncpg.connect('postgresql://...') # 导入时运行!

✅ CORRECT: Lazy initialization

✅ 正确做法:延迟初始化

import asyncpg
class Database: connection = None
@classmethod
async def connect(cls):
    if cls.connection is None:
        cls.connection = await asyncpg.connect('postgresql://...')
    return cls.connection
import asyncpg
class Database: connection = None
@classmethod
async def connect(cls):
    if cls.connection is None:
        cls.connection = await asyncpg.connect('postgresql://...')
    return cls.connection

Usage: connection happens when needed, not at import

使用方式:在需要时连接,而非导入时

@mcp.tool() async def get_users(): conn = await Database.connect() return await conn.fetch("SELECT * FROM users")

**Source:** Async event loop management, cloud deployment requirements

---
@mcp.tool() async def get_users(): conn = await Database.connect() return await conn.fetch("SELECT * FROM users")

**来源:** 异步事件循环管理、云部署要求

---

Error 16: Storage Backend Not Configured

错误16:存储后端未配置

Error:
RuntimeError: OAuth tokens lost on restart
ValueError: Cache not persisting across server instances
Cause: Using default memory storage in production without persistence
Solution:
python
undefined
错误信息:
RuntimeError: OAuth tokens lost on restart
ValueError: Cache not persisting across server instances
原因: 生产环境中使用默认内存存储,无持久化
解决方案:
python
undefined

❌ WRONG: Memory storage in production

❌ 错误做法:生产环境使用内存存储

mcp = FastMCP("Production Server") # Tokens lost on restart!
mcp = FastMCP("Production Server") # 重启后令牌丢失!

✅ CORRECT: Use disk or Redis storage

✅ 正确做法:使用磁盘或Redis存储

from key_value.stores import DiskStore, RedisStore from key_value.encryption import FernetEncryptionWrapper from cryptography.fernet import Fernet
from key_value.stores import DiskStore, RedisStore from key_value.encryption import FernetEncryptionWrapper from cryptography.fernet import Fernet

Disk storage (single instance)

磁盘存储(单实例)

mcp = FastMCP( "Production Server", storage=FernetEncryptionWrapper( key_value=DiskStore(path="/var/lib/mcp/storage"), fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY")) ) )
mcp = FastMCP( "Production Server", storage=FernetEncryptionWrapper( key_value=DiskStore(path="/var/lib/mcp/storage"), fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY")) ) )

Redis storage (multi-instance)

Redis存储(多实例)

mcp = FastMCP( "Production Server", storage=FernetEncryptionWrapper( key_value=RedisStore( host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD") ), fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY")) ) )

**Source:** FastMCP v2.13.0 storage backends documentation

---
mcp = FastMCP( "Production Server", storage=FernetEncryptionWrapper( key_value=RedisStore( host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD") ), fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY")) ) )

**来源:** FastMCP v2.13.0存储后端文档

---

Error 17: Lifespan Not Passed to ASGI App

错误17:生命周期未传递给ASGI应用

Error:
RuntimeError: Database connection never initialized
Warning: MCP lifespan hooks not running
Cause: Using FastMCP with FastAPI/Starlette without passing lifespan
Solution:
python
from fastapi import FastAPI
from fastmcp import FastMCP
错误信息:
RuntimeError: Database connection never initialized
Warning: MCP lifespan hooks not running
原因: 将FastMCP与FastAPI/Starlette一起使用时未传递生命周期
解决方案:
python
from fastapi import FastAPI
from fastmcp import FastMCP

❌ WRONG: Lifespan not passed

❌ 错误做法:未传递生命周期

mcp = FastMCP("My Server", lifespan=my_lifespan) app = FastAPI() # MCP lifespan won't run!
mcp = FastMCP("My Server", lifespan=my_lifespan) app = FastAPI() # MCP生命周期不会运行!

✅ CORRECT: Pass MCP lifespan to parent app

✅ 正确做法:将MCP生命周期传递给父应用

mcp = FastMCP("My Server", lifespan=my_lifespan) app = FastAPI(lifespan=mcp.lifespan)

**Source:** FastMCP v2.13.0 breaking changes, ASGI integration guide

---
mcp = FastMCP("My Server", lifespan=my_lifespan) app = FastAPI(lifespan=mcp.lifespan)

**来源:** FastMCP v2.13.0重大变更、ASGI集成指南

---

Error 18: Middleware Execution Order Error

错误18:中间件执行顺序错误

Error:
RuntimeError: Rate limit not checked before caching
AttributeError: Context state not available in middleware
Cause: Incorrect middleware ordering (order matters!)
Solution:
python
undefined
错误信息:
RuntimeError: Rate limit not checked before caching
AttributeError: Context state not available in middleware
原因: 中间件顺序不正确(顺序很重要!)
解决方案:
python
undefined

❌ WRONG: Cache before rate limiting

❌ 错误做法:缓存先于速率限制

mcp.add_middleware(ResponseCachingMiddleware()) mcp.add_middleware(RateLimitingMiddleware()) # Too late!
mcp.add_middleware(ResponseCachingMiddleware()) mcp.add_middleware(RateLimitingMiddleware()) # 太晚了!

✅ CORRECT: Rate limit before cache

✅ 正确做法:速率限制先于缓存

mcp.add_middleware(ErrorHandlingMiddleware()) # First: catch errors mcp.add_middleware(TimingMiddleware()) # Second: time requests mcp.add_middleware(LoggingMiddleware()) # Third: log mcp.add_middleware(RateLimitingMiddleware()) # Fourth: check limits mcp.add_middleware(ResponseCachingMiddleware()) # Last: cache

**Source:** FastMCP middleware documentation, best practices

---
mcp.add_middleware(ErrorHandlingMiddleware()) # 第一个:捕获错误 mcp.add_middleware(TimingMiddleware()) # 第二个:计时 mcp.add_middleware(LoggingMiddleware()) # 第三个:日志 mcp.add_middleware(RateLimitingMiddleware()) # 第四个:检查速率限制 mcp.add_middleware(ResponseCachingMiddleware()) # 最后一个:缓存

**来源:** FastMCP中间件文档、最佳实践

---

Error 19: Circular Middleware Dependencies

错误19:循环中间件依赖

Error:
RecursionError: maximum recursion depth exceeded
RuntimeError: Middleware loop detected
Cause: Middleware calling
self.next()
incorrectly or circular dependencies
Solution:
python
undefined
错误信息:
RecursionError: maximum recursion depth exceeded
RuntimeError: Middleware loop detected
原因: 中间件错误调用
self.next()
或存在循环依赖
解决方案:
python
undefined

❌ WRONG: Not calling next() or calling incorrectly

❌ 错误做法:未调用next()或调用方式错误

class BadMiddleware(BaseMiddleware): async def on_call_tool(self, tool_name, arguments, context): # Forgot to call next()! return {"error": "blocked"}
class BadMiddleware(BaseMiddleware): async def on_call_tool(self, tool_name, arguments, context): # 忘记调用next()! return {"error": "blocked"}

✅ CORRECT: Always call next() to continue chain

✅ 正确做法:始终调用next()以继续链

class GoodMiddleware(BaseMiddleware): async def on_call_tool(self, tool_name, arguments, context): # Do preprocessing print(f"Before: {tool_name}")
    # MUST call next() to continue
    result = await self.next(tool_name, arguments, context)

    # Do postprocessing
    print(f"After: {tool_name}")
    return result

**Source:** FastMCP middleware system documentation

---
class GoodMiddleware(BaseMiddleware): async def on_call_tool(self, tool_name, arguments, context): # 预处理 print(f"之前: {tool_name}")
    # 必须调用next()以继续
    result = await self.next(tool_name, arguments, context)

    # 后处理
    print(f"之后: {tool_name}")
    return result

**来源:** FastMCP中间件系统文档

---

Error 20: Import vs Mount Confusion

错误20:导入与挂载混淆

Error:
RuntimeError: Subserver changes not reflected
ValueError: Unexpected tool namespacing
Cause: Using
import_server()
when
mount()
was needed (or vice versa)
Solution:
python
undefined
错误信息:
RuntimeError: Subserver changes not reflected
ValueError: Unexpected tool namespacing
原因: 需要
mount()
时使用了
import_server()
(反之亦然)
解决方案:
python
undefined

❌ WRONG: Using import when you want dynamic updates

❌ 错误做法:需要动态更新时使用导入

main_server.import_server(subserver)
main_server.import_server(subserver)

Later: changes to subserver won't appear in main_server

后续:子服务器的更改不会在主服务器中显示

✅ CORRECT: Use mount() for dynamic composition

✅ 正确做法:使用mount()进行动态组合

main_server.mount(subserver, prefix="sub")
main_server.mount(subserver, prefix="sub")

Changes to subserver are immediately visible

子服务器的更改立即可见

❌ WRONG: Using mount when you want static bundle

❌ 错误做法:需要静态打包时使用挂载

main_server.mount(third_party_server, prefix="vendor")
main_server.mount(third_party_server, prefix="vendor")

Runtime overhead for static components

静态组件的运行时开销

✅ CORRECT: Use import_server() for static bundles

✅ 正确做法:使用import_server()进行静态打包

main_server.import_server(third_party_server)
main_server.import_server(third_party_server)

One-time copy, no runtime delegation

一次性复制,无运行时委托


**Source:** FastMCP server composition patterns

---

**来源:** FastMCP服务器组合模式

---

Error 21: Resource Prefix Format Mismatch

错误21:资源前缀格式不匹配

Error:
ValueError: Resource not found: resource://api/users
ValueError: Unexpected resource URI format
Cause: Using wrong resource prefix format (path vs protocol)
Solution:
python
undefined
错误信息:
ValueError: Resource not found: resource://api/users
ValueError: Unexpected resource URI format
原因: 使用了错误的资源前缀格式(路径vs协议)
解决方案:
python
undefined

Path format (default since v2.4.0)

路径格式(v2.4.0起默认)

main_server.mount(api_server, prefix="api")
main_server.mount(api_server, prefix="api")

Resources: resource://api/users

资源:resource://api/users

❌ WRONG: Expecting protocol format

❌ 错误做法:期望协议格式

resource://api+users (doesn't exist)

resource://api+users(不存在)

✅ CORRECT: Use path format

✅ 正确做法:使用路径格式

uri = "resource://api/users"
uri = "resource://api/users"

OR explicitly set protocol format (legacy)

或显式设置协议格式(旧版)

main_server.mount( api_server, prefix="api", resource_prefix_format="protocol" )
main_server.mount( api_server, prefix="api", resource_prefix_format="protocol" )

Resources: api+resource://users

资源:api+resource://users


**Source:** FastMCP v2.4.0+ resource prefix changes

---

**来源:** FastMCP v2.4.0+资源前缀变更

---

Error 22: OAuth Proxy Without Consent Screen

错误22:OAuth代理未启用同意屏幕

Error:
SecurityWarning: Authorization bypass possible
RuntimeError: Confused deputy attack vector
Cause: OAuth Proxy configured without consent screen (security vulnerability)
Solution:
python
undefined
错误信息:
SecurityWarning: Authorization bypass possible
RuntimeError: Confused deputy attack vector
原因: OAuth代理配置时未启用同意屏幕(安全漏洞)
解决方案:
python
undefined

❌ WRONG: No consent screen (security risk!)

❌ 错误做法:无同意屏幕(安全风险!)

auth = OAuthProxy( jwt_signing_key=os.getenv("JWT_KEY"), upstream_authorization_endpoint="...", upstream_token_endpoint="...", # Missing: enable_consent_screen )
auth = OAuthProxy( jwt_signing_key=os.getenv("JWT_KEY"), upstream_authorization_endpoint="...", upstream_token_endpoint="...", # 缺少:enable_consent_screen )

✅ CORRECT: Enable consent screen

✅ 正确做法:启用同意屏幕

auth = OAuthProxy( jwt_signing_key=os.getenv("JWT_KEY"), upstream_authorization_endpoint="...", upstream_token_endpoint="...", enable_consent_screen=True # Prevents bypass attacks )

**Source:** FastMCP v2.13.0 OAuth security enhancements, RFC 7662

---
auth = OAuthProxy( jwt_signing_key=os.getenv("JWT_KEY"), upstream_authorization_endpoint="...", upstream_token_endpoint="...", enable_consent_screen=True # 防止绕过攻击 )

**来源:** FastMCP v2.13.0 OAuth安全增强、RFC 7662

---

Error 23: Missing JWT Signing Key in Production

错误23:生产环境缺少JWT签名密钥

Error:
ValueError: JWT signing key required for OAuth Proxy
RuntimeError: Cannot issue tokens without signing key
Cause: OAuth Proxy missing
jwt_signing_key
in production
Solution:
python
undefined
错误信息:
ValueError: JWT signing key required for OAuth Proxy
RuntimeError: Cannot issue tokens without signing key
原因: OAuth代理在生产环境中缺少
jwt_signing_key
解决方案:
python
undefined

❌ WRONG: No JWT signing key

❌ 错误做法:无JWT签名密钥

auth = OAuthProxy( upstream_authorization_endpoint="...", upstream_token_endpoint="...", # Missing: jwt_signing_key )
auth = OAuthProxy( upstream_authorization_endpoint="...", upstream_token_endpoint="...", # 缺少:jwt_signing_key )

✅ CORRECT: Provide signing key from environment

✅ 正确做法:从环境变量提供签名密钥

import secrets
import secrets

Generate once (in setup):

生成一次(设置时):

signing_key = secrets.token_urlsafe(32)

signing_key = secrets.token_urlsafe(32)

Store in: FASTMCP_JWT_SIGNING_KEY environment variable

存储在:FASTMCP_JWT_SIGNING_KEY环境变量中

auth = OAuthProxy( jwt_signing_key=os.environ["FASTMCP_JWT_SIGNING_KEY"], client_storage=encrypted_storage, upstream_authorization_endpoint="...", upstream_token_endpoint="...", upstream_client_id=os.getenv("OAUTH_CLIENT_ID"), upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET") )

**Source:** OAuth Proxy production requirements

---
auth = OAuthProxy( jwt_signing_key=os.environ["FASTMCP_JWT_SIGNING_KEY"], client_storage=encrypted_storage, upstream_authorization_endpoint="...", upstream_token_endpoint="...", upstream_client_id=os.getenv("OAUTH_CLIENT_ID"), upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET") )

**来源:** OAuth代理生产环境要求

---

Error 24: Icon Data URI Format Error

错误24:图标Data URI格式错误

Error:
ValueError: Invalid data URI format
TypeError: Icon URL must be string or data URI
Cause: Incorrectly formatted data URI for icons
Solution:
python
from fastmcp import Icon, Image
错误信息:
ValueError: Invalid data URI format
TypeError: Icon URL must be string or data URI
原因: 图标Data URI格式不正确
解决方案:
python
from fastmcp import Icon, Image

❌ WRONG: Invalid data URI

❌ 错误做法:无效Data URI

icon = Icon(url="base64,iVBORw0KG...") # Missing data:image/png;
icon = Icon(url="base64,iVBORw0KG...") # 缺少data:image/png;

✅ CORRECT: Use Image utility

✅ 正确做法:使用Image工具

icon = Icon.from_file("/path/to/icon.png", size="medium")
icon = Icon.from_file("/path/to/icon.png", size="medium")

✅ CORRECT: Manual data URI

✅ 正确做法:手动生成Data URI

import base64
with open("/path/to/icon.png", "rb") as f: image_data = base64.b64encode(f.read()).decode() data_uri = f"data:image/png;base64,{image_data}" icon = Icon(url=data_uri, size="medium")

**Source:** FastMCP icons documentation, Data URI specification

---
import base64
with open("/path/to/icon.png", "rb") as f: image_data = base64.b64encode(f.read()).decode() data_uri = f"data:image/png;base64,{image_data}" icon = Icon(url=data_uri, size="medium")

**来源:** FastMCP图标文档、Data URI规范

---

Error 25: Lifespan Behavior Change (v2.13.0)

错误25:生命周期行为变更(v2.13.0)

Error:
Warning: Lifespan runs per-server, not per-session
RuntimeError: Resources initialized multiple times
Cause: Expecting v2.12 lifespan behavior (per-session) in v2.13.0+ (per-server)
Solution:
python
undefined
错误信息:
Warning: Lifespan runs per-server, not per-session
RuntimeError: Resources initialized multiple times
原因: 在v2.13.0+中期望v2.12的生命周期行为(每个会话),但v2.13.0+是每个服务器实例
解决方案:
python
undefined

v2.12.0 and earlier: Lifespan ran per client session

v2.12.0及更早版本:生命周期每个客户端会话运行一次

v2.13.0+: Lifespan runs once per server instance

v2.13.0+:生命周期每个服务器实例运行一次

✅ CORRECT: v2.13.0+ pattern (per-server)

✅ 正确做法:v2.13.0+模式(每个服务器实例)

@asynccontextmanager async def app_lifespan(server: FastMCP): """Runs ONCE when server starts, not per client session.""" db = await Database.connect() print("Server starting - runs once")
try:
    yield {"db": db}
finally:
    await db.disconnect()
    print("Server stopping - runs once")
mcp = FastMCP("My Server", lifespan=app_lifespan)
@asynccontextmanager async def app_lifespan(server: FastMCP): """服务器启动时运行一次,而非每个客户端会话。""" db = await Database.connect() print("服务器启动 - 运行一次")
try:
    yield {"db": db}
finally:
    await db.disconnect()
    print("服务器停止 - 运行一次")
mcp = FastMCP("My Server", lifespan=app_lifespan)

For per-session logic, use middleware instead:

如需每个会话的逻辑,请改用中间件:

class SessionMiddleware(BaseMiddleware): async def on_message(self, message, context): # Runs per client message session_id = context.fastmcp_context.get_state("session_id") if not session_id: session_id = str(uuid.uuid4()) context.fastmcp_context.set_state("session_id", session_id)
    return await self.next(message, context)

**Source:** FastMCP v2.13.0 release notes, breaking changes documentation

---
class SessionMiddleware(BaseMiddleware): async def on_message(self, message, context): # 每个客户端消息运行一次 session_id = context.fastmcp_context.get_state("session_id") if not session_id: session_id = str(uuid.uuid4()) context.fastmcp_context.set_state("session_id", session_id)
    return await self.next(message, context)

**来源:** FastMCP v2.13.0发布说明、重大变更文档

---

Production Patterns

生产模式

Pattern 1: Self-Contained Utils Module

模式1:独立工具模块

Best practice for maintaining all utilities in one place:
python
undefined
将所有工具维护在一个位置的最佳实践:
python
undefined

src/utils.py - Single file with all utilities

src/utils.py - 包含所有工具的单个文件

import os from typing import Dict, Any from datetime import datetime
class Config: """Application configuration.""" SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server") SERVER_VERSION = "1.0.0" API_BASE_URL = os.getenv("API_BASE_URL") API_KEY = os.getenv("API_KEY") CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]: """Format successful response.""" return { "success": True, "message": message, "data": data, "timestamp": datetime.now().isoformat() }
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]: """Format error response.""" return { "success": False, "error": error, "code": code, "timestamp": datetime.now().isoformat() }
import os from typing import Dict, Any from datetime import datetime
class Config: """应用配置。""" SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server") SERVER_VERSION = "1.0.0" API_BASE_URL = os.getenv("API_BASE_URL") API_KEY = os.getenv("API_KEY") CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]: """格式化成功响应。""" return { "success": True, "message": message, "data": data, "timestamp": datetime.now().isoformat() }
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]: """格式化错误响应。""" return { "success": False, "error": error, "code": code, "timestamp": datetime.now().isoformat() }

Usage in tools

在工具中使用

from .utils import format_success, format_error, Config
@mcp.tool() async def process_data(data: dict) -> dict: try: result = await process(data) return format_success(result) except Exception as e: return format_error(str(e))
undefined
from .utils import format_success, format_error, Config
@mcp.tool() async def process_data(data: dict) -> dict: try: result = await process(data) return format_success(result) except Exception as e: return format_error(str(e))
undefined

Pattern 2: Connection Pooling

模式2:连接池

Efficient resource management:
python
import httpx
from typing import Optional

class APIClient:
    _instance: Optional[httpx.AsyncClient] = None

    @classmethod
    async def get_client(cls) -> httpx.AsyncClient:
        if cls._instance is None:
            cls._instance = httpx.AsyncClient(
                base_url=os.getenv("API_BASE_URL"),
                headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
                timeout=httpx.Timeout(30.0),
                limits=httpx.Limits(max_keepalive_connections=5)
            )
        return cls._instance

    @classmethod
    async def cleanup(cls):
        if cls._instance:
            await cls._instance.aclose()
            cls._instance = None

@mcp.tool()
async def api_request(endpoint: str) -> dict:
    """Make API request with managed client."""
    client = await APIClient.get_client()
    response = await client.get(endpoint)
    return response.json()
高效的资源管理:
python
import httpx
from typing import Optional

class APIClient:
    _instance: Optional[httpx.AsyncClient] = None

    @classmethod
    async def get_client(cls) -> httpx.AsyncClient:
        if cls._instance is None:
            cls._instance = httpx.AsyncClient(
                base_url=os.getenv("API_BASE_URL"),
                headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
                timeout=httpx.Timeout(30.0),
                limits=httpx.Limits(max_keepalive_connections=5)
            )
        return cls._instance

    @classmethod
    async def cleanup(cls):
        if cls._instance:
            await cls._instance.aclose()
            cls._instance = None

@mcp.tool()
async def api_request(endpoint: str) -> dict:
    """使用托管客户端发起API请求。"""
    client = await APIClient.get_client()
    response = await client.get(endpoint)
    return response.json()

Pattern 3: Error Handling with Retry

模式3:带重试的错误处理

Resilient API calls:
python
import asyncio
from typing import Callable, TypeVar

T = TypeVar('T')

async def retry_with_backoff(
    func: Callable[[], T],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    exponential_base: float = 2.0
) -> T:
    """Retry function with exponential backoff."""
    delay = initial_delay
    last_exception = None

    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            last_exception = e
            if attempt < max_retries - 1:
                await asyncio.sleep(delay)
                delay *= exponential_base

    raise last_exception

@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
    """API call with automatic retry."""
    async def make_call():
        async with httpx.AsyncClient() as client:
            response = await client.get(endpoint)
            response.raise_for_status()
            return response.json()

    try:
        data = await retry_with_backoff(make_call)
        return {"success": True, "data": data}
    except Exception as e:
        return {"error": f"Failed after retries: {e}"}
弹性API调用:
python
import asyncio
from typing import Callable, TypeVar

T = TypeVar('T')

async def retry_with_backoff(
    func: Callable[[], T],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    exponential_base: float = 2.0
) -> T:
    """带指数退避的重试函数。"""
    delay = initial_delay
    last_exception = None

    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            last_exception = e
            if attempt < max_retries - 1:
                await asyncio.sleep(delay)
                delay *= exponential_base

    raise last_exception

@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
    """带自动重试的API调用。"""
    async def make_call():
        async with httpx.AsyncClient() as client:
            response = await client.get(endpoint)
            response.raise_for_status()
            return response.json()

    try:
        data = await retry_with_backoff(make_call)
        return {"success": True, "data": data}
    except Exception as e:
        return {"error": f"重试后失败: {e}"}

Pattern 4: Time-Based Caching

模式4:基于时间的缓存

Reduce API load:
python
import time
from typing import Any, Optional

class TimeBasedCache:
    def __init__(self, ttl: int = 300):
        self.ttl = ttl
        self.cache = {}
        self.timestamps = {}

    def get(self, key: str) -> Optional[Any]:
        if key in self.cache:
            if time.time() - self.timestamps[key] < self.ttl:
                return self.cache[key]
            else:
                del self.cache[key]
                del self.timestamps[key]
        return None

    def set(self, key: str, value: Any):
        self.cache[key] = value
        self.timestamps[key] = time.time()

cache = TimeBasedCache(ttl=300)

@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
    """Fetch with caching."""
    cache_key = f"resource:{resource_id}"

    cached_data = cache.get(cache_key)
    if cached_data:
        return {"data": cached_data, "from_cache": True}

    data = await fetch_from_api(resource_id)
    cache.set(cache_key, data)

    return {"data": data, "from_cache": False}
减少API负载:
python
import time
from typing import Any, Optional

class TimeBasedCache:
    def __init__(self, ttl: int = 300):
        self.ttl = ttl
        self.cache = {}
        self.timestamps = {}

    def get(self, key: str) -> Optional[Any]:
        if key in self.cache:
            if time.time() - self.timestamps[key] < self.ttl:
                return self.cache[key]
            else:
                del self.cache[key]
                del self.timestamps[key]
        return None

    def set(self, key: str, value: Any):
        self.cache[key] = value
        self.timestamps[key] = time.time()

cache = TimeBasedCache(ttl=300)

@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
    """带缓存的获取操作。"""
    cache_key = f"resource:{resource_id}"

    cached_data = cache.get(cache_key)
    if cached_data:
        return {"data": cached_data, "from_cache": True}

    data = await fetch_from_api(resource_id)
    cache.set(cache_key, data)

    return {"data": data, "from_cache": False}

Testing

测试

Unit Testing Tools

工具单元测试

python
import pytest
from fastmcp import FastMCP
from fastmcp.testing import create_test_client

@pytest.fixture
def test_server():
    """Create test server instance."""
    mcp = FastMCP("test-server")

    @mcp.tool()
    async def test_tool(param: str) -> str:
        return f"Result: {param}"

    return mcp

@pytest.mark.asyncio
async def test_tool_execution(test_server):
    """Test tool execution."""
    async with create_test_client(test_server) as client:
        result = await client.call_tool("test_tool", {"param": "test"})
        assert result.data == "Result: test"
python
import pytest
from fastmcp import FastMCP
from fastmcp.testing import create_test_client

@pytest.fixture
def test_server():
    """创建测试服务器实例。"""
    mcp = FastMCP("test-server")

    @mcp.tool()
    async def test_tool(param: str) -> str:
        return f"Result: {param}"

    return mcp

@pytest.mark.asyncio
async def test_tool_execution(test_server):
    """测试工具执行。"""
    async with create_test_client(test_server) as client:
        result = await client.call_tool("test_tool", {"param": "test"})
        assert result.data == "Result: test"

Integration Testing

集成测试

python
import asyncio
from fastmcp import Client

async def test_server():
    """Test all server functionality."""
    async with Client("server.py") as client:
        # Test tools
        tools = await client.list_tools()
        print(f"Tools: {len(tools)}")

        for tool in tools:
            try:
                result = await client.call_tool(tool.name, {})
                print(f"✓ {tool.name}: {result}")
            except Exception as e:
                print(f"✗ {tool.name}: {e}")

        # Test resources
        resources = await client.list_resources()
        for resource in resources:
            try:
                data = await client.read_resource(resource.uri)
                print(f"✓ {resource.uri}")
            except Exception as e:
                print(f"✗ {resource.uri}: {e}")

if __name__ == "__main__":
    asyncio.run(test_server())
python
import asyncio
from fastmcp import Client

async def test_server():
    """测试服务器所有功能。"""
    async with Client("server.py") as client:
        # 测试工具
        tools = await client.list_tools()
        print(f"工具数量: {len(tools)}")

        for tool in tools:
            try:
                result = await client.call_tool(tool.name, {})
                print(f"✓ {tool.name}: {result}")
            except Exception as e:
                print(f"✗ {tool.name}: {e}")

        # 测试资源
        resources = await client.list_resources()
        for resource in resources:
            try:
                data = await client.read_resource(resource.uri)
                print(f"✓ {resource.uri}")
            except Exception as e:
                print(f"✗ {resource.uri}: {e}")

if __name__ == "__main__":
    asyncio.run(test_server())

CLI Commands

CLI命令

Development:
bash
undefined
开发:
bash
undefined

Run with inspector (recommended)

使用检查器运行(推荐)

fastmcp dev server.py
fastmcp dev server.py

Run normally

正常运行

fastmcp run server.py
fastmcp run server.py

Inspect server without running

检查服务器而不运行

fastmcp inspect server.py

**Installation:**
```bash
fastmcp inspect server.py

**安装:**
```bash

Install to Claude Desktop

安装到Claude Desktop

fastmcp install server.py
fastmcp install server.py

Install with custom name

使用自定义名称安装

fastmcp install server.py --name "My Server"

**Debugging:**
```bash
fastmcp install server.py --name "My Server"

**调试:**
```bash

Enable debug logging

启用调试日志

FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py

Run with HTTP transport

使用HTTP传输运行

fastmcp run server.py --transport http --port 8000
undefined
fastmcp run server.py --transport http --port 8000
undefined

Best Practices

最佳实践

1. Server Structure

1. 服务器结构

python
from fastmcp import FastMCP
import os

def create_server() -> FastMCP:
    """Factory function for complex setup."""
    mcp = FastMCP("Server Name")

    # Configure server
    setup_tools(mcp)
    setup_resources(mcp)

    return mcp

def setup_tools(mcp: FastMCP):
    """Register all tools."""
    @mcp.tool()
    def example_tool():
        pass

def setup_resources(mcp: FastMCP):
    """Register all resources."""
    @mcp.resource("data://config")
    def get_config():
        return {"version": "1.0.0"}
python
from fastmcp import FastMCP
import os

def create_server() -> FastMCP:
    """复杂设置的工厂函数。"""
    mcp = FastMCP("Server Name")

    # 配置服务器
    setup_tools(mcp)
    setup_resources(mcp)

    return mcp

def setup_tools(mcp: FastMCP):
    """注册所有工具。"""
    @mcp.tool()
    def example_tool():
        pass

def setup_resources(mcp: FastMCP):
    """注册所有资源。"""
    @mcp.resource("data://config")
    def get_config():
        return {"version": "1.0.0"}

Export at module level

在模块级别导出

mcp = create_server()
if name == "main": mcp.run()
undefined
mcp = create_server()
if name == "main": mcp.run()
undefined

2. Environment Configuration

2. 环境配置

python
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    API_KEY = os.getenv("API_KEY", "")
    BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
    DEBUG = os.getenv("DEBUG", "false").lower() == "true"

    @classmethod
    def validate(cls):
        if not cls.API_KEY:
            raise ValueError("API_KEY is required")
        return True
python
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    API_KEY = os.getenv("API_KEY", "")
    BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
    DEBUG = os.getenv("DEBUG", "false").lower() == "true"

    @classmethod
    def validate(cls):
        if not cls.API_KEY:
            raise ValueError("API_KEY是必填项")
        return True

Validate on startup

启动时验证

Config.validate()
undefined
Config.validate()
undefined

3. Documentation

3. 文档

python
@mcp.tool()
def complex_tool(
    query: str,
    filters: dict = None,
    limit: int = 10
) -> dict:
    """
    Search with advanced filtering.

    Args:
        query: Search query string
        filters: Optional filters dict with keys:
            - category: Filter by category
            - date_from: Start date (ISO format)
            - date_to: End date (ISO format)
        limit: Maximum results (1-100)

    Returns:
        Dict with 'results' list and 'total' count

    Examples:
        >>> complex_tool("python", {"category": "tutorial"}, 5)
        {'results': [...], 'total': 5}
    """
    pass
python
@mcp.tool()
def complex_tool(
    query: str,
    filters: dict = None,
    limit: int = 10
) -> dict:
    """
    使用高级过滤进行搜索。

    参数:
        query: 搜索查询字符串
        filters: 可选过滤字典,包含以下键:
            - category: 按类别过滤
            - date_from: 开始日期(ISO格式)
            - date_to: 结束日期(ISO格式)
        limit: 最大结果数(1-100)

    返回:
        包含'results'列表和'total'计数的字典

    示例:
        >>> complex_tool("python", {"category": "tutorial"}, 5)
        {'results': [...], 'total': 5}
    """
    pass

4. Health Checks

4. 健康检查

python
@mcp.resource("health://status")
async def health_check() -> dict:
    """Comprehensive health check."""
    checks = {}

    # Check API connectivity
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(f"{BASE_URL}/health", timeout=5)
            checks["api"] = response.status_code == 200
    except:
        checks["api"] = False

    # Check database
    try:
        checks["database"] = await check_db_connection()
    except:
        checks["database"] = False

    all_healthy = all(checks.values())

    return {
        "status": "healthy" if all_healthy else "degraded",
        "timestamp": datetime.now().isoformat(),
        "checks": checks
    }
python
@mcp.resource("health://status")
async def health_check() -> dict:
    """全面健康检查。"""
    checks = {}

    # 检查API连通性
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(f"{BASE_URL}/health", timeout=5)
            checks["api"] = response.status_code == 200
    except:
        checks["api"] = False

    # 检查数据库
    try:
        checks["database"] = await check_db_connection()
    except:
        checks["database"] = False

    all_healthy = all(checks.values())

    return {
        "status": "healthy" if all_healthy else "degraded",
        "timestamp": datetime.now().isoformat(),
        "checks": checks
    }

Project Structure

项目结构

Simple Server

简单服务器

my-mcp-server/
├── server.py          # Main server file
├── requirements.txt   # Dependencies
├── .env              # Environment variables (git-ignored)
├── .gitignore        # Git ignore file
└── README.md         # Documentation
my-mcp-server/
├── server.py          # 主服务器文件
├── requirements.txt   # 依赖
├── .env              # 环境变量(Git忽略)
├── .gitignore        # Git忽略文件
└── README.md         # 文档

Production Server

生产环境服务器

my-mcp-server/
├── src/
│   ├── server.py         # Main entry point
│   ├── utils.py          # Shared utilities
│   ├── tools/           # Tool modules
│   │   ├── __init__.py
│   │   ├── api_tools.py
│   │   └── data_tools.py
│   ├── resources/       # Resource definitions
│   │   ├── __init__.py
│   │   └── static.py
│   └── prompts/         # Prompt templates
│       ├── __init__.py
│       └── templates.py
├── tests/
│   ├── test_tools.py
│   └── test_resources.py
├── requirements.txt
├── pyproject.toml
├── .env
├── .gitignore
└── README.md
my-mcp-server/
├── src/
│   ├── server.py         # 主入口
│   ├── utils.py          # 共享工具
│   ├── tools/           # 工具模块
│   │   ├── __init__.py
│   │   ├── api_tools.py
│   │   └── data_tools.py
│   ├── resources/       # 资源定义
│   │   ├── __init__.py
│   │   └── static.py
│   └── prompts/         # 提示词模板
│       ├── __init__.py
│       └── templates.py
├── tests/
│   ├── test_tools.py
│   └── test_resources.py
├── requirements.txt
├── pyproject.toml
├── .env
├── .gitignore
└── README.md

References

参考资料

Official Documentation:
Related Skills:
  • openai-api
    - OpenAI integration
  • claude-api
    - Claude API
  • cloudflare-worker-base
    - Deploy MCP as Worker
Package Versions:
  • fastmcp >= 2.13.0
  • Python >= 3.10
  • httpx (recommended for async API calls)
  • pydantic (for validation)
  • py-key-value-aio (for storage backends)
  • cryptography (for encrypted storage)
官方文档:
相关技能:
  • openai-api
    - OpenAI集成
  • claude-api
    - Claude API
  • cloudflare-worker-base
    - 将MCP部署为Worker
包版本:
  • fastmcp >= 2.13.0
  • Python >= 3.10
  • httpx(推荐用于异步API调用)
  • pydantic(用于验证)
  • py-key-value-aio(用于存储后端)
  • cryptography(用于加密存储)

Summary

总结

FastMCP enables rapid development of production-ready MCP servers with advanced features for storage, authentication, middleware, and composition. Key takeaways:
  1. Always export server at module level for FastMCP Cloud compatibility
  2. Use persistent storage backends (Disk/Redis) in production for OAuth tokens and caching
  3. Configure server lifespans for proper resource management (DB connections, API clients)
  4. Add middleware strategically - order matters! (errors → timing → logging → rate limiting → caching)
  5. Choose composition wisely -
    import_server()
    for static bundles,
    mount()
    for dynamic composition
  6. Secure OAuth properly - Enable consent screens, encrypt token storage, use JWT signing keys
  7. Use async/await properly - don't block the event loop
  8. Handle errors gracefully with structured responses and ErrorHandlingMiddleware
  9. Avoid circular imports especially with factory functions
  10. Test locally before deploying using
    fastmcp dev
  11. Use environment variables for all configuration (never hardcode secrets)
  12. Document thoroughly - LLMs read your docstrings
  13. Follow production patterns for self-contained, maintainable code
  14. Leverage OpenAPI for instant API integration
  15. Monitor with health checks and middleware for production reliability
Production Readiness:
  • Storage: Encrypted persistence for OAuth tokens and response caching
  • Authentication: 4 auth patterns (Token Validation, Remote OAuth, OAuth Proxy, Full OAuth)
  • Middleware: 8 built-in types for logging, rate limiting, caching, error handling
  • Composition: Modular server architecture with import/mount strategies
  • Security: Consent screens, PKCE, RFC 7662 token introspection, encrypted storage
  • Performance: Response caching, connection pooling, timing middleware
This skill prevents 25+ common errors and provides 90-95% token savings compared to manual implementation.
FastMCP支持快速开发生产就绪的MCP服务器,提供存储、认证、中间件和组合等高级功能。关键要点:
  1. 始终在模块级别导出服务器,以兼容FastMCP Cloud
  2. 生产环境使用持久化存储后端(磁盘/Redis)存储OAuth令牌和缓存
  3. 配置服务器生命周期,以正确管理资源(数据库连接、API客户端)
  4. 战略性添加中间件 - 顺序很重要!(错误处理 → 计时 → 日志 → 速率限制 → 缓存)
  5. 明智选择组合方式 -
    import_server()
    用于静态打包,
    mount()
    用于动态组合
  6. 正确配置OAuth - 启用同意屏幕,加密令牌存储,使用JWT签名密钥
  7. 正确使用async/await - 不要阻塞事件循环
  8. 优雅处理错误,使用结构化响应和ErrorHandlingMiddleware
  9. 避免循环导入,尤其是工厂函数
  10. 部署前本地测试,使用
    fastmcp dev
  11. 所有配置使用环境变量(永远不要硬编码密钥)
  12. 彻底文档化 - LLM会读取你的文档字符串
  13. 遵循生产模式,编写独立、可维护的代码
  14. 利用OpenAPI实现即时API集成
  15. 使用健康检查和中间件监控生产环境可靠性
生产就绪特性:
  • 存储:OAuth令牌和响应缓存的加密持久化
  • 认证:4种认证模式(令牌验证、远程OAuth、OAuth代理、完整OAuth)
  • 中间件:8种内置类型,用于日志、速率限制、缓存、错误处理
  • 组合:模块化服务器架构,支持导入/挂载策略
  • 安全:同意屏幕、PKCE、RFC 7662令牌 introspection、加密存储
  • 性能:响应缓存、连接池、计时中间件
本技能可防止25+种常见错误,与手动实现相比可节省90-95%的令牌消耗。