a2a-protocol
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseA2A Protocol Implementation Guide
A2A协议实现指南
This skill provides comprehensive knowledge for building, deploying, and interacting with agents using the Agent2Agent (A2A) Protocol v0.3.0.
Reference: https://a2a-protocol.org/latest/definitions/
本指南提供了使用Agent2Agent(A2A)协议v0.3.0构建、部署和交互Agent的全面知识。
Protocol Overview
协议概述
A2A is a standard protocol enabling AI agents to communicate and collaborate. It operates across three layers:
| Layer | Description |
|---|---|
| Data Model | Core structures (Task, Message, AgentCard, Part, Artifact) |
| Abstract Operations | Protocol-agnostic capabilities (SendMessage, GetTask, etc.) |
| Protocol Bindings | Concrete implementations (JSON-RPC 2.0, gRPC, HTTP/REST) |
A2A是一款支持AI Agent通信与协作的标准协议,它分为三个层级运作:
| 层级 | 描述 |
|---|---|
| 数据模型层 | 核心结构(Task、Message、AgentCard、Part、Artifact) |
| 抽象操作层 | 与协议无关的能力(SendMessage、GetTask等) |
| 协议绑定层 | 具体实现方式(JSON-RPC 2.0、gRPC、HTTP/REST) |
Core Data Structures
核心数据结构
1. AgentCard
1. AgentCard
Self-describing manifest hosted at :
/.well-known/agent-card.jsonjson
{
"name": "my_agent",
"description": "Agent description",
"url": "http://localhost:8080/",
"version": "1.0.0",
"protocolVersion": "0.3.0",
"defaultInputModes": ["text"],
"defaultOutputModes": ["text"],
"capabilities": {
"streaming": true,
"pushNotifications": false,
"extendedAgentCard": false
},
"skills": [
{
"id": "skill_id",
"name": "Skill Name",
"description": "What this skill does",
"examples": ["Example query 1", "Example query 2"],
"tags": []
}
],
"securitySchemes": {},
"security": []
}托管在的自描述清单:
/.well-known/agent-card.jsonjson
{
"name": "my_agent",
"description": "Agent description",
"url": "http://localhost:8080/",
"version": "1.0.0",
"protocolVersion": "0.3.0",
"defaultInputModes": ["text"],
"defaultOutputModes": ["text"],
"capabilities": {
"streaming": true,
"pushNotifications": false,
"extendedAgentCard": false
},
"skills": [
{
"id": "skill_id",
"name": "Skill Name",
"description": "What this skill does",
"examples": ["Example query 1", "Example query 2"],
"tags": []
}
],
"securitySchemes": {},
"security": []
}2. Task
2. Task
The core unit of work with lifecycle management:
json
{
"id": "task_123",
"contextId": "context_456",
"status": {
"state": "working",
"timestamp": "2024-01-01T00:00:00Z"
},
"artifacts": [],
"history": [],
"metadata": {}
}Task States:
| State | Description |
|---|---|
| Task received, not yet processing |
| Active processing |
| Awaiting client response |
| Awaiting authentication |
| Successfully finished |
| Terminated with error |
| Client-requested cancellation |
| Agent refused processing |
具备生命周期管理的核心工作单元:
json
{
"id": "task_123",
"contextId": "context_456",
"status": {
"state": "working",
"timestamp": "2024-01-01T00:00:00Z"
},
"artifacts": [],
"history": [],
"metadata": {}
}Task状态:
| 状态 | 描述 |
|---|---|
| 任务已接收,尚未开始处理 |
| 任务正在处理中 |
| 等待客户端响应 |
| 等待身份验证 |
| 任务已成功完成 |
| 任务执行失败并终止 |
| 客户端请求取消任务 |
| Agent拒绝处理任务 |
3. Message
3. Message
Communication unit between client and server:
json
{
"messageId": "msg_789",
"role": "user",
"parts": [
{
"type": "text",
"text": "Hello, agent!"
}
],
"contextId": "context_456",
"taskId": "task_123",
"metadata": {}
}Roles: |
useragent客户端与服务器之间的通信单元:
json
{
"messageId": "msg_789",
"role": "user",
"parts": [
{
"type": "text",
"text": "Hello, agent!"
}
],
"contextId": "context_456",
"taskId": "task_123",
"metadata": {}
}角色: |
useragent4. Part
4. Part
Content container supporting multiple types:
| Type | Structure | Description |
|---|---|---|
| Text | | Plain text, markdown, HTML |
| File | | File reference |
| Data | | Structured JSON |
支持多种类型的内容容器:
| 类型 | 结构 | 描述 |
|---|---|---|
| 文本 | | 纯文本、Markdown、HTML |
| 文件 | | 文件引用 |
| 数据 | | 结构化JSON数据 |
5. Artifact
5. Artifact
Task output representation:
json
{
"id": "artifact_001",
"name": "Result",
"description": "Calculation result",
"parts": [
{"type": "text", "text": "42"}
],
"metadata": {}
}任务输出的表现形式:
json
{
"id": "artifact_001",
"name": "Result",
"description": "Calculation result",
"parts": [
{"type": "text", "text": "42"}
],
"metadata": {}
}JSON-RPC 2.0 Operations
JSON-RPC 2.0操作
Method List
方法列表
| Method | Description |
|---|---|
| Send message, returns Task or Message |
| Streaming variant with SSE |
| Retrieve task state by ID |
| List tasks with filtering/pagination |
| Cancel an active task |
| Stream updates for existing task |
| Create webhook config |
| Get webhook config |
| List webhook configs |
| Delete webhook config |
| Get authenticated agent card |
| 方法 | 描述 |
|---|---|
| 发送消息,返回Task或Message |
| 基于SSE的流式传输变体 |
| 根据ID获取任务状态 |
| 列出任务,支持过滤与分页 |
| 取消活跃任务 |
| 流式获取现有任务的更新 |
| 创建Webhook配置 |
| 获取Webhook配置 |
| 列出Webhook配置 |
| 删除Webhook配置 |
| 获取经过身份验证的Agent卡片 |
Request Format
请求格式
json
{
"jsonrpc": "2.0",
"id": "request_id",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Hello"}],
"messageId": "msg_001"
}
}
}json
{
"jsonrpc": "2.0",
"id": "request_id",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Hello"}],
"messageId": "msg_001"
}
}
}Response Format
响应格式
Success:
json
{
"jsonrpc": "2.0",
"id": "request_id",
"result": {
"task": { ... }
}
}Error:
json
{
"jsonrpc": "2.0",
"id": "request_id",
"error": {
"code": -32000,
"message": "Task not found",
"data": { "taskId": "invalid_id" }
}
}成功响应:
json
{
"jsonrpc": "2.0",
"id": "request_id",
"result": {
"task": { ... }
}
}错误响应:
json
{
"jsonrpc": "2.0",
"id": "request_id",
"error": {
"code": -32000,
"message": "Task not found",
"data": { "taskId": "invalid_id" }
}
}Error Codes
错误码
| Code | Name | Description |
|---|---|---|
| Parse Error | Invalid JSON |
| Invalid Request | Invalid JSON-RPC structure |
| Method Not Found | Unknown method |
| Invalid Params | Invalid method parameters |
| Internal Error | Server error |
| TaskNotFoundError | Task does not exist |
| PushNotificationNotSupportedError | Webhooks not supported |
| UnsupportedOperationError | Feature not available |
| ContentTypeNotSupportedError | Unsupported media type |
| VersionNotSupportedError | Protocol version mismatch |
| 代码 | 名称 | 描述 |
|---|---|---|
| Parse Error | JSON格式无效 |
| Invalid Request | JSON-RPC结构无效 |
| Method Not Found | 方法不存在 |
| Invalid Params | 方法参数无效 |
| Internal Error | 服务器内部错误 |
| TaskNotFoundError | 任务不存在 |
| PushNotificationNotSupportedError | 不支持Webhook |
| UnsupportedOperationError | 功能不可用 |
| ContentTypeNotSupportedError | 不支持的媒体类型 |
| VersionNotSupportedError | 协议版本不匹配 |
Streaming (Server-Sent Events)
流式传输(Server-Sent Events)
Stream Response Format
流式响应格式
event: message
data: {"task": {...}}
event: message
data: {"statusUpdate": {"taskId": "...", "state": "working"}}
event: message
data: {"artifactUpdate": {"taskId": "...", "artifact": {...}}}
event: done
data: {"status": "complete"}event: message
data: {"task": {...}}
event: message
data: {"statusUpdate": {"taskId": "...", "state": "working"}}
event: message
data: {"artifactUpdate": {"taskId": "...", "artifact": {...}}}
event: done
data: {"status": "complete"}Event Types
事件类型
| Event | Description |
|---|---|
| Initial task state |
| Direct response message |
| Task state change |
| New or updated artifact |
Ordering Guarantee: Events MUST be delivered in generation order.
| 事件 | 描述 |
|---|---|
| 初始任务状态 |
| 直接响应消息 |
| 任务状态变更 |
| 新增或更新的输出产物 |
顺序保证: 事件必须按照生成顺序交付。
Security Schemes
安全方案
Supported Authentication
支持的身份验证方式
| Scheme | Description |
|---|---|
| API Key | Header, query, or cookie |
| HTTP Auth | Bearer, Basic, Digest |
| OAuth 2.0 | Authorization Code, Client Credentials, Device Code |
| OpenID Connect | Identity layer on OAuth 2.0 |
| Mutual TLS | Certificate-based auth |
| 方案 | 描述 |
|---|---|
| API密钥 | 通过请求头、查询参数或Cookie传递 |
| HTTP认证 | Bearer、Basic、Digest认证 |
| OAuth 2.0 | 授权码、客户端凭证、设备码模式 |
| OpenID Connect | 基于OAuth 2.0的身份层 |
| 双向TLS | 基于证书的身份验证 |
Example Security Declaration
安全声明示例
json
{
"securitySchemes": {
"apiKey": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
},
"oauth2": {
"type": "oauth2",
"flows": {
"clientCredentials": {
"tokenUrl": "https://auth.example.com/token",
"scopes": {
"agent:read": "Read agent data",
"agent:write": "Execute tasks"
}
}
}
}
},
"security": [{"apiKey": []}, {"oauth2": ["agent:read"]}]
}json
{
"securitySchemes": {
"apiKey": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
},
"oauth2": {
"type": "oauth2",
"flows": {
"clientCredentials": {
"tokenUrl": "https://auth.example.com/token",
"scopes": {
"agent:read": "读取Agent数据",
"agent:write": "执行任务"
}
}
}
}
},
"security": [{"apiKey": []}, {"oauth2": ["agent:read"]}]
}Implementation Guide
实现指南
Dependencies
依赖安装
bash
pip install a2a-sdk uvicorn python-dotenvbash
pip install a2a-sdk uvicorn python-dotenvServer Implementation (3-File Pattern)
服务器实现(三文件模式)
1. agent.py - Agent Definition
1. agent.py - Agent定义
python
from a2a.types import AgentCapabilities, AgentSkill, AgentCard, ContentTypes
def create_agent_card(url: str) -> AgentCard:
return AgentCard(
name="my_agent",
description="Agent description",
url=url,
version="1.0.0",
protocolVersion="0.3.0",
defaultInputModes=[ContentTypes.TEXT],
defaultOutputModes=[ContentTypes.TEXT],
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False,
),
skills=[
AgentSkill(
id="main_skill",
name="Main Skill",
description="What this agent does",
examples=["Example query"],
tags=["category"],
)
],
)python
from a2a.types import AgentCapabilities, AgentSkill, AgentCard, ContentTypes
def create_agent_card(url: str) -> AgentCard:
return AgentCard(
name="my_agent",
description="Agent description",
url=url,
version="1.0.0",
protocolVersion="0.3.0",
defaultInputModes=[ContentTypes.TEXT],
defaultOutputModes=[ContentTypes.TEXT],
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False,
),
skills=[
AgentSkill(
id="main_skill",
name="Main Skill",
description="What this agent does",
examples=["Example query"],
tags=["category"],
)
],
)2. agent_executor.py - Business Logic
2. agent_executor.py - 业务逻辑
python
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Part, TextPart, Task, TaskState, TaskStatus
from a2a.utils import completed_task, new_artifact, working_task
class MyAgentExecutor(AgentExecutor):
async def execute(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
user_input = context.get_user_input()
# Signal working state (optional, for long tasks)
await event_queue.enqueue_event(
working_task(context.task_id, context.context_id)
)
# --- YOUR AGENT LOGIC HERE ---
result = await self.process(user_input)
# ------------------------------
# Create response parts
parts = [Part(root=TextPart(text=result))]
# Complete task with artifact
await event_queue.enqueue_event(
completed_task(
context.task_id,
context.context_id,
artifacts=[new_artifact(parts, f"result_{context.task_id}")],
history=[context.message],
)
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> Task | None:
# Handle cancellation request
return None
async def process(self, input_text: str) -> str:
# Implement your logic
return f"Processed: {input_text}"python
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Part, TextPart, Task, TaskState, TaskStatus
from a2a.utils import completed_task, new_artifact, working_task
class MyAgentExecutor(AgentExecutor):
async def execute(
self,
context: RequestContext,
event_queue: EventQueue
) -> None:
user_input = context.get_user_input()
# 发送任务处理中状态(可选,适用于长任务)
await event_queue.enqueue_event(
working_task(context.task_id, context.context_id)
)
# --- 在此处编写你的Agent逻辑 ---
result = await self.process(user_input)
# ------------------------------
# 创建响应内容
parts = [Part(root=TextPart(text=result))]
# 完成任务并返回输出产物
await event_queue.enqueue_event(
completed_task(
context.task_id,
context.context_id,
artifacts=[new_artifact(parts, f"result_{context.task_id}")],
history=[context.message],
)
)
async def cancel(
self,
context: RequestContext,
event_queue: EventQueue
) -> Task | None:
# 处理任务取消请求
return None
async def process(self, input_text: str) -> str:
# 实现自定义处理逻辑
return f"Processed: {input_text}"3. main.py - Server Entry Point
3. main.py - 服务器入口
python
import uvicorn
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication
from a2a.server.tasks import InMemoryTaskStore
from .agent import create_agent_card
from .agent_executor import MyAgentExecutor
def main():
host = "0.0.0.0"
port = 8080
url = f"http://{host}:{port}/"
agent_card = create_agent_card(url)
handler = DefaultRequestHandler(
agent_executor=MyAgentExecutor(),
task_store=InMemoryTaskStore(),
)
app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=handler,
)
print(f"A2A Agent running at {url}")
print(f"Agent Card: {url}.well-known/agent-card.json")
uvicorn.run(app.build(), host=host, port=port)
if __name__ == "__main__":
main()python
import uvicorn
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.apps import A2AStarletteApplication
from a2a.server.tasks import InMemoryTaskStore
from .agent import create_agent_card
from .agent_executor import MyAgentExecutor
def main():
host = "0.0.0.0"
port = 8080
url = f"http://{host}:{port}/"
agent_card = create_agent_card(url)
handler = DefaultRequestHandler(
agent_executor=MyAgentExecutor(),
task_store=InMemoryTaskStore(),
)
app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=handler,
)
print(f"A2A Agent运行于 {url}")
print(f"Agent卡片地址: {url}.well-known/agent-card.json")
uvicorn.run(app.build(), host=host, port=port)
if __name__ == "__main__":
main()Client Implementation
客户端实现
python
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
SendMessageRequest,
MessageSendParams,
Message,
Part,
TextPart,
)
async def call_agent(agent_url: str, query: str):
async with httpx.AsyncClient(timeout=60.0) as http:
# 1. Discover agent
resolver = A2ACardResolver(
base_url=agent_url,
httpx_client=http
)
card = await resolver.get_agent_card()
print(f"Connected to: {card.name} v{card.version}")
# 2. Create client
client = A2AClient(http, card, url=agent_url)
# 3. Build message
message = Message(
role="user",
parts=[Part(root=TextPart(text=query))],
)
# 4. Send request
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
response = await client.send_message(request)
return responsepython
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
SendMessageRequest,
MessageSendParams,
Message,
Part,
TextPart,
)
async def call_agent(agent_url: str, query: str):
async with httpx.AsyncClient(timeout=60.0) as http:
# 1. 发现Agent
resolver = A2ACardResolver(
base_url=agent_url,
httpx_client=http
)
card = await resolver.get_agent_card()
print(f"已连接至: {card.name} v{card.version}")
# 2. 创建客户端
client = A2AClient(http, card, url=agent_url)
# 3. 构建消息
message = Message(
role="user",
parts=[Part(root=TextPart(text=query))],
)
# 4. 发送请求
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
response = await client.send_message(request)
return responseStreaming client
流式客户端
async def call_agent_streaming(agent_url: str, query: str):
async with httpx.AsyncClient(timeout=None) as http:
resolver = A2ACardResolver(base_url=agent_url, httpx_client=http)
card = await resolver.get_agent_card()
client = A2AClient(http, card, url=agent_url)
message = Message(
role="user",
parts=[Part(root=TextPart(text=query))],
)
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
async for event in client.send_message_streaming(request):
if hasattr(event, 'task'):
print(f"Task: {event.task.status.state}")
elif hasattr(event, 'artifact'):
print(f"Artifact: {event.artifact}")
---async def call_agent_streaming(agent_url: str, query: str):
async with httpx.AsyncClient(timeout=None) as http:
resolver = A2ACardResolver(base_url=agent_url, httpx_client=http)
card = await resolver.get_agent_card()
client = A2AClient(http, card, url=agent_url)
message = Message(
role="user",
parts=[Part(root=TextPart(text=query))],
)
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
async for event in client.send_message_streaming(request):
if hasattr(event, 'task'):
print(f"任务状态: {event.task.status.state}")
elif hasattr(event, 'artifact'):
print(f"输出产物: {event.artifact}")
---Best Practices
最佳实践
1. Agent Discovery
1. Agent发现
Always fetch before interaction to adapt to capability changes.
AgentCard在与Agent交互前,务必获取以适配其能力变化。
AgentCard2. Streaming
2. 流式传输
Use SSE for long-running tasks to provide real-time updates.
对于长时间运行的任务,使用SSE实现实时更新。
3. Artifacts vs Messages
3. 输出产物与消息的区别
- Artifacts: Final deliverables (files, structured data)
- Messages: Conversational updates, status information
- 输出产物(Artifacts):最终交付成果(文件、结构化数据)
- 消息(Messages):会话式更新、状态信息
4. Error Handling
4. 错误处理
python
try:
response = await client.send_message(request)
except A2AError as e:
if e.code == -32000:
print("Task not found")
elif e.code == -32602:
print("Invalid parameters")python
try:
response = await client.send_message(request)
except A2AError as e:
if e.code == -32000:
print("任务不存在")
elif e.code == -32602:
print("参数无效")5. Pagination
5. 分页
python
undefinedpython
undefinedList tasks with pagination
分页列出任务
params = TaskQueryParams(
contextId="ctx_123",
status=["completed", "failed"],
pageSize=50,
pageToken=None, # For first page
)
response = await client.list_tasks(params)
next_page_token = response.nextPageToken
undefinedparams = TaskQueryParams(
contextId="ctx_123",
status=["completed", "failed"],
pageSize=50,
pageToken=None, # 第一页
)
response = await client.list_tasks(params)
next_page_token = response.nextPageToken
undefined6. Push Notifications
6. 推送通知
python
undefinedpython
undefinedConfigure webhook for async updates
配置Webhook以接收异步更新
config = PushNotificationConfig(
url="https://myserver.com/webhook",
authentication={
"type": "bearer",
"token": "secret_token"
}
)
await client.create_push_notification_config(task_id, config)
---config = PushNotificationConfig(
url="https://myserver.com/webhook",
authentication={
"type": "bearer",
"token": "secret_token"
}
)
await client.create_push_notification_config(task_id, config)
---Quick Reference
快速参考
Endpoints
端点
| Endpoint | Method | Description |
|---|---|---|
| GET | Public agent card |
| POST | JSON-RPC endpoint |
| 端点 | 请求方法 | 描述 |
|---|---|---|
| GET | 公开Agent卡片 |
| POST | JSON-RPC端点 |
Headers
请求头
| Header | Description |
|---|---|
| |
| |
| Protocol version (e.g., |
| 请求头 | 描述 |
|---|---|
| |
| |
| 协议版本(例如: |
SDK Utilities
SDK工具函数
python
from a2a.utils import (
completed_task, # Create completed task event
failed_task, # Create failed task event
working_task, # Create working status event
input_required, # Request user input
new_artifact, # Create new artifact
new_message, # Create new message
)python
from a2a.utils import (
completed_task, # 创建任务完成事件
failed_task, # 创建任务失败事件
working_task, # 创建任务处理中状态事件
input_required, # 请求用户输入
new_artifact, # 创建新的输出产物
new_message, # 创建新消息
)References
参考链接
- Official Spec: https://a2a-protocol.org/latest/specification/
- Definitions: https://a2a-protocol.org/latest/definitions/
- Python SDK: https://pypi.org/project/a2a-sdk/
- GitHub: https://github.com/a2a-protocol/a2a-python