fastmcp
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastMCP - Build MCP Servers in Python
FastMCP - 在Python中构建MCP服务器
FastMCP is a Python framework for building Model Context Protocol (MCP) servers that expose tools, resources, and prompts to Large Language Models like Claude. This skill provides production-tested patterns, error prevention, and deployment strategies for building robust MCP servers.
FastMCP是一个Python框架,用于构建模型上下文协议(MCP)服务器,向Claude等大语言模型(LLM)暴露工具、资源和提示词。本技能提供经过生产验证的模式、错误预防方案和部署策略,帮助你构建稳健的MCP服务器。
Quick Start
快速开始
Installation
安装
bash
pip install fastmcpbash
pip install fastmcpor
或
uv pip install fastmcp
undefineduv pip install fastmcp
undefinedMinimal Server
最简服务器
python
from fastmcp import FastMCPpython
from fastmcp import FastMCPMUST be at module level for FastMCP Cloud
必须在模块级别定义,以适配FastMCP Cloud
mcp = FastMCP("My Server")
@mcp.tool()
async def hello(name: str) -> str:
"""Say hello to someone."""
return f"Hello, {name}!"
if name == "main":
mcp.run()
**Run it:**
```bashmcp = FastMCP("My Server")
@mcp.tool()
async def hello(name: str) -> str:
"""向某人打招呼。"""
return f"Hello, {name}!"
if name == "main":
mcp.run()
**运行方式:**
```bashLocal development
本地开发
python server.py
python server.py
With FastMCP CLI
使用FastMCP CLI
fastmcp dev server.py
fastmcp dev server.py
HTTP mode
HTTP模式
python server.py --transport http --port 8000
undefinedpython server.py --transport http --port 8000
undefinedCore Concepts
核心概念
1. Tools
1. 工具
Tools are functions that LLMs can call to perform actions:
python
@mcp.tool()
def calculate(operation: str, a: float, b: float) -> float:
"""Perform mathematical operations.
Args:
operation: add, subtract, multiply, or divide
a: First number
b: Second number
Returns:
Result of the operation
"""
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else None
}
return operations.get(operation, lambda x, y: None)(a, b)Best Practices:
- Clear, descriptive function names
- Comprehensive docstrings (LLMs read these!)
- Strong type hints (Pydantic validates automatically)
- Return structured data (dicts/lists)
- Handle errors gracefully
Sync vs Async:
python
undefined工具是LLM可以调用以执行操作的函数:
python
@mcp.tool()
def calculate(operation: str, a: float, b: float) -> float:
"""执行数学运算。
参数:
operation: 加、减、乘或除
a: 第一个数字
b: 第二个数字
返回:
运算结果
"""
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else None
}
return operations.get(operation, lambda x, y: None)(a, b)最佳实践:
- 清晰、描述性的函数名称
- 全面的文档字符串(LLM会读取这些内容!)
- 强类型提示(Pydantic会自动验证)
- 返回结构化数据(字典/列表)
- 优雅处理错误
同步 vs 异步:
python
undefinedSync tool (for non-blocking operations)
同步工具(用于非阻塞操作)
@mcp.tool()
def sync_tool(param: str) -> dict:
return {"result": param.upper()}
@mcp.tool()
def sync_tool(param: str) -> dict:
return {"result": param.upper()}
Async tool (for I/O operations, API calls)
异步工具(用于I/O操作、API调用)
@mcp.tool()
async def async_tool(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
undefined@mcp.tool()
async def async_tool(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
undefined2. Resources
2. 资源
Resources expose static or dynamic data to LLMs:
python
undefined资源向LLM暴露静态或动态数据:
python
undefinedStatic resource
静态资源
@mcp.resource("data://config")
def get_config() -> dict:
"""Provide application configuration."""
return {
"version": "1.0.0",
"features": ["auth", "api", "cache"]
}
@mcp.resource("data://config")
def get_config() -> dict:
"""提供应用配置。"""
return {
"version": "1.0.0",
"features": ["auth", "api", "cache"]
}
Dynamic resource
动态资源
@mcp.resource("info://status")
async def server_status() -> dict:
"""Get current server status."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"api_configured": bool(os.getenv("API_KEY"))
}
**Resource URI Schemes:**
- `data://` - Generic data
- `file://` - File resources
- `resource://` - General resources
- `info://` - Information/metadata
- `api://` - API endpoints
- Custom schemes allowed@mcp.resource("info://status")
async def server_status() -> dict:
"""获取当前服务器状态。"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"api_configured": bool(os.getenv("API_KEY"))
}
**资源URI方案:**
- `data://` - 通用数据
- `file://` - 文件资源
- `resource://` - 通用资源
- `info://` - 信息/元数据
- `api://` - API端点
- 允许自定义方案3. Resource Templates
3. 资源模板
Dynamic resources with parameters in the URI:
python
undefinedURI中包含参数的动态资源:
python
undefinedSingle parameter
单个参数
@mcp.resource("user://{user_id}/profile")
async def get_user_profile(user_id: str) -> dict:
"""Get user profile by ID."""
user = await fetch_user_from_db(user_id)
return {
"id": user_id,
"name": user.name,
"email": user.email
}
@mcp.resource("user://{user_id}/profile")
async def get_user_profile(user_id: str) -> dict:
"""根据ID获取用户资料。"""
user = await fetch_user_from_db(user_id)
return {
"id": user_id,
"name": user.name,
"email": user.email
}
Multiple parameters
多个参数
@mcp.resource("org://{org_id}/team/{team_id}/members")
async def get_team_members(org_id: str, team_id: str) -> list:
"""Get team members with org context."""
return await db.query(
"SELECT * FROM members WHERE org_id = ? AND team_id = ?",
[org_id, team_id]
)
**Critical:** Parameter names must match exactly between URI template and function signature.@mcp.resource("org://{org_id}/team/{team_id}/members")
async def get_team_members(org_id: str, team_id: str) -> list:
"""获取组织上下文下的团队成员。"""
return await db.query(
"SELECT * FROM members WHERE org_id = ? AND team_id = ?",
[org_id, team_id]
)
**关键注意事项:** URI模板中的参数名称必须与函数签名中的参数名称完全匹配。4. Prompts
4. 提示词
Pre-configured prompts for LLMs:
python
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
"""Generate analysis prompt."""
return f"""
Analyze {topic} considering:
1. Current state
2. Challenges
3. Opportunities
4. Recommendations
Use available tools to gather data.
"""
@mcp.prompt("help")
def help_prompt() -> str:
"""Generate help text for server."""
return """
Welcome to My Server!
Available tools:
- search: Search for items
- process: Process data
Available resources:
- info://status: Server status
"""为LLM预配置的提示词:
python
@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
"""生成分析提示词。"""
return f"""
分析{topic}时请考虑:
1. 当前状态
2. 挑战
3. 机遇
4. 建议
使用可用工具收集数据。
"""
@mcp.prompt("help")
def help_prompt() -> str:
"""生成服务器帮助文本。"""
return """
欢迎使用My Server!
可用工具:
- search: 搜索项目
- process: 处理数据
可用资源:
- info://status: 服务器状态
"""Context Features
上下文特性
FastMCP provides advanced features through context injection:
FastMCP通过上下文注入提供高级功能:
1. Elicitation (User Input)
1. 启发式交互(用户输入)
Request user input during tool execution:
python
from fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
"""Perform action with user confirmation."""
# Request confirmation from user
confirmed = await context.request_elicitation(
prompt=f"Confirm {action}? (yes/no)",
response_type=str
)
if confirmed.lower() == "yes":
result = await perform_action(action)
return {"status": "completed", "action": action}
else:
return {"status": "cancelled", "action": action}在工具执行期间请求用户输入:
python
from fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
"""在用户确认后执行操作。"""
# 向用户请求确认
confirmed = await context.request_elicitation(
prompt=f"确认执行{action}?(是/否)",
response_type=str
)
if confirmed.lower() == "yes":
result = await perform_action(action)
return {"status": "completed", "action": action}
else:
return {"status": "cancelled", "action": action}2. Progress Tracking
2. 进度跟踪
Report progress for long-running operations:
python
@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
"""Import data with progress updates."""
data = await read_file(file_path)
total = len(data)
imported = []
for i, item in enumerate(data):
# Report progress
await context.report_progress(
progress=i + 1,
total=total,
message=f"Importing item {i + 1}/{total}"
)
result = await import_item(item)
imported.append(result)
return {"imported": len(imported), "total": total}为长时间运行的操作报告进度:
python
@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
"""导入数据并更新进度。"""
data = await read_file(file_path)
total = len(data)
imported = []
for i, item in enumerate(data):
# 报告进度
await context.report_progress(
progress=i + 1,
total=total,
message=f"正在导入第{i + 1}/{total}项"
)
result = await import_item(item)
imported.append(result)
return {"imported": len(imported), "total": total}3. Sampling (LLM Integration)
3. 采样(LLM集成)
Request LLM completions from within tools:
python
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
"""Enhance text using LLM."""
response = await context.request_sampling(
messages=[{
"role": "system",
"content": "You are a professional copywriter."
}, {
"role": "user",
"content": f"Enhance this text: {text}"
}],
temperature=0.7,
max_tokens=500
)
return response["content"]在工具内部请求LLM补全:
python
@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
"""使用LLM优化文本。"""
response = await context.request_sampling(
messages=[{
"role": "system",
"content": "你是一名专业文案。"
}, {
"role": "user",
"content": f"优化以下文本:{text}"
}],
temperature=0.7,
max_tokens=500
)
return response["content"]Storage Backends
存储后端
FastMCP supports pluggable storage backends built on the library. Storage backends enable persistent state for OAuth tokens, response caching, and client-side token storage.
py-key-value-aioFastMCP支持基于库的可插拔存储后端。存储后端为OAuth令牌、响应缓存和客户端令牌存储提供持久化状态。
py-key-value-aioAvailable Backends
可用后端
Memory Store (Default):
- Ephemeral storage (lost on restart)
- Fast, no configuration needed
- Good for development
Disk Store:
- Persistent storage on local filesystem
- Encrypted by default with
FernetEncryptionWrapper - Platform-aware defaults (Mac/Windows use disk, Linux uses memory)
Redis Store:
- Distributed storage for production
- Supports multi-instance deployments
- Ideal for response caching across servers
Other Supported:
- DynamoDB (AWS)
- MongoDB
- Elasticsearch
- Memcached
- RocksDB
- Valkey
内存存储(默认):
- 临时存储(重启后丢失)
- 速度快,无需配置
- 适合开发环境
磁盘存储:
- 本地文件系统上的持久化存储
- 默认使用加密
FernetEncryptionWrapper - 感知平台的默认配置(Mac/Windows使用磁盘,Linux使用内存)
Redis存储:
- 生产环境的分布式存储
- 支持多实例部署
- 适合跨服务器的响应缓存
其他支持的后端:
- DynamoDB(AWS)
- MongoDB
- Elasticsearch
- Memcached
- RocksDB
- Valkey
Basic Usage
基础用法
python
from fastmcp import FastMCP
from key_value.stores import MemoryStore, DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import ospython
from fastmcp import FastMCP
from key_value.stores import MemoryStore, DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import osMemory storage (default)
内存存储(默认)
mcp = FastMCP("My Server")
mcp = FastMCP("My Server")
Disk storage (persistent)
磁盘存储(持久化)
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/app/data/storage")
)
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/app/data/storage")
)
Redis storage (production)
Redis存储(生产环境)
from key_value.stores import RedisStore
mcp = FastMCP(
"My Server",
storage=RedisStore(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD")
)
)
undefinedfrom key_value.stores import RedisStore
mcp = FastMCP(
"My Server",
storage=RedisStore(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD")
)
)
undefinedEncrypted Storage
加密存储
Storage backends support automatic encryption:
python
from cryptography.fernet import Fernet
from key_value.encryption import FernetEncryptionWrapper
from key_value.stores import DiskStore存储后端支持自动加密:
python
from cryptography.fernet import Fernet
from key_value.encryption import FernetEncryptionWrapper
from key_value.stores import DiskStoreGenerate encryption key (store in environment!)
生成加密密钥(请存储在环境变量中!)
key = Fernet.generate_key()
key = Fernet.generate_key()
Use encrypted storage
使用加密存储
encrypted_storage = FernetEncryptionWrapper(
key_value=DiskStore(path="/app/data/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
mcp = FastMCP("My Server", storage=encrypted_storage)
undefinedencrypted_storage = FernetEncryptionWrapper(
key_value=DiskStore(path="/app/data/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
mcp = FastMCP("My Server", storage=encrypted_storage)
undefinedOAuth Token Storage
OAuth令牌存储
Storage backends automatically persist OAuth tokens:
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet存储后端会自动持久化OAuth令牌:
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import FernetProduction OAuth with encrypted Redis storage
生产环境OAuth + 加密Redis存储
auth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://provider.com/oauth/authorize",
upstream_token_endpoint="https://provider.com/oauth/token",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
mcp = FastMCP("OAuth Server", auth=auth)
undefinedauth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://provider.com/oauth/authorize",
upstream_token_endpoint="https://provider.com/oauth/token",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
mcp = FastMCP("OAuth Server", auth=auth)
undefinedPlatform-Aware Defaults
平台感知默认配置
FastMCP automatically chooses storage based on platform:
- Mac/Windows: Disk storage (persistent)
- Linux: Memory storage (ephemeral)
- Override: Set parameter explicitly
storage
python
undefinedFastMCP会根据平台自动选择存储:
- Mac/Windows:磁盘存储(持久化)
- Linux:内存存储(临时)
- 覆盖默认:显式设置参数
storage
python
undefinedExplicitly use disk storage on Linux
在Linux上显式使用磁盘存储
from key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/var/lib/mcp/storage")
)
undefinedfrom key_value.stores import DiskStore
mcp = FastMCP(
"My Server",
storage=DiskStore(path="/var/lib/mcp/storage")
)
undefinedServer Lifespans
服务器生命周期
Server lifespans provide initialization and cleanup hooks that run once per server instance (NOT per client session). This is critical for managing database connections, API clients, and other resources.
⚠️ Breaking Change in v2.13.0: Lifespan behavior changed from per-session to per-server-instance.
服务器生命周期提供初始化和清理钩子,每个服务器实例运行一次(而非每个客户端会话)。这对于管理数据库连接、API客户端和其他资源至关重要。
⚠️ v2.13.0中的重大变更:生命周期行为从每个会话变为每个服务器实例。
Basic Pattern
基础模式
python
from fastmcp import FastMCP
from contextlib import asynccontextmanager
from typing import AsyncIterator
from dataclasses import dataclass
@dataclass
class AppContext:
"""Shared application state."""
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Initialize resources on startup, cleanup on shutdown.
Runs ONCE per server instance, NOT per client session.
"""
# Startup: Initialize resources
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
print("Server initialized")
try:
# Yield context to tools
yield AppContext(db=db, api_client=api_client)
finally:
# Shutdown: Cleanup resources
await db.disconnect()
await api_client.aclose()
print("Server shutdown complete")python
from fastmcp import FastMCP
from contextlib import asynccontextmanager
from typing import AsyncIterator
from dataclasses import dataclass
@dataclass
class AppContext:
"""共享应用状态。"""
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
启动时初始化资源,关闭时清理资源。
每个服务器实例运行一次,而非每个客户端会话。
"""
# 启动:初始化资源
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
print("服务器已初始化")
try:
# 将上下文传递给工具
yield AppContext(db=db, api_client=api_client)
finally:
# 关闭:清理资源
await db.disconnect()
await api_client.aclose()
print("服务器关闭完成")Create server with lifespan
创建带生命周期的服务器
mcp = FastMCP("My Server", lifespan=app_lifespan)
mcp = FastMCP("My Server", lifespan=app_lifespan)
Access context in tools
在工具中访问上下文
from fastmcp import Context
@mcp.tool()
async def query_database(sql: str, context: Context) -> list:
"""Query database using shared connection."""
# Access lifespan context
app_context: AppContext = context.fastmcp_context.lifespan_context
return await app_context.db.query(sql)
@mcp.tool()
async def api_request(endpoint: str, context: Context) -> dict:
"""Make API request using shared client."""
app_context: AppContext = context.fastmcp_context.lifespan_context
response = await app_context.api_client.get(endpoint)
return response.json()
undefinedfrom fastmcp import Context
@mcp.tool()
async def query_database(sql: str, context: Context) -> list:
"""使用共享连接查询数据库。"""
# 访问生命周期上下文
app_context: AppContext = context.fastmcp_context.lifespan_context
return await app_context.db.query(sql)
@mcp.tool()
async def api_request(endpoint: str, context: Context) -> dict:
"""使用共享客户端发起API请求。"""
app_context: AppContext = context.fastmcp_context.lifespan_context
response = await app_context.api_client.get(endpoint)
return response.json()
undefinedASGI Integration
ASGI集成
When using FastMCP with ASGI apps (FastAPI, Starlette), you must pass the lifespan explicitly:
python
from fastapi import FastAPI
from fastmcp import FastMCP当将FastMCP与ASGI应用(FastAPI、Starlette)一起使用时,必须显式传递生命周期:
python
from fastapi import FastAPI
from fastmcp import FastMCPFastMCP lifespan
FastMCP生命周期
@asynccontextmanager
async def mcp_lifespan(server: FastMCP):
print("MCP server starting")
yield
print("MCP server stopping")
mcp = FastMCP("My Server", lifespan=mcp_lifespan)
@asynccontextmanager
async def mcp_lifespan(server: FastMCP):
print("MCP服务器启动")
yield
print("MCP服务器停止")
mcp = FastMCP("My Server", lifespan=mcp_lifespan)
FastAPI app MUST include MCP lifespan
FastAPI应用必须包含MCP生命周期
app = FastAPI(lifespan=mcp.lifespan)
app = FastAPI(lifespan=mcp.lifespan)
Add routes
添加路由
@app.get("/")
def root():
return {"message": "Hello World"}
**❌ WRONG**: Not passing lifespan to parent app
```python
app = FastAPI() # MCP lifespan won't run!✅ CORRECT: Pass MCP lifespan to parent app
python
app = FastAPI(lifespan=mcp.lifespan)@app.get("/")
def root():
return {"message": "Hello World"}
**❌ 错误做法**:未将生命周期传递给父应用
```python
app = FastAPI() # MCP生命周期不会运行!✅ 正确做法:将MCP生命周期传递给父应用
python
app = FastAPI(lifespan=mcp.lifespan)State Management
状态管理
Store and retrieve state during server lifetime:
python
from fastmcp import Context
@mcp.tool()
async def set_config(key: str, value: str, context: Context) -> dict:
"""Store configuration value."""
context.fastmcp_context.set_state(key, value)
return {"status": "saved", "key": key}
@mcp.tool()
async def get_config(key: str, context: Context) -> dict:
"""Retrieve configuration value."""
value = context.fastmcp_context.get_state(key, default=None)
if value is None:
return {"error": f"Key '{key}' not found"}
return {"key": key, "value": value}在服务器运行期间存储和检索状态:
python
from fastmcp import Context
@mcp.tool()
async def set_config(key: str, value: str, context: Context) -> dict:
"""存储配置值。"""
context.fastmcp_context.set_state(key, value)
return {"status": "saved", "key": key}
@mcp.tool()
async def get_config(key: str, context: Context) -> dict:
"""检索配置值。"""
value = context.fastmcp_context.get_state(key, default=None)
if value is None:
return {"error": f"未找到密钥'{key}'"}
return {"key": key, "value": value}Middleware System
中间件系统
FastMCP provides an MCP-native middleware system for cross-cutting functionality like logging, rate limiting, caching, and error handling.
FastMCP提供原生MCP中间件系统,用于实现日志记录、速率限制、缓存和错误处理等横切功能。
Built-in Middleware (8 Types)
内置中间件(8种类型)
- TimingMiddleware - Performance monitoring
- ResponseCachingMiddleware - TTL-based caching with pluggable storage
- LoggingMiddleware - Human-readable and JSON-structured logging
- RateLimitingMiddleware - Token bucket and sliding window algorithms
- ErrorHandlingMiddleware - Consistent error management
- ToolInjectionMiddleware - Dynamic tool injection
- PromptToolMiddleware - Tool-based prompt access for limited clients
- ResourceToolMiddleware - Tool-based resource access for limited clients
- TimingMiddleware - 性能监控
- ResponseCachingMiddleware - 基于TTL的缓存,支持可插拔存储
- LoggingMiddleware - 人类可读和JSON结构化日志
- RateLimitingMiddleware - 令牌桶和滑动窗口算法
- ErrorHandlingMiddleware - 一致的错误管理
- ToolInjectionMiddleware - 动态工具注入
- PromptToolMiddleware - 为受限客户端提供基于工具的提示词访问
- ResourceToolMiddleware - 为受限客户端提供基于工具的资源访问
Basic Usage
基础用法
python
from fastmcp import FastMCP
from fastmcp.middleware import (
TimingMiddleware,
LoggingMiddleware,
RateLimitingMiddleware,
ResponseCachingMiddleware,
ErrorHandlingMiddleware
)
mcp = FastMCP("My Server")python
from fastmcp import FastMCP
from fastmcp.middleware import (
TimingMiddleware,
LoggingMiddleware,
RateLimitingMiddleware,
ResponseCachingMiddleware,
ErrorHandlingMiddleware
)
mcp = FastMCP("My Server")Add middleware (order matters!)
添加中间件(顺序很重要!)
mcp.add_middleware(ErrorHandlingMiddleware())
mcp.add_middleware(TimingMiddleware())
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(
max_requests=100,
window_seconds=60,
algorithm="token_bucket"
))
mcp.add_middleware(ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost")
))
undefinedmcp.add_middleware(ErrorHandlingMiddleware())
mcp.add_middleware(TimingMiddleware())
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(
max_requests=100,
window_seconds=60,
algorithm="token_bucket"
))
mcp.add_middleware(ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost")
))
undefinedMiddleware Execution Order
中间件执行顺序
Middleware executes in order added:
Request Flow:
→ ErrorHandlingMiddleware (catches errors)
→ TimingMiddleware (starts timer)
→ LoggingMiddleware (logs request)
→ RateLimitingMiddleware (checks rate limit)
→ ResponseCachingMiddleware (checks cache)
→ Tool/Resource Handler
← ResponseCachingMiddleware (stores in cache)
← RateLimitingMiddleware
← LoggingMiddleware (logs response)
← TimingMiddleware (stops timer, logs duration)
← ErrorHandlingMiddleware (returns error if any)中间件按添加顺序执行:
请求流程:
→ ErrorHandlingMiddleware(捕获错误)
→ TimingMiddleware(启动计时器)
→ LoggingMiddleware(记录请求)
→ RateLimitingMiddleware(检查速率限制)
→ ResponseCachingMiddleware(检查缓存)
→ 工具/资源处理器
← ResponseCachingMiddleware(存储到缓存)
← RateLimitingMiddleware
← LoggingMiddleware(记录响应)
← TimingMiddleware(停止计时器,记录耗时)
← ErrorHandlingMiddleware(如有错误则返回)Custom Middleware
自定义中间件
Create custom middleware using hooks:
python
from fastmcp.middleware import BaseMiddleware
from fastmcp import Context
class AccessControlMiddleware(BaseMiddleware):
"""Check authorization before tool execution."""
def __init__(self, allowed_users: list[str]):
self.allowed_users = allowed_users
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""Hook runs before tool execution."""
# Get user from context (from auth)
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"User '{user}' not authorized")
# Continue to tool
return await self.next(tool_name, arguments, context)使用钩子创建自定义中间件:
python
from fastmcp.middleware import BaseMiddleware
from fastmcp import Context
class AccessControlMiddleware(BaseMiddleware):
"""在工具执行前检查授权。"""
def __init__(self, allowed_users: list[str]):
self.allowed_users = allowed_users
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""工具执行前运行的钩子。"""
# 从上下文中获取用户(来自认证)
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"用户'{user}'未被授权")
# 继续执行工具
return await self.next(tool_name, arguments, context)Add to server
添加到服务器
mcp.add_middleware(AccessControlMiddleware(
allowed_users=["alice", "bob", "charlie"]
))
undefinedmcp.add_middleware(AccessControlMiddleware(
allowed_users=["alice", "bob", "charlie"]
))
undefinedHook Hierarchy
钩子层次
Middleware hooks from most general to most specific:
- - All messages (requests and notifications)
on_message - /
on_request- By message typeon_notification - ,
on_call_tool,on_read_resource- Operation-specificon_get_prompt - ,
on_list_tools,on_list_resources,on_list_prompts- List operationson_list_resource_templates
python
class ComprehensiveMiddleware(BaseMiddleware):
async def on_message(self, message: dict, context: Context):
"""Runs for ALL messages."""
print(f"Message: {message['method']}")
return await self.next(message, context)
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""Runs only for tool calls."""
print(f"Tool: {tool_name}")
return await self.next(tool_name, arguments, context)
async def on_read_resource(self, uri: str, context: Context):
"""Runs only for resource reads."""
print(f"Resource: {uri}")
return await self.next(uri, context)中间件钩子从最通用到最具体:
- - 所有消息(请求和通知)
on_message - /
on_request- 按消息类型on_notification - ,
on_call_tool,on_read_resource- 特定操作on_get_prompt - ,
on_list_tools,on_list_resources,on_list_prompts- 列表操作on_list_resource_templates
python
class ComprehensiveMiddleware(BaseMiddleware):
async def on_message(self, message: dict, context: Context):
"""对所有消息运行。"""
print(f"消息: {message['method']}")
return await self.next(message, context)
async def on_call_tool(self, tool_name: str, arguments: dict, context: Context):
"""仅对工具调用运行。"""
print(f"工具: {tool_name}")
return await self.next(tool_name, arguments, context)
async def on_read_resource(self, uri: str, context: Context):
"""仅对资源读取运行。"""
print(f"资源: {uri}")
return await self.next(uri, context)Response Caching Middleware
响应缓存中间件
Improve performance by caching expensive operations:
python
from fastmcp.middleware import ResponseCachingMiddleware
from key_value.stores import RedisStore通过缓存昂贵的操作来提高性能:
python
from fastmcp.middleware import ResponseCachingMiddleware
from key_value.stores import RedisStoreCache responses for 5 minutes
缓存响应5分钟
cache_middleware = ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost"), # Shared across instances
cache_tools=True, # Cache tool calls
cache_resources=True, # Cache resource reads
cache_prompts=False # Don't cache prompts
)
mcp.add_middleware(cache_middleware)
cache_middleware = ResponseCachingMiddleware(
ttl_seconds=300,
storage=RedisStore(host="localhost"), # 跨实例共享
cache_tools=True, # 缓存工具调用
cache_resources=True, # 缓存资源读取
cache_prompts=False # 不缓存提示词
)
mcp.add_middleware(cache_middleware)
Tools/resources are automatically cached
工具/资源会被自动缓存
@mcp.tool()
async def expensive_computation(data: str) -> dict:
"""This will be cached for 5 minutes."""
import time
time.sleep(5) # Expensive operation
return {"result": process(data)}
undefined@mcp.tool()
async def expensive_computation(data: str) -> dict:
"""此操作将被缓存5分钟。"""
import time
time.sleep(5) # 昂贵操作
return {"result": process(data)}
undefinedServer Composition
服务器组合
Organize tools, resources, and prompts into modular components using server composition.
使用服务器组合将工具、资源和提示词组织成模块化组件。
Two Strategies
两种策略
1. - Static Snapshot:
import_server()- One-time copy of components at import time
- Changes to subserver don't propagate
- Fast (no runtime delegation)
- Use for: Bundling finalized components
2. - Dynamic Link:
mount()- Live runtime link to subserver
- Changes to subserver immediately visible
- Runtime delegation (slower)
- Use for: Modular runtime composition
1. - 静态快照:
import_server()- 导入时一次性复制组件
- 子服务器的更改不会传播
- 速度快(无运行时委托)
- 适用场景:打包已完成的组件
2. - 动态链接:
mount()- 与子服务器的实时运行时链接
- 子服务器的更改立即可见
- 运行时委托(速度较慢)
- 适用场景:模块化运行时组合
Import Server (Static)
导入服务器(静态)
python
from fastmcp import FastMCPpython
from fastmcp import FastMCPSubserver with tools
带工具的子服务器
api_server = FastMCP("API Server")
@api_server.tool()
def api_tool():
return "API result"
@api_server.resource("api://status")
def api_status():
return {"status": "ok"}
api_server = FastMCP("API Server")
@api_server.tool()
def api_tool():
return "API result"
@api_server.resource("api://status")
def api_status():
return {"status": "ok"}
Main server imports components
主服务器导入组件
main_server = FastMCP("Main Server")
main_server = FastMCP("Main Server")
Import all components from subserver
从子服务器导入所有组件
main_server.import_server(api_server)
main_server.import_server(api_server)
Now main_server has api_tool and api://status
现在main_server拥有api_tool和api://status
Changes to api_server won't affect main_server
对api_server的更改不会影响main_server
undefinedundefinedMount Server (Dynamic)
挂载服务器(动态)
python
from fastmcp import FastMCPpython
from fastmcp import FastMCPCreate servers
创建服务器
api_server = FastMCP("API Server")
db_server = FastMCP("DB Server")
@api_server.tool()
def fetch_data():
return "API data"
@db_server.tool()
def query_db():
return "DB result"
api_server = FastMCP("API Server")
db_server = FastMCP("DB Server")
@api_server.tool()
def fetch_data():
return "API data"
@db_server.tool()
def query_db():
return "DB result"
Main server mounts subservers
主服务器挂载子服务器
main_server = FastMCP("Main Server")
main_server = FastMCP("Main Server")
Mount with prefix
带前缀挂载
main_server.mount(api_server, prefix="api")
main_server.mount(db_server, prefix="db")
main_server.mount(api_server, prefix="api")
main_server.mount(db_server, prefix="db")
Tools are namespaced:
工具会被命名空间化:
- api.fetch_data
- api.fetch_data
- db.query_db
- db.query_db
Resources are prefixed:
资源会被添加前缀:
- resource://api/path/to/resource
- resource://api/path/to/resource
- resource://db/path/to/resource
- resource://db/path/to/resource
undefinedundefinedMounting Modes
挂载模式
Direct Mounting (Default):
python
undefined直接挂载(默认):
python
undefinedIn-memory access, subserver runs in same process
内存中访问,子服务器在同一进程中运行
main_server.mount(subserver, prefix="sub")
**Proxy Mounting**:
```pythonmain_server.mount(subserver, prefix="sub")
**代理挂载:**
```pythonTreats subserver as separate entity with own lifecycle
将子服务器视为独立实体,拥有自己的生命周期
main_server.mount(
subserver,
prefix="sub",
mode="proxy"
)
undefinedmain_server.mount(
subserver,
prefix="sub",
mode="proxy"
)
undefinedTag Filtering
标签过滤
Filter components when importing/mounting:
python
undefined在导入/挂载时过滤组件:
python
undefinedTag subserver components
为子服务器组件添加标签
@api_server.tool(tags=["public"])
def public_api():
return "Public"
@api_server.tool(tags=["admin"])
def admin_api():
return "Admin only"
@api_server.tool(tags=["public"])
def public_api():
return "Public"
@api_server.tool(tags=["admin"])
def admin_api():
return "Admin only"
Import only public tools
仅导入公共工具
main_server.import_server(
api_server,
include_tags=["public"]
)
main_server.import_server(
api_server,
include_tags=["public"]
)
Or exclude admin tools
或排除管理员工具
main_server.import_server(
api_server,
exclude_tags=["admin"]
)
main_server.import_server(
api_server,
exclude_tags=["admin"]
)
Tag filtering is recursive with mount()
标签过滤在mount()中是递归的
main_server.mount(
api_server,
prefix="api",
include_tags=["public"]
)
undefinedmain_server.mount(
api_server,
prefix="api",
include_tags=["public"]
)
undefinedResource Prefix Formats
资源前缀格式
Path Format (Default since v2.4.0):
resource://prefix/path/to/resourceProtocol Format (Legacy):
prefix+resource://path/to/resourceConfigure format:
python
main_server.mount(
subserver,
prefix="api",
resource_prefix_format="path" # or "protocol"
)路径格式(v2.4.0起默认):
resource://prefix/path/to/resource协议格式(旧版):
prefix+resource://path/to/resource配置格式:
python
main_server.mount(
subserver,
prefix="api",
resource_prefix_format="path" # 或 "protocol"
)OAuth Proxy & Authentication
OAuth代理与认证
FastMCP provides comprehensive authentication support for HTTP-based transports, including an OAuth Proxy for providers that don't support Dynamic Client Registration (DCR).
FastMCP为基于HTTP的传输提供全面的认证支持,包括OAuth代理,用于不支持动态客户端注册(DCR)的提供商。
Four Authentication Patterns
四种认证模式
- Token Validation (/
TokenVerifier) - Validate external tokensJWTVerifier - External Identity Providers () - OAuth 2.0/OIDC with DCR
RemoteAuthProvider - OAuth Proxy () - Bridge to traditional OAuth providers
OAuthProxy - Full OAuth () - Complete authorization server
OAuthProvider
- 令牌验证 (/
TokenVerifier) - 验证外部令牌JWTVerifier - 外部身份提供商 () - 支持DCR的OAuth 2.0/OIDC
RemoteAuthProvider - OAuth代理 () - 桥接到传统OAuth提供商
OAuthProxy - 完整OAuth () - 完整的授权服务器
OAuthProvider
Pattern 1: Token Validation
模式1:令牌验证
Validate tokens issued by external systems:
python
from fastmcp import FastMCP
from fastmcp.auth import JWTVerifier验证外部系统颁发的令牌:
python
from fastmcp import FastMCP
from fastmcp.auth import JWTVerifierJWT verification
JWT验证
auth = JWTVerifier(
issuer="https://auth.example.com",
audience="my-mcp-server",
public_key=os.getenv("JWT_PUBLIC_KEY")
)
mcp = FastMCP("Secure Server", auth=auth)
@mcp.tool()
async def secure_operation(context: Context) -> dict:
"""Only accessible with valid JWT."""
# Token validated automatically
user = context.fastmcp_context.get_state("user_id")
return {"user": user, "status": "authorized"}
undefinedauth = JWTVerifier(
issuer="https://auth.example.com",
audience="my-mcp-server",
public_key=os.getenv("JWT_PUBLIC_KEY")
)
mcp = FastMCP("Secure Server", auth=auth)
@mcp.tool()
async def secure_operation(context: Context) -> dict:
"""仅可通过有效JWT访问。"""
# 令牌会自动验证
user = context.fastmcp_context.get_state("user_id")
return {"user": user, "status": "authorized"}
undefinedPattern 2: External Identity Providers
模式2:外部身份提供商
Use OAuth 2.0/OIDC providers with Dynamic Client Registration:
python
from fastmcp.auth import RemoteAuthProvider
auth = RemoteAuthProvider(
issuer="https://auth.example.com",
# Provider must support DCR
)
mcp = FastMCP("OAuth Server", auth=auth)使用支持动态客户端注册的OAuth 2.0/OIDC提供商:
python
from fastmcp.auth import RemoteAuthProvider
auth = RemoteAuthProvider(
issuer="https://auth.example.com",
# 提供商必须支持DCR
)
mcp = FastMCP("OAuth Server", auth=auth)Pattern 3: OAuth Proxy (Recommended for Production)
模式3:OAuth代理(生产环境推荐)
Bridge to OAuth providers without DCR support (GitHub, Google, Azure, AWS, Discord, Facebook, etc.):
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
auth = OAuthProxy(
# JWT signing for issued tokens
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
# Encrypted storage for upstream tokens
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
# Upstream OAuth provider
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
# Scopes
upstream_scope="read:user user:email",
# Security: Enable consent screen (prevents confused deputy attacks)
enable_consent_screen=True
)
mcp = FastMCP("GitHub Auth Server", auth=auth)桥接到不支持DCR的OAuth提供商(GitHub、Google、Azure、AWS、Discord、Facebook等):
python
from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
import os
auth = OAuthProxy(
# 颁发令牌的JWT签名密钥
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
# 上游令牌的加密存储
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
# 上游OAuth提供商
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
# 权限范围
upstream_scope="read:user user:email",
# 安全:启用同意屏幕(防止混淆代理攻击)
enable_consent_screen=True
)
mcp = FastMCP("GitHub Auth Server", auth=auth)OAuth Proxy Features
OAuth代理特性
Token Factory Pattern:
- Proxy issues its own JWTs (not forwarding upstream tokens)
- Upstream tokens stored encrypted
- Proxy tokens can have custom claims
Consent Screens:
- Prevents authorization bypass attacks
- Shows user what permissions are being granted
- Required for security compliance
PKCE Support:
- End-to-end validation from client to upstream
- Protects against authorization code interception
RFC 7662 Token Introspection:
- Validate tokens with upstream provider
- Check revocation status
令牌工厂模式:
- 代理颁发自己的JWT(不转发上游令牌)
- 上游令牌加密存储
- 代理令牌可包含自定义声明
同意屏幕:
- 防止授权绕过攻击
- 向用户显示授予的权限
- 安全合规要求
PKCE支持:
- 从客户端到上游的端到端验证
- 防止授权码拦截
RFC 7662令牌 introspection:
- 与上游提供商验证令牌
- 检查吊销状态
Pattern 4: Full OAuth Provider
模式4:完整OAuth提供商
Run complete authorization server:
python
from fastmcp.auth import OAuthProvider
auth = OAuthProvider(
issuer="https://my-auth-server.com",
client_storage=RedisStore(host="localhost"),
# Full OAuth 2.0 server implementation
)
mcp = FastMCP("Auth Server", auth=auth)运行完整的授权服务器:
python
from fastmcp.auth import OAuthProvider
auth = OAuthProvider(
issuer="https://my-auth-server.com",
client_storage=RedisStore(host="localhost"),
# 完整的OAuth 2.0服务器实现
)
mcp = FastMCP("Auth Server", auth=auth)Environment-Based Configuration
基于环境的配置
Auto-detect auth from environment:
bash
export FASTMCP_SERVER_AUTH='{"type": "oauth_proxy", "upstream_authorization_endpoint": "...", ...}'python
undefined从环境变量自动检测认证配置:
bash
export FASTMCP_SERVER_AUTH='{"type": "oauth_proxy", "upstream_authorization_endpoint": "...", ...}'python
undefinedAutomatically configures from FASTMCP_SERVER_AUTH
自动从FASTMCP_SERVER_AUTH配置
mcp = FastMCP("Auto Auth Server")
undefinedmcp = FastMCP("Auto Auth Server")
undefinedSupported OAuth Providers
支持的OAuth提供商
- GitHub:
https://github.com/login/oauth/authorize - Google:
https://accounts.google.com/o/oauth2/v2/auth - Azure:
https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize - AWS Cognito:
https://{domain}.auth.{region}.amazoncognito.com/oauth2/authorize - Discord:
https://discord.com/api/oauth2/authorize - Facebook:
https://www.facebook.com/v12.0/dialog/oauth - WorkOS: Enterprise identity
- AuthKit: Authentication toolkit
- Descope: Auth platform
- Scalekit: Enterprise SSO
- GitHub:
https://github.com/login/oauth/authorize - Google:
https://accounts.google.com/o/oauth2/v2/auth - Azure:
https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize - AWS Cognito:
https://{domain}.auth.{region}.amazoncognito.com/oauth2/authorize - Discord:
https://discord.com/api/oauth2/authorize - Facebook:
https://www.facebook.com/v12.0/dialog/oauth - WorkOS:企业身份认证
- AuthKit:认证工具包
- Descope:认证平台
- Scalekit:企业SSO
Icons Support
图标支持
Add visual representations to servers, tools, resources, and prompts for better UX in MCP clients.
为服务器、工具、资源和提示词添加视觉表示,以提升MCP客户端的用户体验。
Server-Level Icons
服务器级图标
python
from fastmcp import FastMCP, Icon
mcp = FastMCP(
name="Weather Service",
website_url="https://weather.example.com",
icons=[
Icon(
url="https://example.com/icon-small.png",
size="small"
),
Icon(
url="https://example.com/icon-large.png",
size="large"
)
]
)python
from fastmcp import FastMCP, Icon
mcp = FastMCP(
name="Weather Service",
website_url="https://weather.example.com",
icons=[
Icon(
url="https://example.com/icon-small.png",
size="small"
),
Icon(
url="https://example.com/icon-large.png",
size="large"
)
]
)Component-Level Icons
组件级图标
python
from fastmcp import Icon
@mcp.tool(icons=[
Icon(url="https://example.com/tool-icon.png")
])
async def analyze_data(data: str) -> dict:
"""Analyze data with visual icon."""
return {"result": "analyzed"}
@mcp.resource(
"user://{user_id}/profile",
icons=[Icon(url="https://example.com/user-icon.png")]
)
async def get_user(user_id: str) -> dict:
"""User profile with icon."""
return {"id": user_id, "name": "Alice"}
@mcp.prompt(
"analyze",
icons=[Icon(url="https://example.com/prompt-icon.png")]
)
def analysis_prompt(topic: str) -> str:
"""Analysis prompt with icon."""
return f"Analyze {topic}"python
from fastmcp import Icon
@mcp.tool(icons=[
Icon(url="https://example.com/tool-icon.png")
])
async def analyze_data(data: str) -> dict:
"""带视觉图标的数据分析工具。"""
return {"result": "analyzed"}
@mcp.resource(
"user://{user_id}/profile",
icons=[Icon(url="https://example.com/user-icon.png")]
)
async def get_user(user_id: str) -> dict:
"""带图标的用户资料。"""
return {"id": user_id, "name": "Alice"}
@mcp.prompt(
"analyze",
icons=[Icon(url="https://example.com/prompt-icon.png")]
)
def analysis_prompt(topic: str) -> str:
"""带图标的分析提示词。"""
return f"分析{topic}"Data URI Support
Data URI支持
Embed images directly (useful for self-contained deployments):
python
from fastmcp import Icon, Image直接嵌入图像(适用于独立部署):
python
from fastmcp import Icon, ImageConvert local file to data URI
将本地文件转换为data URI
icon = Icon.from_file("/path/to/icon.png", size="medium")
icon = Icon.from_file("/path/to/icon.png", size="medium")
Or use Image utility
或使用Image工具
image_data_uri = Image.to_data_uri("/path/to/icon.png")
icon = Icon(url=image_data_uri, size="medium")
image_data_uri = Image.to_data_uri("/path/to/icon.png")
icon = Icon(url=image_data_uri, size="medium")
Use in server
在服务器中使用
mcp = FastMCP(
"My Server",
icons=[icon]
)
undefinedmcp = FastMCP(
"My Server",
icons=[icon]
)
undefinedMultiple Sizes
多尺寸支持
Provide different sizes for different contexts:
python
mcp = FastMCP(
"Responsive Server",
icons=[
Icon(url="icon-16.png", size="small"), # 16x16
Icon(url="icon-32.png", size="medium"), # 32x32
Icon(url="icon-64.png", size="large"), # 64x64
]
)为不同场景提供不同尺寸的图标:
python
mcp = FastMCP(
"Responsive Server",
icons=[
Icon(url="icon-16.png", size="small"), # 16x16
Icon(url="icon-32.png", size="medium"), # 32x32
Icon(url="icon-64.png", size="large"), # 64x64
]
)API Integration
API集成
FastMCP provides multiple patterns for API integration:
FastMCP提供多种API集成模式:
Pattern 1: Manual API Integration
模式1:手动API集成
python
import httpx
import ospython
import httpx
import osCreate reusable client
创建可重用客户端
client = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
@mcp.tool()
async def fetch_data(endpoint: str) -> dict:
"""Fetch data from API."""
try:
response = await client.get(endpoint)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
undefinedclient = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=30.0
)
@mcp.tool()
async def fetch_data(endpoint: str) -> dict:
"""从API获取数据。"""
try:
response = await client.get(endpoint)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP {e.response.status_code}"}
except Exception as e:
return {"error": str(e)}
undefinedPattern 2: OpenAPI/Swagger Auto-Generation
模式2:OpenAPI/Swagger自动生成
python
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpxpython
from fastmcp import FastMCP
from fastmcp.server.openapi import RouteMap, MCPType
import httpxLoad OpenAPI spec
加载OpenAPI规范
spec = httpx.get("https://api.example.com/openapi.json").json()
spec = httpx.get("https://api.example.com/openapi.json").json()
Create authenticated client
创建带认证的客户端
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0
)
client = httpx.AsyncClient(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0
)
Auto-generate MCP server from OpenAPI
从OpenAPI自动生成MCP服务器
mcp = FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="API Server",
route_maps=[
# GET with parameters → Resource Templates
RouteMap(
methods=["GET"],
pattern=r".{.}.*",
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# GET without parameters → Resources
RouteMap(
methods=["GET"],
mcp_type=MCPType.RESOURCE
),
# POST/PUT/DELETE → Tools
RouteMap(
methods=["POST", "PUT", "DELETE"],
mcp_type=MCPType.TOOL
),
]
)
mcp = FastMCP.from_openapi(
openapi_spec=spec,
client=client,
name="API Server",
route_maps=[
# 带参数的GET → 资源模板
RouteMap(
methods=["GET"],
pattern=r".{.}.*",
mcp_type=MCPType.RESOURCE_TEMPLATE
),
# 无参数的GET → 资源
RouteMap(
methods=["GET"],
mcp_type=MCPType.RESOURCE
),
# POST/PUT/DELETE → 工具
RouteMap(
methods=["POST", "PUT", "DELETE"],
mcp_type=MCPType.TOOL
),
]
)
Optionally add custom tools
可选:添加自定义工具
@mcp.tool()
async def custom_operation(data: dict) -> dict:
"""Custom tool on top of generated ones."""
return process_data(data)
undefined@mcp.tool()
async def custom_operation(data: dict) -> dict:
"""在自动生成的基础上添加自定义工具。"""
return process_data(data)
undefinedPattern 3: FastAPI Conversion
模式3:FastAPI转换
python
from fastapi import FastAPI
from fastmcp import FastMCPpython
from fastapi import FastAPI
from fastmcp import FastMCPExisting FastAPI app
现有FastAPI应用
app = FastAPI()
@app.get("/items/{item_id}")
def get_item(item_id: int):
return {"id": item_id, "name": "Item"}
app = FastAPI()
@app.get("/items/{item_id}")
def get_item(item_id: int):
return {"id": item_id, "name": "Item"}
Convert to MCP server
转换为MCP服务器
mcp = FastMCP.from_fastapi(
app=app,
httpx_client_kwargs={
"headers": {"Authorization": "Bearer token"}
}
)
undefinedmcp = FastMCP.from_fastapi(
app=app,
httpx_client_kwargs={
"headers": {"Authorization": "Bearer token"}
}
)
undefinedCloud Deployment (FastMCP Cloud)
云部署(FastMCP Cloud)
Critical Requirements
关键要求
❗️ IMPORTANT: These requirements are mandatory for FastMCP Cloud:
- Module-level server object named ,
mcp, orserverapp - PyPI dependencies only in requirements.txt
- Public GitHub repository (or accessible to FastMCP Cloud)
- Environment variables for configuration
❗️ 重要提示: 这些要求是FastMCP Cloud的强制要求:
- 模块级服务器对象,命名为、
mcp或serverapp - 仅使用PyPI依赖,在requirements.txt中声明
- 公开GitHub仓库(或FastMCP Cloud可访问的仓库)
- 使用环境变量进行配置
Cloud-Ready Server Pattern
云就绪服务器模式
python
undefinedpython
undefinedserver.py
server.py
from fastmcp import FastMCP
import os
from fastmcp import FastMCP
import os
✅ CORRECT: Module-level server object
✅ 正确做法:模块级服务器对象
mcp = FastMCP(
name="production-server"
)
mcp = FastMCP(
name="production-server"
)
✅ Use environment variables
✅ 使用环境变量
API_KEY = os.getenv("API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool()
async def production_tool(data: str) -> dict:
"""Production-ready tool."""
if not API_KEY:
return {"error": "API_KEY not configured"}
# Your implementation
return {"status": "success", "data": data}API_KEY = os.getenv("API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL")
@mcp.tool()
async def production_tool(data: str) -> dict:
"""生产就绪工具。"""
if not API_KEY:
return {"error": "API_KEY未配置"}
# 你的实现
return {"status": "success", "data": data}✅ Optional: for local testing
✅ 可选:用于本地测试
if name == "main":
mcp.run()
undefinedif name == "main":
mcp.run()
undefinedCommon Cloud Deployment Errors
常见云部署错误
❌ WRONG: Function-wrapped server
python
def create_server():
mcp = FastMCP("my-server")
return mcp
if __name__ == "__main__":
server = create_server() # Too late for cloud!
server.run()✅ CORRECT: Factory with module export
python
def create_server() -> FastMCP:
mcp = FastMCP("my-server")
# Complex setup logic
return mcp❌ 错误做法:函数包裹的服务器
python
def create_server():
mcp = FastMCP("my-server")
return mcp
if __name__ == "__main__":
server = create_server() # 对云部署来说太晚了!
server.run()✅ 正确做法:带模块导出的工厂函数
python
def create_server() -> FastMCP:
mcp = FastMCP("my-server")
# 复杂的设置逻辑
return mcpExport at module level
在模块级别导出
mcp = create_server()
if name == "main":
mcp.run()
undefinedmcp = create_server()
if name == "main":
mcp.run()
undefinedDeployment Steps
部署步骤
- Prepare Repository:
bash
git init
git add .
git commit -m "Initial MCP server"
gh repo create my-mcp-server --public
git push -u origin main-
Deploy on FastMCP Cloud:
- Visit https://fastmcp.cloud
- Sign in with GitHub
- Click "Create Project"
- Select your repository
- Configure:
- Server Name: Your project name
- Entrypoint:
server.py - Environment Variables: Add any needed
-
Access Your Server:
- URL:
https://your-project.fastmcp.app/mcp - Automatic deployment on push to main
- PR preview deployments
- URL:
- 准备仓库:
bash
git init
git add .
git commit -m "Initial MCP server"
gh repo create my-mcp-server --public
git push -u origin main-
在FastMCP Cloud上部署:
- 访问https://fastmcp.cloud
- 使用GitHub登录
- 点击“Create Project”
- 选择你的仓库
- 配置:
- Server Name: 你的项目名称
- Entrypoint:
server.py - Environment Variables: 添加所需的环境变量
-
访问你的服务器:
- URL:
https://your-project.fastmcp.app/mcp - 推送到main分支时自动部署
- PR预览部署
- URL:
Client Configuration
客户端配置
Claude Desktop
Claude Desktop
Add to :
claude_desktop_config.jsonjson
{
"mcpServers": {
"my-server": {
"url": "https://your-project.fastmcp.app/mcp",
"transport": "http"
}
}
}添加到:
claude_desktop_config.jsonjson
{
"mcpServers": {
"my-server": {
"url": "https://your-project.fastmcp.app/mcp",
"transport": "http"
}
}
}Local Development
本地开发
json
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/absolute/path/to/server.py"],
"env": {
"API_KEY": "your-key",
"DATABASE_URL": "your-db-url"
}
}
}
}json
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/absolute/path/to/server.py"],
"env": {
"API_KEY": "your-key",
"DATABASE_URL": "your-db-url"
}
}
}
}Claude Code CLI
Claude Code CLI
json
{
"mcpServers": {
"my-server": {
"command": "uv",
"args": ["run", "python", "/absolute/path/to/server.py"]
}
}
}json
{
"mcpServers": {
"my-server": {
"command": "uv",
"args": ["run", "python", "/absolute/path/to/server.py"]
}
}
}25 Common Errors (With Solutions)
25种常见错误(及解决方案)
Error 1: Missing Server Object
错误1:缺少服务器对象
Error:
RuntimeError: No server object found at module levelCause: Server object not exported at module level (FastMCP Cloud requirement)
Solution:
python
undefined错误信息:
RuntimeError: No server object found at module level原因: 未在模块级别导出服务器对象(FastMCP Cloud要求)
解决方案:
python
undefined❌ WRONG
❌ 错误做法
def create_server():
return FastMCP("server")
def create_server():
return FastMCP("server")
✅ CORRECT
✅ 正确做法
mcp = FastMCP("server") # At module level
**Source:** FastMCP Cloud documentation, deployment failures
---mcp = FastMCP("server") # 在模块级别定义
**来源:** FastMCP Cloud文档、部署失败案例
---Error 2: Async/Await Confusion
错误2:Async/Await混淆
Error:
RuntimeError: no running event loop
TypeError: object coroutine can't be used in 'await' expressionCause: Mixing sync/async incorrectly
Solution:
python
undefined错误信息:
RuntimeError: no running event loop
TypeError: object coroutine can't be used in 'await' expression原因: 错误地混合同步/异步代码
解决方案:
python
undefined❌ WRONG: Sync function calling async
❌ 错误做法:同步函数调用异步代码
@mcp.tool()
def bad_tool():
result = await async_function() # Error!
@mcp.tool()
def bad_tool():
result = await async_function() # 错误!
✅ CORRECT: Async tool
✅ 正确做法:异步工具
@mcp.tool()
async def good_tool():
result = await async_function()
return result
@mcp.tool()
async def good_tool():
result = await async_function()
return result
✅ CORRECT: Sync tool with sync code
✅ 正确做法:带同步代码的同步工具
@mcp.tool()
def sync_tool():
return "Hello"
**Source:** GitHub issues #156, #203
---@mcp.tool()
def sync_tool():
return "Hello"
**来源:** GitHub issues #156, #203
---Error 3: Context Not Injected
错误3:上下文未注入
Error:
TypeError: missing 1 required positional argument: 'context'Cause: Missing type annotation for context parameter
ContextSolution:
python
from fastmcp import Context错误信息:
TypeError: missing 1 required positional argument: 'context'原因: 上下文参数缺少类型注解
Context解决方案:
python
from fastmcp import Context❌ WRONG: No type hint
❌ 错误做法:无类型提示
@mcp.tool()
async def bad_tool(context): # Missing type!
await context.report_progress(...)
@mcp.tool()
async def bad_tool(context): # 缺少类型!
await context.report_progress(...)
✅ CORRECT: Proper type hint
✅ 正确做法:正确的类型提示
@mcp.tool()
async def good_tool(context: Context):
await context.report_progress(0, 100, "Starting")
**Source:** FastMCP v2 migration guide
---@mcp.tool()
async def good_tool(context: Context):
await context.report_progress(0, 100, "Starting")
**来源:** FastMCP v2迁移指南
---Error 4: Resource URI Syntax
错误4:资源URI语法错误
Error:
ValueError: Invalid resource URI: missing schemeCause: Resource URI missing scheme prefix
Solution:
python
undefined错误信息:
ValueError: Invalid resource URI: missing scheme原因: 资源URI缺少方案前缀
解决方案:
python
undefined❌ WRONG: Missing scheme
❌ 错误做法:缺少方案
@mcp.resource("config")
def get_config(): pass
@mcp.resource("config")
def get_config(): pass
✅ CORRECT: Include scheme
✅ 正确做法:包含方案
@mcp.resource("data://config")
def get_config(): pass
@mcp.resource("data://config")
def get_config(): pass
✅ Valid schemes
✅ 有效的方案
@mcp.resource("file://config.json")
@mcp.resource("api://status")
@mcp.resource("info://health")
**Source:** MCP Protocol specification
---@mcp.resource("file://config.json")
@mcp.resource("api://status")
@mcp.resource("info://health")
**来源:** MCP协议规范
---Error 5: Resource Template Parameter Mismatch
错误5:资源模板参数不匹配
Error:
TypeError: get_user() missing 1 required positional argument: 'user_id'Cause: Function parameter names don't match URI template
Solution:
python
undefined错误信息:
TypeError: get_user() missing 1 required positional argument: 'user_id'原因: 函数参数名称与URI模板不匹配
解决方案:
python
undefined❌ WRONG: Parameter name mismatch
❌ 错误做法:参数名称不匹配
@mcp.resource("user://{user_id}/profile")
def get_user(id: str): # Wrong name!
pass
@mcp.resource("user://{user_id}/profile")
def get_user(id: str): # 名称错误!
pass
✅ CORRECT: Matching names
✅ 正确做法:名称匹配
@mcp.resource("user://{user_id}/profile")
def get_user(user_id: str): # Matches {user_id}
return {"id": user_id}
**Source:** FastMCP patterns documentation
---@mcp.resource("user://{user_id}/profile")
def get_user(user_id: str): # 与{user_id}匹配
return {"id": user_id}
**来源:** FastMCP模式文档
---Error 6: Pydantic Validation Error
错误6:Pydantic验证错误
Error:
ValidationError: value is not a valid integerCause: Type hints don't match provided data
Solution:
python
from pydantic import BaseModel, Field错误信息:
ValidationError: value is not a valid integer原因: 类型提示与提供的数据不匹配
解决方案:
python
from pydantic import BaseModel, Field✅ Use Pydantic models for complex validation
✅ 对复杂验证使用Pydantic模型
class SearchParams(BaseModel):
query: str = Field(min_length=1, max_length=100)
limit: int = Field(default=10, ge=1, le=100)
@mcp.tool()
async def search(params: SearchParams) -> dict:
# Validation automatic
return await perform_search(params.query, params.limit)
**Source:** Pydantic documentation, FastMCP examples
---class SearchParams(BaseModel):
query: str = Field(min_length=1, max_length=100)
limit: int = Field(default=10, ge=1, le=100)
@mcp.tool()
async def search(params: SearchParams) -> dict:
# 自动验证
return await perform_search(params.query, params.limit)
**来源:** Pydantic文档、FastMCP示例
---Error 7: Transport/Protocol Mismatch
错误7:传输/协议不匹配
Error:
ConnectionError: Server using different transportCause: Client and server using incompatible transports
Solution:
python
undefined错误信息:
ConnectionError: Server using different transport原因: 客户端和服务器使用不兼容的传输方式
解决方案:
python
undefinedServer using stdio (default)
服务器使用stdio(默认)
mcp.run() # or mcp.run(transport="stdio")
mcp.run() # 或 mcp.run(transport="stdio")
Client configuration must match
客户端配置必须匹配
{
"command": "python",
"args": ["server.py"]
}
{
"command": "python",
"args": ["server.py"]
}
OR for HTTP:
或者使用HTTP:
mcp.run(transport="http", port=8000)
mcp.run(transport="http", port=8000)
Client:
客户端:
{
"url": "http://localhost:8000/mcp",
"transport": "http"
}
**Source:** MCP transport specification
---{
"url": "http://localhost:8000/mcp",
"transport": "http"
}
**来源:** MCP传输规范
---Error 8: Import Errors (Editable Package)
错误8:导入错误(可编辑包)
Error:
ModuleNotFoundError: No module named 'my_package'Cause: Package not properly installed in editable mode
Solution:
bash
undefined错误信息:
ModuleNotFoundError: No module named 'my_package'原因: 包未以可编辑模式正确安装
解决方案:
bash
undefined✅ Install in editable mode
✅ 以可编辑模式安装
pip install -e .
pip install -e .
✅ Or use absolute imports
✅ 或使用绝对导入
from src.tools import my_tool
from src.tools import my_tool
✅ Or add to PYTHONPATH
✅ 或添加到PYTHONPATH
export PYTHONPATH="${PYTHONPATH}:/path/to/project"
**Source:** Python packaging documentation
---export PYTHONPATH="${PYTHONPATH}:/path/to/project"
**来源:** Python打包文档
---Error 9: Deprecation Warnings
错误9:弃用警告
Error:
DeprecationWarning: 'mcp.settings' is deprecated, use global Settings insteadCause: Using old FastMCP v1 API
Solution:
python
undefined错误信息:
DeprecationWarning: 'mcp.settings' is deprecated, use global Settings instead原因: 使用旧版FastMCP v1 API
解决方案:
python
undefined❌ OLD: FastMCP v1
❌ 旧版:FastMCP v1
from fastmcp import FastMCP
mcp = FastMCP()
api_key = mcp.settings.get("API_KEY")
from fastmcp import FastMCP
mcp = FastMCP()
api_key = mcp.settings.get("API_KEY")
✅ NEW: FastMCP v2
✅ 新版:FastMCP v2
import os
api_key = os.getenv("API_KEY")
**Source:** FastMCP v2 migration guide
---import os
api_key = os.getenv("API_KEY")
**来源:** FastMCP v2迁移指南
---Error 10: Port Already in Use
错误10:端口已被占用
Error:
OSError: [Errno 48] Address already in useCause: Port 8000 already occupied
Solution:
bash
undefined错误信息:
OSError: [Errno 48] Address already in use原因: 端口8000已被占用
解决方案:
bash
undefined✅ Use different port
✅ 使用不同端口
python server.py --transport http --port 8001
python server.py --transport http --port 8001
✅ Or kill process on port
✅ 或杀死端口上的进程
lsof -ti:8000 | xargs kill -9
**Source:** Common networking issue
---lsof -ti:8000 | xargs kill -9
**来源:** 常见网络问题
---Error 11: Schema Generation Failures
错误11:架构生成失败
Error:
TypeError: Object of type 'ndarray' is not JSON serializableCause: Unsupported type hints (NumPy arrays, custom classes)
Solution:
python
undefined错误信息:
TypeError: Object of type 'ndarray' is not JSON serializable原因: 使用了不支持的类型提示(NumPy数组、自定义类)
解决方案:
python
undefined❌ WRONG: NumPy array
❌ 错误做法:NumPy数组
import numpy as np
@mcp.tool()
def bad_tool() -> np.ndarray: # Not JSON serializable
return np.array([1, 2, 3])
import numpy as np
@mcp.tool()
def bad_tool() -> np.ndarray: # 不可JSON序列化
return np.array([1, 2, 3])
✅ CORRECT: Use JSON-compatible types
✅ 正确做法:使用兼容JSON的类型
@mcp.tool()
def good_tool() -> list[float]:
return [1.0, 2.0, 3.0]
@mcp.tool()
def good_tool() -> list[float]:
return [1.0, 2.0, 3.0]
✅ Or convert to dict
✅ 或转换为字典
@mcp.tool()
def array_tool() -> dict:
data = np.array([1, 2, 3])
return {"values": data.tolist()}
**Source:** JSON serialization requirements
---@mcp.tool()
def array_tool() -> dict:
data = np.array([1, 2, 3])
return {"values": data.tolist()}
**来源:** JSON序列化要求
---Error 12: JSON Serialization
错误12:JSON序列化错误
Error:
TypeError: Object of type 'datetime' is not JSON serializableCause: Returning non-JSON-serializable objects
Solution:
python
from datetime import datetime错误信息:
TypeError: Object of type 'datetime' is not JSON serializable原因: 返回了不可JSON序列化的对象
解决方案:
python
from datetime import datetime❌ WRONG: Return datetime object
❌ 错误做法:返回datetime对象
@mcp.tool()
def bad_tool() -> dict:
return {"timestamp": datetime.now()} # Not serializable
@mcp.tool()
def bad_tool() -> dict:
return {"timestamp": datetime.now()} # 不可序列化
✅ CORRECT: Convert to string
✅ 正确做法:转换为字符串
@mcp.tool()
def good_tool() -> dict:
return {"timestamp": datetime.now().isoformat()}
@mcp.tool()
def good_tool() -> dict:
return {"timestamp": datetime.now().isoformat()}
✅ Use helper function
✅ 使用辅助函数
def make_serializable(obj):
"""Convert object to JSON-serializable format."""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, bytes):
return obj.decode('utf-8')
# Add more conversions as needed
return obj
**Source:** JSON specification
---def make_serializable(obj):
"""将对象转换为可JSON序列化的格式。"""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, bytes):
return obj.decode('utf-8')
# 根据需要添加更多转换
return obj
**来源:** JSON规范
---Error 13: Circular Import Errors
错误13:循环导入错误
Error:
ImportError: cannot import name 'X' from partially initialized moduleCause: Modules import from each other creating circular dependency (common in cloud deployment)
Solution:
python
undefined错误信息:
ImportError: cannot import name 'X' from partially initialized module原因: 模块之间互相导入,形成循环依赖(云部署中常见)
解决方案:
python
undefined❌ WRONG: Factory function in init.py
❌ 错误做法:init.py中的工厂函数
shared/init.py
shared/init.py
_client = None
def get_api_client():
from .api_client import APIClient # Circular!
return APIClient()
_client = None
def get_api_client():
from .api_client import APIClient # 循环!
return APIClient()
shared/monitoring.py
shared/monitoring.py
from . import get_api_client # Creates circle
from . import get_api_client # 形成循环
✅ CORRECT: Direct imports
✅ 正确做法:直接导入
shared/init.py
shared/init.py
from .api_client import APIClient
from .cache import CacheManager
from .api_client import APIClient
from .cache import CacheManager
shared/monitoring.py
shared/monitoring.py
from .api_client import APIClient
client = APIClient() # Create directly
from .api_client import APIClient
client = APIClient() # 直接创建
✅ ALTERNATIVE: Lazy import
✅ 替代方案:延迟导入
shared/monitoring.py
shared/monitoring.py
def get_client():
from .api_client import APIClient
return APIClient()
**Source:** Production cloud deployment errors, Python import system
---def get_client():
from .api_client import APIClient
return APIClient()
**来源:** 生产环境云部署错误、Python导入系统
---Error 14: Python Version Compatibility
错误14:Python版本兼容性
Error:
DeprecationWarning: datetime.utcnow() is deprecatedCause: Using deprecated Python 3.12+ methods
Solution:
python
undefined错误信息:
DeprecationWarning: datetime.utcnow() is deprecated原因: 使用了Python 3.12+中已弃用的方法
解决方案:
python
undefined❌ DEPRECATED (Python 3.12+)
❌ 已弃用(Python 3.12+)
from datetime import datetime
timestamp = datetime.utcnow()
from datetime import datetime
timestamp = datetime.utcnow()
✅ CORRECT: Future-proof
✅ 正确做法:面向未来
from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc)
**Source:** Python 3.12 release notes
---from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc)
**来源:** Python 3.12发布说明
---Error 15: Import-Time Execution
错误15:导入时执行
Error:
RuntimeError: Event loop is closedCause: Creating async resources at module import time
Solution:
python
undefined错误信息:
RuntimeError: Event loop is closed原因: 在模块导入时创建异步资源
解决方案:
python
undefined❌ WRONG: Module-level async execution
❌ 错误做法:模块级异步执行
import asyncpg
connection = asyncpg.connect('postgresql://...') # Runs at import!
import asyncpg
connection = asyncpg.connect('postgresql://...') # 导入时运行!
✅ CORRECT: Lazy initialization
✅ 正确做法:延迟初始化
import asyncpg
class Database:
connection = None
@classmethod
async def connect(cls):
if cls.connection is None:
cls.connection = await asyncpg.connect('postgresql://...')
return cls.connectionimport asyncpg
class Database:
connection = None
@classmethod
async def connect(cls):
if cls.connection is None:
cls.connection = await asyncpg.connect('postgresql://...')
return cls.connectionUsage: connection happens when needed, not at import
使用方式:在需要时连接,而非导入时
@mcp.tool()
async def get_users():
conn = await Database.connect()
return await conn.fetch("SELECT * FROM users")
**Source:** Async event loop management, cloud deployment requirements
---@mcp.tool()
async def get_users():
conn = await Database.connect()
return await conn.fetch("SELECT * FROM users")
**来源:** 异步事件循环管理、云部署要求
---Error 16: Storage Backend Not Configured
错误16:存储后端未配置
Error:
RuntimeError: OAuth tokens lost on restart
ValueError: Cache not persisting across server instancesCause: Using default memory storage in production without persistence
Solution:
python
undefined错误信息:
RuntimeError: OAuth tokens lost on restart
ValueError: Cache not persisting across server instances原因: 生产环境中使用默认内存存储,无持久化
解决方案:
python
undefined❌ WRONG: Memory storage in production
❌ 错误做法:生产环境使用内存存储
mcp = FastMCP("Production Server") # Tokens lost on restart!
mcp = FastMCP("Production Server") # 重启后令牌丢失!
✅ CORRECT: Use disk or Redis storage
✅ 正确做法:使用磁盘或Redis存储
from key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
from key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
Disk storage (single instance)
磁盘存储(单实例)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=DiskStore(path="/var/lib/mcp/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=DiskStore(path="/var/lib/mcp/storage"),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)
Redis storage (multi-instance)
Redis存储(多实例)
mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)
**Source:** FastMCP v2.13.0 storage backends documentation
---mcp = FastMCP(
"Production Server",
storage=FernetEncryptionWrapper(
key_value=RedisStore(
host=os.getenv("REDIS_HOST"),
password=os.getenv("REDIS_PASSWORD")
),
fernet=Fernet(os.getenv("STORAGE_ENCRYPTION_KEY"))
)
)
**来源:** FastMCP v2.13.0存储后端文档
---Error 17: Lifespan Not Passed to ASGI App
错误17:生命周期未传递给ASGI应用
Error:
RuntimeError: Database connection never initialized
Warning: MCP lifespan hooks not runningCause: Using FastMCP with FastAPI/Starlette without passing lifespan
Solution:
python
from fastapi import FastAPI
from fastmcp import FastMCP错误信息:
RuntimeError: Database connection never initialized
Warning: MCP lifespan hooks not running原因: 将FastMCP与FastAPI/Starlette一起使用时未传递生命周期
解决方案:
python
from fastapi import FastAPI
from fastmcp import FastMCP❌ WRONG: Lifespan not passed
❌ 错误做法:未传递生命周期
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI() # MCP lifespan won't run!
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI() # MCP生命周期不会运行!
✅ CORRECT: Pass MCP lifespan to parent app
✅ 正确做法:将MCP生命周期传递给父应用
mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI(lifespan=mcp.lifespan)
**Source:** FastMCP v2.13.0 breaking changes, ASGI integration guide
---mcp = FastMCP("My Server", lifespan=my_lifespan)
app = FastAPI(lifespan=mcp.lifespan)
**来源:** FastMCP v2.13.0重大变更、ASGI集成指南
---Error 18: Middleware Execution Order Error
错误18:中间件执行顺序错误
Error:
RuntimeError: Rate limit not checked before caching
AttributeError: Context state not available in middlewareCause: Incorrect middleware ordering (order matters!)
Solution:
python
undefined错误信息:
RuntimeError: Rate limit not checked before caching
AttributeError: Context state not available in middleware原因: 中间件顺序不正确(顺序很重要!)
解决方案:
python
undefined❌ WRONG: Cache before rate limiting
❌ 错误做法:缓存先于速率限制
mcp.add_middleware(ResponseCachingMiddleware())
mcp.add_middleware(RateLimitingMiddleware()) # Too late!
mcp.add_middleware(ResponseCachingMiddleware())
mcp.add_middleware(RateLimitingMiddleware()) # 太晚了!
✅ CORRECT: Rate limit before cache
✅ 正确做法:速率限制先于缓存
mcp.add_middleware(ErrorHandlingMiddleware()) # First: catch errors
mcp.add_middleware(TimingMiddleware()) # Second: time requests
mcp.add_middleware(LoggingMiddleware()) # Third: log
mcp.add_middleware(RateLimitingMiddleware()) # Fourth: check limits
mcp.add_middleware(ResponseCachingMiddleware()) # Last: cache
**Source:** FastMCP middleware documentation, best practices
---mcp.add_middleware(ErrorHandlingMiddleware()) # 第一个:捕获错误
mcp.add_middleware(TimingMiddleware()) # 第二个:计时
mcp.add_middleware(LoggingMiddleware()) # 第三个:日志
mcp.add_middleware(RateLimitingMiddleware()) # 第四个:检查速率限制
mcp.add_middleware(ResponseCachingMiddleware()) # 最后一个:缓存
**来源:** FastMCP中间件文档、最佳实践
---Error 19: Circular Middleware Dependencies
错误19:循环中间件依赖
Error:
RecursionError: maximum recursion depth exceeded
RuntimeError: Middleware loop detectedCause: Middleware calling incorrectly or circular dependencies
self.next()Solution:
python
undefined错误信息:
RecursionError: maximum recursion depth exceeded
RuntimeError: Middleware loop detected原因: 中间件错误调用或存在循环依赖
self.next()解决方案:
python
undefined❌ WRONG: Not calling next() or calling incorrectly
❌ 错误做法:未调用next()或调用方式错误
class BadMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# Forgot to call next()!
return {"error": "blocked"}
class BadMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# 忘记调用next()!
return {"error": "blocked"}
✅ CORRECT: Always call next() to continue chain
✅ 正确做法:始终调用next()以继续链
class GoodMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# Do preprocessing
print(f"Before: {tool_name}")
# MUST call next() to continue
result = await self.next(tool_name, arguments, context)
# Do postprocessing
print(f"After: {tool_name}")
return result
**Source:** FastMCP middleware system documentation
---class GoodMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
# 预处理
print(f"之前: {tool_name}")
# 必须调用next()以继续
result = await self.next(tool_name, arguments, context)
# 后处理
print(f"之后: {tool_name}")
return result
**来源:** FastMCP中间件系统文档
---Error 20: Import vs Mount Confusion
错误20:导入与挂载混淆
Error:
RuntimeError: Subserver changes not reflected
ValueError: Unexpected tool namespacingCause: Using when was needed (or vice versa)
import_server()mount()Solution:
python
undefined错误信息:
RuntimeError: Subserver changes not reflected
ValueError: Unexpected tool namespacing原因: 需要时使用了(反之亦然)
mount()import_server()解决方案:
python
undefined❌ WRONG: Using import when you want dynamic updates
❌ 错误做法:需要动态更新时使用导入
main_server.import_server(subserver)
main_server.import_server(subserver)
Later: changes to subserver won't appear in main_server
后续:子服务器的更改不会在主服务器中显示
✅ CORRECT: Use mount() for dynamic composition
✅ 正确做法:使用mount()进行动态组合
main_server.mount(subserver, prefix="sub")
main_server.mount(subserver, prefix="sub")
Changes to subserver are immediately visible
子服务器的更改立即可见
❌ WRONG: Using mount when you want static bundle
❌ 错误做法:需要静态打包时使用挂载
main_server.mount(third_party_server, prefix="vendor")
main_server.mount(third_party_server, prefix="vendor")
Runtime overhead for static components
静态组件的运行时开销
✅ CORRECT: Use import_server() for static bundles
✅ 正确做法:使用import_server()进行静态打包
main_server.import_server(third_party_server)
main_server.import_server(third_party_server)
One-time copy, no runtime delegation
一次性复制,无运行时委托
**Source:** FastMCP server composition patterns
---
**来源:** FastMCP服务器组合模式
---Error 21: Resource Prefix Format Mismatch
错误21:资源前缀格式不匹配
Error:
ValueError: Resource not found: resource://api/users
ValueError: Unexpected resource URI formatCause: Using wrong resource prefix format (path vs protocol)
Solution:
python
undefined错误信息:
ValueError: Resource not found: resource://api/users
ValueError: Unexpected resource URI format原因: 使用了错误的资源前缀格式(路径vs协议)
解决方案:
python
undefinedPath format (default since v2.4.0)
路径格式(v2.4.0起默认)
main_server.mount(api_server, prefix="api")
main_server.mount(api_server, prefix="api")
Resources: resource://api/users
资源:resource://api/users
❌ WRONG: Expecting protocol format
❌ 错误做法:期望协议格式
resource://api+users (doesn't exist)
resource://api+users(不存在)
✅ CORRECT: Use path format
✅ 正确做法:使用路径格式
uri = "resource://api/users"
uri = "resource://api/users"
OR explicitly set protocol format (legacy)
或显式设置协议格式(旧版)
main_server.mount(
api_server,
prefix="api",
resource_prefix_format="protocol"
)
main_server.mount(
api_server,
prefix="api",
resource_prefix_format="protocol"
)
Resources: api+resource://users
资源:api+resource://users
**Source:** FastMCP v2.4.0+ resource prefix changes
---
**来源:** FastMCP v2.4.0+资源前缀变更
---Error 22: OAuth Proxy Without Consent Screen
错误22:OAuth代理未启用同意屏幕
Error:
SecurityWarning: Authorization bypass possible
RuntimeError: Confused deputy attack vectorCause: OAuth Proxy configured without consent screen (security vulnerability)
Solution:
python
undefined错误信息:
SecurityWarning: Authorization bypass possible
RuntimeError: Confused deputy attack vector原因: OAuth代理配置时未启用同意屏幕(安全漏洞)
解决方案:
python
undefined❌ WRONG: No consent screen (security risk!)
❌ 错误做法:无同意屏幕(安全风险!)
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# Missing: enable_consent_screen
)
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# 缺少:enable_consent_screen
)
✅ CORRECT: Enable consent screen
✅ 正确做法:启用同意屏幕
auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
enable_consent_screen=True # Prevents bypass attacks
)
**Source:** FastMCP v2.13.0 OAuth security enhancements, RFC 7662
---auth = OAuthProxy(
jwt_signing_key=os.getenv("JWT_KEY"),
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
enable_consent_screen=True # 防止绕过攻击
)
**来源:** FastMCP v2.13.0 OAuth安全增强、RFC 7662
---Error 23: Missing JWT Signing Key in Production
错误23:生产环境缺少JWT签名密钥
Error:
ValueError: JWT signing key required for OAuth Proxy
RuntimeError: Cannot issue tokens without signing keyCause: OAuth Proxy missing in production
jwt_signing_keySolution:
python
undefined错误信息:
ValueError: JWT signing key required for OAuth Proxy
RuntimeError: Cannot issue tokens without signing key原因: OAuth代理在生产环境中缺少
jwt_signing_key解决方案:
python
undefined❌ WRONG: No JWT signing key
❌ 错误做法:无JWT签名密钥
auth = OAuthProxy(
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# Missing: jwt_signing_key
)
auth = OAuthProxy(
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
# 缺少:jwt_signing_key
)
✅ CORRECT: Provide signing key from environment
✅ 正确做法:从环境变量提供签名密钥
import secrets
import secrets
Generate once (in setup):
生成一次(设置时):
signing_key = secrets.token_urlsafe(32)
signing_key = secrets.token_urlsafe(32)
Store in: FASTMCP_JWT_SIGNING_KEY environment variable
存储在:FASTMCP_JWT_SIGNING_KEY环境变量中
auth = OAuthProxy(
jwt_signing_key=os.environ["FASTMCP_JWT_SIGNING_KEY"],
client_storage=encrypted_storage,
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
**Source:** OAuth Proxy production requirements
---auth = OAuthProxy(
jwt_signing_key=os.environ["FASTMCP_JWT_SIGNING_KEY"],
client_storage=encrypted_storage,
upstream_authorization_endpoint="...",
upstream_token_endpoint="...",
upstream_client_id=os.getenv("OAUTH_CLIENT_ID"),
upstream_client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
**来源:** OAuth代理生产环境要求
---Error 24: Icon Data URI Format Error
错误24:图标Data URI格式错误
Error:
ValueError: Invalid data URI format
TypeError: Icon URL must be string or data URICause: Incorrectly formatted data URI for icons
Solution:
python
from fastmcp import Icon, Image错误信息:
ValueError: Invalid data URI format
TypeError: Icon URL must be string or data URI原因: 图标Data URI格式不正确
解决方案:
python
from fastmcp import Icon, Image❌ WRONG: Invalid data URI
❌ 错误做法:无效Data URI
icon = Icon(url="base64,iVBORw0KG...") # Missing data:image/png;
icon = Icon(url="base64,iVBORw0KG...") # 缺少data:image/png;
✅ CORRECT: Use Image utility
✅ 正确做法:使用Image工具
icon = Icon.from_file("/path/to/icon.png", size="medium")
icon = Icon.from_file("/path/to/icon.png", size="medium")
✅ CORRECT: Manual data URI
✅ 正确做法:手动生成Data URI
import base64
with open("/path/to/icon.png", "rb") as f:
image_data = base64.b64encode(f.read()).decode()
data_uri = f"data:image/png;base64,{image_data}"
icon = Icon(url=data_uri, size="medium")
**Source:** FastMCP icons documentation, Data URI specification
---import base64
with open("/path/to/icon.png", "rb") as f:
image_data = base64.b64encode(f.read()).decode()
data_uri = f"data:image/png;base64,{image_data}"
icon = Icon(url=data_uri, size="medium")
**来源:** FastMCP图标文档、Data URI规范
---Error 25: Lifespan Behavior Change (v2.13.0)
错误25:生命周期行为变更(v2.13.0)
Error:
Warning: Lifespan runs per-server, not per-session
RuntimeError: Resources initialized multiple timesCause: Expecting v2.12 lifespan behavior (per-session) in v2.13.0+ (per-server)
Solution:
python
undefined错误信息:
Warning: Lifespan runs per-server, not per-session
RuntimeError: Resources initialized multiple times原因: 在v2.13.0+中期望v2.12的生命周期行为(每个会话),但v2.13.0+是每个服务器实例
解决方案:
python
undefinedv2.12.0 and earlier: Lifespan ran per client session
v2.12.0及更早版本:生命周期每个客户端会话运行一次
v2.13.0+: Lifespan runs once per server instance
v2.13.0+:生命周期每个服务器实例运行一次
✅ CORRECT: v2.13.0+ pattern (per-server)
✅ 正确做法:v2.13.0+模式(每个服务器实例)
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Runs ONCE when server starts, not per client session."""
db = await Database.connect()
print("Server starting - runs once")
try:
yield {"db": db}
finally:
await db.disconnect()
print("Server stopping - runs once")mcp = FastMCP("My Server", lifespan=app_lifespan)
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""服务器启动时运行一次,而非每个客户端会话。"""
db = await Database.connect()
print("服务器启动 - 运行一次")
try:
yield {"db": db}
finally:
await db.disconnect()
print("服务器停止 - 运行一次")mcp = FastMCP("My Server", lifespan=app_lifespan)
For per-session logic, use middleware instead:
如需每个会话的逻辑,请改用中间件:
class SessionMiddleware(BaseMiddleware):
async def on_message(self, message, context):
# Runs per client message
session_id = context.fastmcp_context.get_state("session_id")
if not session_id:
session_id = str(uuid.uuid4())
context.fastmcp_context.set_state("session_id", session_id)
return await self.next(message, context)
**Source:** FastMCP v2.13.0 release notes, breaking changes documentation
---class SessionMiddleware(BaseMiddleware):
async def on_message(self, message, context):
# 每个客户端消息运行一次
session_id = context.fastmcp_context.get_state("session_id")
if not session_id:
session_id = str(uuid.uuid4())
context.fastmcp_context.set_state("session_id", session_id)
return await self.next(message, context)
**来源:** FastMCP v2.13.0发布说明、重大变更文档
---Production Patterns
生产模式
Pattern 1: Self-Contained Utils Module
模式1:独立工具模块
Best practice for maintaining all utilities in one place:
python
undefined将所有工具维护在一个位置的最佳实践:
python
undefinedsrc/utils.py - Single file with all utilities
src/utils.py - 包含所有工具的单个文件
import os
from typing import Dict, Any
from datetime import datetime
class Config:
"""Application configuration."""
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
SERVER_VERSION = "1.0.0"
API_BASE_URL = os.getenv("API_BASE_URL")
API_KEY = os.getenv("API_KEY")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]:
"""Format successful response."""
return {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""Format error response."""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
import os
from typing import Dict, Any
from datetime import datetime
class Config:
"""应用配置。"""
SERVER_NAME = os.getenv("SERVER_NAME", "FastMCP Server")
SERVER_VERSION = "1.0.0"
API_BASE_URL = os.getenv("API_BASE_URL")
API_KEY = os.getenv("API_KEY")
CACHE_TTL = int(os.getenv("CACHE_TTL", "300"))
def format_success(data: Any, message: str = "Success") -> Dict[str, Any]:
"""格式化成功响应。"""
return {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
def format_error(error: str, code: str = "ERROR") -> Dict[str, Any]:
"""格式化错误响应。"""
return {
"success": False,
"error": error,
"code": code,
"timestamp": datetime.now().isoformat()
}
Usage in tools
在工具中使用
from .utils import format_success, format_error, Config
@mcp.tool()
async def process_data(data: dict) -> dict:
try:
result = await process(data)
return format_success(result)
except Exception as e:
return format_error(str(e))
undefinedfrom .utils import format_success, format_error, Config
@mcp.tool()
async def process_data(data: dict) -> dict:
try:
result = await process(data)
return format_success(result)
except Exception as e:
return format_error(str(e))
undefinedPattern 2: Connection Pooling
模式2:连接池
Efficient resource management:
python
import httpx
from typing import Optional
class APIClient:
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=5)
)
return cls._instance
@classmethod
async def cleanup(cls):
if cls._instance:
await cls._instance.aclose()
cls._instance = None
@mcp.tool()
async def api_request(endpoint: str) -> dict:
"""Make API request with managed client."""
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()高效的资源管理:
python
import httpx
from typing import Optional
class APIClient:
_instance: Optional[httpx.AsyncClient] = None
@classmethod
async def get_client(cls) -> httpx.AsyncClient:
if cls._instance is None:
cls._instance = httpx.AsyncClient(
base_url=os.getenv("API_BASE_URL"),
headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"},
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=5)
)
return cls._instance
@classmethod
async def cleanup(cls):
if cls._instance:
await cls._instance.aclose()
cls._instance = None
@mcp.tool()
async def api_request(endpoint: str) -> dict:
"""使用托管客户端发起API请求。"""
client = await APIClient.get_client()
response = await client.get(endpoint)
return response.json()Pattern 3: Error Handling with Retry
模式3:带重试的错误处理
Resilient API calls:
python
import asyncio
from typing import Callable, TypeVar
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], T],
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
) -> T:
"""Retry function with exponential backoff."""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
"""API call with automatic retry."""
async def make_call():
async with httpx.AsyncClient() as client:
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_call)
return {"success": True, "data": data}
except Exception as e:
return {"error": f"Failed after retries: {e}"}弹性API调用:
python
import asyncio
from typing import Callable, TypeVar
T = TypeVar('T')
async def retry_with_backoff(
func: Callable[[], T],
max_retries: int = 3,
initial_delay: float = 1.0,
exponential_base: float = 2.0
) -> T:
"""带指数退避的重试函数。"""
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= exponential_base
raise last_exception
@mcp.tool()
async def resilient_api_call(endpoint: str) -> dict:
"""带自动重试的API调用。"""
async def make_call():
async with httpx.AsyncClient() as client:
response = await client.get(endpoint)
response.raise_for_status()
return response.json()
try:
data = await retry_with_backoff(make_call)
return {"success": True, "data": data}
except Exception as e:
return {"error": f"重试后失败: {e}"}Pattern 4: Time-Based Caching
模式4:基于时间的缓存
Reduce API load:
python
import time
from typing import Any, Optional
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache = {}
self.timestamps = {}
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = TimeBasedCache(ttl=300)
@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
"""Fetch with caching."""
cache_key = f"resource:{resource_id}"
cached_data = cache.get(cache_key)
if cached_data:
return {"data": cached_data, "from_cache": True}
data = await fetch_from_api(resource_id)
cache.set(cache_key, data)
return {"data": data, "from_cache": False}减少API负载:
python
import time
from typing import Any, Optional
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.ttl = ttl
self.cache = {}
self.timestamps = {}
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
self.cache[key] = value
self.timestamps[key] = time.time()
cache = TimeBasedCache(ttl=300)
@mcp.tool()
async def cached_fetch(resource_id: str) -> dict:
"""带缓存的获取操作。"""
cache_key = f"resource:{resource_id}"
cached_data = cache.get(cache_key)
if cached_data:
return {"data": cached_data, "from_cache": True}
data = await fetch_from_api(resource_id)
cache.set(cache_key, data)
return {"data": data, "from_cache": False}Testing
测试
Unit Testing Tools
工具单元测试
python
import pytest
from fastmcp import FastMCP
from fastmcp.testing import create_test_client
@pytest.fixture
def test_server():
"""Create test server instance."""
mcp = FastMCP("test-server")
@mcp.tool()
async def test_tool(param: str) -> str:
return f"Result: {param}"
return mcp
@pytest.mark.asyncio
async def test_tool_execution(test_server):
"""Test tool execution."""
async with create_test_client(test_server) as client:
result = await client.call_tool("test_tool", {"param": "test"})
assert result.data == "Result: test"python
import pytest
from fastmcp import FastMCP
from fastmcp.testing import create_test_client
@pytest.fixture
def test_server():
"""创建测试服务器实例。"""
mcp = FastMCP("test-server")
@mcp.tool()
async def test_tool(param: str) -> str:
return f"Result: {param}"
return mcp
@pytest.mark.asyncio
async def test_tool_execution(test_server):
"""测试工具执行。"""
async with create_test_client(test_server) as client:
result = await client.call_tool("test_tool", {"param": "test"})
assert result.data == "Result: test"Integration Testing
集成测试
python
import asyncio
from fastmcp import Client
async def test_server():
"""Test all server functionality."""
async with Client("server.py") as client:
# Test tools
tools = await client.list_tools()
print(f"Tools: {len(tools)}")
for tool in tools:
try:
result = await client.call_tool(tool.name, {})
print(f"✓ {tool.name}: {result}")
except Exception as e:
print(f"✗ {tool.name}: {e}")
# Test resources
resources = await client.list_resources()
for resource in resources:
try:
data = await client.read_resource(resource.uri)
print(f"✓ {resource.uri}")
except Exception as e:
print(f"✗ {resource.uri}: {e}")
if __name__ == "__main__":
asyncio.run(test_server())python
import asyncio
from fastmcp import Client
async def test_server():
"""测试服务器所有功能。"""
async with Client("server.py") as client:
# 测试工具
tools = await client.list_tools()
print(f"工具数量: {len(tools)}")
for tool in tools:
try:
result = await client.call_tool(tool.name, {})
print(f"✓ {tool.name}: {result}")
except Exception as e:
print(f"✗ {tool.name}: {e}")
# 测试资源
resources = await client.list_resources()
for resource in resources:
try:
data = await client.read_resource(resource.uri)
print(f"✓ {resource.uri}")
except Exception as e:
print(f"✗ {resource.uri}: {e}")
if __name__ == "__main__":
asyncio.run(test_server())CLI Commands
CLI命令
Development:
bash
undefined开发:
bash
undefinedRun with inspector (recommended)
使用检查器运行(推荐)
fastmcp dev server.py
fastmcp dev server.py
Run normally
正常运行
fastmcp run server.py
fastmcp run server.py
Inspect server without running
检查服务器而不运行
fastmcp inspect server.py
**Installation:**
```bashfastmcp inspect server.py
**安装:**
```bashInstall to Claude Desktop
安装到Claude Desktop
fastmcp install server.py
fastmcp install server.py
Install with custom name
使用自定义名称安装
fastmcp install server.py --name "My Server"
**Debugging:**
```bashfastmcp install server.py --name "My Server"
**调试:**
```bashEnable debug logging
启用调试日志
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev server.py
Run with HTTP transport
使用HTTP传输运行
fastmcp run server.py --transport http --port 8000
undefinedfastmcp run server.py --transport http --port 8000
undefinedBest Practices
最佳实践
1. Server Structure
1. 服务器结构
python
from fastmcp import FastMCP
import os
def create_server() -> FastMCP:
"""Factory function for complex setup."""
mcp = FastMCP("Server Name")
# Configure server
setup_tools(mcp)
setup_resources(mcp)
return mcp
def setup_tools(mcp: FastMCP):
"""Register all tools."""
@mcp.tool()
def example_tool():
pass
def setup_resources(mcp: FastMCP):
"""Register all resources."""
@mcp.resource("data://config")
def get_config():
return {"version": "1.0.0"}python
from fastmcp import FastMCP
import os
def create_server() -> FastMCP:
"""复杂设置的工厂函数。"""
mcp = FastMCP("Server Name")
# 配置服务器
setup_tools(mcp)
setup_resources(mcp)
return mcp
def setup_tools(mcp: FastMCP):
"""注册所有工具。"""
@mcp.tool()
def example_tool():
pass
def setup_resources(mcp: FastMCP):
"""注册所有资源。"""
@mcp.resource("data://config")
def get_config():
return {"version": "1.0.0"}Export at module level
在模块级别导出
mcp = create_server()
if name == "main":
mcp.run()
undefinedmcp = create_server()
if name == "main":
mcp.run()
undefined2. Environment Configuration
2. 环境配置
python
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
API_KEY = os.getenv("API_KEY", "")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
@classmethod
def validate(cls):
if not cls.API_KEY:
raise ValueError("API_KEY is required")
return Truepython
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
API_KEY = os.getenv("API_KEY", "")
BASE_URL = os.getenv("BASE_URL", "https://api.example.com")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
@classmethod
def validate(cls):
if not cls.API_KEY:
raise ValueError("API_KEY是必填项")
return TrueValidate on startup
启动时验证
Config.validate()
undefinedConfig.validate()
undefined3. Documentation
3. 文档
python
@mcp.tool()
def complex_tool(
query: str,
filters: dict = None,
limit: int = 10
) -> dict:
"""
Search with advanced filtering.
Args:
query: Search query string
filters: Optional filters dict with keys:
- category: Filter by category
- date_from: Start date (ISO format)
- date_to: End date (ISO format)
limit: Maximum results (1-100)
Returns:
Dict with 'results' list and 'total' count
Examples:
>>> complex_tool("python", {"category": "tutorial"}, 5)
{'results': [...], 'total': 5}
"""
passpython
@mcp.tool()
def complex_tool(
query: str,
filters: dict = None,
limit: int = 10
) -> dict:
"""
使用高级过滤进行搜索。
参数:
query: 搜索查询字符串
filters: 可选过滤字典,包含以下键:
- category: 按类别过滤
- date_from: 开始日期(ISO格式)
- date_to: 结束日期(ISO格式)
limit: 最大结果数(1-100)
返回:
包含'results'列表和'total'计数的字典
示例:
>>> complex_tool("python", {"category": "tutorial"}, 5)
{'results': [...], 'total': 5}
"""
pass4. Health Checks
4. 健康检查
python
@mcp.resource("health://status")
async def health_check() -> dict:
"""Comprehensive health check."""
checks = {}
# Check API connectivity
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
# Check database
try:
checks["database"] = await check_db_connection()
except:
checks["database"] = False
all_healthy = all(checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": checks
}python
@mcp.resource("health://status")
async def health_check() -> dict:
"""全面健康检查。"""
checks = {}
# 检查API连通性
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/health", timeout=5)
checks["api"] = response.status_code == 200
except:
checks["api"] = False
# 检查数据库
try:
checks["database"] = await check_db_connection()
except:
checks["database"] = False
all_healthy = all(checks.values())
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.now().isoformat(),
"checks": checks
}Project Structure
项目结构
Simple Server
简单服务器
my-mcp-server/
├── server.py # Main server file
├── requirements.txt # Dependencies
├── .env # Environment variables (git-ignored)
├── .gitignore # Git ignore file
└── README.md # Documentationmy-mcp-server/
├── server.py # 主服务器文件
├── requirements.txt # 依赖
├── .env # 环境变量(Git忽略)
├── .gitignore # Git忽略文件
└── README.md # 文档Production Server
生产环境服务器
my-mcp-server/
├── src/
│ ├── server.py # Main entry point
│ ├── utils.py # Shared utilities
│ ├── tools/ # Tool modules
│ │ ├── __init__.py
│ │ ├── api_tools.py
│ │ └── data_tools.py
│ ├── resources/ # Resource definitions
│ │ ├── __init__.py
│ │ └── static.py
│ └── prompts/ # Prompt templates
│ ├── __init__.py
│ └── templates.py
├── tests/
│ ├── test_tools.py
│ └── test_resources.py
├── requirements.txt
├── pyproject.toml
├── .env
├── .gitignore
└── README.mdmy-mcp-server/
├── src/
│ ├── server.py # 主入口
│ ├── utils.py # 共享工具
│ ├── tools/ # 工具模块
│ │ ├── __init__.py
│ │ ├── api_tools.py
│ │ └── data_tools.py
│ ├── resources/ # 资源定义
│ │ ├── __init__.py
│ │ └── static.py
│ └── prompts/ # 提示词模板
│ ├── __init__.py
│ └── templates.py
├── tests/
│ ├── test_tools.py
│ └── test_resources.py
├── requirements.txt
├── pyproject.toml
├── .env
├── .gitignore
└── README.mdReferences
参考资料
Official Documentation:
- FastMCP: https://github.com/jlowin/fastmcp
- FastMCP Cloud: https://fastmcp.cloud
- MCP Protocol: https://modelcontextprotocol.io
- Context7 Docs:
/jlowin/fastmcp
Related Skills:
- - OpenAI integration
openai-api - - Claude API
claude-api - - Deploy MCP as Worker
cloudflare-worker-base
Package Versions:
- fastmcp >= 2.13.0
- Python >= 3.10
- httpx (recommended for async API calls)
- pydantic (for validation)
- py-key-value-aio (for storage backends)
- cryptography (for encrypted storage)
官方文档:
- FastMCP: https://github.com/jlowin/fastmcp
- FastMCP Cloud: https://fastmcp.cloud
- MCP协议: https://modelcontextprotocol.io
- Context7文档:
/jlowin/fastmcp
相关技能:
- - OpenAI集成
openai-api - - Claude API
claude-api - - 将MCP部署为Worker
cloudflare-worker-base
包版本:
- fastmcp >= 2.13.0
- Python >= 3.10
- httpx(推荐用于异步API调用)
- pydantic(用于验证)
- py-key-value-aio(用于存储后端)
- cryptography(用于加密存储)
Summary
总结
FastMCP enables rapid development of production-ready MCP servers with advanced features for storage, authentication, middleware, and composition. Key takeaways:
- Always export server at module level for FastMCP Cloud compatibility
- Use persistent storage backends (Disk/Redis) in production for OAuth tokens and caching
- Configure server lifespans for proper resource management (DB connections, API clients)
- Add middleware strategically - order matters! (errors → timing → logging → rate limiting → caching)
- Choose composition wisely - for static bundles,
import_server()for dynamic compositionmount() - Secure OAuth properly - Enable consent screens, encrypt token storage, use JWT signing keys
- Use async/await properly - don't block the event loop
- Handle errors gracefully with structured responses and ErrorHandlingMiddleware
- Avoid circular imports especially with factory functions
- Test locally before deploying using
fastmcp dev - Use environment variables for all configuration (never hardcode secrets)
- Document thoroughly - LLMs read your docstrings
- Follow production patterns for self-contained, maintainable code
- Leverage OpenAPI for instant API integration
- Monitor with health checks and middleware for production reliability
Production Readiness:
- Storage: Encrypted persistence for OAuth tokens and response caching
- Authentication: 4 auth patterns (Token Validation, Remote OAuth, OAuth Proxy, Full OAuth)
- Middleware: 8 built-in types for logging, rate limiting, caching, error handling
- Composition: Modular server architecture with import/mount strategies
- Security: Consent screens, PKCE, RFC 7662 token introspection, encrypted storage
- Performance: Response caching, connection pooling, timing middleware
This skill prevents 25+ common errors and provides 90-95% token savings compared to manual implementation.
FastMCP支持快速开发生产就绪的MCP服务器,提供存储、认证、中间件和组合等高级功能。关键要点:
- 始终在模块级别导出服务器,以兼容FastMCP Cloud
- 生产环境使用持久化存储后端(磁盘/Redis)存储OAuth令牌和缓存
- 配置服务器生命周期,以正确管理资源(数据库连接、API客户端)
- 战略性添加中间件 - 顺序很重要!(错误处理 → 计时 → 日志 → 速率限制 → 缓存)
- 明智选择组合方式 - 用于静态打包,
import_server()用于动态组合mount() - 正确配置OAuth - 启用同意屏幕,加密令牌存储,使用JWT签名密钥
- 正确使用async/await - 不要阻塞事件循环
- 优雅处理错误,使用结构化响应和ErrorHandlingMiddleware
- 避免循环导入,尤其是工厂函数
- 部署前本地测试,使用
fastmcp dev - 所有配置使用环境变量(永远不要硬编码密钥)
- 彻底文档化 - LLM会读取你的文档字符串
- 遵循生产模式,编写独立、可维护的代码
- 利用OpenAPI实现即时API集成
- 使用健康检查和中间件监控生产环境可靠性
生产就绪特性:
- 存储:OAuth令牌和响应缓存的加密持久化
- 认证:4种认证模式(令牌验证、远程OAuth、OAuth代理、完整OAuth)
- 中间件:8种内置类型,用于日志、速率限制、缓存、错误处理
- 组合:模块化服务器架构,支持导入/挂载策略
- 安全:同意屏幕、PKCE、RFC 7662令牌 introspection、加密存储
- 性能:响应缓存、连接池、计时中间件
本技能可防止25+种常见错误,与手动实现相比可节省90-95%的令牌消耗。