temporal-go

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Temporal Go SDK

Temporal Go SDK

Build durable, fault-tolerant distributed applications using the Temporal Go SDK (
go.temporal.io/sdk
). Temporal provides workflow orchestration with automatic retries, state persistence, and exactly-once execution semantics.
使用Temporal Go SDK (
go.temporal.io/sdk
) 构建持久化、容错的分布式应用程序。Temporal提供工作流编排功能,支持自动重试、状态持久化和恰好一次执行语义。

Core Principles

核心原则

  • Deterministic Workflows: Workflow code must be deterministic. Never use
    time.Now()
    ,
    rand
    ,
    map
    range, goroutines, or I/O directly in Workflows.
  • Activities for Side Effects: All non-deterministic operations (HTTP calls, DB queries, file I/O) go in Activities.
  • Struct Parameters: Use single struct parameters and return values for forward compatibility.
  • Idempotent Activities: Activities may be retried; design them to be safely re-executable.
  • Explicit Error Handling: Use Temporal's typed errors (
    ApplicationError
    ,
    TimeoutError
    ,
    CanceledError
    ) for control flow.
  • 确定性工作流:工作流代码必须具有确定性。绝不要在工作流中直接使用
    time.Now()
    rand
    、map遍历、goroutine或I/O操作。
  • 活动处理副作用:所有非确定性操作(HTTP调用、数据库查询、文件I/O)都应放在活动中执行。
  • 结构体参数:使用单个结构体作为参数和返回值,以保证向前兼容性。
  • 幂等活动:活动可能会被重试;设计时要确保它们可以安全地重复执行。
  • 显式错误处理:使用Temporal的类型化错误(
    ApplicationError
    TimeoutError
    CanceledError
    )进行控制流管理。

Project Structure

项目结构

.
├── cmd/
│   ├── worker/           # Worker process entry point
│   └── starter/          # Workflow starter (client) entry point
├── workflows/            # Workflow definitions
│   ├── order.go
│   └── order_test.go
├── activities/           # Activity definitions
│   ├── payment.go
│   └── payment_test.go
├── shared/               # Shared types (params, results, constants)
│   └── types.go
├── go.mod
└── go.sum
.
├── cmd/
│   ├── worker/           # 工作器进程入口
│   └── starter/          # 工作流启动器(客户端)入口
├── workflows/            # 工作流定义
│   ├── order.go
│   └── order_test.go
├── activities/           # 活动定义
│   ├── payment.go
│   └── payment_test.go
├── shared/               # 共享类型(参数、结果、常量)
│   └── types.go
├── go.mod
└── go.sum

Imports

导入

go
import (
    // Client - for connecting to Temporal and starting workflows
    "go.temporal.io/sdk/client"

    // Workflow - for writing workflow definitions
    "go.temporal.io/sdk/workflow"

    // Activity - for writing activity definitions
    "go.temporal.io/sdk/activity"

    // Worker - for creating and running workers
    "go.temporal.io/sdk/worker"

    // Temporal - for error types and retry policies
    "go.temporal.io/sdk/temporal"

    // Testing - for test environments
    "go.temporal.io/sdk/testsuite"

    // Logging
    "go.temporal.io/sdk/log"
)
go
import (
    // Client - 用于连接Temporal并启动工作流
    "go.temporal.io/sdk/client"

    // Workflow - 用于编写工作流定义
    "go.temporal.io/sdk/workflow"

    // Activity - 用于编写活动定义
    "go.temporal.io/sdk/activity"

    // Worker - 用于创建和运行工作器
    "go.temporal.io/sdk/worker"

    // Temporal - 用于错误类型和重试策略
    "go.temporal.io/sdk/temporal"

    // Testing - 用于测试环境
    "go.temporal.io/sdk/testsuite"

    // Logging
    "go.temporal.io/sdk/log"
)

Workflow Definitions

工作流定义

Basic Workflow

基础工作流

A Workflow Definition is an exportable function. The first parameter must be
workflow.Context
. Return
error
or
(result, error)
. Use struct params for forward compatibility.
go
package workflows

import (
    "time"
    "go.temporal.io/sdk/workflow"
)

type OrderInput struct {
    OrderID    string
    CustomerID string
    Items      []string
    Amount     float64
}

type OrderResult struct {
    OrderID       string
    PaymentID     string
    TrackingCode  string
    CompletedAt   string
}

func ProcessOrder(ctx workflow.Context, input OrderInput) (*OrderResult, error) {
    logger := workflow.GetLogger(ctx)
    logger.Info("ProcessOrder started", "orderID", input.OrderID)

    // Set activity options (StartToCloseTimeout OR ScheduleToCloseTimeout required)
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Use nil struct pointer to call struct-method Activities
    var a *Activities

    var paymentResult PaymentResult
    err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &paymentResult)
    if err != nil {
        return nil, err
    }

    var shippingResult ShippingResult
    err = workflow.ExecuteActivity(ctx, a.ShipOrder, input).Get(ctx, &shippingResult)
    if err != nil {
        return nil, err
    }

    return &OrderResult{
        OrderID:      input.OrderID,
        PaymentID:    paymentResult.PaymentID,
        TrackingCode: shippingResult.TrackingCode,
        CompletedAt:  workflow.Now(ctx).Format(time.RFC3339),
    }, nil
}
工作流定义是一个可导出的函数。第一个参数必须为
workflow.Context
。返回值为
error
(result, error)
。使用结构体参数以保证向前兼容性。
go
package workflows

import (
    "time"
    "go.temporal.io/sdk/workflow"
)

type OrderInput struct {
    OrderID    string
    CustomerID string
    Items      []string
    Amount     float64
}

type OrderResult struct {
    OrderID       string
    PaymentID     string
    TrackingCode  string
    CompletedAt   string
}

func ProcessOrder(ctx workflow.Context, input OrderInput) (*OrderResult, error) {
    logger := workflow.GetLogger(ctx)
    logger.Info("ProcessOrder started", "orderID", input.OrderID)

    // 设置活动选项(必须指定StartToCloseTimeout或ScheduleToCloseTimeout)
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // 使用nil结构体指针调用结构体方法类型的活动
    var a *Activities

    var paymentResult PaymentResult
    err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &paymentResult)
    if err != nil {
        return nil, err
    }

    var shippingResult ShippingResult
    err = workflow.ExecuteActivity(ctx, a.ShipOrder, input).Get(ctx, &shippingResult)
    if err != nil {
        return nil, err
    }

    return &OrderResult{
        OrderID:      input.OrderID,
        PaymentID:    paymentResult.PaymentID,
        TrackingCode: shippingResult.TrackingCode,
        CompletedAt:  workflow.Now(ctx).Format(time.RFC3339),
    }, nil
}

Determinism Rules

确定性规则

Workflow code MUST be deterministic. Use Temporal SDK replacements for non-deterministic Go constructs:
Go ConstructTemporal ReplacementPackage
time.Now()
workflow.Now(ctx)
workflow
time.Sleep()
workflow.Sleep(ctx, d)
workflow
go func()
workflow.Go(ctx, func(ctx workflow.Context) {...})
workflow
chan
workflow.Channel
/
workflow.NewChannel(ctx)
workflow
select
workflow.Selector
/
workflow.NewSelector(ctx)
workflow
context.Context
workflow.Context
workflow
rand.Intn()
workflow.SideEffect(ctx, func(...) interface{})
workflow
log.Println()
workflow.GetLogger(ctx).Info(...)
workflow
range
over
map
workflow.DeterministicKeys(m)
then iterate
workflow
工作流代码必须具有确定性。使用Temporal SDK替代Go中的非确定性构造:
Go构造Temporal替代方案
time.Now()
workflow.Now(ctx)
workflow
time.Sleep()
workflow.Sleep(ctx, d)
workflow
go func()
workflow.Go(ctx, func(ctx workflow.Context) {...})
workflow
chan
workflow.Channel
/
workflow.NewChannel(ctx)
workflow
select
workflow.Selector
/
workflow.NewSelector(ctx)
workflow
context.Context
workflow.Context
workflow
rand.Intn()
workflow.SideEffect(ctx, func(...) interface{})
workflow
log.Println()
workflow.GetLogger(ctx).Info(...)
workflow
range
遍历
map
workflow.DeterministicKeys(m)
后再遍历
workflow

Logging (replay-safe)

日志(重放安全)

Always use
workflow.GetLogger(ctx)
-- it suppresses duplicate logs during replay:
go
logger := workflow.GetLogger(ctx)
logger.Info("Processing started", "orderID", orderID)
logger.Error("Activity failed", "Error", err)
始终使用
workflow.GetLogger(ctx)
——它会在重放期间抑制重复日志:
go
logger := workflow.GetLogger(ctx)
logger.Info("Processing started", "orderID", orderID)
logger.Error("Activity failed", "Error", err)

Side Effects

副作用

Capture non-deterministic values (random numbers, UUIDs) so they are recorded in history and replayed consistently:
go
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
    return rand.Intn(100)
})
var randomValue int
encodedRandom.Get(&randomValue)
捕获非确定性值(随机数、UUID),以便它们被记录到历史中并一致地重放:
go
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
    return rand.Intn(100)
})
var randomValue int
encodedRandom.Get(&randomValue)

Selectors (replacing
select
)

选择器(替代
select

Use
workflow.Selector
to wait on multiple Futures and Channels. The selector picks the first ready callback.
go
func SampleTimerWorkflow(ctx workflow.Context, timeout time.Duration) error {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    childCtx, cancelHandler := workflow.WithCancel(ctx)
    selector := workflow.NewSelector(ctx)

    var processingDone bool

    // Add a Future (activity result)
    f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
    selector.AddFuture(f, func(f workflow.Future) {
        processingDone = true
        cancelHandler() // cancel the timer
    })

    // Add a timer Future
    timerFuture := workflow.NewTimer(childCtx, timeout)
    selector.AddFuture(timerFuture, func(f workflow.Future) {
        if !processingDone {
            _ = workflow.ExecuteActivity(ctx, SendNotificationActivity).Get(ctx, nil)
        }
    })

    // Wait for the first to complete
    selector.Select(ctx)

    // If timer fired first, still wait for processing to finish
    if !processingDone {
        selector.Select(ctx)
    }

    return nil
}
使用
workflow.Selector
等待多个Future和Channel。选择器会选择第一个就绪的回调函数。
go
func SampleTimerWorkflow(ctx workflow.Context, timeout time.Duration) error {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    childCtx, cancelHandler := workflow.WithCancel(ctx)
    selector := workflow.NewSelector(ctx)

    var processingDone bool

    // 添加一个Future(活动结果)
    f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
    selector.AddFuture(f, func(f workflow.Future) {
        processingDone = true
        cancelHandler() // 取消计时器
    })

    // 添加一个计时器Future
    timerFuture := workflow.NewTimer(childCtx, timeout)
    selector.AddFuture(timerFuture, func(f workflow.Future) {
        if !processingDone {
            _ = workflow.ExecuteActivity(ctx, SendNotificationActivity).Get(ctx, nil)
        }
    })

    // 等待第一个完成
    selector.Select(ctx)

    // 如果计时器先触发,仍需等待处理完成
    if !processingDone {
        selector.Select(ctx)
    }

    return nil
}

Timers

计时器

go
// Simple sleep
workflow.Sleep(ctx, 5*time.Minute)

// Cancellable timer via NewTimer
timerCtx, timerCancel := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, 30*time.Minute)

// Cancel the timer when no longer needed
timerCancel()
go
// 简单睡眠
workflow.Sleep(ctx, 5*time.Minute)

// 通过NewTimer创建可取消的计时器
timerCtx, timerCancel := workflow.WithCancel(ctx)
timer := workflow.NewTimer(timerCtx, 30*time.Minute)

// 不再需要时取消计时器
timerCancel()

Goroutines (workflow.Go)

Goroutine(workflow.Go)

Never use native
go
-- use
workflow.Go()
for deterministic coroutines:
go
func SampleGoroutineWorkflow(ctx workflow.Context, parallelism int) ([]string, error) {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    var results []string
    var err error

    for i := 0; i < parallelism; i++ {
        input := fmt.Sprint(i) // capture outside lambda
        workflow.Go(ctx, func(gCtx workflow.Context) {
            // IMPORTANT: use gCtx (the goroutine context), not ctx
            var result string
            err = workflow.ExecuteActivity(gCtx, ProcessItem, input).Get(gCtx, &result)
            if err != nil {
                return
            }
            results = append(results, result)
        })
    }

    // Wait for all goroutines to complete
    _ = workflow.Await(ctx, func() bool {
        return err != nil || len(results) == parallelism
    })
    return results, err
}
绝不要使用原生
go
——使用
workflow.Go()
实现确定性协程:
go
func SampleGoroutineWorkflow(ctx workflow.Context, parallelism int) ([]string, error) {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    var results []string
    var err error

    for i := 0; i < parallelism; i++ {
        input := fmt.Sprint(i) // 在lambda外部捕获变量
        workflow.Go(ctx, func(gCtx workflow.Context) {
            // 重要:使用gCtx(协程上下文),而非ctx
            var result string
            err = workflow.ExecuteActivity(gCtx, ProcessItem, input).Get(gCtx, &result)
            if err != nil {
                return
            }
            results = append(results, result)
        })
    }

    // 等待所有协程完成
    _ = workflow.Await(ctx, func() bool {
        return err != nil || len(results) == parallelism
    })
    return results, err
}

Await and AwaitWithTimeout

Await和AwaitWithTimeout

Block a Workflow until a condition becomes true, without polling:
go
// Block indefinitely until condition is met
err := workflow.Await(ctx, func() bool {
    return isApproved
})

// Block with a timeout
ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
    return isApproved
})
if err != nil {
    return err // canceled
}
if !ok {
    return temporal.NewApplicationError("approval timed out", "timeout")
}
阻塞工作流直到条件满足,无需轮询:
go
// 无限期阻塞直到条件满足
err := workflow.Await(ctx, func() bool {
    return isApproved
})

// 带超时的阻塞
ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
    return isApproved
})
if err != nil {
    return err // 已取消
}
if !ok {
    return temporal.NewApplicationError("审批超时", "timeout")
}

Child Workflows

子工作流

Execute a Workflow from within another Workflow:
go
func ParentWorkflow(ctx workflow.Context) (string, error) {
    cwo := workflow.ChildWorkflowOptions{
        WorkflowID: "child-workflow-id",
        // Optional: TaskQueue, RetryPolicy, etc.
    }
    ctx = workflow.WithChildOptions(ctx, cwo)

    var result string
    err := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, "input-data").Get(ctx, &result)
    if err != nil {
        return "", err
    }
    return result, nil
}

func ChildWorkflow(ctx workflow.Context, input string) (string, error) {
    logger := workflow.GetLogger(ctx)
    logger.Info("Child workflow started", "input", input)
    return "processed: " + input, nil
}
在一个工作流中执行另一个工作流:
go
func ParentWorkflow(ctx workflow.Context) (string, error) {
    cwo := workflow.ChildWorkflowOptions{
        WorkflowID: "child-workflow-id",
        // 可选:TaskQueue、RetryPolicy等
    }
    ctx = workflow.WithChildOptions(ctx, cwo)

    var result string
    err := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, "input-data").Get(ctx, &result)
    if err != nil {
        return "", err
    }
    return result, nil
}

func ChildWorkflow(ctx workflow.Context, input string) (string, error) {
    logger := workflow.GetLogger(ctx)
    logger.Info("Child workflow started", "input", input)
    return "processed: " + input, nil
}

Continue-As-New

Continue-As-New

For long-running Workflows, use Continue-As-New to keep Event History from growing too large. This closes the current Workflow Execution and starts a fresh one with the same Workflow ID:
go
func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) error {
    // Check if Temporal suggests continuing as new
    if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
        return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
    }

    // ... do work, update state ...

    // Explicitly continue as new after N iterations
    state.Iteration++
    if state.Iteration >= 1000 {
        return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
    }

    return nil
}
When using Update or Signal handlers, wait for them to finish before continuing as new:
go
err := workflow.Await(ctx, func() bool {
    return workflow.AllHandlersFinished(ctx)
})
if err != nil {
    return err
}
return workflow.NewContinueAsNewError(ctx, MyWorkflow, updatedState)
对于长时间运行的工作流,使用Continue-As-New避免事件历史记录过大。这会关闭当前工作流执行,并使用相同的Workflow ID启动一个新的工作流:
go
func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) error {
    // 检查Temporal是否建议使用Continue-As-New
    if workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
        return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
    }

    // ...执行工作,更新状态...

    // 在N次迭代后显式使用Continue-As-New
    state.Iteration++
    if state.Iteration >= 1000 {
        return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
    }

    return nil
}
当使用Update或Signal处理器时,在Continue-As-New前等待它们完成:
go
err := workflow.Await(ctx, func() bool {
    return workflow.AllHandlersFinished(ctx)
})
if err != nil {
    return err
}
return workflow.NewContinueAsNewError(ctx, MyWorkflow, updatedState)

Signals

信号

Signals are asynchronous messages sent to a running Workflow to change its state.
信号是发送给运行中工作流的异步消息,用于更改其状态。

Handle Signals in a Workflow

在工作流中处理信号

go
const ApproveSignal = "approve"

type ApproveInput struct {
    ApproverName string
}

func ApprovalWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // Get the signal channel
    signalChan := workflow.GetSignalChannel(ctx, ApproveSignal)

    // Block until signal is received
    var approval ApproveInput
    signalChan.Receive(ctx, &approval)
    logger.Info("Received approval", "approver", approval.ApproverName)

    // Continue workflow execution after signal...
    return nil
}
go
const ApproveSignal = "approve"

type ApproveInput struct {
    ApproverName string
}

func ApprovalWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // 获取信号通道
    signalChan := workflow.GetSignalChannel(ctx, ApproveSignal)

    // 阻塞直到收到信号
    var approval ApproveInput
    signalChan.Receive(ctx, &approval)
    logger.Info("Received approval", "approver", approval.ApproverName)

    // 收到信号后继续工作流执行...
    return nil
}

Listen for Signals in a Background Goroutine

在后台协程中监听信号

go
func OrderWorkflow(ctx workflow.Context) error {
    var lastSignalData string

    // Listen for signals in a separate goroutine
    workflow.Go(ctx, func(gCtx workflow.Context) {
        signalChan := workflow.GetSignalChannel(gCtx, "my-signal")
        for {
            selector := workflow.NewSelector(gCtx)
            selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
                c.Receive(gCtx, &lastSignalData)
            })
            selector.Select(gCtx)
        }
    })

    // Main workflow logic continues...
    // lastSignalData is updated whenever a signal arrives
    return nil
}
go
func OrderWorkflow(ctx workflow.Context) error {
    var lastSignalData string

    // 在单独的协程中监听信号
    workflow.Go(ctx, func(gCtx workflow.Context) {
        signalChan := workflow.GetSignalChannel(gCtx, "my-signal")
        for {
            selector := workflow.NewSelector(gCtx)
            selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
                c.Receive(gCtx, &lastSignalData)
            })
            selector.Select(gCtx)
        }
    })

    // 主工作流逻辑继续执行...
    // 每当收到信号时,lastSignalData会被更新
    return nil
}

Drain Signals Before Continue-As-New

在Continue-As-New前处理完所有信号

Always drain buffered signals before completing or continuing as new:
go
signalChan := workflow.GetSignalChannel(ctx, "my-signal")
for {
    var data string
    ok := signalChan.ReceiveAsync(&data)
    if !ok {
        break
    }
    // process data
}
在完成或Continue-As-New前,务必处理完所有缓冲的信号:
go
signalChan := workflow.GetSignalChannel(ctx, "my-signal")
for {
    var data string
    ok := signalChan.ReceiveAsync(&data)
    if !ok {
        break
    }
    // 处理数据
}

Queries

查询

Queries are synchronous, read-only operations that inspect Workflow state. Query handlers must NOT mutate state.
查询是同步的只读操作,用于检查工作流状态。查询处理器不得修改状态。

Set a Query Handler

设置查询处理器

go
func QueryableWorkflow(ctx workflow.Context) error {
    currentState := "initialized"

    // Register a query handler
    err := workflow.SetQueryHandler(ctx, "get-state", func() (string, error) {
        return currentState, nil
    })
    if err != nil {
        return err
    }

    // Query handlers with input parameters
    err = workflow.SetQueryHandler(ctx, "get-item", func(itemID string) (*Item, error) {
        item, ok := items[itemID]
        if !ok {
            return nil, fmt.Errorf("item %s not found", itemID)
        }
        return item, nil
    })
    if err != nil {
        return err
    }

    // Workflow logic that updates currentState...
    currentState = "processing"
    workflow.Sleep(ctx, time.Minute)
    currentState = "done"
    return nil
}
go
func QueryableWorkflow(ctx workflow.Context) error {
    currentState := "initialized"

    // 注册查询处理器
    err := workflow.SetQueryHandler(ctx, "get-state", func() (string, error) {
        return currentState, nil
    })
    if err != nil {
        return err
    }

    // 带输入参数的查询处理器
    err = workflow.SetQueryHandler(ctx, "get-item", func(itemID string) (*Item, error) {
        item, ok := items[itemID]
        if !ok {
            return nil, fmt.Errorf("item %s not found", itemID)
        }
        return item, nil
    })
    if err != nil {
        return err
    }

    // 更新currentState的工作流逻辑...
    currentState = "processing"
    workflow.Sleep(ctx, time.Minute)
    currentState = "done"
    return nil
}

Updates

更新

Updates are synchronous, trackable requests that can mutate Workflow state and return results. They support validators for rejecting invalid requests before they are written to history.
更新是同步的、可追踪的请求,可修改工作流状态并返回结果。它们支持验证器,用于在写入历史前拒绝无效请求。

Set an Update Handler

设置更新处理器

go
const FetchAndAdd = "fetch_and_add"
const Done = "done"

func CounterWorkflow(ctx workflow.Context) (int, error) {
    counter := 0

    // Update handler with validator
    err := workflow.SetUpdateHandlerWithOptions(
        ctx,
        FetchAndAdd,
        func(ctx workflow.Context, addend int) (int, error) {
            previous := counter
            counter += addend
            return previous, nil
        },
        workflow.UpdateHandlerOptions{
            Validator: func(ctx workflow.Context, addend int) error {
                if addend < 0 {
                    return fmt.Errorf("addend must be non-negative (%d)", addend)
                }
                return nil
            },
        },
    )
    if err != nil {
        return 0, err
    }

    // Wait for a "done" signal to finish
    _ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
    return counter, ctx.Err()
}
go
const FetchAndAdd = "fetch_and_add"
const Done = "done"

func CounterWorkflow(ctx workflow.Context) (int, error) {
    counter := 0

    // 带验证器的更新处理器
    err := workflow.SetUpdateHandlerWithOptions(
        ctx,
        FetchAndAdd,
        func(ctx workflow.Context, addend int) (int, error) {
            previous := counter
            counter += addend
            return previous, nil
        },
        workflow.UpdateHandlerOptions{
            Validator: func(ctx workflow.Context, addend int) error {
                if addend < 0 {
                    return fmt.Errorf("addend must be non-negative (%d)", addend)
                }
                return nil
            },
        },
    )
    if err != nil {
        return 0, err
    }

    // 等待"done"信号以结束
    _ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
    return counter, ctx.Err()
}

Update Handlers with Activities and Mutexes

包含活动和互斥锁的更新处理器

For safe concurrent access, use
workflow.Mutex
:
go
type Manager struct {
    state    map[string]string
    nodeLock workflow.Mutex
}

func (m *Manager) AssignNode(ctx workflow.Context, input AssignInput) (AssignResult, error) {
    // Wait until ready
    err := workflow.Await(ctx, func() bool { return m.isStarted })
    if err != nil {
        return AssignResult{}, err
    }

    // Acquire mutex for safe concurrent access
    err = m.nodeLock.Lock(ctx)
    if err != nil {
        return AssignResult{}, err
    }
    defer m.nodeLock.Unlock()

    // Execute activity while holding lock
    actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        ScheduleToCloseTimeout: 10 * time.Second,
    })
    err = workflow.ExecuteActivity(actCtx, DoAssignment, input).Get(actCtx, nil)
    if err != nil {
        return AssignResult{}, err
    }

    m.state[input.NodeID] = input.JobName
    return AssignResult{NodeID: input.NodeID}, nil
}
为了安全的并发访问,使用
workflow.Mutex
go
type Manager struct {
    state    map[string]string
    nodeLock workflow.Mutex
}

func (m *Manager) AssignNode(ctx workflow.Context, input AssignInput) (AssignResult, error) {
    // 等待就绪
    err := workflow.Await(ctx, func() bool { return m.isStarted })
    if err != nil {
        return AssignResult{}, err
    }

    // 获取互斥锁以保证安全并发访问
    err = m.nodeLock.Lock(ctx)
    if err != nil {
        return AssignResult{}, err
    }
    defer m.nodeLock.Unlock()

    // 持有锁时执行活动
    actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        ScheduleToCloseTimeout: 10 * time.Second,
    })
    err = workflow.ExecuteActivity(actCtx, DoAssignment, input).Get(actCtx, nil)
    if err != nil {
        return AssignResult{}, err
    }

    m.state[input.NodeID] = input.JobName
    return AssignResult{NodeID: input.NodeID}, nil
}

Activity Definitions

活动定义

Basic Activity

基础活动

Activities are normal Go functions. The first parameter should be
context.Context
. They can perform any operation (I/O, HTTP calls, DB queries).
go
package activities

import (
    "context"
    "go.temporal.io/sdk/activity"
)

// Simple function activity
func SendEmail(ctx context.Context, recipient, subject, body string) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Sending email", "to", recipient)
    // ... actual email sending logic ...
    return nil
}
活动是普通的Go函数。第一个参数应为
context.Context
。它们可以执行任何操作(I/O、HTTP调用、数据库查询)。
go
package activities

import (
    "context"
    "go.temporal.io/sdk/activity"
)

// 简单函数式活动
func SendEmail(ctx context.Context, recipient, subject, body string) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Sending email", "to", recipient)
    // ...实际的邮件发送逻辑...
    return nil
}

Struct-Based Activities (recommended for dependencies)

基于结构体的活动(推荐用于依赖管理)

Use struct methods when Activities need shared dependencies (DB pools, HTTP clients, configs). This is the recommended pattern.
go
type Activities struct {
    DBPool    *pgxpool.Pool
    APIClient *http.Client
    Config    *AppConfig
}

type PaymentInput struct {
    OrderID  string
    Amount   float64
    Currency string
}

type PaymentResult struct {
    PaymentID string
    Status    string
}

func (a *Activities) ChargePayment(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("Charging payment", "orderID", input.OrderID, "amount", input.Amount)

    // Use shared dependencies
    result, err := a.APIClient.Post(/* ... */)
    if err != nil {
        return nil, err
    }

    return &PaymentResult{
        PaymentID: result.ID,
        Status:    "charged",
    }, nil
}

func (a *Activities) RefundPayment(ctx context.Context, input PaymentInput) error {
    // Compensation activity for saga pattern
    logger := activity.GetLogger(ctx)
    logger.Info("Refunding payment", "orderID", input.OrderID)
    // ... refund logic ...
    return nil
}
When calling struct-method activities from a Workflow, use a nil struct pointer:
go
// In workflow code:
var a *Activities  // nil pointer is fine -- used only for method resolution
err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &result)
当活动需要共享依赖(数据库连接池、HTTP客户端、配置)时,使用结构体方法。这是推荐的模式。
go
type Activities struct {
    DBPool    *pgxpool.Pool
    APIClient *http.Client
    Config    *AppConfig
}

type PaymentInput struct {
    OrderID  string
    Amount   float64
    Currency string
}

type PaymentResult struct {
    PaymentID string
    Status    string
}

func (a *Activities) ChargePayment(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("Charging payment", "orderID", input.OrderID, "amount", input.Amount)

    // 使用共享依赖
    result, err := a.APIClient.Post(/* ... */)
    if err != nil {
        return nil, err
    }

    return &PaymentResult{
        PaymentID: result.ID,
        Status:    "charged",
    }, nil
}

func (a *Activities) RefundPayment(ctx context.Context, input PaymentInput) error {
    // Saga模式的补偿活动
    logger := activity.GetLogger(ctx)
    logger.Info("Refunding payment", "orderID", input.OrderID)
    // ...退款逻辑...
    return nil
}
从工作流中调用结构体方法类型的活动时,使用nil结构体指针:
go
// 在工作流代码中:
var a *Activities  // nil指针即可——仅用于方法解析
err := workflow.ExecuteActivity(ctx, a.ChargePayment, input).Get(ctx, &result)

Activity Heartbeating

活动心跳

Long-running Activities should heartbeat to report progress. If the Activity fails and retries, it can resume from the last heartbeated progress.
go
func BatchProcessingActivity(ctx context.Context, startIdx, endIdx int) error {
    logger := activity.GetLogger(ctx)

    // Resume from last heartbeat on retry
    i := startIdx
    if activity.HasHeartbeatDetails(ctx) {
        var lastCompleted int
        if err := activity.GetHeartbeatDetails(ctx, &lastCompleted); err == nil {
            i = lastCompleted + 1
            logger.Info("Resuming from heartbeat", "index", i)
        }
    }

    for ; i <= endIdx; i++ {
        // Do work for item i...
        logger.Info("Processing item", "index", i)

        // Record progress -- also delivers cancellation signals
        activity.RecordHeartbeat(ctx, i)

        // Check for cancellation
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
    }

    return nil
}
Set
HeartbeatTimeout
in the Workflow's
ActivityOptions
:
go
ao := workflow.ActivityOptions{
    StartToCloseTimeout: 24 * time.Hour,
    HeartbeatTimeout:    30 * time.Second,  // Must heartbeat at least every 30s
}
长时间运行的活动应发送心跳以报告进度。如果活动失败并重试,它可以从最后一次心跳的进度处恢复。
go
func BatchProcessingActivity(ctx context.Context, startIdx, endIdx int) error {
    logger := activity.GetLogger(ctx)

    // 重试时从最后一次心跳处恢复
    i := startIdx
    if activity.HasHeartbeatDetails(ctx) {
        var lastCompleted int
        if err := activity.GetHeartbeatDetails(ctx, &lastCompleted); err == nil {
            i = lastCompleted + 1
            logger.Info("Resuming from heartbeat", "index", i)
        }
    }

    for ; i <= endIdx; i++ {
        // 处理第i项...
        logger.Info("Processing item", "index", i)

        // 记录进度——同时传递取消信号
        activity.RecordHeartbeat(ctx, i)

        // 检查是否被取消
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
    }

    return nil
}
在工作流的
ActivityOptions
中设置
HeartbeatTimeout
go
ao := workflow.ActivityOptions{
    StartToCloseTimeout: 24 * time.Hour,
    HeartbeatTimeout:    30 * time.Second,  // 必须至少每30秒发送一次心跳
}

Activity Errors

活动错误

go
import "go.temporal.io/sdk/temporal"

// Retryable error (default) -- will be retried per RetryPolicy
return temporal.NewApplicationError("temporary failure", "TempError", details)

// Non-retryable error -- will NOT be retried regardless of RetryPolicy
return temporal.NewNonRetryableApplicationError("bad input", "ValidationError", nil, details)

// Standard Go errors are converted to retryable ApplicationError automatically
return fmt.Errorf("something went wrong: %w", err)
go
import "go.temporal.io/sdk/temporal"

// 可重试错误(默认)——会根据RetryPolicy进行重试
return temporal.NewApplicationError("temporary failure", "TempError", details)

// 不可重试错误——无论RetryPolicy如何,都不会重试
return temporal.NewNonRetryableApplicationError("bad input", "ValidationError", nil, details)

// 标准Go错误会自动转换为可重试的ApplicationError
return fmt.Errorf("something went wrong: %w", err)

Worker Setup

工作器设置

A Worker polls a Task Queue for Workflow Tasks and Activity Tasks.
go
package main

import (
    "log"

    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"

    "myapp/activities"
    "myapp/workflows"
)

func main() {
    // Create a Temporal client (heavyweight -- create once per process)
    c, err := client.Dial(client.Options{
        HostPort:  "localhost:7233",       // default
        Namespace: "default",              // default
    })
    if err != nil {
        log.Fatalln("Unable to create client", err)
    }
    defer c.Close()

    // Create a Worker that listens on a specific Task Queue
    w := worker.New(c, "my-task-queue", worker.Options{
        // Optional: tune worker concurrency
        // MaxConcurrentActivityExecutionSize:     defaults to 1000
        // MaxConcurrentWorkflowTaskExecutionSize: defaults to 1000
    })

    // Register Workflow functions
    w.RegisterWorkflow(workflows.ProcessOrder)
    w.RegisterWorkflow(workflows.ApprovalWorkflow)

    // Register Activity struct (registers all exported methods)
    w.RegisterActivity(&activities.Activities{
        DBPool:    dbPool,
        APIClient: httpClient,
        Config:    config,
    })

    // Register standalone Activity functions
    w.RegisterActivity(activities.SendEmail)

    // Run the Worker (blocks until interrupt signal)
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Unable to start worker", err)
    }
}
工作器轮询任务队列以获取工作流任务和活动任务。
go
package main

import (
    "log"

    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"

    "myapp/activities"
    "myapp/workflows"
)

func main() {
    // 创建Temporal客户端(重量级——每个进程创建一次)
    c, err := client.Dial(client.Options{
        HostPort:  "localhost:7233",       // 默认值
        Namespace: "default",              // 默认值
    })
    if err != nil {
        log.Fatalln("Unable to create client", err)
    }
    defer c.Close()

    // 创建监听特定任务队列的工作器
    w := worker.New(c, "my-task-queue", worker.Options{
        // 可选:调整工作器并发数
        // MaxConcurrentActivityExecutionSize:     默认值为1000
        // MaxConcurrentWorkflowTaskExecutionSize: 默认值为1000
    })

    // 注册工作流函数
    w.RegisterWorkflow(workflows.ProcessOrder)
    w.RegisterWorkflow(workflows.ApprovalWorkflow)

    // 注册活动结构体(注册所有导出方法)
    w.RegisterActivity(&activities.Activities{
        DBPool:    dbPool,
        APIClient: httpClient,
        Config:    config,
    })

    // 注册独立的活动函数
    w.RegisterActivity(activities.SendEmail)

    // 运行工作器(阻塞直到收到中断信号)
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Unable to start worker", err)
    }
}

Custom Registration Names

自定义注册名称

go
// Custom Workflow type name
w.RegisterWorkflowWithOptions(workflows.ProcessOrder, workflow.RegisterOptions{
    Name: "OrderProcessing",
})

// Custom Activity type name
w.RegisterActivityWithOptions(activities.SendEmail, activity.RegisterOptions{
    Name: "EmailSender",
})
go
// 自定义工作流类型名称
w.RegisterWorkflowWithOptions(workflows.ProcessOrder, workflow.RegisterOptions{
    Name: "OrderProcessing",
})

// 自定义活动类型名称
w.RegisterActivityWithOptions(activities.SendEmail, activity.RegisterOptions{
    Name: "EmailSender",
})

Client Usage

客户端使用

Starting a Workflow

启动工作流

go
package main

import (
    "context"
    "log"

    "go.temporal.io/sdk/client"
    "myapp/workflows"
)

func main() {
    c, err := client.Dial(client.Options{
        HostPort: client.DefaultHostPort,
    })
    if err != nil {
        log.Fatalln("Unable to create client", err)
    }
    defer c.Close()

    options := client.StartWorkflowOptions{
        ID:        "order-12345",          // Workflow ID (should be meaningful)
        TaskQueue: "my-task-queue",        // Must match Worker's task queue
        // Optional:
        // WorkflowExecutionTimeout: 24 * time.Hour,
        // RetryPolicy: &temporal.RetryPolicy{...},
    }

    input := workflows.OrderInput{
        OrderID:    "12345",
        CustomerID: "cust-789",
        Items:      []string{"item-a", "item-b"},
        Amount:     99.99,
    }

    we, err := c.ExecuteWorkflow(context.Background(), options, workflows.ProcessOrder, input)
    if err != nil {
        log.Fatalln("Unable to execute workflow", err)
    }

    log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

    // Synchronously wait for the result
    var result workflows.OrderResult
    err = we.Get(context.Background(), &result)
    if err != nil {
        log.Fatalln("Workflow failed", err)
    }
    log.Printf("Workflow result: %+v\n", result)
}
go
package main

import (
    "context"
    "log"

    "go.temporal.io/sdk/client"
    "myapp/workflows"
)

func main() {
    c, err := client.Dial(client.Options{
        HostPort: client.DefaultHostPort,
    })
    if err != nil {
        log.Fatalln("Unable to create client", err)
    }
    defer c.Close()

    options := client.StartWorkflowOptions{
        ID:        "order-12345",          // 工作流ID(应具有实际意义)
        TaskQueue: "my-task-queue",        // 必须与工作器的任务队列匹配
        // 可选:
        // WorkflowExecutionTimeout: 24 * time.Hour,
        // RetryPolicy: &temporal.RetryPolicy{...},
    }

    input := workflows.OrderInput{
        OrderID:    "12345",
        CustomerID: "cust-789",
        Items:      []string{"item-a", "item-b"},
        Amount:     99.99,
    }

    we, err := c.ExecuteWorkflow(context.Background(), options, workflows.ProcessOrder, input)
    if err != nil {
        log.Fatalln("Unable to execute workflow", err)
    }

    log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

    // 同步等待结果
    var result workflows.OrderResult
    err = we.Get(context.Background(), &result)
    if err != nil {
        log.Fatalln("Workflow failed", err)
    }
    log.Printf("Workflow result: %+v\n", result)
}

Sending a Signal

发送信号

go
// Signal from client
err = c.SignalWorkflow(
    context.Background(),
    "order-12345",        // Workflow ID
    "",                   // Run ID (empty = current run)
    "approve",            // Signal name
    ApproveInput{ApproverName: "Alice"},
)

// Signal-With-Start: signal if running, otherwise start + signal
_, err = c.SignalWithStartWorkflow(
    context.Background(),
    "order-12345",        // Workflow ID
    "approve",            // Signal name
    ApproveInput{ApproverName: "Alice"},  // Signal arg
    client.StartWorkflowOptions{
        TaskQueue: "my-task-queue",
    },
    workflows.ApprovalWorkflow,  // Workflow function
    orderInput,                  // Workflow arg
)
go
// 从客户端发送信号
err = c.SignalWorkflow(
    context.Background(),
    "order-12345",        // 工作流ID
    "",                   // Run ID(空值表示当前运行实例)
    "approve",            // 信号名称
    ApproveInput{ApproverName: "Alice"},
)

// Signal-With-Start:如果工作流正在运行则发送信号,否则启动工作流并发送信号
_, err = c.SignalWithStartWorkflow(
    context.Background(),
    "order-12345",        // 工作流ID
    "approve",            // 信号名称
    ApproveInput{ApproverName: "Alice"},  // 信号参数
    client.StartWorkflowOptions{
        TaskQueue: "my-task-queue",
    },
    workflows.ApprovalWorkflow,  // 工作流函数
    orderInput,                  // 工作流参数
)

Signal from Another Workflow

从另一个工作流发送信号

go
// Inside a Workflow:
err := workflow.SignalExternalWorkflow(ctx, "target-workflow-id", "", "signal-name", signalData).Get(ctx, nil)
go
// 在工作流内部:
err := workflow.SignalExternalWorkflow(ctx, "target-workflow-id", "", "signal-name", signalData).Get(ctx, nil)

Querying a Workflow

查询工作流

go
result, err := c.QueryWorkflow(
    context.Background(),
    "order-12345",    // Workflow ID
    "",               // Run ID
    "get-state",      // Query type
    // Optional query args...
)
if err != nil {
    log.Fatalln("Query failed", err)
}
var state string
err = result.Get(&state)
go
result, err := c.QueryWorkflow(
    context.Background(),
    "order-12345",    // 工作流ID
    "",               // Run ID
    "get-state",      // 查询类型
    // 可选的查询参数...
)
if err != nil {
    log.Fatalln("Query failed", err)
}
var state string
err = result.Get(&state)

Sending an Update

发送更新

go
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

updateHandle, err := c.UpdateWorkflow(ctxWithTimeout, client.UpdateWorkflowOptions{
    WorkflowID:   "counter-workflow-id",
    RunID:        "",                                       // empty = current run
    UpdateName:   "fetch_and_add",
    WaitForStage: client.WorkflowUpdateStageAccepted,       // or WorkflowUpdateStageCompleted
    Args:         []interface{}{5},
})
if err != nil {
    log.Fatalf("Update failed: %v", err)
}

var previousValue int
err = updateHandle.Get(ctxWithTimeout, &previousValue)
go
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

updateHandle, err := c.UpdateWorkflow(ctxWithTimeout, client.UpdateWorkflowOptions{
    WorkflowID:   "counter-workflow-id",
    RunID:        "",                                       // 空值表示当前运行实例
    UpdateName:   "fetch_and_add",
    WaitForStage: client.WorkflowUpdateStageAccepted,       // 或WorkflowUpdateStageCompleted
    Args:         []interface{}{5},
})
if err != nil {
    log.Fatalf("Update failed: %v", err)
}

var previousValue int
err = updateHandle.Get(ctxWithTimeout, &previousValue)

Cancelling a Workflow

取消工作流

go
err = c.CancelWorkflow(context.Background(), "order-12345", "")
go
err = c.CancelWorkflow(context.Background(), "order-12345", "")

Error Handling

错误处理

Workflow Error Types

工作流错误类型

Handle different error types from Activity or Child Workflow failures:
go
err := workflow.ExecuteActivity(ctx, a.DoSomething, input).Get(ctx, nil)
if err != nil {
    var applicationErr *temporal.ApplicationError
    if errors.As(err, &applicationErr) {
        // Application-level error (from Activity code)
        switch applicationErr.Type() {
        case "ValidationError":
            // Handle validation error
        case "NotFound":
            // Handle not found
        default:
            return err
        }
    }

    var canceledErr *temporal.CanceledError
    if errors.As(err, &canceledErr) {
        // Workflow or Activity was canceled
        return err
    }

    var timeoutErr *temporal.TimeoutError
    if errors.As(err, &timeoutErr) {
        switch timeoutErr.TimeoutType() {
        case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
            // Activity took too long
        case enumspb.TIMEOUT_TYPE_HEARTBEAT:
            // Activity stopped heartbeating
        case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
            // No Worker picked up the task
        }
    }

    var panicErr *temporal.PanicError
    if errors.As(err, &panicErr) {
        // Activity panicked -- panicErr.Error() and panicErr.StackTrace() available
    }
}
处理活动或子工作流失败产生的不同错误类型:
go
err := workflow.ExecuteActivity(ctx, a.DoSomething, input).Get(ctx, nil)
if err != nil {
    var applicationErr *temporal.ApplicationError
    if errors.As(err, &applicationErr) {
        // 应用级错误(来自活动代码)
        switch applicationErr.Type() {
        case "ValidationError":
            // 处理验证错误
        case "NotFound":
            // 处理未找到错误
        default:
            return err
        }
    }

    var canceledErr *temporal.CanceledError
    if errors.As(err, &canceledErr) {
        // 工作流或活动已被取消
        return err
    }

    var timeoutErr *temporal.TimeoutError
    if errors.As(err, &timeoutErr) {
        switch timeoutErr.TimeoutType() {
        case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
            // 活动执行时间过长
        case enumspb.TIMEOUT_TYPE_HEARTBEAT:
            // 活动停止发送心跳
        case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
            // 没有工作器拾取任务
        }
    }

    var panicErr *temporal.PanicError
    if errors.As(err, &panicErr) {
        // 活动发生panic——可获取panicErr.Error()和panicErr.StackTrace()
    }
}

Non-Retryable Errors

不可重试错误

go
// In Activity code: return a non-retryable error to stop retries immediately
return temporal.NewNonRetryableApplicationError(
    "invalid input: email is required",
    "ValidationError",
    nil, // cause
)

// In Workflow code: check specific error types in NonRetryableErrorTypes
ao := workflow.ActivityOptions{
    RetryPolicy: &temporal.RetryPolicy{
        NonRetryableErrorTypes: []string{"ValidationError", "NotFound"},
    },
}
go
// 在活动代码中:返回不可重试错误以立即停止重试
return temporal.NewNonRetryableApplicationError(
    "invalid input: email is required",
    "ValidationError",
    nil, // 原因
)

// 在工作流代码中:在NonRetryableErrorTypes中指定特定错误类型
ao := workflow.ActivityOptions{
    RetryPolicy: &temporal.RetryPolicy{
        NonRetryableErrorTypes: []string{"ValidationError", "NotFound"},
    },
}

Retry Policies

重试策略

Activity Retry Policy

活动重试策略

Activities retry by default. Customize with
temporal.RetryPolicy
:
go
retryPolicy := &temporal.RetryPolicy{
    InitialInterval:        time.Second,       // First retry delay (default: 1s)
    BackoffCoefficient:     2.0,               // Multiplier for each retry (default: 2.0)
    MaximumInterval:        time.Minute,       // Cap on retry delay (default: 100x InitialInterval)
    MaximumAttempts:        5,                 // 0 = unlimited (default: 0)
    NonRetryableErrorTypes: []string{          // Error types that stop retries
        "ValidationError",
    },
}

ao := workflow.ActivityOptions{
    StartToCloseTimeout: 2 * time.Minute,
    HeartbeatTimeout:    10 * time.Second,
    RetryPolicy:         retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)
活动默认会重试。使用
temporal.RetryPolicy
自定义重试行为:
go
retryPolicy := &temporal.RetryPolicy{
    InitialInterval:        time.Second,       // 首次重试延迟(默认:1秒)
    BackoffCoefficient:     2.0,               // 每次重试的延迟乘数(默认:2.0)
    MaximumInterval:        time.Minute,       // 重试延迟上限(默认:InitialInterval的100倍)
    MaximumAttempts:        5,                 // 0表示无限重试(默认:0)
    NonRetryableErrorTypes: []string{          // 停止重试的错误类型
        "ValidationError",
    },
}

ao := workflow.ActivityOptions{
    StartToCloseTimeout: 2 * time.Minute,
    HeartbeatTimeout:    10 * time.Second,
    RetryPolicy:         retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)

Workflow Retry Policy

工作流重试策略

Workflows do NOT retry by default. Add a retry policy when starting the Workflow:
go
options := client.StartWorkflowOptions{
    ID:        "my-workflow",
    TaskQueue: "my-queue",
    RetryPolicy: &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    100 * time.Second,
    },
}
工作流默认不会重试。启动工作流时添加重试策略:
go
options := client.StartWorkflowOptions{
    ID:        "my-workflow",
    TaskQueue: "my-queue",
    RetryPolicy: &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    100 * time.Second,
    },
}

Activity Options Reference

活动选项参考

go
ao := workflow.ActivityOptions{
    // REQUIRED: at least one of these two must be set
    StartToCloseTimeout:    30 * time.Second,     // Max time for a single attempt
    ScheduleToCloseTimeout: 5 * time.Minute,      // Max total time including retries

    // OPTIONAL
    ScheduleToStartTimeout: time.Minute,           // Max time waiting for a Worker
    HeartbeatTimeout:       10 * time.Second,       // Must heartbeat within this interval
    TaskQueueName:          "special-queue",        // Override parent's task queue
    WaitForCancellation:    true,                   // Wait for Activity to handle cancellation
    RetryPolicy:            &temporal.RetryPolicy{},
    ActivityID:             "custom-id",
}
ctx = workflow.WithActivityOptions(ctx, ao)
go
ao := workflow.ActivityOptions{
    // 必填:必须设置以下两项中的至少一项
    StartToCloseTimeout:    30 * time.Second,     // 单次尝试的最长时间
    ScheduleToCloseTimeout: 5 * time.Minute,      // 包括重试在内的总最长时间

    // 可选
    ScheduleToStartTimeout: time.Minute,           // 等待工作器拾取任务的最长时间
    HeartbeatTimeout:       10 * time.Second,       // 必须在此间隔内发送心跳
    TaskQueueName:          "special-queue",        // 覆盖父级的任务队列
    WaitForCancellation:    true,                   // 等待活动处理取消请求
    RetryPolicy:            &temporal.RetryPolicy{},
    ActivityID:             "custom-id",
}
ctx = workflow.WithActivityOptions(ctx, ao)

Testing

测试

Workflow Unit Tests with Mocked Activities

使用模拟活动的工作流单元测试

go
package workflows_test

import (
    "testing"

    "github.com/stretchr/testify/mock"
    "github.com/stretchr/testify/require"
    "go.temporal.io/sdk/testsuite"
)

func Test_ProcessOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    var a *Activities
    env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        &PaymentResult{PaymentID: "pay-123", Status: "charged"}, nil,
    )
    env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
        &ShippingResult{TrackingCode: "TRACK-456"}, nil,
    )

    // Execute the workflow
    input := OrderInput{OrderID: "order-1", Amount: 99.99}
    env.ExecuteWorkflow(ProcessOrder, input)

    // Assert
    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())

    var result OrderResult
    require.NoError(t, env.GetWorkflowResult(&result))
    require.Equal(t, "pay-123", result.PaymentID)
    require.Equal(t, "TRACK-456", result.TrackingCode)
}
go
package workflows_test

import (
    "testing"

    "github.com/stretchr/testify/mock"
    "github.com/stretchr/testify/require"
    "go.temporal.io/sdk/testsuite"
)

func Test_ProcessOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // 模拟活动
    var a *Activities
    env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        &PaymentResult{PaymentID: "pay-123", Status: "charged"}, nil,
    )
    env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
        &ShippingResult{TrackingCode: "TRACK-456"}, nil,
    )

    // 执行工作流
    input := OrderInput{OrderID: "order-1", Amount: 99.99}
    env.ExecuteWorkflow(ProcessOrder, input)

    // 断言
    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())

    var result OrderResult
    require.NoError(t, env.GetWorkflowResult(&result))
    require.Equal(t, "pay-123", result.PaymentID)
    require.Equal(t, "TRACK-456", result.TrackingCode)
}

Test Activity Failure

测试活动失败场景

go
func Test_ProcessOrder_PaymentFails(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    var a *Activities
    env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        nil, temporal.NewApplicationError("payment declined", "PaymentError"),
    )

    env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1"})

    require.True(t, env.IsWorkflowCompleted())
    err := env.GetWorkflowError()
    require.Error(t, err)

    var appErr *temporal.ApplicationError
    require.True(t, errors.As(err, &appErr))
    require.Equal(t, "PaymentError", appErr.Type())
}
go
func Test_ProcessOrder_PaymentFails(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    var a *Activities
    env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        nil, temporal.NewApplicationError("payment declined", "PaymentError"),
    )

    env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1"})

    require.True(t, env.IsWorkflowCompleted())
    err := env.GetWorkflowError()
    require.Error(t, err)

    var appErr *temporal.ApplicationError
    require.True(t, errors.As(err, &appErr))
    require.Equal(t, "PaymentError", appErr.Type())
}

Mock with Custom Implementation

使用自定义实现进行模拟

go
func Test_ProcessOrder_ValidatesInput(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    var a *Activities
    env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        func(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
            // Custom implementation that validates the input
            require.Equal(t, "order-1", input.OrderID)
            require.Equal(t, 99.99, input.Amount)
            return &PaymentResult{PaymentID: "pay-123"}, nil
        },
    )

    env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
        &ShippingResult{TrackingCode: "TRACK-456"}, nil,
    )

    env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1", Amount: 99.99})
    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}
go
func Test_ProcessOrder_ValidatesInput(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    var a *Activities
    env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        func(ctx context.Context, input PaymentInput) (*PaymentResult, error) {
            // 验证输入的自定义实现
            require.Equal(t, "order-1", input.OrderID)
            require.Equal(t, 99.99, input.Amount)
            return &PaymentResult{PaymentID: "pay-123"}, nil
        },
    )

    env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
        &ShippingResult{TrackingCode: "TRACK-456"}, nil,
    )

    env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "order-1", Amount: 99.99})
    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

Activity Unit Tests

活动单元测试

go
func Test_ChargePayment(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestActivityEnvironment()

    a := &Activities{
        APIClient: mockHTTPClient,
        Config:    testConfig,
    }
    env.RegisterActivity(a)

    result, err := env.ExecuteActivity(a.ChargePayment, PaymentInput{
        OrderID: "order-1",
        Amount:  50.00,
    })

    require.NoError(t, err)
    var paymentResult PaymentResult
    require.NoError(t, result.Get(&paymentResult))
    require.Equal(t, "charged", paymentResult.Status)
}
go
func Test_ChargePayment(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestActivityEnvironment()

    a := &Activities{
        APIClient: mockHTTPClient,
        Config:    testConfig,
    }
    env.RegisterActivity(a)

    result, err := env.ExecuteActivity(a.ChargePayment, PaymentInput{
        OrderID: "order-1",
        Amount:  50.00,
    })

    require.NoError(t, err)
    var paymentResult PaymentResult
    require.NoError(t, result.Get(&paymentResult))
    require.Equal(t, "charged", paymentResult.Status)
}

Test with Signals

带信号的测试

go
func Test_ApprovalWorkflow_WithSignal(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Send signal after a delay
    env.RegisterDelayedCallback(func() {
        env.SignalWorkflow("approve", ApproveInput{ApproverName: "Alice"})
    }, time.Millisecond*10)

    env.ExecuteWorkflow(ApprovalWorkflow, "order-1")

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}
go
func Test_ApprovalWorkflow_WithSignal(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // 延迟发送信号
    env.RegisterDelayedCallback(func() {
        env.SignalWorkflow("approve", ApproveInput{ApproverName: "Alice"})
    }, time.Millisecond*10)

    env.ExecuteWorkflow(ApprovalWorkflow, "order-1")

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

Test with Queries

带查询的测试

go
func Test_QueryableWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    env.RegisterDelayedCallback(func() {
        result, err := env.QueryWorkflow("get-state")
        require.NoError(t, err)
        var state string
        require.NoError(t, result.Get(&state))
        require.Equal(t, "processing", state)
    }, time.Millisecond*10)

    env.ExecuteWorkflow(QueryableWorkflow)
}
go
func Test_QueryableWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    env.RegisterDelayedCallback(func() {
        result, err := env.QueryWorkflow("get-state")
        require.NoError(t, err)
        var state string
        require.NoError(t, result.Get(&state))
        require.Equal(t, "processing", state)
    }, time.Millisecond*10)

    env.ExecuteWorkflow(QueryableWorkflow)
}

Integration Tests with Dev Server

使用开发服务器的集成测试

go
func Test_Integration_WithDevServer(t *testing.T) {
    server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
        ClientOptions: &client.Options{HostPort: ""}, // random port
    })
    require.NoError(t, err)
    defer func() { _ = server.Stop() }()

    c := server.Client()
    taskQueue := "integration-test-queue"

    // Start worker in background
    w := worker.New(c, taskQueue, worker.Options{})
    w.RegisterWorkflow(ProcessOrder)
    w.RegisterActivity(&Activities{/* deps */})

    go func() { _ = w.Run(worker.InterruptCh()) }()
    defer w.Stop()

    // Execute workflow
    we, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
        ID:        "test-order-1",
        TaskQueue: taskQueue,
    }, ProcessOrder, OrderInput{OrderID: "test-1", Amount: 50.00})
    require.NoError(t, err)

    var result OrderResult
    err = we.Get(context.Background(), &result)
    require.NoError(t, err)
    require.Equal(t, "test-1", result.OrderID)
}
go
func Test_Integration_WithDevServer(t *testing.T) {
    server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
        ClientOptions: &client.Options{HostPort: ""}, // 随机端口
    })
    require.NoError(t, err)
    defer func() { _ = server.Stop() }()

    c := server.Client()
    taskQueue := "integration-test-queue"

    // 在后台启动工作器
    w := worker.New(c, taskQueue, worker.Options{})
    w.RegisterWorkflow(ProcessOrder)
    w.RegisterActivity(&Activities{/* 依赖项 */})

    go func() { _ = w.Run(worker.InterruptCh()) }()
    defer w.Stop()

    // 执行工作流
    we, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
        ID:        "test-order-1",
        TaskQueue: taskQueue,
    }, ProcessOrder, OrderInput{OrderID: "test-1", Amount: 50.00})
    require.NoError(t, err)

    var result OrderResult
    err = we.Get(context.Background(), &result)
    require.NoError(t, err)
    require.Equal(t, "test-1", result.OrderID)
}

Test Suite Pattern (testify suite)

测试套件模式(testify suite)

go
type OrderWorkflowSuite struct {
    suite.Suite
    testsuite.WorkflowTestSuite
    env *testsuite.TestWorkflowEnvironment
}

func (s *OrderWorkflowSuite) SetupTest() {
    s.env = s.NewTestWorkflowEnvironment()
}

func (s *OrderWorkflowSuite) AfterTest(suiteName, testName string) {
    s.env.AssertExpectations(s.T())
}

func (s *OrderWorkflowSuite) Test_HappyPath() {
    var a *Activities
    s.env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        &PaymentResult{PaymentID: "p1"}, nil,
    )
    s.env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
        &ShippingResult{TrackingCode: "t1"}, nil,
    )

    s.env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "1"})
    s.True(s.env.IsWorkflowCompleted())
    s.NoError(s.env.GetWorkflowError())
}

func TestOrderWorkflowSuite(t *testing.T) {
    suite.Run(t, new(OrderWorkflowSuite))
}
go
type OrderWorkflowSuite struct {
    suite.Suite
    testsuite.WorkflowTestSuite
    env *testsuite.TestWorkflowEnvironment
}

func (s *OrderWorkflowSuite) SetupTest() {
    s.env = s.NewTestWorkflowEnvironment()
}

func (s *OrderWorkflowSuite) AfterTest(suiteName, testName string) {
    s.env.AssertExpectations(s.T())
}

func (s *OrderWorkflowSuite) Test_HappyPath() {
    var a *Activities
    s.env.OnActivity(a.ChargePayment, mock.Anything, mock.Anything).Return(
        &PaymentResult{PaymentID: "p1"}, nil,
    )
    s.env.OnActivity(a.ShipOrder, mock.Anything, mock.Anything).Return(
        &ShippingResult{TrackingCode: "t1"}, nil,
    )

    s.env.ExecuteWorkflow(ProcessOrder, OrderInput{OrderID: "1"})
    s.True(s.env.IsWorkflowCompleted())
    s.NoError(s.env.GetWorkflowError())
}

func TestOrderWorkflowSuite(t *testing.T) {
    suite.Run(t, new(OrderWorkflowSuite))
}

Key Patterns

关键模式

Saga / Compensation Pattern

Saga / 补偿模式

Use Go's
defer
to build compensation logic. If a later step fails, earlier steps are automatically compensated in reverse order.
go
func TransferMoney(ctx workflow.Context, details TransferDetails) (err error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Step 1: Withdraw
    err = workflow.ExecuteActivity(ctx, Withdraw, details).Get(ctx, nil)
    if err != nil {
        return err
    }

    // Compensate Step 1 if later steps fail
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, details).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }
    }()

    // Step 2: Deposit
    err = workflow.ExecuteActivity(ctx, Deposit, details).Get(ctx, nil)
    if err != nil {
        return err  // triggers WithdrawCompensation via defer
    }

    // Compensate Step 2 if later steps fail
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, details).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }
    }()

    // Step 3: Notify (if this fails, both Deposit and Withdraw are compensated)
    err = workflow.ExecuteActivity(ctx, SendNotification, details).Get(ctx, nil)
    if err != nil {
        return err  // triggers DepositCompensation then WithdrawCompensation
    }

    return nil
}
使用Go的
defer
构建补偿逻辑。如果后续步骤失败,前面的步骤会自动按相反顺序进行补偿。
go
func TransferMoney(ctx workflow.Context, details TransferDetails) (err error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // 步骤1:取款
    err = workflow.ExecuteActivity(ctx, Withdraw, details).Get(ctx, nil)
    if err != nil {
        return err
    }

    // 如果后续步骤失败,补偿步骤1
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, details).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }
    }()

    // 步骤2:存款
    err = workflow.ExecuteActivity(ctx, Deposit, details).Get(ctx, nil)
    if err != nil {
        return err  // 通过defer触发取款补偿
    }

    // 如果后续步骤失败,补偿步骤2
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, details).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }
    }()

    // 步骤3:通知(如果此步骤失败,存款和取款都会被补偿)
    err = workflow.ExecuteActivity(ctx, SendNotification, details).Get(ctx, nil)
    if err != nil {
        return err  // 通过defer触发存款补偿,然后是取款补偿
    }

    return nil
}

Cancellation with Cleanup

带清理的取消

When a Workflow is canceled, use
workflow.NewDisconnectedContext
to run cleanup Activities:
go
func CancellableWorkflow(ctx workflow.Context) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Minute,
        HeartbeatTimeout:    5 * time.Second,
        WaitForCancellation: true,  // Wait for Activity to acknowledge cancel
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Cleanup runs even after cancellation
    defer func() {
        if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
            return
        }
        // Get a new context that is NOT canceled
        newCtx, _ := workflow.NewDisconnectedContext(ctx)
        err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(newCtx, nil)
        if err != nil {
            workflow.GetLogger(ctx).Error("Cleanup failed", "Error", err)
        }
    }()

    // Long-running activity that can be canceled
    var result string
    err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, &result)
    return err
}
当工作流被取消时,使用
workflow.NewDisconnectedContext
运行清理活动:
go
func CancellableWorkflow(ctx workflow.Context) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Minute,
        HeartbeatTimeout:    5 * time.Second,
        WaitForCancellation: true,  // 等待活动确认取消
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // 即使被取消也会运行清理
    defer func() {
        if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
            return
        }
        // 获取一个未被取消的新上下文
        newCtx, _ := workflow.NewDisconnectedContext(ctx)
        err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(newCtx, nil)
        if err != nil {
            workflow.GetLogger(ctx).Error("Cleanup failed", "Error", err)
        }
    }()

    // 可被取消的长时间运行活动
    var result string
    err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, &result)
    return err
}

Polling Pattern: Frequent (Activity with Heartbeat)

轮询模式:频繁(带心跳的活动)

Poll inside a single long-running Activity using heartbeats:
go
// Activity: polls until success, heartbeating to stay alive
func PollForResult(ctx context.Context) (string, error) {
    for {
        result, err := callExternalService(ctx)
        if err == nil {
            return result, nil
        }

        activity.RecordHeartbeat(ctx)

        select {
        case <-ctx.Done():
            return "", ctx.Err()
        case <-time.After(5 * time.Second):
            // poll again
        }
    }
}

// Workflow: use long timeout + heartbeat
func PollingWorkflow(ctx workflow.Context) (string, error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 24 * time.Hour,
        HeartbeatTimeout:    30 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    var result string
    err := workflow.ExecuteActivity(ctx, PollForResult).Get(ctx, &result)
    return result, err
}
在单个长时间运行的活动中使用心跳进行轮询:
go
// 活动:轮询直到成功,发送心跳以保持活跃
func PollForResult(ctx context.Context) (string, error) {
    for {
        result, err := callExternalService(ctx)
        if err == nil {
            return result, nil
        }

        activity.RecordHeartbeat(ctx)

        select {
        case <-ctx.Done():
            return "", ctx.Err()
        case <-time.After(5 * time.Second):
            // 再次轮询
        }
    }
}

// 工作流:使用长时间超时 + 心跳
func PollingWorkflow(ctx workflow.Context) (string, error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 24 * time.Hour,
        HeartbeatTimeout:    30 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    var result string
    err := workflow.ExecuteActivity(ctx, PollForResult).Get(ctx, &result)
    return result, err
}

Polling Pattern: Infrequent (Activity Retries)

轮询模式:不频繁(活动重试)

Use the retry policy itself as the polling mechanism:
go
func InfrequentPollingWorkflow(ctx workflow.Context) (string, error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 2 * time.Second,  // Short: just check once
        RetryPolicy: &temporal.RetryPolicy{
            BackoffCoefficient: 1,              // Constant interval
            InitialInterval:    60 * time.Second, // Poll every 60s
            // MaximumAttempts: 0 means unlimited retries
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    var result string
    err := workflow.ExecuteActivity(ctx, CheckService).Get(ctx, &result)
    return result, err
}

// Activity: check once and return error if not ready
func CheckService(ctx context.Context) (string, error) {
    result, err := callExternalService(ctx)
    if err != nil {
        return "", err  // Will be retried per RetryPolicy
    }
    return result, nil
}
使用重试策略本身作为轮询机制:
go
func InfrequentPollingWorkflow(ctx workflow.Context) (string, error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 2 * time.Second,  // 短时间:仅检查一次
        RetryPolicy: &temporal.RetryPolicy{
            BackoffCoefficient: 1,              // 固定间隔
            InitialInterval:    60 * time.Second, // 每60秒轮询一次
            // MaximumAttempts: 0表示无限重试
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    var result string
    err := workflow.ExecuteActivity(ctx, CheckService).Get(ctx, &result)
    return result, err
}

// 活动:检查一次,如果未就绪则返回错误
func CheckService(ctx context.Context) (string, error) {
    result, err := callExternalService(ctx)
    if err != nil {
        return "", err  // 将根据RetryPolicy进行重试
    }
    return result, nil
}

Parallel Activities with Futures

并行活动与Future

Launch multiple Activities in parallel and collect results:
go
func ParallelWorkflow(ctx workflow.Context, items []string) ([]Result, error) {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Launch all activities
    var futures []workflow.Future
    for _, item := range items {
        future := workflow.ExecuteActivity(ctx, ProcessItem, item)
        futures = append(futures, future)
    }

    // Collect results (in order)
    var results []Result
    for _, future := range futures {
        var result Result
        if err := future.Get(ctx, &result); err != nil {
            return nil, err
        }
        results = append(results, result)
    }
    return results, nil
}
并行启动多个活动并收集结果:
go
func ParallelWorkflow(ctx workflow.Context, items []string) ([]Result, error) {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    // 启动所有活动
    var futures []workflow.Future
    for _, item := range items {
        future := workflow.ExecuteActivity(ctx, ProcessItem, item)
        futures = append(futures, future)
    }

    // 按顺序收集结果
    var results []Result
    for _, future := range futures {
        var result Result
        if err := future.Get(ctx, &result); err != nil {
            return nil, err
        }
        results = append(results, result)
    }
    return results, nil
}

Process Results as They Complete (Selector)

结果完成即处理(选择器)

go
func ProcessAsCompleted(ctx workflow.Context, items []string) error {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    selector := workflow.NewSelector(ctx)
    var processErr error

    for _, item := range items {
        future := workflow.ExecuteActivity(ctx, ProcessItem, item)
        selector.AddFuture(future, func(f workflow.Future) {
            var result Result
            if err := f.Get(ctx, &result); err != nil {
                processErr = err
                return
            }
            // Process result immediately as it arrives
            workflow.GetLogger(ctx).Info("Got result", "value", result)
        })
    }

    // Wait for all
    for range items {
        selector.Select(ctx)
        if processErr != nil {
            return processErr
        }
    }
    return nil
}
go
func ProcessAsCompleted(ctx workflow.Context, items []string) error {
    ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Second}
    ctx = workflow.WithActivityOptions(ctx, ao)

    selector := workflow.NewSelector(ctx)
    var processErr error

    for _, item := range items {
        future := workflow.ExecuteActivity(ctx, ProcessItem, item)
        selector.AddFuture(future, func(f workflow.Future) {
            var result Result
            if err := f.Get(ctx, &result); err != nil {
                processErr = err
                return
            }
            // 结果到达后立即处理
            workflow.GetLogger(ctx).Info("Got result", "value", result)
        })
    }

    // 等待所有完成
    for range items {
        selector.Select(ctx)
        if processErr != nil {
            return processErr
        }
    }
    return nil
}

Updatable Timer

可更新计时器

A timer that can be rescheduled via Signals:
go
type UpdatableTimer struct {
    wakeUpTime time.Time
}

func (u *UpdatableTimer) SleepUntil(ctx workflow.Context, wakeUpTime time.Time, updateCh workflow.ReceiveChannel) error {
    u.wakeUpTime = wakeUpTime
    timerFired := false

    for !timerFired && ctx.Err() == nil {
        timerCtx, timerCancel := workflow.WithCancel(ctx)
        duration := u.wakeUpTime.Sub(workflow.Now(timerCtx))
        timer := workflow.NewTimer(timerCtx, duration)

        workflow.NewSelector(timerCtx).
            AddFuture(timer, func(f workflow.Future) {
                if f.Get(timerCtx, nil) == nil {
                    timerFired = true
                }
            }).
            AddReceive(updateCh, func(c workflow.ReceiveChannel, more bool) {
                timerCancel()                         // cancel current timer
                c.Receive(timerCtx, &u.wakeUpTime)    // update to new time
            }).
            Select(timerCtx)
    }
    return ctx.Err()
}

func ReminderWorkflow(ctx workflow.Context, initialTime time.Time) error {
    timer := UpdatableTimer{}

    err := workflow.SetQueryHandler(ctx, "getWakeUpTime", func() (time.Time, error) {
        return timer.wakeUpTime, nil
    })
    if err != nil {
        return err
    }

    return timer.SleepUntil(ctx, initialTime, workflow.GetSignalChannel(ctx, "updateWakeUpTime"))
}
可通过信号重新调度的计时器:
go
type UpdatableTimer struct {
    wakeUpTime time.Time
}

func (u *UpdatableTimer) SleepUntil(ctx workflow.Context, wakeUpTime time.Time, updateCh workflow.ReceiveChannel) error {
    u.wakeUpTime = wakeUpTime
    timerFired := false

    for !timerFired && ctx.Err() == nil {
        timerCtx, timerCancel := workflow.WithCancel(ctx)
        duration := u.wakeUpTime.Sub(workflow.Now(timerCtx))
        timer := workflow.NewTimer(timerCtx, duration)

        workflow.NewSelector(timerCtx).
            AddFuture(timer, func(f workflow.Future) {
                if f.Get(timerCtx, nil) == nil {
                    timerFired = true
                }
            }).
            AddReceive(updateCh, func(c workflow.ReceiveChannel, more bool) {
                timerCancel()                         // 取消当前计时器
                c.Receive(timerCtx, &u.wakeUpTime)    // 更新为新时间
            }).
            Select(timerCtx)
    }
    return ctx.Err()
}

func ReminderWorkflow(ctx workflow.Context, initialTime time.Time) error {
    timer := UpdatableTimer{}

    err := workflow.SetQueryHandler(ctx, "getWakeUpTime", func() (time.Time, error) {
        return timer.wakeUpTime, nil
    })
    if err != nil {
        return err
    }

    return timer.SleepUntil(ctx, initialTime, workflow.GetSignalChannel(ctx, "updateWakeUpTime"))
}

Ordered Signal Processing with Await

使用Await的有序信号处理

Process signals that may arrive out of order:
go
type SignalTracker struct {
    Signal1Received bool
    Signal2Received bool
    Signal3Received bool
    FirstSignalTime time.Time
}

func (s *SignalTracker) Listen(ctx workflow.Context) {
    for {
        selector := workflow.NewSelector(ctx)
        selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) {
            c.Receive(ctx, nil)
            s.Signal1Received = true
        })
        selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) {
            c.Receive(ctx, nil)
            s.Signal2Received = true
        })
        selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) {
            c.Receive(ctx, nil)
            s.Signal3Received = true
        })
        selector.Select(ctx)
        if s.FirstSignalTime.IsZero() {
            s.FirstSignalTime = workflow.Now(ctx)
        }
    }
}

func OrderedSignalsWorkflow(ctx workflow.Context) error {
    var tracker SignalTracker

    // Listen for signals in background goroutine
    workflow.Go(ctx, tracker.Listen)

    // Wait for Signal1 first
    err := workflow.Await(ctx, func() bool { return tracker.Signal1Received })
    if err != nil {
        return err
    }

    // Wait for Signal2 with timeout
    ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
        return tracker.Signal2Received
    })
    if err != nil {
        return err
    }
    if !ok {
        return temporal.NewApplicationError("timed out waiting for Signal2", "timeout")
    }

    // Wait for Signal3 with timeout
    ok, err = workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
        return tracker.Signal3Received
    })
    if err != nil {
        return err
    }
    if !ok {
        return temporal.NewApplicationError("timed out waiting for Signal3", "timeout")
    }

    return nil
}
处理可能乱序到达的信号:
go
type SignalTracker struct {
    Signal1Received bool
    Signal2Received bool
    Signal3Received bool
    FirstSignalTime time.Time
}

func (s *SignalTracker) Listen(ctx workflow.Context) {
    for {
        selector := workflow.NewSelector(ctx)
        selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) {
            c.Receive(ctx, nil)
            s.Signal1Received = true
        })
        selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) {
            c.Receive(ctx, nil)
            s.Signal2Received = true
        })
        selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) {
            c.Receive(ctx, nil)
            s.Signal3Received = true
        })
        selector.Select(ctx)
        if s.FirstSignalTime.IsZero() {
            s.FirstSignalTime = workflow.Now(ctx)
        }
    }
}

func OrderedSignalsWorkflow(ctx workflow.Context) error {
    var tracker SignalTracker

    // 在后台协程中监听信号
    workflow.Go(ctx, tracker.Listen)

    // 先等待Signal1
    err := workflow.Await(ctx, func() bool { return tracker.Signal1Received })
    if err != nil {
        return err
    }

    // 带超时等待Signal2
    ok, err := workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
        return tracker.Signal2Received
    })
    if err != nil {
        return err
    }
    if !ok {
        return temporal.NewApplicationError("timed out waiting for Signal2", "timeout")
    }

    // 带超时等待Signal3
    ok, err = workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
        return tracker.Signal3Received
    })
    if err != nil {
        return err
    }
    if !ok {
        return temporal.NewApplicationError("timed out waiting for Signal3", "timeout")
    }

    return nil
}

Workflow Info

工作流信息

Access metadata about the current Workflow Execution:
go
info := workflow.GetInfo(ctx)
info.WorkflowExecution.ID        // Workflow ID
info.WorkflowExecution.RunID     // Run ID
info.WorkflowType.Name           // Workflow type name
info.TaskQueueName               // Task Queue
info.Attempt                     // Current attempt (starts at 1)
info.GetContinueAsNewSuggested() // True when history is getting large
info.GetCurrentHistoryLength()   // Current history event count
访问当前工作流执行的元数据:
go
info := workflow.GetInfo(ctx)
info.WorkflowExecution.ID        // 工作流ID
info.WorkflowExecution.RunID     // Run ID
info.WorkflowType.Name           // 工作流类型名称
info.TaskQueueName               // 任务队列
info.Attempt                     // 当前尝试次数(从1开始)
info.GetContinueAsNewSuggested() // 当历史记录过大时返回True
info.GetCurrentHistoryLength()   // 当前历史事件数量

DeterministicKeys

DeterministicKeys

When iterating over maps in Workflow code, always use
workflow.DeterministicKeys
to get a sorted, deterministic key order:
go
myMap := map[string]string{"b": "2", "a": "1", "c": "3"}

// WRONG: range over map is non-deterministic
// for k, v := range myMap { ... }

// CORRECT: use DeterministicKeys
for _, k := range workflow.DeterministicKeys(myMap) {
    v := myMap[k]
    // process k, v
}
在工作流代码中遍历map时,始终使用
workflow.DeterministicKeys
获取排序后的确定性键顺序:
go
myMap := map[string]string{"b": "2", "a": "1", "c": "3"}

// 错误:map遍历是非确定性的
// for k, v := range myMap { ... }

// 正确:使用DeterministicKeys
for _, k := range workflow.DeterministicKeys(myMap) {
    v := myMap[k]
    // 处理k, v
}