vercel-ai-sdk

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Vercel AI SDK (Python)

Vercel AI SDK (Python)

bash
uv add vercel-ai-sdk
python
import vercel_ai_sdk as ai
bash
uv add vercel-ai-sdk
python
import vercel_ai_sdk as ai

Core workflow

核心工作流

ai.run(root, *args, checkpoint=None, cancel_on_hooks=False)
is the entry point. It creates a
Runtime
(stored in a context var), starts
root
as a background task, processes an internal step queue, and yields
Message
objects. All SDK functions (
stream_step
,
execute_tool
, hooks) require this Runtime context -- they must be called within
ai.run()
.
The root function is any async function. If it declares a param typed
ai.Runtime
, it's auto-injected.
python
@ai.tool
async def talk_to_mothership(question: str) -> str:
    """Contact the mothership for important decisions."""
    return "Soon."

async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
    return await ai.stream_loop(
        llm,
        messages=ai.make_messages(system="You are a robot assistant.", user=query),
        tools=[talk_to_mothership],
    )

llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6")
async for msg in ai.run(agent, llm, "When will the robots take over?"):
    print(msg.text_delta, end="")
@ai.tool
turns an async function into a
Tool
. Schema is extracted from type hints + docstring. If a tool declares
runtime: ai.Runtime
, it's auto-injected (excluded from LLM schema). Tools are registered globally by name.
ai.stream_step(llm, messages, tools=None, label=None, output_type=None)
-- single LLM call. Returns
StreamResult
with
.text
,
.tool_calls
,
.output
,
.usage
,
.last_message
.
ai.stream_loop(llm, messages, tools, label=None, output_type=None)
-- agent loop: calls LLM → executes tools → repeats until no tool calls. Returns final
StreamResult
.
Both are thin convenience wrappers (not magical -- they could be reimplemented by the user).
stream_step
is a
@ai.stream
-decorated function that calls
llm.stream()
.
stream_loop
calls
stream_step
in a while loop with
ai.execute_tool()
between iterations.
ai.execute_tool(tool_call, message=None)
runs a tool call by name from the global registry. Handles malformed JSON / invalid args gracefully -- reports as a tool error so the LLM can retry rather than crashing.
ai.run(root, *args, checkpoint=None, cancel_on_hooks=False)
是入口函数。它创建一个
Runtime
(存储在上下文变量中),将
root
作为后台任务启动,处理内部步骤队列,并生成
Message
对象。所有SDK函数(
stream_step
execute_tool
、钩子)都需要这个Runtime上下文——必须在
ai.run()
内部调用。
根函数可以是任意异步函数。如果它声明了类型为
ai.Runtime
的参数,该参数会被自动注入。
python
@ai.tool
async def talk_to_mothership(question: str) -> str:
    """联系母舰以做出重要决策。"""
    return "Soon."

async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
    return await ai.stream_loop(
        llm,
        messages=ai.make_messages(system="You are a robot assistant.", user=query),
        tools=[talk_to_mothership],
    )

llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6")
async for msg in ai.run(agent, llm, "When will the robots take over?"):
    print(msg.text_delta, end="")
@ai.tool
将异步函数转换为
Tool
。从类型提示和文档字符串中提取 Schema。如果工具声明了
runtime: ai.Runtime
,该参数会被自动注入(不包含在LLM的Schema中)。工具会按名称全局注册。
ai.stream_step(llm, messages, tools=None, label=None, output_type=None)
——单次LLM调用。返回包含
.text
.tool_calls
.output
.usage
.last_message
StreamResult
ai.stream_loop(llm, messages, tools, label=None, output_type=None)
——智能体循环:调用LLM → 执行工具 → 重复直到没有工具调用。返回最终的
StreamResult
两者都是轻量级的便捷封装(没有魔法逻辑——用户可以自行重实现)。
stream_step
是一个被
@ai.stream
装饰的函数,用于调用
llm.stream()
stream_loop
在循环中调用
stream_step
,并在迭代之间调用
ai.execute_tool()
ai.execute_tool(tool_call, message=None)
通过名称从全局注册表中运行工具调用。优雅处理格式错误的JSON/无效参数——将其报告为工具错误,以便LLM可以重试而不是崩溃。

Multi-agent

多智能体

Use
asyncio.gather
with labels to run agents in parallel:
python
async def multi(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
    r1, r2 = await asyncio.gather(
        ai.stream_loop(llm, msgs1, tools=[t1], label="researcher"),
        ai.stream_loop(llm, msgs2, tools=[t2], label="analyst"),
    )
    return await ai.stream_loop(
        llm,
        ai.make_messages(user=f"{r1.text}\n{r2.text}"),
        tools=[],
        label="summary",
    )
The
label
field on messages lets the consumer distinguish which agent produced output (e.g.
msg.label == "researcher"
).
使用带标签的
asyncio.gather
并行运行多个智能体:
python
async def multi(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
    r1, r2 = await asyncio.gather(
        ai.stream_loop(llm, msgs1, tools=[t1], label="researcher"),
        ai.stream_loop(llm, msgs2, tools=[t2], label="analyst"),
    )
    return await ai.stream_loop(
        llm,
        ai.make_messages(user=f"{r1.text}\n{r2.text}"),
        tools=[],
        label="summary",
    )
消息上的
label
字段允许消费者区分哪个智能体生成了输出(例如
msg.label == "researcher"
)。

Messages

消息

ai.make_messages(system=None, user=str)
builds a message list.
Message
is a Pydantic model with
role
,
parts
(list of
TextPart | ToolPart | ReasoningPart | HookPart | StructuredOutputPart
),
label
, and
usage
. Serialize with
msg.model_dump()
, restore with
ai.Message.model_validate(data)
.
Key properties for consuming streamed output:
  • msg.text_delta
    -- current text chunk (use for live streaming display)
  • msg.text
    -- full accumulated text
  • msg.tool_calls
    -- list of
    ToolPart
    objects
  • msg.output
    -- validated Pydantic instance (when using
    output_type
    )
  • msg.is_done
    -- true when all parts finished streaming
  • msg.get_hook_part()
    -- find a hook suspension part (for human-in-the-loop)
ai.make_messages(system=None, user=str)
用于构建消息列表。
Message
是一个Pydantic模型,包含
role
parts
TextPart | ToolPart | ReasoningPart | HookPart | StructuredOutputPart
的列表)、
label
usage
。使用
msg.model_dump()
序列化,使用
ai.Message.model_validate(data)
恢复。
用于处理流式输出的关键属性:
  • msg.text_delta
    —— 当前文本块(用于实时流式显示)
  • msg.text
    —— 累积的完整文本
  • msg.tool_calls
    ——
    ToolPart
    对象列表
  • msg.output
    —— 经过验证的Pydantic实例(使用
    output_type
    时)
  • msg.is_done
    —— 所有部分完成流式传输时为true
  • msg.get_hook_part()
    —— 查找钩子挂起部分(用于人机协作场景)

Customization

自定义配置

Custom loop

自定义循环

When
stream_loop
doesn't fit (conditional tool execution, approval gates, custom routing), use
stream_step
in a manual loop:
python
async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
    messages = ai.make_messages(system="...", user=query)
    tools = [get_weather, get_population]

    while True:
        result = await ai.stream_step(llm, messages, tools)
        if not result.tool_calls:
            return result
        messages.append(result.last_message)
        await asyncio.gather(*(ai.execute_tool(tc, message=result.last_message) for tc in result.tool_calls))
stream_loop
不满足需求时(如条件工具执行、审批闸门、自定义路由),可在手动循环中使用
stream_step
python
async def agent(llm: ai.LanguageModel, query: str) -> ai.StreamResult:
    messages = ai.make_messages(system="...", user=query)
    tools = [get_weather, get_population]

    while True:
        result = await ai.stream_step(llm, messages, tools)
        if not result.tool_calls:
            return result
        messages.append(result.last_message)
        await asyncio.gather(*(ai.execute_tool(tc, message=result.last_message) for tc in result.tool_calls))

Custom stream

自定义流

@ai.stream
wires an async generator (yielding
Message
) into the Runtime's step queue. This is what makes streaming visible to
ai.run()
and enables checkpoint replay -- calling
llm.stream()
directly would bypass both.
python
@ai.stream
async def custom_step(llm: ai.LanguageModel, messages: list[ai.Message]) -> AsyncGenerator[ai.Message]:
    async for msg in llm.stream(messages=messages, tools=[...]):
        msg.label = "custom"
        yield msg

result = await custom_step(llm, messages)  # returns StreamResult
Tools can also stream intermediate progress via
runtime.put_message()
:
python
@ai.tool
async def long_task(input: str, runtime: ai.Runtime) -> str:
    """Streams progress back to the caller."""
    for step in ["Connecting...", "Processing..."]:
        await runtime.put_message(
            ai.Message(role="assistant", parts=[ai.TextPart(text=step, state="streaming")], label="progress")
        )
    return "final result"
@ai.stream
将异步生成器(生成
Message
)连接到Runtime的步骤队列。这使得流式传输对
ai.run()
可见,并支持检查点重放——直接调用
llm.stream()
会绕过这两个功能。
python
@ai.stream
async def custom_step(llm: ai.LanguageModel, messages: list[ai.Message]) -> AsyncGenerator[ai.Message]:
    async for msg in llm.stream(messages=messages, tools=[...]):
        msg.label = "custom"
        yield msg

result = await custom_step(llm, messages)  # 返回StreamResult
工具也可以通过
runtime.put_message()
流式传输中间进度:
python
@ai.tool
async def long_task(input: str, runtime: ai.Runtime) -> str:
    """将进度流式传输回调用方。"""
    for step in ["Connecting...", "Processing..."]:
        await runtime.put_message(
            ai.Message(role="assistant", parts=[ai.TextPart(text=step, state="streaming")], label="progress")
        )
    return "final result"

Hooks

钩子

Hooks are typed suspension points for human-in-the-loop. Decorate a Pydantic model to define the resolution schema:
python
@ai.hook
class Approval(pydantic.BaseModel):
    granted: bool
    reason: str
Inside agent code -- blocks until resolved:
python
approval = await Approval.create("approve_send_email", metadata={"tool": "send_email"})
if approval.granted:
    await ai.execute_tool(tc, message=result.last_message)
else:
    tc.set_error(f"Rejected: {approval.reason}")
From outside (API handler, iterator loop):
python
Approval.resolve("approve_send_email", {"granted": True, "reason": "User approved"})
Approval.cancel("approve_send_email")
Long-running mode (
cancel_on_hooks=False
, default):
create()
blocks until
resolve()
or
cancel()
is called externally. Use for websocket/interactive UIs.
Serverless mode (
cancel_on_hooks=True
): unresolved hooks are cancelled, the run ends. Inspect
result.pending_hooks
and
result.checkpoint
to resume later.
Consuming hooks in the iterator:
python
async for msg in ai.run(agent, llm, query):
    if (hook := msg.get_hook_part()) and hook.status == "pending":
        answer = input(f"Approve {hook.hook_id}? [y/n] ")
        Approval.resolve(hook.hook_id, {"granted": answer == "y", "reason": "operator"})
        continue
    print(msg.text_delta, end="")
钩子是用于人机协作的类型化挂起点。装饰Pydantic模型以定义解析Schema:
python
@ai.hook
class Approval(pydantic.BaseModel):
    granted: bool
    reason: str
在智能体代码中——会阻塞直到解析完成:
python
approval = await Approval.create("approve_send_email", metadata={"tool": "send_email"})
if approval.granted:
    await ai.execute_tool(tc, message=result.last_message)
else:
    tc.set_error(f"Rejected: {approval.reason}")
从外部(API处理程序、迭代器循环):
python
Approval.resolve("approve_send_email", {"granted": True, "reason": "User approved"})
Approval.cancel("approve_send_email")
长运行模式
cancel_on_hooks=False
,默认值):
create()
会阻塞,直到外部调用
resolve()
cancel()
。用于WebSocket/交互式UI场景。
Serverless模式
cancel_on_hooks=True
):未解析的钩子会被取消,运行结束。检查
result.pending_hooks
result.checkpoint
以便稍后恢复。
在迭代器中处理钩子:
python
async for msg in ai.run(agent, llm, query):
    if (hook := msg.get_hook_part()) and hook.status == "pending":
        answer = input(f"Approve {hook.hook_id}? [y/n] ")
        Approval.resolve(hook.hook_id, {"granted": answer == "y", "reason": "operator"})
        continue
    print(msg.text_delta, end="")

Checkpoints

检查点

Checkpoint
records completed steps (LLM calls), tool executions, and hook resolutions. On replay, cached results are returned without re-executing.
python
data = result.checkpoint.model_dump()  # serialize (JSON-safe dict)
checkpoint = ai.Checkpoint.model_validate(data)  # restore
result = ai.run(agent, llm, query, checkpoint=checkpoint)  # replay completed work
Primary use case is serverless hook re-entry.
Checkpoint
记录已完成的步骤(LLM调用)、工具执行和钩子解析结果。重放时,会返回缓存结果而不重新执行。
python
data = result.checkpoint.model_dump()  # 序列化(JSON安全的字典)
checkpoint = ai.Checkpoint.model_validate(data)  # 恢复
result = ai.run(agent, llm, query, checkpoint=checkpoint)  # 重放已完成的工作
主要用于Serverless钩子重入场景。

Adapters

适配器

Providers

提供商

python
undefined
python
undefined

Vercel AI Gateway (recommended)

Vercel AI Gateway(推荐)

Uses AI_GATEWAY_API_KEY env var

使用AI_GATEWAY_API_KEY环境变量

llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6", thinking=True, budget_tokens=10000)
llm = ai.ai_gateway.GatewayModel(model="anthropic/claude-opus-4.6", thinking=True, budget_tokens=10000)

Direct

直接连接

llm = ai.openai.OpenAIModel(model="gpt-5") llm = ai.anthropic.AnthropicModel(model="claude-opus-4-6", thinking=True, budget_tokens=10000)

All implement `LanguageModel` with `stream()` (async generator of `Message`) and `buffer()` (returns final `Message`). Gateway routes Anthropic models through the native Anthropic API for full feature support, others through OpenAI-compatible endpoint.
llm = ai.openai.OpenAIModel(model="gpt-5") llm = ai.anthropic.AnthropicModel(model="claude-opus-4-6", thinking=True, budget_tokens=10000)

所有提供商都实现了`LanguageModel`,包含`stream()`(`Message`的异步生成器)和`buffer()`(返回最终`Message`)。Gateway会将Anthropic模型通过原生Anthropic API路由以支持全功能,其他模型通过兼容OpenAI的端点路由。

AI SDK UI

AI SDK UI

For streaming to AI SDK frontend (
useChat
, etc.):
python
from vercel_ai_sdk.ai_sdk_ui import to_sse_stream, to_messages, UI_MESSAGE_STREAM_HEADERS

messages = to_messages(request.messages)
return StreamingResponse(to_sse_stream(ai.run(agent, llm, query)), headers=UI_MESSAGE_STREAM_HEADERS)
用于向AI SDK前端(
useChat
等)流式传输数据:
python
from vercel_ai_sdk.ai_sdk_ui import to_sse_stream, to_messages, UI_MESSAGE_STREAM_HEADERS

messages = to_messages(request.messages)
return StreamingResponse(to_sse_stream(ai.run(agent, llm, query)), headers=UI_MESSAGE_STREAM_HEADERS)

Other features

其他功能

Structured output

结构化输出

Pass a Pydantic model as
output_type
:
python
class Forecast(pydantic.BaseModel):
    city: str
    temperature: float

result = await ai.stream_step(llm, messages, output_type=Forecast)
result.output.city  # validated Pydantic instance
将Pydantic模型作为
output_type
传入:
python
class Forecast(pydantic.BaseModel):
    city: str
    temperature: float

result = await ai.stream_step(llm, messages, output_type=Forecast)
result.output.city  # 经过验证的Pydantic实例

Also works directly on the model:

也可以直接在模型上使用:

msg = await llm.buffer(messages, output_type=Forecast)
undefined
msg = await llm.buffer(messages, output_type=Forecast)
undefined

MCP

MCP

python
tools = await ai.mcp.get_http_tools("https://mcp.example.com/mcp", headers={...}, tool_prefix="docs")
tools = await ai.mcp.get_stdio_tools("npx", "-y", "@anthropic/mcp-server-filesystem", "/tmp", tool_prefix="fs")
Returns
Tool
objects usable in
stream_step
/
stream_loop
. Connections are pooled per
ai.run()
and cleaned up automatically.
python
tools = await ai.mcp.get_http_tools("https://mcp.example.com/mcp", headers={...}, tool_prefix="docs")
tools = await ai.mcp.get_stdio_tools("npx", "-y", "@anthropic/mcp-server-filesystem", "/tmp", tool_prefix="fs")
返回可在
stream_step
/
stream_loop
中使用的
Tool
对象。连接会在每个
ai.run()
中被池化,并自动清理。

Telemetry

遥测

python
ai.telemetry.enable()  # OTel-based, emits gen_ai.* spans for runs/steps/tools
python
ai.telemetry.enable()  # 基于OTel,为运行/步骤/工具生成gen_ai.*跨度