vercel-ai-sdk
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseVercel AI SDK (Python)
Vercel AI SDK (Python)
bash
uv add vercel-ai-sdkpython
import vercel_ai_sdk as aibash
uv add vercel-ai-sdkpython
import vercel_ai_sdk as aiCore workflow
核心工作流
ai.run(root, *args, checkpoint=None, cancel_on_hooks=False)RuntimerootMessagestream_stepexecute_toolai.run()The root function is any async function. If it declares a param typed , it's auto-injected.
ai.Runtimepython
@ai.tool
async def talk_to_mothership(question: str) -> str:
"""Contact the mothership for important decisions."""
return "Soon."
async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
return await ai.stream_loop(
llm,
messages=ai.make_messages(system="You are a robot assistant.", user=query),
tools=[talk_to_mothership],
)
llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6")
async for msg in ai.run(agent, llm, "When will the robots take over?"):
print(msg.text_delta, end="")@ai.toolToolruntime: ai.Runtimeai.stream_step(llm, messages, tools=None, label=None, output_type=None)StreamResult.text.tool_calls.output.usage.last_messageai.stream_loop(llm, messages, tools, label=None, output_type=None)StreamResultBoth are thin convenience wrappers (not magical -- they could be reimplemented by the user). is a -decorated function that calls . calls in a while loop with between iterations.
stream_step@ai.streamllm.stream()stream_loopstream_stepai.execute_tool()ai.execute_tool(tool_call, message=None)ai.run(root, *args, checkpoint=None, cancel_on_hooks=False)RuntimerootMessagestream_stepexecute_toolai.run()根函数可以是任意异步函数。如果它声明了类型为的参数,该参数会被自动注入。
ai.Runtimepython
@ai.tool
async def talk_to_mothership(question: str) -> str:
"""联系母舰以做出重要决策。"""
return "Soon."
async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
return await ai.stream_loop(
llm,
messages=ai.make_messages(system="You are a robot assistant.", user=query),
tools=[talk_to_mothership],
)
llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6")
async for msg in ai.run(agent, llm, "When will the robots take over?"):
print(msg.text_delta, end="")@ai.toolToolruntime: ai.Runtimeai.stream_step(llm, messages, tools=None, label=None, output_type=None).text.tool_calls.output.usage.last_messageStreamResultai.stream_loop(llm, messages, tools, label=None, output_type=None)StreamResult两者都是轻量级的便捷封装(没有魔法逻辑——用户可以自行重实现)。是一个被装饰的函数,用于调用。在循环中调用,并在迭代之间调用。
stream_step@ai.streamllm.stream()stream_loopstream_stepai.execute_tool()ai.execute_tool(tool_call, message=None)Multi-agent
多智能体
Use with labels to run agents in parallel:
asyncio.gatherpython
async def multi(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
r1, r2 = await asyncio.gather(
ai.stream_loop(llm, msgs1, tools=[t1], label="researcher"),
ai.stream_loop(llm, msgs2, tools=[t2], label="analyst"),
)
return await ai.stream_loop(
llm,
ai.make_messages(user=f"{r1.text}\n{r2.text}"),
tools=[],
label="summary",
)The field on messages lets the consumer distinguish which agent produced output (e.g. ).
labelmsg.label == "researcher"使用带标签的并行运行多个智能体:
asyncio.gatherpython
async def multi(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
r1, r2 = await asyncio.gather(
ai.stream_loop(llm, msgs1, tools=[t1], label="researcher"),
ai.stream_loop(llm, msgs2, tools=[t2], label="analyst"),
)
return await ai.stream_loop(
llm,
ai.make_messages(user=f"{r1.text}\n{r2.text}"),
tools=[],
label="summary",
)消息上的字段允许消费者区分哪个智能体生成了输出(例如)。
labelmsg.label == "researcher"Messages
消息
ai.make_messages(system=None, user=str)MessagerolepartsTextPart | ToolPart | ReasoningPart | HookPart | StructuredOutputPartlabelusagemsg.model_dump()ai.Message.model_validate(data)Key properties for consuming streamed output:
- -- current text chunk (use for live streaming display)
msg.text_delta - -- full accumulated text
msg.text - -- list of
msg.tool_callsobjectsToolPart - -- validated Pydantic instance (when using
msg.output)output_type - -- true when all parts finished streaming
msg.is_done - -- find a hook suspension part (for human-in-the-loop)
msg.get_hook_part()
ai.make_messages(system=None, user=str)MessagerolepartsTextPart | ToolPart | ReasoningPart | HookPart | StructuredOutputPartlabelusagemsg.model_dump()ai.Message.model_validate(data)用于处理流式输出的关键属性:
- —— 当前文本块(用于实时流式显示)
msg.text_delta - —— 累积的完整文本
msg.text - ——
msg.tool_calls对象列表ToolPart - —— 经过验证的Pydantic实例(使用
msg.output时)output_type - —— 所有部分完成流式传输时为true
msg.is_done - —— 查找钩子挂起部分(用于人机协作场景)
msg.get_hook_part()
Customization
自定义配置
Custom loop
自定义循环
When doesn't fit (conditional tool execution, approval gates, custom routing), use in a manual loop:
stream_loopstream_steppython
async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
messages = ai.make_messages(system="...", user=query)
tools = [get_weather, get_population]
while True:
result = await ai.stream_step(llm, messages, tools)
if not result.tool_calls:
return result
messages.append(result.last_message)
await asyncio.gather(*(ai.execute_tool(tc, message=result.last_message) for tc in result.tool_calls))当不满足需求时(如条件工具执行、审批闸门、自定义路由),可在手动循环中使用:
stream_loopstream_steppython
async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
messages = ai.make_messages(system="...", user=query)
tools = [get_weather, get_population]
while True:
result = await ai.stream_step(llm, messages, tools)
if not result.tool_calls:
return result
messages.append(result.last_message)
await asyncio.gather(*(ai.execute_tool(tc, message=result.last_message) for tc in result.tool_calls))Custom stream
自定义流
@ai.streamMessageai.run()llm.stream()python
@ai.stream
async def custom_step(llm: ai.LanguageModel, messages: list[ai.Message]) -> AsyncGenerator[ai.Message]:
async for msg in llm.stream(messages=messages, tools=[...]):
msg.label = "custom"
yield msg
result = await custom_step(llm, messages) # returns StreamResultTools can also stream intermediate progress via :
runtime.put_message()python
@ai.tool
async def long_task(input: str, runtime: ai.Runtime) -> str:
"""Streams progress back to the caller."""
for step in ["Connecting...", "Processing..."]:
await runtime.put_message(
ai.Message(role="assistant", parts=[ai.TextPart(text=step, state="streaming")], label="progress")
)
return "final result"@ai.streamMessageai.run()llm.stream()python
@ai.stream
async def custom_step(llm: ai.LanguageModel, messages: list[ai.Message]) -> AsyncGenerator[ai.Message]:
async for msg in llm.stream(messages=messages, tools=[...]):
msg.label = "custom"
yield msg
result = await custom_step(llm, messages) # 返回StreamResult工具也可以通过流式传输中间进度:
runtime.put_message()python
@ai.tool
async def long_task(input: str, runtime: ai.Runtime) -> str:
"""将进度流式传输回调用方。"""
for step in ["Connecting...", "Processing..."]:
await runtime.put_message(
ai.Message(role="assistant", parts=[ai.TextPart(text=step, state="streaming")], label="progress")
)
return "final result"Hooks
钩子
Hooks are typed suspension points for human-in-the-loop. Decorate a Pydantic model to define the resolution schema:
python
@ai.hook
class Approval(pydantic.BaseModel):
granted: bool
reason: strInside agent code -- blocks until resolved:
python
approval = await Approval.create("approve_send_email", metadata={"tool": "send_email"})
if approval.granted:
await ai.execute_tool(tc, message=result.last_message)
else:
tc.set_error(f"Rejected: {approval.reason}")From outside (API handler, iterator loop):
python
Approval.resolve("approve_send_email", {"granted": True, "reason": "User approved"})
Approval.cancel("approve_send_email")Long-running mode (, default): blocks until or is called externally. Use for websocket/interactive UIs.
cancel_on_hooks=Falsecreate()resolve()cancel()Serverless mode (): unresolved hooks are cancelled, the run ends. Inspect and to resume later.
cancel_on_hooks=Trueresult.pending_hooksresult.checkpointConsuming hooks in the iterator:
python
async for msg in ai.run(agent, llm, query):
if (hook := msg.get_hook_part()) and hook.status == "pending":
answer = input(f"Approve {hook.hook_id}? [y/n] ")
Approval.resolve(hook.hook_id, {"granted": answer == "y", "reason": "operator"})
continue
print(msg.text_delta, end="")钩子是用于人机协作的类型化挂起点。装饰Pydantic模型以定义解析Schema:
python
@ai.hook
class Approval(pydantic.BaseModel):
granted: bool
reason: str在智能体代码中——会阻塞直到解析完成:
python
approval = await Approval.create("approve_send_email", metadata={"tool": "send_email"})
if approval.granted:
await ai.execute_tool(tc, message=result.last_message)
else:
tc.set_error(f"Rejected: {approval.reason}")从外部(API处理程序、迭代器循环):
python
Approval.resolve("approve_send_email", {"granted": True, "reason": "User approved"})
Approval.cancel("approve_send_email")长运行模式(,默认值):会阻塞,直到外部调用或。用于WebSocket/交互式UI场景。
cancel_on_hooks=Falsecreate()resolve()cancel()Serverless模式():未解析的钩子会被取消,运行结束。检查和以便稍后恢复。
cancel_on_hooks=Trueresult.pending_hooksresult.checkpoint在迭代器中处理钩子:
python
async for msg in ai.run(agent, llm, query):
if (hook := msg.get_hook_part()) and hook.status == "pending":
answer = input(f"Approve {hook.hook_id}? [y/n] ")
Approval.resolve(hook.hook_id, {"granted": answer == "y", "reason": "operator"})
continue
print(msg.text_delta, end="")Checkpoints
检查点
Checkpointpython
data = result.checkpoint.model_dump() # serialize (JSON-safe dict)
checkpoint = ai.Checkpoint.model_validate(data) # restore
result = ai.run(agent, llm, query, checkpoint=checkpoint) # replay completed workPrimary use case is serverless hook re-entry.
Checkpointpython
data = result.checkpoint.model_dump() # 序列化(JSON安全的字典)
checkpoint = ai.Checkpoint.model_validate(data) # 恢复
result = ai.run(agent, llm, query, checkpoint=checkpoint) # 重放已完成的工作主要用于Serverless钩子重入场景。
Adapters
适配器
Providers
提供商
python
undefinedpython
undefinedVercel AI Gateway (recommended)
Vercel AI Gateway(推荐)
Uses AI_GATEWAY_API_KEY env var
使用AI_GATEWAY_API_KEY环境变量
llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6", thinking=True, budget_tokens=10000)
llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6", thinking=True, budget_tokens=10000)
Direct
直接连接
llm = ai.openai.OpenAIModel(model="gpt-5")
llm = ai.anthropic.AnthropicModel(model="claude-opus-4-6", thinking=True, budget_tokens=10000)
All implement `LanguageModel` with `stream()` (async generator of `Message`) and `buffer()` (returns final `Message`). Gateway routes Anthropic models through the native Anthropic API for full feature support, others through OpenAI-compatible endpoint.llm = ai.openai.OpenAIModel(model="gpt-5")
llm = ai.anthropic.AnthropicModel(model="claude-opus-4-6", thinking=True, budget_tokens=10000)
所有提供商都实现了`LanguageModel`,包含`stream()`(`Message`的异步生成器)和`buffer()`(返回最终`Message`)。Gateway会将Anthropic模型通过原生Anthropic API路由以支持全功能,其他模型通过兼容OpenAI的端点路由。AI SDK UI
AI SDK UI
For streaming to AI SDK frontend (, etc.):
useChatpython
from vercel_ai_sdk.ai_sdk_ui import to_sse_stream, to_messages, UI_MESSAGE_STREAM_HEADERS
messages = to_messages(request.messages)
return StreamingResponse(to_sse_stream(ai.run(agent, llm, query)), headers=UI_MESSAGE_STREAM_HEADERS)用于向AI SDK前端(等)流式传输数据:
useChatpython
from vercel_ai_sdk.ai_sdk_ui import to_sse_stream, to_messages, UI_MESSAGE_STREAM_HEADERS
messages = to_messages(request.messages)
return StreamingResponse(to_sse_stream(ai.run(agent, llm, query)), headers=UI_MESSAGE_STREAM_HEADERS)Other features
其他功能
Structured output
结构化输出
Pass a Pydantic model as :
output_typepython
class Forecast(pydantic.BaseModel):
city: str
temperature: float
result = await ai.stream_step(llm, messages, output_type=Forecast)
result.output.city # validated Pydantic instance将Pydantic模型作为传入:
output_typepython
class Forecast(pydantic.BaseModel):
city: str
temperature: float
result = await ai.stream_step(llm, messages, output_type=Forecast)
result.output.city # 经过验证的Pydantic实例Also works directly on the model:
也可以直接在模型上使用:
msg = await llm.buffer(messages, output_type=Forecast)
undefinedmsg = await llm.buffer(messages, output_type=Forecast)
undefinedMCP
MCP
python
tools = await ai.mcp.get_http_tools("https://mcp.example.com/mcp", headers={...}, tool_prefix="docs")
tools = await ai.mcp.get_stdio_tools("npx", "-y", "@anthropic/mcp-server-filesystem", "/tmp", tool_prefix="fs")Returns objects usable in /. Connections are pooled per and cleaned up automatically.
Toolstream_stepstream_loopai.run()python
tools = await ai.mcp.get_http_tools("https://mcp.example.com/mcp", headers={...}, tool_prefix="docs")
tools = await ai.mcp.get_stdio_tools("npx", "-y", "@anthropic/mcp-server-filesystem", "/tmp", tool_prefix="fs")返回可在/中使用的对象。连接会在每个中被池化,并自动清理。
stream_stepstream_loopToolai.run()Telemetry
遥测
python
ai.telemetry.enable() # OTel-based, emits gen_ai.* spans for runs/steps/toolspython
ai.telemetry.enable() # 基于OTel,为运行/步骤/工具生成gen_ai.*跨度