eino-compose
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseOrchestration Overview
编排概述
The package provides three orchestration APIs:
github.com/cloudwego/eino/compose| API | Topology | Cycles | Type Alignment |
|---|---|---|---|
| Graph | Directed graph | Yes (Pregel mode) / No (DAG mode) | Whole input/output |
| Chain | Linear sequence | No | Whole input/output |
| Workflow | DAG | No | Field-level mapping |
*Chain is implemented on top of Graph in Pregel mode but enforces linear topology.
All three compile into which exposes Invoke, Stream, Collect, and Transform.
Runnable[I, O]go
import "github.com/cloudwego/eino/compose"github.com/cloudwego/eino/compose| API | 拓扑结构 | 是否支持循环 | 类型对齐方式 |
|---|---|---|---|
| Graph | 有向图 | 是(Pregel模式)/ 否(DAG模式) | 整体输入输出 |
| Chain | 线性序列 | 否 | 整体输入输出 |
| Workflow | DAG | 否 | 字段级映射 |
*Chain基于Pregel模式的Graph实现,但强制使用线性拓扑结构。
三者最终都会编译为类型,提供Invoke、Stream、Collect、Transform四种执行方法。
Runnable[I, O]go
import "github.com/cloudwego/eino/compose"Choosing an API
API选型
- Chain -- sequential pipeline: prompt -> model -> parser. Simplest API.
- Graph -- need branching, loops (ReAct agent), or fan-out/fan-in.
- Workflow -- need field-level mapping between nodes with different struct types; DAG only.
- Chain -- 适用于顺序流水线:提示词 -> 模型 -> 解析器,是最简单的API。
- Graph -- 适用于需要分支、循环(ReAct agent)、扇入/扇出的场景。
- Workflow -- 适用于需要在不同结构体类型的节点之间做字段级映射的场景,仅支持DAG。
Graph Quick Reference
Graph快速参考
go
g := compose.NewGraph[InputType, OutputType]()
// Add nodes
g.AddChatModelNode("model", chatModel)
g.AddChatTemplateNode("tmpl", tmpl)
g.AddToolsNode("tools", toolsNode)
g.AddLambdaNode("fn", compose.InvokableLambda(myFunc))
g.AddPassthroughNode("pass")
g.AddGraphNode("sub", subGraph)
// Connect nodes
g.AddEdge(compose.START, "tmpl")
g.AddEdge("tmpl", "model")
g.AddEdge("model", compose.END)
// Branch (conditional routing)
branch := compose.NewGraphBranch(conditionFn, map[string]bool{"a": true, "b": true})
g.AddBranch("model", branch)
// Compile and run
r, err := g.Compile(ctx)
out, err := r.Invoke(ctx, input)go
g := compose.NewGraph[InputType, OutputType]()
// Add nodes
g.AddChatModelNode("model", chatModel)
g.AddChatTemplateNode("tmpl", tmpl)
g.AddToolsNode("tools", toolsNode)
g.AddLambdaNode("fn", compose.InvokableLambda(myFunc))
g.AddPassthroughNode("pass")
g.AddGraphNode("sub", subGraph)
// Connect nodes
g.AddEdge(compose.START, "tmpl")
g.AddEdge("tmpl", "model")
g.AddEdge("model", compose.END)
// Branch (conditional routing)
branch := compose.NewGraphBranch(conditionFn, map[string]bool{"a": true, "b": true})
g.AddBranch("model", branch)
// Compile and run
r, err := g.Compile(ctx)
out, err := r.Invoke(ctx, input)Chain Quick Reference
Chain快速参考
go
chain := compose.NewChain[InputType, OutputType]()
chain.
AppendChatTemplate(tmpl).
AppendChatModel(model).
AppendLambda(compose.InvokableLambda(parseFn))
r, err := chain.Compile(ctx)
out, err := r.Invoke(ctx, input)Append methods: , , , , , , , , , , , , .
AppendChatModelAppendChatTemplateAppendToolsNodeAppendLambdaAppendGraphAppendParallelAppendBranchAppendPassthroughAppendRetrieverAppendEmbeddingAppendLoaderAppendIndexerAppendDocumentTransformergo
chain := compose.NewChain[InputType, OutputType]()
chain.
AppendChatTemplate(tmpl).
AppendChatModel(model).
AppendLambda(compose.InvokableLambda(parseFn))
r, err := chain.Compile(ctx)
out, err := r.Invoke(ctx, input)可用的Append方法包括:、、、、、、、、、、、、。
AppendChatModelAppendChatTemplateAppendToolsNodeAppendLambdaAppendGraphAppendParallelAppendBranchAppendPassthroughAppendRetrieverAppendEmbeddingAppendLoaderAppendIndexerAppendDocumentTransformerWorkflow Quick Reference
Workflow快速参考
go
wf := compose.NewWorkflow[InputStruct, OutputStruct]()
wf.AddLambdaNode("node1", compose.InvokableLambda(fn1)).
AddInput(compose.START, compose.MapFields("FieldA", "InputField"))
wf.AddLambdaNode("node2", compose.InvokableLambda(fn2)).
AddInput("node1", compose.ToField("Result"))
wf.End().AddInput("node2")
r, err := wf.Compile(ctx)Field mapping helpers: , , , , , .
MapFieldsToFieldFromFieldMapFieldPathsToFieldPathFromFieldPathgo
wf := compose.NewWorkflow[InputStruct, OutputStruct]()
wf.AddLambdaNode("node1", compose.InvokableLambda(fn1)).
AddInput(compose.START, compose.MapFields("FieldA", "InputField"))
wf.AddLambdaNode("node2", compose.InvokableLambda(fn2)).
AddInput("node1", compose.ToField("Result"))
wf.End().AddInput("node2")
r, err := wf.Compile(ctx)字段映射辅助方法包括:、、、、、。
MapFieldsToFieldFromFieldMapFieldPathsToFieldPathFromFieldPathStream Programming
流式编程
Four interaction modes on :
Runnable[I, O]| Mode | Input | Output | Lambda Constructor |
|---|---|---|---|
| Invoke | | | |
| Stream | | | |
| Collect | | | |
| Transform | | | |
Framework auto-converts between modes:
- Invoke call: all internal nodes run in Invoke mode.
- Stream/Collect/Transform call: all internal nodes run in Transform mode; missing modes are auto-filled.
Stream primitives live in :
github.com/cloudwego/eino/schemago
sr, sw := schema.Pipe[T](capacity)
// sw.Send(chunk, nil); sw.Close()
// chunk, err := sr.Recv(); sr.Close()Runnable[I, O]| 模式 | 输入 | 输出 | Lambda构造器 |
|---|---|---|---|
| Invoke | | | |
| Stream | | | |
| Collect | | | |
| Transform | | | |
框架会自动在不同模式之间转换:
- Invoke 调用:所有内部节点以Invoke模式运行。
- Stream/Collect/Transform 调用:所有内部节点以Transform模式运行;缺失的模式实现会自动填充。
流式原语定义在包中:
github.com/cloudwego/eino/schemago
sr, sw := schema.Pipe[T](capacity)
// sw.Send(chunk, nil); sw.Close()
// chunk, err := sr.Recv(); sr.Close()Compile & Run
编译与运行
go
r, err := g.Compile(ctx,
compose.WithGraphName("my_graph"),
compose.WithNodeTriggerMode(compose.AllPredecessor), // DAG mode
)
// Non-streaming
out, err := r.Invoke(ctx, input)
// Streaming
stream, err := r.Stream(ctx, input)
defer stream.Close()
for {
chunk, err := stream.Recv()
if err == io.EOF { break }
if err != nil { return err }
process(chunk)
}go
r, err := g.Compile(ctx,
compose.WithGraphName("my_graph"),
compose.WithNodeTriggerMode(compose.AllPredecessor), // DAG mode
)
// 非流式执行
out, err := r.Invoke(ctx, input)
// 流式执行
stream, err := r.Stream(ctx, input)
defer stream.Close()
for {
chunk, err := stream.Recv()
if err == io.EOF { break }
if err != nil { return err }
process(chunk)
}State Graph
状态图
Share state across nodes within a single request:
go
g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyState {
return &MyState{}
}))
g.AddLambdaNode("node", lambda,
compose.WithStatePreHandler(func(ctx context.Context, in string, state *MyState) (string, error) {
// read/write state before node executes
return in, nil
}),
compose.WithStatePostHandler(func(ctx context.Context, out string, state *MyState) (string, error) {
// read/write state after node executes
return out, nil
}),
)在单个请求的多个节点之间共享状态:
go
g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyState {
return &MyState{}
}))
g.AddLambdaNode("node", lambda,
compose.WithStatePreHandler(func(ctx context.Context, in string, state *MyState) (string, error) {
// 在节点执行前读写状态
return in, nil
}),
compose.WithStatePostHandler(func(ctx context.Context, out string, state *MyState) (string, error) {
// 在节点执行后读写状态
return out, nil
}),
)Instructions to Agent
给Agent的使用指引
When helping users build orchestration:
- Default to Graph for most use cases. Use Chain only for simple linear pipelines. Use Workflow when field-level mapping between different struct types is needed.
- Always show the Compile step -- returns
g.Compile(ctx).Runnable[I,O] - Always close StreamReaders -- use immediately after obtaining a stream.
defer sr.Close() - Upstream output type must match downstream input type (or use /
WithInputKeyfor map conversion).WithOutputKey - For cyclic graphs (e.g., ReAct agent), use default Pregel mode (). For DAGs, set
AnyPredecessor.AllPredecessor - Use to inject logging/tracing at runtime.
compose.WithCallbacks(handler) - Use with interrupt nodes for pause/resume workflows.
compose.WithCheckPointStore(store)
在帮助用户构建编排逻辑时,请遵循以下规则:
- 大多数场景默认使用Graph;仅简单线性流水线使用Chain;需要不同结构体间字段级映射时使用Workflow。
- 始终展示Compile步骤 -- 会返回
g.Compile(ctx)实例。Runnable[I,O] - 始终要关闭StreamReaders -- 获取流后立即使用。
defer sr.Close() - 上游输出类型必须匹配下游输入类型(也可使用/
WithInputKey做Map类型转换)。WithOutputKey - 对于循环图(例如ReAct agent),使用默认的Pregel模式();对于DAG,设置
AnyPredecessor。AllPredecessor - 使用在运行时注入日志/链路追踪能力。
compose.WithCallbacks(handler) - 结合和中断节点实现工作流的暂停/恢复。
compose.WithCheckPointStore(store)
Reference Files
参考文件
Read these files on-demand for detailed API, examples, and advanced usage:
- reference/graph.md -- Full Graph API, branches, state graph, cyclic graph, complete ReAct example
- reference/chain.md -- Chain API, when to use, parallel/branch in chain
- reference/workflow.md -- Workflow API, field-level mapping helpers, constraints
- reference/stream.md -- StreamReader/Writer, Pipe/Copy/Merge, lambda constructors, auto-conversion rules
- reference/callback.md -- Callback timings, handler registration, trigger rules, tracing example
- reference/call-option.md -- Per-request CallOption, component-type options, node targeting
- reference/checkpoint-and-state.md -- CheckPointStore, interrupt/resume, state management
按需读取以下文件获取详细API、示例和高级用法:
- reference/graph.md -- 完整Graph API、分支、状态图、循环图、完整ReAct示例
- reference/chain.md -- Chain API、适用场景、Chain中的并行/分支实现
- reference/workflow.md -- Workflow API、字段映射辅助方法、使用约束
- reference/stream.md -- StreamReader/Writer、Pipe/Copy/Merge、Lambda构造器、自动转换规则
- reference/callback.md -- 回调时机、处理器注册、触发规则、链路追踪示例
- reference/call-option.md -- 单请求CallOption、组件级选项、节点定向配置
- reference/checkpoint-and-state.md -- CheckPointStore、中断/恢复、状态管理