temporal-go
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseTemporal Go SDK
Temporal Go SDK
Build durable, fault-tolerant distributed applications using the Temporal Go SDK (). Temporal provides workflow orchestration with automatic retries, state persistence, and exactly-once execution semantics.
go.temporal.io/sdk使用Temporal Go SDK () 构建持久化、容错的分布式应用程序。Temporal提供工作流编排功能,支持自动重试、状态持久化和恰好一次执行语义。
go.temporal.io/sdkCore Principles
核心原则
- Deterministic Workflows: Workflow code must be deterministic. Never use ,
time.Now(),randrange, goroutines, or I/O directly in Workflows.map - Activities for Side Effects: All non-deterministic operations (HTTP calls, DB queries, file I/O) go in Activities.
- Struct Parameters: Use single struct parameters and return values for forward compatibility.
- Idempotent Activities: Activities may be retried; design them to be safely re-executable.
- Explicit Error Handling: Use Temporal's typed errors (,
ApplicationError,TimeoutError) for control flow.CanceledError
- 确定性工作流:工作流代码必须具有确定性。绝不要在工作流中直接使用、
time.Now()、map遍历、goroutine或I/O操作。rand - 活动处理副作用:所有非确定性操作(HTTP调用、数据库查询、文件I/O)都应放在活动中执行。
- 结构体参数:使用单个结构体作为参数和返回值,以保证向前兼容性。
- 幂等活动:活动可能会被重试;设计时要确保它们可以安全地重复执行。
- 显式错误处理:使用Temporal的类型化错误(、
ApplicationError、TimeoutError)进行控制流管理。CanceledError
Project Structure
项目结构
.
├── cmd/
│ ├── worker/ # Worker process entry point
│ └── starter/ # Workflow starter (client) entry point
├── workflows/ # Workflow definitions
│ ├── order.go
│ └── order_test.go
├── activities/ # Activity definitions
│ ├── payment.go
│ └── payment_test.go
├── shared/ # Shared types (params, results, constants)
│ └── types.go
├── go.mod
└── go.sum.
├── cmd/
│ ├── worker/ # 工作器进程入口
│ └── starter/ # 工作流启动器(客户端)入口
├── workflows/ # 工作流定义
│ ├── order.go
│ └── order_test.go
├── activities/ # 活动定义
│ ├── payment.go
│ └── payment_test.go
├── shared/ # 共享类型(参数、结果、常量)
│ └── types.go
├── go.mod
└── go.sumImports
导入
go
import (
// Client - for connecting to Temporal and starting workflows
"go.temporal.io/sdk/client"
// Workflow - for writing workflow definitions
"go.temporal.io/sdk/workflow"
// Activity - for writing activity definitions
"go.temporal.io/sdk/activity"
// Worker - for creating and running workers
"go.temporal.io/sdk/worker"
// Temporal - for error types and retry policies
"go.temporal.io/sdk/temporal"
// Testing - for test environments
"go.temporal.io/sdk/testsuite"
// Logging
"go.temporal.io/sdk/log"
)go
import (
// Client - 用于连接Temporal并启动工作流
"go.temporal.io/sdk/client"
// Workflow - 用于编写工作流定义
"go.temporal.io/sdk/workflow"
// Activity - 用于编写活动定义
"go.temporal.io/sdk/activity"
// Worker - 用于创建和运行工作器
"go.temporal.io/sdk/worker"
// Temporal - 用于错误类型和重试策略
"go.temporal.io/sdk/temporal"
// Testing - 用于测试环境
"go.temporal.io/sdk/testsuite"
// Logging
"go.temporal.io/sdk/log"
)Workflow Definitions
工作流定义
Basic Workflow
基础工作流
A Workflow Definition is an exportable function. The first parameter must be . Return or . Use struct params for forward compatibility.
workflow.Contexterror(result, error)go
package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
)
type OrderInput struct {
OrderID string
CustomerID string
Items []string
Amount float64
}
type OrderResult struct {
OrderID string
PaymentID string
TrackingCode string
CompletedAt string
}
func ProcessOrder(ctx workflow.Context, input OrderInput) (*OrderResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("ProcessOrder started", "orderID", input.OrderID)
// Set activity options (StartToCloseTimeout OR ScheduleToCloseTimeout required)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Use nil struct pointer to call struct-method Activities
var a *Activities
var paymentResult PaymentResult
err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &paymentResult)
if err != nil {
return nil, err
}
var shippingResult ShippingResult
err = workflow.ExecuteActivity(ctx, a.ShipOrder, input).Get(ctx, &shippingResult)
if err != nil {
return nil, err
}
return &OrderResult{
OrderID: input.OrderID,
PaymentID: paymentResult.PaymentID,
TrackingCode: shippingResult.TrackingCode,
CompletedAt: workflow.Now(ctx).Format(time.RFC3339),
}, nil
}工作流定义是一个可导出的函数。第一个参数必须为。返回值为或。使用结构体参数以保证向前兼容性。
workflow.Contexterror(result, error)go
package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
)
type OrderInput struct {
OrderID string
CustomerID string
Items []string
Amount float64
}
type OrderResult struct {
OrderID string
PaymentID string
TrackingCode string
CompletedAt string
}
func ProcessOrder(ctx workflow.Context, input OrderInput) (*OrderResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("ProcessOrder started", "orderID", input.OrderID)
// 设置活动选项(必须指定StartToCloseTimeout或ScheduleToCloseTimeout)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 使用nil结构体指针调用结构体方法类型的活动
var a *Activities
var paymentResult PaymentResult
err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &paymentResult)
if err != nil {
return nil, err
}
var shippingResult ShippingResult
err = workflow.ExecuteActivity(ctx, a.ShipOrder, input).Get(ctx, &shippingResult)
if err != nil {
return nil, err
}
return &OrderResult{
OrderID: input.OrderID,
PaymentID: paymentResult.PaymentID,
TrackingCode: shippingResult.TrackingCode,
CompletedAt: workflow.Now(ctx).Format(time.RFC3339),
}, nil
}Determinism Rules
确定性规则
Workflow code MUST be deterministic. Use Temporal SDK replacements for non-deterministic Go constructs:
| Go Construct | Temporal Replacement | Package |
|---|---|---|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
工作流代码必须具有确定性。使用Temporal SDK替代Go中的非确定性构造:
| Go构造 | Temporal替代方案 | 包 |
|---|---|---|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
Logging (replay-safe)
日志(重放安全)
Always use -- it suppresses duplicate logs during replay:
workflow.GetLogger(ctx)go
logger := workflow.GetLogger(ctx)
logger.Info("Processing started", "orderID", orderID)
logger.Error("Activity failed", "Error", err)始终使用——它会在重放期间抑制重复日志:
workflow.GetLogger(ctx)go
logger := workflow.GetLogger(ctx)
logger.Info("Processing started", "orderID", orderID)
logger.Error("Activity failed", "Error", err)Side Effects
副作用
Capture non-deterministic values (random numbers, UUIDs) so they are recorded in history and replayed consistently:
go
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
})
var randomValue int
encodedRandom.Get(&randomValue)捕获非确定性值(随机数、UUID),以便它们被记录到历史中并一致地重放:
go
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
})
var randomValue int
encodedRandom.Get(&randomValue)Selectors (replacing select
)
select选择器(替代select
)
selectUse to wait on multiple Futures and Channels. The selector picks the first ready callback.
workflow.Selectorgo
func SampleTimerWorkflow(ctx workflow.Context, timeout time.Duration) error {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)
var processingDone bool
// Add a Future (activity result)
f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
selector.AddFuture(f, func(f workflow.Future) {
processingDone = true
cancelHandler() // cancel the timer
})
// Add a timer Future
timerFuture := workflow.NewTimer(childCtx, timeout)
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !processingDone {
_ = workflow.ExecuteActivity(ctx, SendNotificationActivity).Get(ctx, nil)
}
})
// Wait for the first to complete
selector.Select(ctx)
// If timer fired first, still wait for processing to finish
if !processingDone {
selector.Select(ctx)
}
return nil
}使用等待多个Future和Channel。选择器会选择第一个就绪的回调函数。
workflow.Selectorgo
func SampleTimerWorkflow(ctx workflow.Context, timeout time.Duration) error {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)
var processingDone bool
// 添加一个Future(活动结果)
f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
selector.AddFuture(f, func(f workflow.Future) {
processingDone = true
cancelHandler() // 取消计时器
})
// 添加一个计时器Future
timerFuture := workflow.NewTimer(childCtx, timeout)
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !processingDone {
_ = workflow.ExecuteActivity(ctx, SendNotificationActivity).Get(ctx, nil)
}
})
// 等待第一个完成
selector.Select(ctx)
// 如果计时器先触发,仍需等待处理完成
if !processingDone {
selector.Select(ctx)
}
return nil
}Timers
计时器
go
// Simple sleep
workflow.Sleep(ctx, 5*time.Minute)
// Cancellable timer via NewTimer
timerCtx, timerCancel := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, 30*time.Minute)
// Cancel the timer when no longer needed
timerCancel()go
// 简单睡眠
workflow.Sleep(ctx, 5*time.Minute)
// 通过NewTimer创建可取消的计时器
timerCtx, timerCancel := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, 30*time.Minute)
// 不再需要时取消计时器
timerCancel()Goroutines (workflow.Go)
Goroutine(workflow.Go)
Never use native -- use for deterministic coroutines:
goworkflow.Go()go
func SampleGoroutineWorkflow(ctx workflow.Context, parallelism int) ([]string, error) {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
var results []string
var err error
for i := 0; i < parallelism; i++ {
input := fmt.Sprint(i) // capture outside lambda
workflow.Go(ctx, func(gCtx workflow.Context) {
// IMPORTANT: use gCtx (the goroutine context), not ctx
var result string
err = workflow.ExecuteActivity(gCtx, ProcessItem, input).Get(gCtx, &result)
if err != nil {
return
}
results = append(results, result)
})
}
// Wait for all goroutines to complete
_ = workflow.Await(ctx, func() bool {
return err != nil || len(results) == parallelism
})
return results, err
}绝不要使用原生——使用实现确定性协程:
goworkflow.Go()go
func SampleGoroutineWorkflow(ctx workflow.Context, parallelism int) ([]string, error) {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
var results []string
var err error
for i := 0; i < parallelism; i++ {
input := fmt.Sprint(i) // 在lambda外部捕获变量
workflow.Go(ctx, func(gCtx workflow.Context) {
// 重要:使用gCtx(协程上下文),而非ctx
var result string
err = workflow.ExecuteActivity(gCtx, ProcessItem, input).Get(gCtx, &result)
if err != nil {
return
}
results = append(results, result)
})
}
// 等待所有协程完成
_ = workflow.Await(ctx, func() bool {
return err != nil || len(results) == parallelism
})
return results, err
}Await and AwaitWithTimeout
Await和AwaitWithTimeout
Block a Workflow until a condition becomes true, without polling:
go
// Block indefinitely until condition is met
err := workflow.Await(ctx, func() bool {
return isApproved
})
// Block with a timeout
ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
return isApproved
})
if err != nil {
return err // canceled
}
if !ok {
return temporal.NewApplicationError("approval timed out", "timeout")
}阻塞工作流直到条件满足,无需轮询:
go
// 无限期阻塞直到条件满足
err := workflow.Await(ctx, func() bool {
return isApproved
})
// 带超时的阻塞
ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
return isApproved
})
if err != nil {
return err // 已取消
}
if !ok {
return temporal.NewApplicationError("审批超时", "timeout")
}Child Workflows
子工作流
Execute a Workflow from within another Workflow:
go
func ParentWorkflow(ctx workflow.Context) (string, error) {
cwo := workflow.ChildWorkflowOptions{
WorkflowID: "child-workflow-id",
// Optional: TaskQueue, RetryPolicy, etc.
}
ctx = workflow.WithChildOptions(ctx, cwo)
var result string
err := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, "input-data").Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}
func ChildWorkflow(ctx workflow.Context, input string) (string, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Child workflow started", "input", input)
return "processed: " + input, nil
}在一个工作流中执行另一个工作流:
go
func ParentWorkflow(ctx workflow.Context) (string, error) {
cwo := workflow.ChildWorkflowOptions{
WorkflowID: "child-workflow-id",
// 可选:TaskQueue、RetryPolicy等
}
ctx = workflow.WithChildOptions(ctx, cwo)
var result string
err := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, "input-data").Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}
func ChildWorkflow(ctx workflow.Context, input string) (string, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Child workflow started", "input", input)
return "processed: " + input, nil
}Continue-As-New
Continue-As-New
For long-running Workflows, use Continue-As-New to keep Event History from growing too large. This closes the current Workflow Execution and starts a fresh one with the same Workflow ID:
go
func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) error {
// Check if Temporal suggests continuing as new
if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
// ... do work, update state ...
// Explicitly continue as new after N iterations
state.Iteration++
if state.Iteration >= 1000 {
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
return nil
}When using Update or Signal handlers, wait for them to finish before continuing as new:
go
err := workflow.Await(ctx, func() bool {
return workflow.AllHandlersFinished(ctx)
})
if err != nil {
return err
}
return workflow.NewContinueAsNewError(ctx, MyWorkflow, updatedState)对于长时间运行的工作流,使用Continue-As-New避免事件历史记录过大。这会关闭当前工作流执行,并使用相同的Workflow ID启动一个新的工作流:
go
func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) error {
// 检查Temporal是否建议使用Continue-As-New
if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
// ...执行工作,更新状态...
// 在N次迭代后显式使用Continue-As-New
state.Iteration++
if state.Iteration >= 1000 {
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
return nil
}当使用Update或Signal处理器时,在Continue-As-New前等待它们完成:
go
err := workflow.Await(ctx, func() bool {
return workflow.AllHandlersFinished(ctx)
})
if err != nil {
return err
}
return workflow.NewContinueAsNewError(ctx, MyWorkflow, updatedState)Signals
信号
Signals are asynchronous messages sent to a running Workflow to change its state.
信号是发送给运行中工作流的异步消息,用于更改其状态。
Handle Signals in a Workflow
在工作流中处理信号
go
const ApproveSignal = "approve"
type ApproveInput struct {
ApproverName string
}
func ApprovalWorkflow(ctx workflow.Context, orderID string) error {
logger := workflow.GetLogger(ctx)
// Get the signal channel
signalChan := workflow.GetSignalChannel(ctx, ApproveSignal)
// Block until signal is received
var approval ApproveInput
signalChan.Receive(ctx, &approval)
logger.Info("Received approval", "approver", approval.ApproverName)
// Continue workflow execution after signal...
return nil
}go
const ApproveSignal = "approve"
type ApproveInput struct {
ApproverName string
}
func ApprovalWorkflow(ctx workflow.Context, orderID string) error {
logger := workflow.GetLogger(ctx)
// 获取信号通道
signalChan := workflow.GetSignalChannel(ctx, ApproveSignal)
// 阻塞直到收到信号
var approval ApproveInput
signalChan.Receive(ctx, &approval)
logger.Info("Received approval", "approver", approval.ApproverName)
// 收到信号后继续工作流执行...
return nil
}Listen for Signals in a Background Goroutine
在后台协程中监听信号
go
func OrderWorkflow(ctx workflow.Context) error {
var lastSignalData string
// Listen for signals in a separate goroutine
workflow.Go(ctx, func(gCtx workflow.Context) {
signalChan := workflow.GetSignalChannel(gCtx, "my-signal")
for {
selector := workflow.NewSelector(gCtx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(gCtx, &lastSignalData)
})
selector.Select(gCtx)
}
})
// Main workflow logic continues...
// lastSignalData is updated whenever a signal arrives
return nil
}go
func OrderWorkflow(ctx workflow.Context) error {
var lastSignalData string
// 在单独的协程中监听信号
workflow.Go(ctx, func(gCtx workflow.Context) {
signalChan := workflow.GetSignalChannel(gCtx, "my-signal")
for {
selector := workflow.NewSelector(gCtx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(gCtx, &lastSignalData)
})
selector.Select(gCtx)
}
})
// 主工作流逻辑继续执行...
// 每当收到信号时,lastSignalData会被更新
return nil
}Drain Signals Before Continue-As-New
在Continue-As-New前处理完所有信号
Always drain buffered signals before completing or continuing as new:
go
signalChan := workflow.GetSignalChannel(ctx, "my-signal")
for {
var data string
ok := signalChan.ReceiveAsync(&data)
if !ok {
break
}
// process data
}在完成或Continue-As-New前,务必处理完所有缓冲的信号:
go
signalChan := workflow.GetSignalChannel(ctx, "my-signal")
for {
var data string
ok := signalChan.ReceiveAsync(&data)
if !ok {
break
}
// 处理数据
}Queries
查询
Queries are synchronous, read-only operations that inspect Workflow state. Query handlers must NOT mutate state.
查询是同步的只读操作,用于检查工作流状态。查询处理器不得修改状态。
Set a Query Handler
设置查询处理器
go
func QueryableWorkflow(ctx workflow.Context) error {
currentState := "initialized"
// Register a query handler
err := workflow.SetQueryHandler(ctx, "get-state", func() (string, error) {
return currentState, nil
})
if err != nil {
return err
}
// Query handlers with input parameters
err = workflow.SetQueryHandler(ctx, "get-item", func(itemID string) (*Item, error) {
item, ok := items[itemID]
if !ok {
return nil, fmt.Errorf("item %s not found", itemID)
}
return item, nil
})
if err != nil {
return err
}
// Workflow logic that updates currentState...
currentState = "processing"
workflow.Sleep(ctx, time.Minute)
currentState = "done"
return nil
}go
func QueryableWorkflow(ctx workflow.Context) error {
currentState := "initialized"
// 注册查询处理器
err := workflow.SetQueryHandler(ctx, "get-state", func() (string, error) {
return currentState, nil
})
if err != nil {
return err
}
// 带输入参数的查询处理器
err = workflow.SetQueryHandler(ctx, "get-item", func(itemID string) (*Item, error) {
item, ok := items[itemID]
if !ok {
return nil, fmt.Errorf("item %s not found", itemID)
}
return item, nil
})
if err != nil {
return err
}
// 更新currentState的工作流逻辑...
currentState = "processing"
workflow.Sleep(ctx, time.Minute)
currentState = "done"
return nil
}Updates
更新
Updates are synchronous, trackable requests that can mutate Workflow state and return results. They support validators for rejecting invalid requests before they are written to history.
更新是同步的、可追踪的请求,可修改工作流状态并返回结果。它们支持验证器,用于在写入历史前拒绝无效请求。
Set an Update Handler
设置更新处理器
go
const FetchAndAdd = "fetch_and_add"
const Done = "done"
func CounterWorkflow(ctx workflow.Context) (int, error) {
counter := 0
// Update handler with validator
err := workflow.SetUpdateHandlerWithOptions(
ctx,
FetchAndAdd,
func(ctx workflow.Context, addend int) (int, error) {
previous := counter
counter += addend
return previous, nil
},
workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, addend int) error {
if addend < 0 {
return fmt.Errorf("addend must be non-negative (%d)", addend)
}
return nil
},
},
)
if err != nil {
return 0, err
}
// Wait for a "done" signal to finish
_ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
return counter, ctx.Err()
}go
const FetchAndAdd = "fetch_and_add"
const Done = "done"
func CounterWorkflow(ctx workflow.Context) (int, error) {
counter := 0
// 带验证器的更新处理器
err := workflow.SetUpdateHandlerWithOptions(
ctx,
FetchAndAdd,
func(ctx workflow.Context, addend int) (int, error) {
previous := counter
counter += addend
return previous, nil
},
workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, addend int) error {
if addend < 0 {
return fmt.Errorf("addend must be non-negative (%d)", addend)
}
return nil
},
},
)
if err != nil {
return 0, err
}
// 等待"done"信号以结束
_ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
return counter, ctx.Err()
}Update Handlers with Activities and Mutexes
包含活动和互斥锁的更新处理器
For safe concurrent access, use :
workflow.Mutexgo
type Manager struct {
state map[string]string
nodeLock workflow.Mutex
}
func (m *Manager) AssignNode(ctx workflow.Context, input AssignInput) (AssignResult, error) {
// Wait until ready
err := workflow.Await(ctx, func() bool { return m.isStarted })
if err != nil {
return AssignResult{}, err
}
// Acquire mutex for safe concurrent access
err = m.nodeLock.Lock(ctx)
if err != nil {
return AssignResult{}, err
}
defer m.nodeLock.Unlock()
// Execute activity while holding lock
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToCloseTimeout: 10 * time.Second,
})
err = workflow.ExecuteActivity(actCtx, DoAssignment, input).Get(actCtx, nil)
if err != nil {
return AssignResult{}, err
}
m.state[input.NodeID] = input.JobName
return AssignResult{NodeID: input.NodeID}, nil
}为了安全的并发访问,使用:
workflow.Mutexgo
type Manager struct {
state map[string]string
nodeLock workflow.Mutex
}
func (m *Manager) AssignNode(ctx workflow.Context, input AssignInput) (AssignResult, error) {
// 等待就绪
err := workflow.Await(ctx, func() bool { return m.isStarted })
if err != nil {
return AssignResult{}, err
}
// 获取互斥锁以保证安全并发访问
err = m.nodeLock.Lock(ctx)
if err != nil {
return AssignResult{}, err
}
defer m.nodeLock.Unlock()
// 持有锁时执行活动
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToCloseTimeout: 10 * time.Second,
})
err = workflow.ExecuteActivity(actCtx, DoAssignment, input).Get(actCtx, nil)
if err != nil {
return AssignResult{}, err
}
m.state[input.NodeID] = input.JobName
return AssignResult{NodeID: input.NodeID}, nil
}Activity Definitions
活动定义
Basic Activity
基础活动
Activities are normal Go functions. The first parameter should be . They can perform any operation (I/O, HTTP calls, DB queries).
context.Contextgo
package activities
import (
"context"
"go.temporal.io/sdk/activity"
)
// Simple function activity
func SendEmail(ctx context.Context, recipient, subject, body string) error {
logger := activity.GetLogger(ctx)
logger.Info("Sending email", "to", recipient)
// ... actual email sending logic ...
return nil
}活动是普通的Go函数。第一个参数应为。它们可以执行任何操作(I/O、HTTP调用、数据库查询)。
context.Contextgo
package activities
import (
"context"
"go.temporal.io/sdk/activity"
)
// 简单函数式活动
func SendEmail(ctx context.Context, recipient, subject, body string) error {
logger := activity.GetLogger(ctx)
logger.Info("Sending email", "to", recipient)
// ...实际的邮件发送逻辑...
return nil
}Struct-Based Activities (recommended for dependencies)
基于结构体的活动(推荐用于依赖管理)
Use struct methods when Activities need shared dependencies (DB pools, HTTP clients, configs). This is the recommended pattern.
go
type Activities struct {
DBPool *pgxpool.Pool
APIClient *http.Client
Config *AppConfig
}
type PaymentInput struct {
OrderID string
Amount float64
Currency string
}
type PaymentResult struct {
PaymentID string
Status string
}
func (a *Activities) ChargePayment(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
logger := activity.GetLogger(ctx)
logger.Info("Charging payment", "orderID", input.OrderID, "amount", input.Amount)
// Use shared dependencies
result, err := a.APIClient.Post(/* ... */)
if err != nil {
return nil, err
}
return &PaymentResult{
PaymentID: result.ID,
Status: "charged",
}, nil
}
func (a *Activities) RefundPayment(ctx context.Context, input PaymentInput) error {
// Compensation activity for saga pattern
logger := activity.GetLogger(ctx)
logger.Info("Refunding payment", "orderID", input.OrderID)
// ... refund logic ...
return nil
}When calling struct-method activities from a Workflow, use a nil struct pointer:
go
// In workflow code:
var a *Activities // nil pointer is fine -- used only for method resolution
err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &result)当活动需要共享依赖(数据库连接池、HTTP客户端、配置)时,使用结构体方法。这是推荐的模式。
go
type Activities struct {
DBPool *pgxpool.Pool
APIClient *http.Client
Config *AppConfig
}
type PaymentInput struct {
OrderID string
Amount float64
Currency string
}
type PaymentResult struct {
PaymentID string
Status string
}
func (a *Activities) ChargePayment(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
logger := activity.GetLogger(ctx)
logger.Info("Charging payment", "orderID", input.OrderID, "amount", input.Amount)
// 使用共享依赖
result, err := a.APIClient.Post(/* ... */)
if err != nil {
return nil, err
}
return &PaymentResult{
PaymentID: result.ID,
Status: "charged",
}, nil
}
func (a *Activities) RefundPayment(ctx context.Context, input PaymentInput) error {
// Saga模式的补偿活动
logger := activity.GetLogger(ctx)
logger.Info("Refunding payment", "orderID", input.OrderID)
// ...退款逻辑...
return nil
}从工作流中调用结构体方法类型的活动时,使用nil结构体指针:
go
// 在工作流代码中:
var a *Activities // nil指针即可——仅用于方法解析
err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &result)Activity Heartbeating
活动心跳
Long-running Activities should heartbeat to report progress. If the Activity fails and retries, it can resume from the last heartbeated progress.
go
func BatchProcessingActivity(ctx context.Context, startIdx, endIdx int) error {
logger := activity.GetLogger(ctx)
// Resume from last heartbeat on retry
i := startIdx
if activity.HasHeartbeatDetails(ctx) {
var lastCompleted int
if err := activity.GetHeartbeatDetails(ctx, &lastCompleted); err == nil {
i = lastCompleted + 1
logger.Info("Resuming from heartbeat", "index", i)
}
}
for ; i <= endIdx; i++ {
// Do work for item i...
logger.Info("Processing item", "index", i)
// Record progress -- also delivers cancellation signals
activity.RecordHeartbeat(ctx, i)
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
}Set in the Workflow's :
HeartbeatTimeoutActivityOptionsgo
ao := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
HeartbeatTimeout: 30 * time.Second, // Must heartbeat at least every 30s
}长时间运行的活动应发送心跳以报告进度。如果活动失败并重试,它可以从最后一次心跳的进度处恢复。
go
func BatchProcessingActivity(ctx context.Context, startIdx, endIdx int) error {
logger := activity.GetLogger(ctx)
// 重试时从最后一次心跳处恢复
i := startIdx
if activity.HasHeartbeatDetails(ctx) {
var lastCompleted int
if err := activity.GetHeartbeatDetails(ctx, &lastCompleted); err == nil {
i = lastCompleted + 1
logger.Info("Resuming from heartbeat", "index", i)
}
}
for ; i <= endIdx; i++ {
// 处理第i项...
logger.Info("Processing item", "index", i)
// 记录进度——同时传递取消信号
activity.RecordHeartbeat(ctx, i)
// 检查是否被取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
}在工作流的中设置:
ActivityOptionsHeartbeatTimeoutgo
ao := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
HeartbeatTimeout: 30 * time.Second, // 必须至少每30秒发送一次心跳
}Activity Errors
活动错误
go
import "go.temporal.io/sdk/temporal"
// Retryable error (default) -- will be retried per RetryPolicy
return temporal.NewApplicationError("temporary failure", "TempError", details)
// Non-retryable error -- will NOT be retried regardless of RetryPolicy
return temporal.NewNonRetryableApplicationError("bad input", "ValidationError", nil, details)
// Standard Go errors are converted to retryable ApplicationError automatically
return fmt.Errorf("something went wrong: %w", err)go
import "go.temporal.io/sdk/temporal"
// 可重试错误(默认)——会根据RetryPolicy进行重试
return temporal.NewApplicationError("temporary failure", "TempError", details)
// 不可重试错误——无论RetryPolicy如何,都不会重试
return temporal.NewNonRetryableApplicationError("bad input", "ValidationError", nil, details)
// 标准Go错误会自动转换为可重试的ApplicationError
return fmt.Errorf("something went wrong: %w", err)Worker Setup
工作器设置
A Worker polls a Task Queue for Workflow Tasks and Activity Tasks.
go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"myapp/activities"
"myapp/workflows"
)
func main() {
// Create a Temporal client (heavyweight -- create once per process)
c, err := client.Dial(client.Options{
HostPort: "localhost:7233", // default
Namespace: "default", // default
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// Create a Worker that listens on a specific Task Queue
w := worker.New(c, "my-task-queue", worker.Options{
// Optional: tune worker concurrency
// MaxConcurrentActivityExecutionSize: defaults to 1000
// MaxConcurrentWorkflowTaskExecutionSize: defaults to 1000
})
// Register Workflow functions
w.RegisterWorkflow(workflows.ProcessOrder)
w.RegisterWorkflow(workflows.ApprovalWorkflow)
// Register Activity struct (registers all exported methods)
w.RegisterActivity(&activities.Activities{
DBPool: dbPool,
APIClient: httpClient,
Config: config,
})
// Register standalone Activity functions
w.RegisterActivity(activities.SendEmail)
// Run the Worker (blocks until interrupt signal)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}工作器轮询任务队列以获取工作流任务和活动任务。
go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"myapp/activities"
"myapp/workflows"
)
func main() {
// 创建Temporal客户端(重量级——每个进程创建一次)
c, err := client.Dial(client.Options{
HostPort: "localhost:7233", // 默认值
Namespace: "default", // 默认值
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// 创建监听特定任务队列的工作器
w := worker.New(c, "my-task-queue", worker.Options{
// 可选:调整工作器并发数
// MaxConcurrentActivityExecutionSize: 默认值为1000
// MaxConcurrentWorkflowTaskExecutionSize: 默认值为1000
})
// 注册工作流函数
w.RegisterWorkflow(workflows.ProcessOrder)
w.RegisterWorkflow(workflows.ApprovalWorkflow)
// 注册活动结构体(注册所有导出方法)
w.RegisterActivity(&activities.Activities{
DBPool: dbPool,
APIClient: httpClient,
Config: config,
})
// 注册独立的活动函数
w.RegisterActivity(activities.SendEmail)
// 运行工作器(阻塞直到收到中断信号)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}Custom Registration Names
自定义注册名称
go
// Custom Workflow type name
w.RegisterWorkflowWithOptions(workflows.ProcessOrder, workflow.RegisterOptions{
Name: "OrderProcessing",
})
// Custom Activity type name
w.RegisterActivityWithOptions(activities.SendEmail, activity.RegisterOptions{
Name: "EmailSender",
})go
// 自定义工作流类型名称
w.RegisterWorkflowWithOptions(workflows.ProcessOrder, workflow.RegisterOptions{
Name: "OrderProcessing",
})
// 自定义活动类型名称
w.RegisterActivityWithOptions(activities.SendEmail, activity.RegisterOptions{
Name: "EmailSender",
})Client Usage
客户端使用
Starting a Workflow
启动工作流
go
package main
import (
"context"
"log"
"go.temporal.io/sdk/client"
"myapp/workflows"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
options := client.StartWorkflowOptions{
ID: "order-12345", // Workflow ID (should be meaningful)
TaskQueue: "my-task-queue", // Must match Worker's task queue
// Optional:
// WorkflowExecutionTimeout: 24 * time.Hour,
// RetryPolicy: &temporal.RetryPolicy{...},
}
input := workflows.OrderInput{
OrderID: "12345",
CustomerID: "cust-789",
Items: []string{"item-a", "item-b"},
Amount: 99.99,
}
we, err := c.ExecuteWorkflow(context.Background(), options, workflows.ProcessOrder, input)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
// Synchronously wait for the result
var result workflows.OrderResult
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Workflow failed", err)
}
log.Printf("Workflow result: %+v\n", result)
}go
package main
import (
"context"
"log"
"go.temporal.io/sdk/client"
"myapp/workflows"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
options := client.StartWorkflowOptions{
ID: "order-12345", // 工作流ID(应具有实际意义)
TaskQueue: "my-task-queue", // 必须与工作器的任务队列匹配
// 可选:
// WorkflowExecutionTimeout: 24 * time.Hour,
// RetryPolicy: &temporal.RetryPolicy{...},
}
input := workflows.OrderInput{
OrderID: "12345",
CustomerID: "cust-789",
Items: []string{"item-a", "item-b"},
Amount: 99.99,
}
we, err := c.ExecuteWorkflow(context.Background(), options, workflows.ProcessOrder, input)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
// 同步等待结果
var result workflows.OrderResult
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Workflow failed", err)
}
log.Printf("Workflow result: %+v\n", result)
}Sending a Signal
发送信号
go
// Signal from client
err = c.SignalWorkflow(
context.Background(),
"order-12345", // Workflow ID
"", // Run ID (empty = current run)
"approve", // Signal name
ApproveInput{ApproverName: "Alice"},
)
// Signal-With-Start: signal if running, otherwise start + signal
_, err = c.SignalWithStartWorkflow(
context.Background(),
"order-12345", // Workflow ID
"approve", // Signal name
ApproveInput{ApproverName: "Alice"}, // Signal arg
client.StartWorkflowOptions{
TaskQueue: "my-task-queue",
},
workflows.ApprovalWorkflow, // Workflow function
orderInput, // Workflow arg
)go
// 从客户端发送信号
err = c.SignalWorkflow(
context.Background(),
"order-12345", // 工作流ID
"", // Run ID(空值表示当前运行实例)
"approve", // 信号名称
ApproveInput{ApproverName: "Alice"},
)
// Signal-With-Start:如果工作流正在运行则发送信号,否则启动工作流并发送信号
_, err = c.SignalWithStartWorkflow(
context.Background(),
"order-12345", // 工作流ID
"approve", // 信号名称
ApproveInput{ApproverName: "Alice"}, // 信号参数
client.StartWorkflowOptions{
TaskQueue: "my-task-queue",
},
workflows.ApprovalWorkflow, // 工作流函数
orderInput, // 工作流参数
)Signal from Another Workflow
从另一个工作流发送信号
go
// Inside a Workflow:
err := workflow.SignalExternalWorkflow(ctx, "target-workflow-id", "", "signal-name", signalData).Get(ctx, nil)go
// 在工作流内部:
err := workflow.SignalExternalWorkflow(ctx, "target-workflow-id", "", "signal-name", signalData).Get(ctx, nil)Querying a Workflow
查询工作流
go
result, err := c.QueryWorkflow(
context.Background(),
"order-12345", // Workflow ID
"", // Run ID
"get-state", // Query type
// Optional query args...
)
if err != nil {
log.Fatalln("Query failed", err)
}
var state string
err = result.Get(&state)go
result, err := c.QueryWorkflow(
context.Background(),
"order-12345", // 工作流ID
"", // Run ID
"get-state", // 查询类型
// 可选的查询参数...
)
if err != nil {
log.Fatalln("Query failed", err)
}
var state string
err = result.Get(&state)Sending an Update
发送更新
go
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
updateHandle, err := c.UpdateWorkflow(ctxWithTimeout, client.UpdateWorkflowOptions{
WorkflowID: "counter-workflow-id",
RunID: "", // empty = current run
UpdateName: "fetch_and_add",
WaitForStage: client.WorkflowUpdateStageAccepted, // or WorkflowUpdateStageCompleted
Args: []interface{}{5},
})
if err != nil {
log.Fatalf("Update failed: %v", err)
}
var previousValue int
err = updateHandle.Get(ctxWithTimeout, &previousValue)go
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
updateHandle, err := c.UpdateWorkflow(ctxWithTimeout, client.UpdateWorkflowOptions{
WorkflowID: "counter-workflow-id",
RunID: "", // 空值表示当前运行实例
UpdateName: "fetch_and_add",
WaitForStage: client.WorkflowUpdateStageAccepted, // 或WorkflowUpdateStageCompleted
Args: []interface{}{5},
})
if err != nil {
log.Fatalf("Update failed: %v", err)
}
var previousValue int
err = updateHandle.Get(ctxWithTimeout, &previousValue)Cancelling a Workflow
取消工作流
go
err = c.CancelWorkflow(context.Background(), "order-12345", "")go
err = c.CancelWorkflow(context.Background(), "order-12345", "")Error Handling
错误处理
Workflow Error Types
工作流错误类型
Handle different error types from Activity or Child Workflow failures:
go
err := workflow.ExecuteActivity(ctx, a.DoSomething, input).Get(ctx, nil)
if err != nil {
var applicationErr *temporal.ApplicationError
if errors.As(err, &applicationErr) {
// Application-level error (from Activity code)
switch applicationErr.Type() {
case "ValidationError":
// Handle validation error
case "NotFound":
// Handle not found
default:
return err
}
}
var canceledErr *temporal.CanceledError
if errors.As(err, &canceledErr) {
// Workflow or Activity was canceled
return err
}
var timeoutErr *temporal.TimeoutError
if errors.As(err, &timeoutErr) {
switch timeoutErr.TimeoutType() {
case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
// Activity took too long
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
// Activity stopped heartbeating
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
// No Worker picked up the task
}
}
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
// Activity panicked -- panicErr.Error() and panicErr.StackTrace() available
}
}处理活动或子工作流失败产生的不同错误类型:
go
err := workflow.ExecuteActivity(ctx, a.DoSomething, input).Get(ctx, nil)
if err != nil {
var applicationErr *temporal.ApplicationError
if errors.As(err, &applicationErr) {
// 应用级错误(来自活动代码)
switch applicationErr.Type() {
case "ValidationError":
// 处理验证错误
case "NotFound":
// 处理未找到错误
default:
return err
}
}
var canceledErr *temporal.CanceledError
if errors.As(err, &canceledErr) {
// 工作流或活动已被取消
return err
}
var timeoutErr *temporal.TimeoutError
if errors.As(err, &timeoutErr) {
switch timeoutErr.TimeoutType() {
case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
// 活动执行时间过长
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
// 活动停止发送心跳
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
// 没有工作器拾取任务
}
}
var panicErr *temporal.PanicError
if errors.As(err, &panicErr) {
// 活动发生panic——可获取panicErr.Error()和panicErr.StackTrace()
}
}Non-Retryable Errors
不可重试错误
go
// In Activity code: return a non-retryable error to stop retries immediately
return temporal.NewNonRetryableApplicationError(
"invalid input: email is required",
"ValidationError",
nil, // cause
)
// In Workflow code: check specific error types in NonRetryableErrorTypes
ao := workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{
NonRetryableErrorTypes: []string{"ValidationError", "NotFound"},
},
}go
// 在活动代码中:返回不可重试错误以立即停止重试
return temporal.NewNonRetryableApplicationError(
"invalid input: email is required",
"ValidationError",
nil, // 原因
)
// 在工作流代码中:在NonRetryableErrorTypes中指定特定错误类型
ao := workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{
NonRetryableErrorTypes: []string{"ValidationError", "NotFound"},
},
}Retry Policies
重试策略
Activity Retry Policy
活动重试策略
Activities retry by default. Customize with :
temporal.RetryPolicygo
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second, // First retry delay (default: 1s)
BackoffCoefficient: 2.0, // Multiplier for each retry (default: 2.0)
MaximumInterval: time.Minute, // Cap on retry delay (default: 100x InitialInterval)
MaximumAttempts: 5, // 0 = unlimited (default: 0)
NonRetryableErrorTypes: []string{ // Error types that stop retries
"ValidationError",
},
}
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
HeartbeatTimeout: 10 * time.Second,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)活动默认会重试。使用自定义重试行为:
temporal.RetryPolicygo
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second, // 首次重试延迟(默认:1秒)
BackoffCoefficient: 2.0, // 每次重试的延迟乘数(默认:2.0)
MaximumInterval: time.Minute, // 重试延迟上限(默认:InitialInterval的100倍)
MaximumAttempts: 5, // 0表示无限重试(默认:0)
NonRetryableErrorTypes: []string{ // 停止重试的错误类型
"ValidationError",
},
}
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
HeartbeatTimeout: 10 * time.Second,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)Workflow Retry Policy
工作流重试策略
Workflows do NOT retry by default. Add a retry policy when starting the Workflow:
go
options := client.StartWorkflowOptions{
ID: "my-workflow",
TaskQueue: "my-queue",
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
},
}工作流默认不会重试。启动工作流时添加重试策略:
go
options := client.StartWorkflowOptions{
ID: "my-workflow",
TaskQueue: "my-queue",
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
},
}Activity Options Reference
活动选项参考
go
ao := workflow.ActivityOptions{
// REQUIRED: at least one of these two must be set
StartToCloseTimeout: 30 * time.Second, // Max time for a single attempt
ScheduleToCloseTimeout: 5 * time.Minute, // Max total time including retries
// OPTIONAL
ScheduleToStartTimeout: time.Minute, // Max time waiting for a Worker
HeartbeatTimeout: 10 * time.Second, // Must heartbeat within this interval
TaskQueueName: "special-queue", // Override parent's task queue
WaitForCancellation: true, // Wait for Activity to handle cancellation
RetryPolicy: &temporal.RetryPolicy{},
ActivityID: "custom-id",
}
ctx = workflow.WithActivityOptions(ctx, ao)go
ao := workflow.ActivityOptions{
// 必填:必须设置以下两项中的至少一项
StartToCloseTimeout: 30 * time.Second, // 单次尝试的最长时间
ScheduleToCloseTimeout: 5 * time.Minute, // 包括重试在内的总最长时间
// 可选
ScheduleToStartTimeout: time.Minute, // 等待工作器拾取任务的最长时间
HeartbeatTimeout: 10 * time.Second, // 必须在此间隔内发送心跳
TaskQueueName: "special-queue", // 覆盖父级的任务队列
WaitForCancellation: true, // 等待活动处理取消请求
RetryPolicy: &temporal.RetryPolicy{},
ActivityID: "custom-id",
}
ctx = workflow.WithActivityOptions(ctx, ao)Testing
测试
Workflow Unit Tests with Mocked Activities
使用模拟活动的工作流单元测试
go
package workflows_test
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)
func Test_ProcessOrderWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Mock activities
var a *Activities
env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
&PaymentResult{PaymentID: "pay-123", Status: "charged"}, nil,
)
env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
&ShippingResult{TrackingCode: "TRACK-456"}, nil,
)
// Execute the workflow
input := OrderInput{OrderID: "order-1", Amount: 99.99}
env.ExecuteWorkflow(ProcessOrder, input)
// Assert
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result OrderResult
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "pay-123", result.PaymentID)
require.Equal(t, "TRACK-456", result.TrackingCode)
}go
package workflows_test
import (
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)
func Test_ProcessOrderWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// 模拟活动
var a *Activities
env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
&PaymentResult{PaymentID: "pay-123", Status: "charged"}, nil,
)
env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
&ShippingResult{TrackingCode: "TRACK-456"}, nil,
)
// 执行工作流
input := OrderInput{OrderID: "order-1", Amount: 99.99}
env.ExecuteWorkflow(ProcessOrder, input)
// 断言
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result OrderResult
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "pay-123", result.PaymentID)
require.Equal(t, "TRACK-456", result.TrackingCode)
}Test Activity Failure
测试活动失败场景
go
func Test_ProcessOrder_PaymentFails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
var a *Activities
env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
nil, temporal.NewApplicationError("payment declined", "PaymentError"),
)
env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1"})
require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.Error(t, err)
var appErr *temporal.ApplicationError
require.True(t, errors.As(err, &appErr))
require.Equal(t, "PaymentError", appErr.Type())
}go
func Test_ProcessOrder_PaymentFails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
var a *Activities
env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
nil, temporal.NewApplicationError("payment declined", "PaymentError"),
)
env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1"})
require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.Error(t, err)
var appErr *temporal.ApplicationError
require.True(t, errors.As(err, &appErr))
require.Equal(t, "PaymentError", appErr.Type())
}Mock with Custom Implementation
使用自定义实现进行模拟
go
func Test_ProcessOrder_ValidatesInput(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
var a *Activities
env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
func(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
// Custom implementation that validates the input
require.Equal(t, "order-1", input.OrderID)
require.Equal(t, 99.99, input.Amount)
return &PaymentResult{PaymentID: "pay-123"}, nil
},
)
env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
&ShippingResult{TrackingCode: "TRACK-456"}, nil,
)
env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1", Amount: 99.99})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}go
func Test_ProcessOrder_ValidatesInput(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
var a *Activities
env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
func(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
// 验证输入的自定义实现
require.Equal(t, "order-1", input.OrderID)
require.Equal(t, 99.99, input.Amount)
return &PaymentResult{PaymentID: "pay-123"}, nil
},
)
env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
&ShippingResult{TrackingCode: "TRACK-456"}, nil,
)
env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1", Amount: 99.99})
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}Activity Unit Tests
活动单元测试
go
func Test_ChargePayment(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
a := &Activities{
APIClient: mockHTTPClient,
Config: testConfig,
}
env.RegisterActivity(a)
result, err := env.ExecuteActivity(a.ChargePayment, PaymentInput{
OrderID: "order-1",
Amount: 50.00,
})
require.NoError(t, err)
var paymentResult PaymentResult
require.NoError(t, result.Get(&paymentResult))
require.Equal(t, "charged", paymentResult.Status)
}go
func Test_ChargePayment(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
a := &Activities{
APIClient: mockHTTPClient,
Config: testConfig,
}
env.RegisterActivity(a)
result, err := env.ExecuteActivity(a.ChargePayment, PaymentInput{
OrderID: "order-1",
Amount: 50.00,
})
require.NoError(t, err)
var paymentResult PaymentResult
require.NoError(t, result.Get(&paymentResult))
require.Equal(t, "charged", paymentResult.Status)
}Test with Signals
带信号的测试
go
func Test_ApprovalWorkflow_WithSignal(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Send signal after a delay
env.RegisterDelayedCallback(func() {
env.SignalWorkflow("approve", ApproveInput{ApproverName: "Alice"})
}, time.Millisecond*10)
env.ExecuteWorkflow(ApprovalWorkflow, "order-1")
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}go
func Test_ApprovalWorkflow_WithSignal(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// 延迟发送信号
env.RegisterDelayedCallback(func() {
env.SignalWorkflow("approve", ApproveInput{ApproverName: "Alice"})
}, time.Millisecond*10)
env.ExecuteWorkflow(ApprovalWorkflow, "order-1")
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
}Test with Queries
带查询的测试
go
func Test_QueryableWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
result, err := env.QueryWorkflow("get-state")
require.NoError(t, err)
var state string
require.NoError(t, result.Get(&state))
require.Equal(t, "processing", state)
}, time.Millisecond*10)
env.ExecuteWorkflow(QueryableWorkflow)
}go
func Test_QueryableWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
result, err := env.QueryWorkflow("get-state")
require.NoError(t, err)
var state string
require.NoError(t, result.Get(&state))
require.Equal(t, "processing", state)
}, time.Millisecond*10)
env.ExecuteWorkflow(QueryableWorkflow)
}Integration Tests with Dev Server
使用开发服务器的集成测试
go
func Test_Integration_WithDevServer(t *testing.T) {
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
ClientOptions: &client.Options{HostPort: ""}, // random port
})
require.NoError(t, err)
defer func() { _ = server.Stop() }()
c := server.Client()
taskQueue := "integration-test-queue"
// Start worker in background
w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(ProcessOrder)
w.RegisterActivity(&Activities{/* deps */})
go func() { _ = w.Run(worker.InterruptCh()) }()
defer w.Stop()
// Execute workflow
we, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
ID: "test-order-1",
TaskQueue: taskQueue,
}, ProcessOrder, OrderInput{OrderID: "test-1", Amount: 50.00})
require.NoError(t, err)
var result OrderResult
err = we.Get(context.Background(), &result)
require.NoError(t, err)
require.Equal(t, "test-1", result.OrderID)
}go
func Test_Integration_WithDevServer(t *testing.T) {
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
ClientOptions: &client.Options{HostPort: ""}, // 随机端口
})
require.NoError(t, err)
defer func() { _ = server.Stop() }()
c := server.Client()
taskQueue := "integration-test-queue"
// 在后台启动工作器
w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(ProcessOrder)
w.RegisterActivity(&Activities{/* 依赖项 */})
go func() { _ = w.Run(worker.InterruptCh()) }()
defer w.Stop()
// 执行工作流
we, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
ID: "test-order-1",
TaskQueue: taskQueue,
}, ProcessOrder, OrderInput{OrderID: "test-1", Amount: 50.00})
require.NoError(t, err)
var result OrderResult
err = we.Get(context.Background(), &result)
require.NoError(t, err)
require.Equal(t, "test-1", result.OrderID)
}Test Suite Pattern (testify suite)
测试套件模式(testify suite)
go
type OrderWorkflowSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *OrderWorkflowSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func (s *OrderWorkflowSuite) AfterTest(suiteName, testName string) {
s.env.AssertExpectations(s.T())
}
func (s *OrderWorkflowSuite) Test_HappyPath() {
var a *Activities
s.env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
&PaymentResult{PaymentID: "p1"}, nil,
)
s.env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
&ShippingResult{TrackingCode: "t1"}, nil,
)
s.env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "1"})
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
}
func TestOrderWorkflowSuite(t *testing.T) {
suite.Run(t, new(OrderWorkflowSuite))
}go
type OrderWorkflowSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *OrderWorkflowSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func (s *OrderWorkflowSuite) AfterTest(suiteName, testName string) {
s.env.AssertExpectations(s.T())
}
func (s *OrderWorkflowSuite) Test_HappyPath() {
var a *Activities
s.env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
&PaymentResult{PaymentID: "p1"}, nil,
)
s.env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
&ShippingResult{TrackingCode: "t1"}, nil,
)
s.env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "1"})
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
}
func TestOrderWorkflowSuite(t *testing.T) {
suite.Run(t, new(OrderWorkflowSuite))
}Key Patterns
关键模式
Saga / Compensation Pattern
Saga / 补偿模式
Use Go's to build compensation logic. If a later step fails, earlier steps are automatically compensated in reverse order.
defergo
func TransferMoney(ctx workflow.Context, details TransferDetails) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Step 1: Withdraw
err = workflow.ExecuteActivity(ctx, Withdraw, details).Get(ctx, nil)
if err != nil {
return err
}
// Compensate Step 1 if later steps fail
defer func() {
if err != nil {
errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, details).Get(ctx, nil)
err = multierr.Append(err, errCompensation)
}
}()
// Step 2: Deposit
err = workflow.ExecuteActivity(ctx, Deposit, details).Get(ctx, nil)
if err != nil {
return err // triggers WithdrawCompensation via defer
}
// Compensate Step 2 if later steps fail
defer func() {
if err != nil {
errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, details).Get(ctx, nil)
err = multierr.Append(err, errCompensation)
}
}()
// Step 3: Notify (if this fails, both Deposit and Withdraw are compensated)
err = workflow.ExecuteActivity(ctx, SendNotification, details).Get(ctx, nil)
if err != nil {
return err // triggers DepositCompensation then WithdrawCompensation
}
return nil
}使用Go的构建补偿逻辑。如果后续步骤失败,前面的步骤会自动按相反顺序进行补偿。
defergo
func TransferMoney(ctx workflow.Context, details TransferDetails) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 步骤1:取款
err = workflow.ExecuteActivity(ctx, Withdraw, details).Get(ctx, nil)
if err != nil {
return err
}
// 如果后续步骤失败,补偿步骤1
defer func() {
if err != nil {
errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, details).Get(ctx, nil)
err = multierr.Append(err, errCompensation)
}
}()
// 步骤2:存款
err = workflow.ExecuteActivity(ctx, Deposit, details).Get(ctx, nil)
if err != nil {
return err // 通过defer触发取款补偿
}
// 如果后续步骤失败,补偿步骤2
defer func() {
if err != nil {
errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, details).Get(ctx, nil)
err = multierr.Append(err, errCompensation)
}
}()
// 步骤3:通知(如果此步骤失败,存款和取款都会被补偿)
err = workflow.ExecuteActivity(ctx, SendNotification, details).Get(ctx, nil)
if err != nil {
return err // 通过defer触发存款补偿,然后是取款补偿
}
return nil
}Cancellation with Cleanup
带清理的取消
When a Workflow is canceled, use to run cleanup Activities:
workflow.NewDisconnectedContextgo
func CancellableWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true, // Wait for Activity to acknowledge cancel
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Cleanup runs even after cancellation
defer func() {
if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
return
}
// Get a new context that is NOT canceled
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(newCtx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("Cleanup failed", "Error", err)
}
}()
// Long-running activity that can be canceled
var result string
err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, &result)
return err
}当工作流被取消时,使用运行清理活动:
workflow.NewDisconnectedContextgo
func CancellableWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true, // 等待活动确认取消
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 即使被取消也会运行清理
defer func() {
if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
return
}
// 获取一个未被取消的新上下文
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(newCtx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("Cleanup failed", "Error", err)
}
}()
// 可被取消的长时间运行活动
var result string
err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, &result)
return err
}Polling Pattern: Frequent (Activity with Heartbeat)
轮询模式:频繁(带心跳的活动)
Poll inside a single long-running Activity using heartbeats:
go
// Activity: polls until success, heartbeating to stay alive
func PollForResult(ctx context.Context) (string, error) {
for {
result, err := callExternalService(ctx)
if err == nil {
return result, nil
}
activity.RecordHeartbeat(ctx)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(5 * time.Second):
// poll again
}
}
}
// Workflow: use long timeout + heartbeat
func PollingWorkflow(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
HeartbeatTimeout: 30 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, PollForResult).Get(ctx, &result)
return result, err
}在单个长时间运行的活动中使用心跳进行轮询:
go
// 活动:轮询直到成功,发送心跳以保持活跃
func PollForResult(ctx context.Context) (string, error) {
for {
result, err := callExternalService(ctx)
if err == nil {
return result, nil
}
activity.RecordHeartbeat(ctx)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(5 * time.Second):
// 再次轮询
}
}
}
// 工作流:使用长时间超时 + 心跳
func PollingWorkflow(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
HeartbeatTimeout: 30 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, PollForResult).Get(ctx, &result)
return result, err
}Polling Pattern: Infrequent (Activity Retries)
轮询模式:不频繁(活动重试)
Use the retry policy itself as the polling mechanism:
go
func InfrequentPollingWorkflow(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Second, // Short: just check once
RetryPolicy: &temporal.RetryPolicy{
BackoffCoefficient: 1, // Constant interval
InitialInterval: 60 * time.Second, // Poll every 60s
// MaximumAttempts: 0 means unlimited retries
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, CheckService).Get(ctx, &result)
return result, err
}
// Activity: check once and return error if not ready
func CheckService(ctx context.Context) (string, error) {
result, err := callExternalService(ctx)
if err != nil {
return "", err // Will be retried per RetryPolicy
}
return result, nil
}使用重试策略本身作为轮询机制:
go
func InfrequentPollingWorkflow(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Second, // 短时间:仅检查一次
RetryPolicy: &temporal.RetryPolicy{
BackoffCoefficient: 1, // 固定间隔
InitialInterval: 60 * time.Second, // 每60秒轮询一次
// MaximumAttempts: 0表示无限重试
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, CheckService).Get(ctx, &result)
return result, err
}
// 活动:检查一次,如果未就绪则返回错误
func CheckService(ctx context.Context) (string, error) {
result, err := callExternalService(ctx)
if err != nil {
return "", err // 将根据RetryPolicy进行重试
}
return result, nil
}Parallel Activities with Futures
并行活动与Future
Launch multiple Activities in parallel and collect results:
go
func ParallelWorkflow(ctx workflow.Context, items []string) ([]Result, error) {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
// Launch all activities
var futures []workflow.Future
for _, item := range items {
future := workflow.ExecuteActivity(ctx, ProcessItem, item)
futures = append(futures, future)
}
// Collect results (in order)
var results []Result
for _, future := range futures {
var result Result
if err := future.Get(ctx, &result); err != nil {
return nil, err
}
results = append(results, result)
}
return results, nil
}并行启动多个活动并收集结果:
go
func ParallelWorkflow(ctx workflow.Context, items []string) ([]Result, error) {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
// 启动所有活动
var futures []workflow.Future
for _, item := range items {
future := workflow.ExecuteActivity(ctx, ProcessItem, item)
futures = append(futures, future)
}
// 按顺序收集结果
var results []Result
for _, future := range futures {
var result Result
if err := future.Get(ctx, &result); err != nil {
return nil, err
}
results = append(results, result)
}
return results, nil
}Process Results as They Complete (Selector)
结果完成即处理(选择器)
go
func ProcessAsCompleted(ctx workflow.Context, items []string) error {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
selector := workflow.NewSelector(ctx)
var processErr error
for _, item := range items {
future := workflow.ExecuteActivity(ctx, ProcessItem, item)
selector.AddFuture(future, func(f workflow.Future) {
var result Result
if err := f.Get(ctx, &result); err != nil {
processErr = err
return
}
// Process result immediately as it arrives
workflow.GetLogger(ctx).Info("Got result", "value", result)
})
}
// Wait for all
for range items {
selector.Select(ctx)
if processErr != nil {
return processErr
}
}
return nil
}go
func ProcessAsCompleted(ctx workflow.Context, items []string) error {
ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
ctx = workflow.WithActivityOptions(ctx, ao)
selector := workflow.NewSelector(ctx)
var processErr error
for _, item := range items {
future := workflow.ExecuteActivity(ctx, ProcessItem, item)
selector.AddFuture(future, func(f workflow.Future) {
var result Result
if err := f.Get(ctx, &result); err != nil {
processErr = err
return
}
// 结果到达后立即处理
workflow.GetLogger(ctx).Info("Got result", "value", result)
})
}
// 等待所有完成
for range items {
selector.Select(ctx)
if processErr != nil {
return processErr
}
}
return nil
}Updatable Timer
可更新计时器
A timer that can be rescheduled via Signals:
go
type UpdatableTimer struct {
wakeUpTime time.Time
}
func (u *UpdatableTimer) SleepUntil(ctx workflow.Context, wakeUpTime time.Time, updateCh workflow.ReceiveChannel) error {
u.wakeUpTime = wakeUpTime
timerFired := false
for !timerFired && ctx.Err() == nil {
timerCtx, timerCancel := workflow.WithCancel(ctx)
duration := u.wakeUpTime.Sub(workflow.Now(timerCtx))
timer := workflow.NewTimer(timerCtx, duration)
workflow.NewSelector(timerCtx).
AddFuture(timer, func(f workflow.Future) {
if f.Get(timerCtx, nil) == nil {
timerFired = true
}
}).
AddReceive(updateCh, func(c workflow.ReceiveChannel, more bool) {
timerCancel() // cancel current timer
c.Receive(timerCtx, &u.wakeUpTime) // update to new time
}).
Select(timerCtx)
}
return ctx.Err()
}
func ReminderWorkflow(ctx workflow.Context, initialTime time.Time) error {
timer := UpdatableTimer{}
err := workflow.SetQueryHandler(ctx, "getWakeUpTime", func() (time.Time, error) {
return timer.wakeUpTime, nil
})
if err != nil {
return err
}
return timer.SleepUntil(ctx, initialTime, workflow.GetSignalChannel(ctx, "updateWakeUpTime"))
}可通过信号重新调度的计时器:
go
type UpdatableTimer struct {
wakeUpTime time.Time
}
func (u *UpdatableTimer) SleepUntil(ctx workflow.Context, wakeUpTime time.Time, updateCh workflow.ReceiveChannel) error {
u.wakeUpTime = wakeUpTime
timerFired := false
for !timerFired && ctx.Err() == nil {
timerCtx, timerCancel := workflow.WithCancel(ctx)
duration := u.wakeUpTime.Sub(workflow.Now(timerCtx))
timer := workflow.NewTimer(timerCtx, duration)
workflow.NewSelector(timerCtx).
AddFuture(timer, func(f workflow.Future) {
if f.Get(timerCtx, nil) == nil {
timerFired = true
}
}).
AddReceive(updateCh, func(c workflow.ReceiveChannel, more bool) {
timerCancel() // 取消当前计时器
c.Receive(timerCtx, &u.wakeUpTime) // 更新为新时间
}).
Select(timerCtx)
}
return ctx.Err()
}
func ReminderWorkflow(ctx workflow.Context, initialTime time.Time) error {
timer := UpdatableTimer{}
err := workflow.SetQueryHandler(ctx, "getWakeUpTime", func() (time.Time, error) {
return timer.wakeUpTime, nil
})
if err != nil {
return err
}
return timer.SleepUntil(ctx, initialTime, workflow.GetSignalChannel(ctx, "updateWakeUpTime"))
}Ordered Signal Processing with Await
使用Await的有序信号处理
Process signals that may arrive out of order:
go
type SignalTracker struct {
Signal1Received bool
Signal2Received bool
Signal3Received bool
FirstSignalTime time.Time
}
func (s *SignalTracker) Listen(ctx workflow.Context) {
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
s.Signal1Received = true
})
selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
s.Signal2Received = true
})
selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
s.Signal3Received = true
})
selector.Select(ctx)
if s.FirstSignalTime.IsZero() {
s.FirstSignalTime = workflow.Now(ctx)
}
}
}
func OrderedSignalsWorkflow(ctx workflow.Context) error {
var tracker SignalTracker
// Listen for signals in background goroutine
workflow.Go(ctx, tracker.Listen)
// Wait for Signal1 first
err := workflow.Await(ctx, func() bool { return tracker.Signal1Received })
if err != nil {
return err
}
// Wait for Signal2 with timeout
ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
return tracker.Signal2Received
})
if err != nil {
return err
}
if !ok {
return temporal.NewApplicationError("timed out waiting for Signal2", "timeout")
}
// Wait for Signal3 with timeout
ok, err = workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
return tracker.Signal3Received
})
if err != nil {
return err
}
if !ok {
return temporal.NewApplicationError("timed out waiting for Signal3", "timeout")
}
return nil
}处理可能乱序到达的信号:
go
type SignalTracker struct {
Signal1Received bool
Signal2Received bool
Signal3Received bool
FirstSignalTime time.Time
}
func (s *SignalTracker) Listen(ctx workflow.Context) {
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
s.Signal1Received = true
})
selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
s.Signal2Received = true
})
selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
s.Signal3Received = true
})
selector.Select(ctx)
if s.FirstSignalTime.IsZero() {
s.FirstSignalTime = workflow.Now(ctx)
}
}
}
func OrderedSignalsWorkflow(ctx workflow.Context) error {
var tracker SignalTracker
// 在后台协程中监听信号
workflow.Go(ctx, tracker.Listen)
// 先等待Signal1
err := workflow.Await(ctx, func() bool { return tracker.Signal1Received })
if err != nil {
return err
}
// 带超时等待Signal2
ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
return tracker.Signal2Received
})
if err != nil {
return err
}
if !ok {
return temporal.NewApplicationError("timed out waiting for Signal2", "timeout")
}
// 带超时等待Signal3
ok, err = workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
return tracker.Signal3Received
})
if err != nil {
return err
}
if !ok {
return temporal.NewApplicationError("timed out waiting for Signal3", "timeout")
}
return nil
}Workflow Info
工作流信息
Access metadata about the current Workflow Execution:
go
info := workflow.GetInfo(ctx)
info.WorkflowExecution.ID // Workflow ID
info.WorkflowExecution.RunID // Run ID
info.WorkflowType.Name // Workflow type name
info.TaskQueueName // Task Queue
info.Attempt // Current attempt (starts at 1)
info.GetContinueAsNewSuggested() // True when history is getting large
info.GetCurrentHistoryLength() // Current history event count访问当前工作流执行的元数据:
go
info := workflow.GetInfo(ctx)
info.WorkflowExecution.ID // 工作流ID
info.WorkflowExecution.RunID // Run ID
info.WorkflowType.Name // 工作流类型名称
info.TaskQueueName // 任务队列
info.Attempt // 当前尝试次数(从1开始)
info.GetContinueAsNewSuggested() // 当历史记录过大时返回True
info.GetCurrentHistoryLength() // 当前历史事件数量DeterministicKeys
DeterministicKeys
When iterating over maps in Workflow code, always use to get a sorted, deterministic key order:
workflow.DeterministicKeysgo
myMap := map[string]string{"b": "2", "a": "1", "c": "3"}
// WRONG: range over map is non-deterministic
// for k, v := range myMap { ... }
// CORRECT: use DeterministicKeys
for _, k := range workflow.DeterministicKeys(myMap) {
v := myMap[k]
// process k, v
}在工作流代码中遍历map时,始终使用获取排序后的确定性键顺序:
workflow.DeterministicKeysgo
myMap := map[string]string{"b": "2", "a": "1", "c": "3"}
// 错误:map遍历是非确定性的
// for k, v := range myMap { ... }
// 正确:使用DeterministicKeys
for _, k := range workflow.DeterministicKeys(myMap) {
v := myMap[k]
// 处理k, v
}