Loading...
Loading...
Compare original and translation side by side
pip install fastmcppip install fastmcpundefinedundefinedfrom fastmcp import FastMCPfrom fastmcp import FastMCP
**Run it:**
```bash
**运行方式:**
```bashundefinedundefinedauth_route$refauth_route$refctx.sample()AnthropicSamplingHandlerctx.sample_step()SampleStepctx.sample()AnthropicSamplingHandlerctx.sample_step()SampleSteptask=TrueBearerAuthProviderJWTVerifierOAuthProxyContext.get_http_request()fastmcp.Imagefrom fastmcp.utilities import Imageenable_docketenable_tasksrun_streamable_http_async()sse_app()streamable_http_app()run_sse_async()dependenciesoutput_schema=FalseFASTMCP_SERVER_task=TrueBearerAuthProviderJWTVerifierOAuthProxyContext.get_http_request()fastmcp.Imagefrom fastmcp.utilities import Imageenable_docketenable_tasksrun_streamable_http_async()sse_app()streamable_http_app()run_sse_async()dependenciesoutput_schema=FalseFASTMCP_SERVER_FileSystemProviderSkillsProviderOpenAPIProviderProxyProviderfrom fastmcp import FastMCP
from fastmcp.providers import FileSystemProvider
mcp = FastMCP("server")
mcp.add_provider(FileSystemProvider(path="./tools", reload=True))FileSystemProviderSkillsProviderOpenAPIProviderProxyProviderfrom fastmcp import FastMCP
from fastmcp.providers import FileSystemProvider
mcp = FastMCP("server")
mcp.add_provider(FileSystemProvider(path="./tools", reload=True))ResourcesAsToolsPromptsAsToolsfrom fastmcp.transforms import Namespace, VersionFilter
mcp.add_transform(Namespace(prefix="api"))
mcp.add_transform(VersionFilter(min_version="2.0"))ResourcesAsToolsPromptsAsToolsfrom fastmcp.transforms import Namespace, VersionFilter
mcp.add_transform(Namespace(prefix="api"))
mcp.add_transform(VersionFilter(min_version="2.0"))@mcp.tool(version="2.0")
async def fetch_data(query: str) -> dict:
# Clients see highest version by default
# Can request specific version
return {"data": [...]}@mcp.tool(version="2.0")
async def fetch_data(query: str) -> dict:
# 客户端默认使用最高版本
# 也可以请求特定版本
return {"data": [...]}@mcp.tool()
async def set_preference(key: str, value: str, ctx: Context) -> dict:
await ctx.set_state(key, value) # Persists across session
return {"saved": True}
@mcp.tool()
async def get_preference(key: str, ctx: Context) -> dict:
value = await ctx.get_state(key, default=None)
return {"value": value}@mcp.tool()
async def set_preference(key: str, value: str, ctx: Context) -> dict:
await ctx.set_state(key, value) # 在会话中持久化
return {"saved": True}
@mcp.tool()
async def get_preference(key: str, ctx: Context) -> dict:
value = await ctx.get_state(key, default=None)
return {"value": value}--reload@tool(auth=require_scopes("admin"))--reload@tool(auth=require_scopes("admin"))undefinedundefined
**For most servers**, updating the import is all you need:
```python
**对于大多数服务器**,只需更新导入语句即可兼容:
```python
**See**: [Official Migration Guide](https://github.com/jlowin/fastmcp/blob/main/docs/development/upgrade-guide.mdx)
---
**参考**: [官方迁移指南](https://github.com/jlowin/fastmcp/blob/main/docs/development/upgrade-guide.mdx)
---@mcp.tool()
async def async_tool(url: str) -> dict: # Use async for I/O
async with httpx.AsyncClient() as client:
return (await client.get(url)).json()@mcp.tool()
async def async_tool(url: str) -> dict: # I/O操作使用async
async with httpx.AsyncClient() as client:
return (await client.get(url)).json()data://file://resource://info://api://@mcp.resource("user://{user_id}/profile") # Template with parameters
async def get_user(user_id: str) -> dict: # CRITICAL: param names must match
return await fetch_user_from_db(user_id)data://file://resource://info://api://@mcp.resource("user://{user_id}/profile") # 带参数的模板
async def get_user(user_id: str) -> dict: # 关键:参数名称必须与模板匹配
return await fetch_user_from_db(user_id)@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
return f"Analyze {topic} considering: state, challenges, opportunities, recommendations."@mcp.prompt("analyze")
def analyze_prompt(topic: str) -> str:
return f"分析{topic},需涵盖:现状、挑战、机遇、建议。"Contextfrom fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
confirmed = await context.request_elicitation(prompt=f"Confirm {action}?", response_type=str)
return {"status": "completed" if confirmed.lower() == "yes" else "cancelled"}@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
data = await read_file(file_path)
for i, item in enumerate(data):
await context.report_progress(i + 1, len(data), f"Importing {i + 1}/{len(data)}")
await import_item(item)
return {"imported": len(data)}@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
response = await context.request_sampling(
messages=[{"role": "user", "content": f"Enhance: {text}"}],
temperature=0.7
)
return response["content"]Contextfrom fastmcp import Context
@mcp.tool()
async def confirm_action(action: str, context: Context) -> dict:
confirmed = await context.request_elicitation(prompt=f"确认执行{action}?", response_type=str)
return {"status": "已完成" if confirmed.lower() == "yes" else "已取消"}@mcp.tool()
async def batch_import(file_path: str, context: Context) -> dict:
data = await read_file(file_path)
for i, item in enumerate(data):
await context.report_progress(i + 1, len(data), f"正在导入第{i + 1}/{len(data)}项")
await import_item(item)
return {"已导入": len(data)}@mcp.tool()
async def enhance_text(text: str, context: Context) -> str:
response = await context.request_sampling(
messages=[{"role": "user", "content": f"优化以下文本:{text}"}],
temperature=0.7
)
return response["content"]@mcp.tool(task=True) # Enable background task mode
async def analyze_large_dataset(dataset_id: str, context: Context) -> dict:
"""Analyze large dataset with progress tracking."""
data = await fetch_dataset(dataset_id)
for i, chunk in enumerate(data.chunks):
# Report progress to client
await context.report_progress(
current=i + 1,
total=len(data.chunks),
message=f"Processing chunk {i + 1}/{len(data.chunks)}"
)
await process_chunk(chunk)
return {"status": "complete", "records_processed": len(data)}pendingrunningcompletedfailedcancelledstatusMessagectx.report_progress()mcp>=1.10.0@mcp.tool(task=True) # 启用后台任务模式
async def analyze_large_dataset(dataset_id: str, context: Context) -> dict:
"""分析大型数据集并追踪进度。"""
data = await fetch_dataset(dataset_id)
for i, chunk in enumerate(data.chunks):
# 向客户端报告进度
await context.report_progress(
current=i + 1,
total=len(data.chunks),
message=f"正在处理第{i + 1}/{len(data.chunks)}个数据块"
)
await process_chunk(chunk)
return {"状态": "完成", "已处理记录数": len(data)}pendingrunningcompletedfailedcancelledctx.report_progress()statusMessagemcp>=1.10.0ctx.sample()from fastmcp import Context
from fastmcp.sampling import AnthropicSamplingHandlerctx.sample()from fastmcp import Context
from fastmcp.sampling import AnthropicSamplingHandler# Define tools available during sampling
research_tools = [
{
"name": "search_web",
"description": "Search the web for information",
"inputSchema": {"type": "object", "properties": {"query": {"type": "string"}}}
},
{
"name": "fetch_url",
"description": "Fetch content from a URL",
"inputSchema": {"type": "object", "properties": {"url": {"type": "string"}}}
}
]
# Sample with tools - LLM can call these tools during reasoning
result = await context.sample(
messages=[{"role": "user", "content": f"Research: {topic}"}],
tools=research_tools,
max_tokens=4096
)
return {"research": result.content, "tools_used": result.tool_calls}
**Single-Step Sampling:**
```python
@mcp.tool()
async def get_single_response(prompt: str, context: Context) -> dict:
"""Get a single LLM response without tool loop."""
# sample_step() returns SampleStep for inspection
step = await context.sample_step(
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
return {
"content": step.content,
"model": step.model,
"stop_reason": step.stop_reason
}AnthropicSamplingHandlerOpenAISamplingHandlerctx.sample()# 定义采样过程中可用的工具
research_tools = [
{
"name": "search_web",
"description": "在网络上搜索信息",
"inputSchema": {"type": "object", "properties": {"query": {"type": "string"}}}
},
{
"name": "fetch_url",
"description": "获取URL中的内容",
"inputSchema": {"type": "object", "properties": {"url": {"type": "string"}}}
}
]
# 带工具的采样 - LLM可以在推理过程中调用这些工具
result = await context.sample(
messages=[{"role": "user", "content": f"研究主题:{topic}"}],
tools=research_tools,
max_tokens=4096
)
return {"研究结果": result.content, "使用的工具": result.tool_calls}
**单步采样:**
```python
@mcp.tool()
async def get_single_response(prompt: str, context: Context) -> dict:
"""获取单次LLM响应,无工具循环。"""
# sample_step()返回SampleStep对象供检查
step = await context.sample_step(
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
return {
"内容": step.content,
"模型": step.model,
"停止原因": step.stop_reason
}AnthropicSamplingHandlerOpenAISamplingHandlerctx.sample()py-key-value-aioFernetEncryptionWrapperfrom key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernetpy-key-value-aioFernetEncryptionWrapperfrom key_value.stores import DiskStore, RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
**Platform Defaults:** Mac/Windows use Disk, Linux uses Memory. Override with `storage` parameter.
**平台默认值:** Mac/Windows使用磁盘存储,Linux使用内存存储。可通过`storage`参数覆盖默认设置。from contextlib import asynccontextmanager
from dataclasses import dataclass
@dataclass
class AppContext:
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Runs ONCE per server instance."""
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(base_url=os.getenv("API_BASE_URL"), timeout=30.0)
try:
yield AppContext(db=db, api_client=api_client)
finally:
await db.disconnect()
await api_client.aclose()
mcp = FastMCP("Server", lifespan=app_lifespan)from contextlib import asynccontextmanager
from dataclasses import dataclass
@dataclass
class AppContext:
db: Database
api_client: httpx.AsyncClient
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""每个服务器实例仅执行一次。"""
db = await Database.connect(os.getenv("DATABASE_URL"))
api_client = httpx.AsyncClient(base_url=os.getenv("API_BASE_URL"), timeout=30.0)
try:
yield AppContext(db=db, api_client=api_client)
finally:
await db.disconnect()
await api_client.aclose()
mcp = FastMCP("Server", lifespan=app_lifespan)
**ASGI Integration (FastAPI/Starlette):**
```python
mcp = FastMCP("Server", lifespan=mcp_lifespan)
app = FastAPI(lifespan=mcp.lifespan) # ✅ MUST pass lifespan!context.fastmcp_context.set_state(key, value) # Store
context.fastmcp_context.get_state(key, default=None) # Retrieve
**ASGI集成(FastAPI/Starlette):**
```python
mcp = FastMCP("Server", lifespan=mcp_lifespan)
app = FastAPI(lifespan=mcp.lifespan) # ✅ 必须传递lifespan!context.fastmcp_context.set_state(key, value) # 存储
context.fastmcp_context.get_state(key, default=None) # 检索Request Flow:
→ ErrorHandlingMiddleware (catches errors)
→ TimingMiddleware (starts timer)
→ LoggingMiddleware (logs request)
→ RateLimitingMiddleware (checks rate limit)
→ ResponseCachingMiddleware (checks cache)
→ Tool/Resource Handlerfrom fastmcp.middleware import ErrorHandlingMiddleware, TimingMiddleware, LoggingMiddleware
mcp.add_middleware(ErrorHandlingMiddleware()) # First: catch errors
mcp.add_middleware(TimingMiddleware()) # Second: time requests
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(max_requests=100, window_seconds=60))
mcp.add_middleware(ResponseCachingMiddleware(ttl_seconds=300, storage=RedisStore()))from fastmcp.middleware import BaseMiddleware
class AccessControlMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"User not authorized")
return await self.next(tool_name, arguments, context)on_messageon_requeston_notificationon_call_toolon_read_resourceon_get_prompton_list_*请求流程:
→ ErrorHandlingMiddleware(捕获错误)
→ TimingMiddleware(启动计时)
→ LoggingMiddleware(记录请求)
→ RateLimitingMiddleware(检查限流)
→ ResponseCachingMiddleware(检查缓存)
→ 工具/资源处理器from fastmcp.middleware import ErrorHandlingMiddleware, TimingMiddleware, LoggingMiddleware
mcp.add_middleware(ErrorHandlingMiddleware()) # 第一个:捕获错误
mcp.add_middleware(TimingMiddleware()) # 第二个:计时请求
mcp.add_middleware(LoggingMiddleware(level="INFO"))
mcp.add_middleware(RateLimitingMiddleware(max_requests=100, window_seconds=60))
mcp.add_middleware(ResponseCachingMiddleware(ttl_seconds=300, storage=RedisStore()))from fastmcp.middleware import BaseMiddleware
class AccessControlMiddleware(BaseMiddleware):
async def on_call_tool(self, tool_name, arguments, context):
user = context.fastmcp_context.get_state("user_id")
if user not in self.allowed_users:
raise PermissionError(f"用户未授权")
return await self.next(tool_name, arguments, context)on_messageon_requeston_notificationon_call_toolon_read_resourceon_get_prompton_list_*import_server()mount()undefinedimport_server()mount()undefined
**Tag Filtering:**
```python
@api_server.tool(tags=["public"])
def public_api(): pass
main_server.import_server(api_server, include_tags=["public"]) # Only public
main_server.mount(api_server, prefix="api", exclude_tags=["admin"]) # No adminresource://prefix/pathprefix+resource://pathmain_server.mount(subserver, prefix="api", resource_prefix_format="path")
**标签过滤:**
```python
@api_server.tool(tags=["public"])
def public_api(): pass
main_server.import_server(api_server, include_tags=["public"]) # 仅导入公共工具
main_server.mount(api_server, prefix="api", exclude_tags=["admin"]) # 排除管理员工具resource://prefix/pathprefix+resource://pathmain_server.mount(subserver, prefix="api", resource_prefix_format="path")JWTVerifierRemoteAuthProviderOAuthProxyOAuthProviderfrom fastmcp.auth import JWTVerifier
auth = JWTVerifier(issuer="https://auth.example.com", audience="my-server",
public_key=os.getenv("JWT_PUBLIC_KEY"))
mcp = FastMCP("Server", auth=auth)from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
auth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD")),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
enable_consent_screen=True # CRITICAL: Prevents confused deputy attacks
)
mcp = FastMCP("GitHub Auth", auth=auth)from fastmcp.auth import SupabaseProvider
auth = SupabaseProvider(
auth_route="/custom-auth", # Custom auth route (new in v2.14.2)
# ... other config
)JWTVerifierRemoteAuthProviderOAuthProxyOAuthProviderfrom fastmcp.auth import JWTVerifier
auth = JWTVerifier(issuer="https://auth.example.com", audience="my-server",
public_key=os.getenv("JWT_PUBLIC_KEY"))
mcp = FastMCP("Server", auth=auth)from fastmcp.auth import OAuthProxy
from key_value.stores import RedisStore
from key_value.encryption import FernetEncryptionWrapper
from cryptography.fernet import Fernet
auth = OAuthProxy(
jwt_signing_key=os.environ["JWT_SIGNING_KEY"],
client_storage=FernetEncryptionWrapper(
key_value=RedisStore(host=os.getenv("REDIS_HOST"), password=os.getenv("REDIS_PASSWORD")),
fernet=Fernet(os.environ["STORAGE_ENCRYPTION_KEY"])
),
upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
upstream_token_endpoint="https://github.com/login/oauth/access_token",
upstream_client_id=os.getenv("GITHUB_CLIENT_ID"),
upstream_client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
enable_consent_screen=True # 关键:防止混淆代理攻击
)
mcp = FastMCP("GitHub Auth", auth=auth)from fastmcp.auth import SupabaseProvider
auth = SupabaseProvider(
auth_route="/custom-auth", # 自定义认证路由(v2.14.2新增)
# ... 其他配置
)Icon(url, size)Icon.from_file()Image.to_data_uri()httpx.AsyncClientFastMCP.from_openapi(spec, client, route_maps)FastMCP.from_fastapi(app, httpx_client_kwargs)mcpserverappundefinedIcon(url, size)Icon.from_file()Image.to_data_uri()httpx.AsyncClientFastMCP.from_openapi(spec, client, route_maps)FastMCP.from_fastapi(app, httpx_client_kwargs)mcpserverappundefined
**Deployment:** https://fastmcp.cloud → Sign in → Create Project → Select repo → Deploy
**Client Config (Claude Desktop):**
```json
{"mcpServers": {"my-server": {"url": "https://project.fastmcp.app/mcp", "transport": "http"}}}
**部署流程:** https://fastmcp.cloud → 登录 → 创建项目 → 选择仓库 → 部署
**客户端配置(Claude桌面端):**
```json
{"mcpServers": {"my-server": {"url": "https://project.fastmcp.app/mcp", "transport": "http"}}}RuntimeError: No server object found at module levelmcp = FastMCP("server")RuntimeError: No server object found at module levelmcp = FastMCP("server")RuntimeError: no running event loopTypeError: object coroutine can't be used in 'await'async defawaitdefRuntimeError: no running event loopTypeError: object coroutine can't be used in 'await'awaitasync defdefTypeError: missing 1 required positional argument: 'context'Contextasync def tool(context: Context)TypeError: missing 1 required positional argument: 'context'Contextasync def tool(context: Context)ValueError: Invalid resource URI: missing scheme@mcp.resource("data://config")@mcp.resource("config")ValueError: Invalid resource URI: missing scheme@mcp.resource("data://config")@mcp.resource("config")TypeError: get_user() missing 1 required positional argument@mcp.resource("user://{user_id}/profile")def get_user(user_id: str)TypeError: get_user() missing 1 required positional argument@mcp.resource("user://{user_id}/profile")def get_user(user_id: str)ValidationError: value is not a valid integerclass Params(BaseModel): query: str = Field(min_length=1)ValidationError: value is not a valid integerclass Params(BaseModel): query: str = Field(min_length=1)ConnectionError: Server using different transportmcp.run(){"command": "python", "args": ["server.py"]}mcp.run(transport="http", port=8000){"url": "http://localhost:8000/mcp", "transport": "http"}ConnectionError: Server using different transportmcp.run(){"command": "python", "args": ["server.py"]}mcp.run(transport="http", port=8000){"url": "http://localhost:8000/mcp", "transport": "http"}ModuleNotFoundError: No module named 'my_package'pip install -e .export PYTHONPATH="/path/to/project"ModuleNotFoundError: No module named 'my_package'pip install -e .export PYTHONPATH="/path/to/project"DeprecationWarning: 'mcp.settings' is deprecatedos.getenv("API_KEY")mcp.settings.get("API_KEY")DeprecationWarning: 'mcp.settings' is deprecatedos.getenv("API_KEY")mcp.settings.get("API_KEY")OSError: [Errno 48] Address already in use--port 8001lsof -ti:8000 | xargs kill -9OSError: [Errno 48] Address already in use--port 8001lsof -ti:8000 | xargs kill -9TypeError: Object of type 'ndarray' is not JSON serializablelist[float]{"values": np_array.tolist()}undefinedTypeError: Object of type 'ndarray' is not JSON serializablelist[float]{"values": np_array.tolist()}undefined
**OutputSchema $ref Resolution (Fixed in v2.14.2)**:
- Root-level `$ref` in `outputSchema` wasn't being dereferenced ([GitHub Issue #2720](https://github.com/jlowin/fastmcp/issues/2720))
- Caused MCP spec non-compliance and client compatibility issues
- **Solution**: Upgrade to fastmcp>=2.14.2 (auto-dereferences $ref)
**OutputSchema $ref解析(v2.14.2已修复):**
- `outputSchema`中的根级别`$ref`未被正确解析([GitHub Issue #2720](https://github.com/jlowin/fastmcp/issues/2720))
- 导致MCP规范不兼容和客户端兼容性问题
- **解决方案**:升级到fastmcp>=2.14.2(自动解析$ref)TypeError: Object of type 'datetime' is not JSON serializabledatetime.now().isoformat().decode('utf-8')TypeError: Object of type 'datetime' is not JSON serializabledatetime.now().isoformat().decode('utf-8')ImportError: cannot import name 'X' from partially initialized module__init__.pyfrom .api_client import APIClientImportError: cannot import name 'X' from partially initialized module__init__.pyfrom .api_client import APIClientDeprecationWarning: datetime.utcnow() is deprecateddatetime.now(timezone.utc)datetime.utcnow()DeprecationWarning: datetime.utcnow() is deprecateddatetime.now(timezone.utc)datetime.utcnow()RuntimeError: Event loop is closedconnect()RuntimeError: Event loop is closedconnect()RuntimeError: OAuth tokens lost on restartValueError: Cache not persistingFernetEncryptionWrapperRuntimeError: OAuth tokens lost on restartValueError: Cache not persistingFernetEncryptionWrapperRuntimeError: Database connection never initializedWarning: MCP lifespan hooks not runningapp = FastAPI(lifespan=mcp.lifespan)RuntimeError: Database connection never initializedWarning: MCP lifespan hooks not runningapp = FastAPI(lifespan=mcp.lifespan)RuntimeError: Rate limit not checked before cachingRuntimeError: Rate limit not checked before cachingRecursionError: maximum recursion depth exceededself.next()result = await self.next(tool_name, arguments, context)RecursionError: maximum recursion depth exceededself.next()result = await self.next(tool_name, arguments, context)RuntimeError: Subserver changes not reflectedValueError: Unexpected tool namespacingimport_server()mount()import_server()mount()RuntimeError: Subserver changes not reflectedValueError: Unexpected tool namespacingmount()import_server()import_server()mount()ValueError: Resource not found: resource://api/usersresource://prefix/pathprefix+resource://pathresource_prefix_format="path"ValueError: Resource not found: resource://api/usersresource://prefix/pathprefix+resource://pathresource_prefix_format="path"SecurityWarning: Authorization bypass possibleenable_consent_screen=TrueSecurityWarning: Authorization bypass possibleenable_consent_screen=TrueValueError: JWT signing key required for OAuth Proxyjwt_signing_keysecrets.token_urlsafe(32)FASTMCP_JWT_SIGNING_KEYOAuthProxy(jwt_signing_key=...)ValueError: JWT signing key required for OAuth Proxyjwt_signing_keysecrets.token_urlsafe(32)FASTMCP_JWT_SIGNING_KEYOAuthProxy(jwt_signing_key=...)ValueError: Invalid data URI formatIcon.from_file("/path/icon.png", size="medium")Image.to_data_uri()ValueError: Invalid data URI formatIcon.from_file("/path/icon.png", size="medium")Image.to_data_uri()Warning: Lifespan runs per-server, not per-sessionWarning: Lifespan runs per-server, not per-sessionImportError: cannot import name 'BearerAuthProvider' from 'fastmcp.auth'BearerAuthProviderJWTVerifierOAuthProxyundefinedImportError: cannot import name 'BearerAuthProvider' from 'fastmcp.auth'BearerAuthProviderJWTVerifierOAuthProxyundefinedundefinedundefinedAttributeError: 'Context' object has no attribute 'get_http_request'Context.get_http_request()InitializeResultAttributeError: 'Context' object has no attribute 'get_http_request'Context.get_http_request()InitializeResultImportError: cannot import name 'Image' from 'fastmcp'fastmcp.ImageundefinedImportError: cannot import name 'Image' from 'fastmcp'fastmcp.Imageundefinedundefinedundefined/mcp/mcp/mcp/mcp/undefined/mcp/mcp/mcp/mcp/undefined
**Critical**: Must also pass `lifespan=mcp.lifespan` to FastAPI (see Error #17).
**关键提示**:必须同时将`lifespan=mcp.lifespan`传递给FastAPI(参考错误#17)。RuntimeError: No active context foundtask=TrueundefinedRuntimeError: No active context foundtask=Trueundefined
**Note**: Related to Error #17 (Lifespan Not Passed to ASGI App).
---
**注意**:与错误#17(未将生命周期传递给ASGI应用)相关。
---utils.pyhttpx.AsyncClientget_client()retry_with_backoff(func, max_retries=3, initial_delay=1.0, exponential_base=2.0)TimeBasedCache(ttl=300).get().set()pytestcreate_test_client(test_server)await client.call_tool()Client("server.py")list_tools()call_tool()list_resources()fastmcp dev server.py # Run with inspector
fastmcp install server.py # Install to Claude Desktop
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev # Debug loggingserver.pyrequirements.txt.envREADME.mdsrc/tests/pyproject.tomlutils.pyhttpx.AsyncClientget_client()retry_with_backoff(func, max_retries=3, initial_delay=1.0, exponential_base=2.0)TimeBasedCache(ttl=300).get().set()pytestcreate_test_client(test_server)await client.call_tool()Client("server.py")list_tools()call_tool()list_resources()fastmcp dev server.py # 带检查器运行
fastmcp install server.py # 安装到Claude桌面端
FASTMCP_LOG_LEVEL=DEBUG fastmcp dev # 调试日志server.pyrequirements.txt.envREADME.mdsrc/tests/pyproject.toml/jlowin/fastmcpimport_server()mount()fastmcp devtask=Truectx.sample(tools=[...])/jlowin/fastmcpimport_server()mount()fastmcp devtask=Truectx.sample(tools=[...])