golang-samber-ro

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use
ultrathink
when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
角色定位:你是一名Go工程师,当数据以异步或无限的方式流动时,会选择使用响应式流。你使用samber/ro构建声明式管道,而非手动编写goroutine/channel逻辑,但你也清楚何时使用简单的切片+samber/lo就足够。
思考模式:在设计高级响应式管道,或是在Cold/Hot Observable、Subject和组合操作符之间做选择时,使用
ultrathink
。错误的架构会导致资源泄漏或事件丢失。

samber/ro — Reactive Streams for Go

samber/ro — Go语言的响应式流库

Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more informations. Context7 can help as a discoverability platform.
这是ReactiveX的Go语言实现。以泛型优先、类型安全、可组合的管道处理异步数据流,支持自动背压、错误传播、context集成以及资源清理。包含150+操作符、5种Subject类型、40+插件。
官方资源
本技能内容并非详尽无遗。如需更多信息,请参考库文档和代码示例。Context7可作为发现平台提供帮助。

Why samber/ro (Streams vs Slices)

为什么选择samber/ro(流 vs 切片)

Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators.
samber/ro
solves this with declarative, chainable stream operators.
When to use which tool:
ScenarioToolWhy
Transform a slice (map, filter, reduce)
samber/lo
Finite, synchronous, eager — no stream overhead needed
Simple goroutine fan-out with error handling
errgroup
Standard lib, lightweight, sufficient for bounded concurrency
Infinite event stream (WebSocket, tickers, file watcher)
samber/ro
Declarative pipeline with backpressure, retry, timeout, combine
Real-time data enrichment from multiple async sources
samber/ro
CombineLatest/Zip compose dependent streams without manual select
Pub/sub with multiple consumers sharing one source
samber/ro
Hot observables (Share/Subjects) handle multicast natively
Key differences: lo vs ro
Aspect
samber/lo
samber/ro
DataFinite slicesInfinite streams
ExecutionSynchronous, blockingAsynchronous, non-blocking
EvaluationEager (allocates intermediate slices)Lazy (processes items as they arrive)
TimingImmediateTime-aware (delay, throttle, interval, timeout)
Error modelReturn
(T, error)
per call
Error channel propagates through pipeline
Use caseCollection transformsEvent-driven, real-time, async pipelines
对于复杂的异步管道,Go的channel + goroutine会变得难以维护:手动关闭channel、繁琐的goroutine生命周期管理、嵌套select中的错误传播,以及缺乏可组合的操作符。
samber/ro
通过声明式、可链式调用的流操作符解决了这些问题。
何时使用哪种工具:
场景工具原因
转换切片(map、filter、reduce)
samber/lo
有限、同步、立即执行——无需流处理的开销
带错误处理的简单goroutine扇出
errgroup
标准库、轻量,足以应对有界并发场景
无限事件流(WebSocket、定时器、文件监视器)
samber/ro
声明式管道,支持背压、重试、超时、组合
从多个异步源实时丰富数据
samber/ro
CombineLatest/Zip可组合依赖流,无需手动编写select
多消费者共享同一源的发布/订阅
samber/ro
Hot Observable(Share/Subjects)原生支持多播
lo与ro的核心区别:
维度
samber/lo
samber/ro
数据有限切片无限流
执行同步、阻塞异步、非阻塞
求值立即执行(分配中间切片)延迟执行(随数据到达处理)
时序即时支持时间控制(延迟、节流、间隔、超时)
错误模型每次调用返回
(T, error)
错误通过管道中的错误通道传播
适用场景集合转换事件驱动、实时处理、异步管道

Installation

安装

bash
go get github.com/samber/ro
bash
go get github.com/samber/ro

Core Concepts

核心概念

Four building blocks:
  1. Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
  2. Observer — a consumer with three callbacks:
    onNext(T)
    ,
    onError(error)
    ,
    onComplete()
  3. Operator — a function that transforms an observable into another observable, chained via
    Pipe
  4. Subscription — the connection between observable and observer. Call
    .Wait()
    to block or
    .Unsubscribe()
    to cancel
go
observable := ro.Pipe2(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool { return x%2 == 0 }),
    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)

observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },      // onNext
    func(err error) { log.Println(err) },    // onError
    func() { fmt.Println("Done!") },         // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"

// Or collect synchronously:
values, err := ro.Collect(observable)
四个核心构建块:
  1. Observable — 一种随时间发射值的数据源,默认是Cold类型:每个订阅者都会触发独立的从头执行
  2. Observer — 消费者,包含三个回调:
    onNext(T)
    onError(error)
    onComplete()
  3. Operator — 将一个Observable转换为另一个Observable的函数,通过
    Pipe
    链式调用
  4. Subscription — Observable与Observer之间的连接。调用
    .Wait()
    会阻塞,调用
    .Unsubscribe()
    可取消连接
go
observable := ro.Pipe2(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool { return x%2 == 0 }),
    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)

observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },      // onNext
    func(err error) { log.Println(err) },    // onError
    func() { fmt.Println("Done!") },         // onComplete
))
// 输出: "even-0", "even-2", "even-4", "Done!"

// 或者同步收集结果:
values, err := ro.Collect(observable)

Cold vs Hot Observables

Cold与Hot Observable

Cold (default): each
.Subscribe()
starts a new independent execution. Safe and predictable — use by default.
Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
Convert withBehavior
Share()
Cold → hot with reference counting. Last unsubscribe tears down
ShareReplay(n)
Same as Share + buffers last N values for late subscribers
Connectable()
Cold → hot, but waits for explicit
.Connect()
call
SubjectsNatively hot — call
.Send()
,
.Error()
,
.Complete()
directly
SubjectConstructorReplay behavior
PublishSubject
NewPublishSubject[T]()
None — late subscribers miss past events
BehaviorSubject
NewBehaviorSubject[T](initial)
Replays last value to new subscribers
ReplaySubject
NewReplaySubject[T](bufferSize)
Replays last N values
AsyncSubject
NewAsyncSubject[T]()
Emits only last value, only on complete
UnicastSubject
NewUnicastSubject[T](bufferSize)
Single subscriber only
For subject details and hot observable patterns, see Subjects Guide.
Cold(默认):每次
.Subscribe()
都会启动一个独立的执行流程。安全且可预测——默认使用该类型。
Hot:多个订阅者共享同一个执行流程。当数据源开销较大(如WebSocket、数据库轮询),或订阅者需要查看相同事件时使用。
转换方式行为
Share()
Cold → Hot,带引用计数。最后一个订阅者取消订阅时会销毁流
ShareReplay(n)
与Share相同,同时缓存最后N个值给晚加入的订阅者
Connectable()
Cold → Hot,但需等待显式调用
.Connect()
才会启动
Subjects原生Hot类型——可直接调用
.Send()
.Error()
.Complete()
Subject类型构造函数重放行为
PublishSubject
NewPublishSubject[T]()
无——晚加入的订阅者会错过之前的事件
BehaviorSubject
NewBehaviorSubject[T](initial)
向新订阅者重放最后一个值
ReplaySubject
NewReplaySubject[T](bufferSize)
重放最后N个值
AsyncSubject
NewAsyncSubject[T]()
仅在完成时发射最后一个值
UnicastSubject
NewUnicastSubject[T](bufferSize)
仅支持单个订阅者
如需了解Subject的详细信息和Hot Observable的模式,请查看Subjects指南

Operator Quick Reference

操作符快速参考

CategoryKey operatorsPurpose
Creation
Just
,
FromSlice
,
FromChannel
,
Range
,
Interval
,
Defer
,
Future
Create observables from various sources
Transform
Map
,
MapErr
,
FlatMap
,
Scan
,
Reduce
,
GroupBy
Transform or accumulate stream values
Filter
Filter
,
Take
,
TakeLast
,
Skip
,
Distinct
,
Find
,
First
,
Last
Selectively emit values
Combine
Merge
,
Concat
,
Zip2
Zip6
,
CombineLatest2
CombineLatest5
,
Race
Merge multiple observables
Error
Catch
,
OnErrorReturn
,
OnErrorResumeNextWith
,
Retry
,
RetryWithConfig
Recover from errors
Timing
Delay
,
DelayEach
,
Timeout
,
ThrottleTime
,
SampleTime
,
BufferWithTime
Control emission timing
Side effect
Tap
/
Do
,
TapOnNext
,
TapOnError
,
TapOnComplete
Observe without altering stream
Terminal
Collect
,
ToSlice
,
ToChannel
,
ToMap
Consume stream into Go types
Use typed
Pipe2
,
Pipe3
...
Pipe25
for compile-time type safety across operator chains. The untyped
Pipe
uses
any
and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
分类核心操作符用途
创建
Just
,
FromSlice
,
FromChannel
,
Range
,
Interval
,
Defer
,
Future
从各种源创建Observable
转换
Map
,
MapErr
,
FlatMap
,
Scan
,
Reduce
,
GroupBy
转换或累积流中的值
过滤
Filter
,
Take
,
TakeLast
,
Skip
,
Distinct
,
Find
,
First
,
Last
选择性发射值
组合
Merge
,
Concat
,
Zip2
Zip6
,
CombineLatest2
CombineLatest5
,
Race
合并多个Observable
错误处理
Catch
,
OnErrorReturn
,
OnErrorResumeNextWith
,
Retry
,
RetryWithConfig
从错误中恢复
时序控制
Delay
,
DelayEach
,
Timeout
,
ThrottleTime
,
SampleTime
,
BufferWithTime
控制值的发射时机
副作用
Tap
/
Do
,
TapOnNext
,
TapOnError
,
TapOnComplete
观察流而不修改其内容
终端操作
Collect
,
ToSlice
,
ToChannel
,
ToMap
将流转换为Go原生类型
使用带类型的
Pipe2
,
Pipe3
...
Pipe25
可在操作符链中获得编译时类型安全。无类型的
Pipe
使用
any
类型,会丢失类型检查。
如需完整的操作符目录(150+带签名的操作符),请查看操作符指南

Common Mistakes

常见错误

MistakeWhy it failsFix
Using
ro.OnNext()
without error handler
Errors are silently dropped — bugs hide in productionUse
ro.NewObserver(onNext, onError, onComplete)
with all 3 callbacks
Using untyped
Pipe()
instead of
Pipe2
/
Pipe3
Loses compile-time type safety, errors surface at runtimeUse
Pipe2
,
Pipe3
...
Pipe25
for typed operator chains
Forgetting
.Unsubscribe()
on infinite streams
Goroutine leak — the observable runs foreverUse
TakeUntil(signal)
, context cancellation, or explicit
Unsubscribe()
Using
Share()
when cold is sufficient
Unnecessary complexity, harder to reason about lifecycleUse hot observables only when multiple consumers need the same stream
Using
samber/ro
for finite slice transforms
Stream overhead (goroutines, subscriptions) for a synchronous operationUse
samber/lo
— it's simpler, faster, and purpose-built for slices
Not propagating context for cancellationStreams ignore shutdown signals, causing resource leaks on terminationChain
ContextWithTimeout
or
ThrowOnContextCancel
in the pipeline
错误失败原因修复方案
使用
ro.OnNext()
但未设置错误处理器
错误会被静默丢弃——生产环境中bug会隐藏使用
ro.NewObserver(onNext, onError, onComplete)
,同时设置三个回调
使用无类型的
Pipe()
而非
Pipe2
/
Pipe3
丢失编译时类型安全,错误仅在运行时暴露使用
Pipe2
,
Pipe3
...
Pipe25
构建带类型的操作符链
无限流忘记调用
.Unsubscribe()
Goroutine泄漏——Observable会一直运行使用
TakeUntil(signal)
、context取消,或显式调用
Unsubscribe()
当Cold Observable足够时使用
Share()
不必要的复杂度,生命周期更难推理仅当多个消费者需要共享同一流时才使用Hot Observable
使用
samber/ro
处理有限切片转换
同步操作却带来了流处理的开销(goroutine、订阅)使用
samber/lo
——它更简单、更快,专为切片处理设计
未传播用于取消的context流会忽略关闭信号,导致终止时资源泄漏在管道中链式调用
ContextWithTimeout
ThrowOnContextCancel

Best Practices

最佳实践

  1. Always handle all three events — use
    NewObserver(onNext, onError, onComplete)
    , not just
    OnNext
    . Unhandled errors cause silent data loss
  2. Use
    Collect()
    for synchronous consumption
    — when the stream is finite and you need
    []T
    ,
    Collect
    blocks until complete and returns the slice + error
  3. Prefer typed Pipe functions
    Pipe2
    ,
    Pipe3
    ...
    Pipe25
    catch type mismatches at compile time. Reserve untyped
    Pipe
    for dynamic operator chains
  4. Bound infinite streams — use
    Take(n)
    ,
    TakeUntil(signal)
    ,
    Timeout(d)
    , or context cancellation. Unbounded streams leak goroutines
  5. Use
    Tap
    /
    Do
    for observability
    — log, trace, or meter emissions without altering the stream. Chain
    TapOnError
    for error monitoring
  6. Prefer
    samber/lo
    for simple transforms
    — if the data is a finite slice and you need Map/Filter/Reduce, use
    lo
    . Reach for
    ro
    when data arrives over time, from multiple sources, or needs retry/timeout/backpressure
  1. 始终处理三种事件——使用
    NewObserver(onNext, onError, onComplete)
    ,不要只使用
    OnNext
    。未处理的错误会导致数据静默丢失
  2. 使用
    Collect()
    进行同步消费
    ——当流是有限的,且你需要
    []T
    时,
    Collect
    会阻塞直到流完成,返回切片+错误
  3. 优先使用带类型的Pipe函数——
    Pipe2
    ,
    Pipe3
    ...
    Pipe25
    会在编译时捕获类型不匹配问题。仅在动态操作符链中使用无类型的
    Pipe
  4. 限制无限流的生命周期——使用
    Take(n)
    TakeUntil(signal)
    Timeout(d)
    或context取消。无界流会导致Goroutine泄漏
  5. 使用
    Tap
    /
    Do
    实现可观察性
    ——记录日志、追踪或计量发射的值,而不修改流。链式调用
    TapOnError
    进行错误监控
  6. 简单转换优先使用
    samber/lo
    ——如果数据是有限切片,且你需要Map/Filter/Reduce,使用
    lo
    。当数据随时间到达、来自多个源,或需要重试/超时/背压时,再使用
    ro

Plugin Ecosystem

插件生态

40+ plugins extend ro with domain-specific operators:
CategoryPluginsImport path prefix
EncodingJSON, CSV, Base64, Gob
plugins/encoding/...
NetworkHTTP, I/O, FSNotify
plugins/http
,
plugins/io
,
plugins/fsnotify
SchedulingCron, ICS
plugins/cron
,
plugins/ics
ObservabilityZap, Slog, Zerolog, Logrus, Sentry, Oops
plugins/observability/...
,
plugins/samber/oops
Rate limitingNative, Ulule
plugins/ratelimit/...
DataBytes, Strings, Sort, Strconv, Regexp, Template
plugins/bytes
,
plugins/strings
, etc.
SystemProcess, Signal
plugins/proc
,
plugins/signal
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
40+插件扩展了ro的领域特定操作符:
分类插件导入路径前缀
编码JSON、CSV、Base64、Gob
plugins/encoding/...
网络HTTP、I/O、FSNotify
plugins/http
,
plugins/io
,
plugins/fsnotify
调度Cron、ICS
plugins/cron
,
plugins/ics
可观察性Zap、Slog、Zerolog、Logrus、Sentry、Oops
plugins/observability/...
,
plugins/samber/oops
限流原生、Ulule
plugins/ratelimit/...
数据Bytes、Strings、Sort、Strconv、Regexp、Template
plugins/bytes
,
plugins/strings
系统进程、信号
plugins/proc
,
plugins/signal
如需完整的插件目录(含导入路径和使用示例),请查看插件生态
如需了解真实世界的响应式模式(重试+超时、WebSocket扇出、优雅关闭、流组合),请查看模式
如果你在使用samber/ro时遇到bug或意外行为,请在github.com/samber/ro/issues提交问题。

Cross-References

交叉参考

  • → See
    samber/cc-skills-golang@golang-samber-lo
    skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
  • → See
    samber/cc-skills-golang@golang-samber-mo
    skill for monadic types (Option, Result, Either) that compose with ro pipelines
  • → See
    samber/cc-skills-golang@golang-samber-hot
    skill for in-memory caching (also available as an ro plugin)
  • → See
    samber/cc-skills-golang@golang-concurrency
    skill for goroutine/channel patterns when reactive streams are overkill
  • → See
    samber/cc-skills-golang@golang-observability
    skill for monitoring reactive pipelines in production
  • → 如需有限切片转换(Map、Filter、Reduce、GroupBy),请查看
    samber/cc-skills-golang@golang-samber-lo
    技能——当数据已在切片中时使用lo
  • → 如需与ro管道组合的单子类型(Option、Result、Either),请查看
    samber/cc-skills-golang@golang-samber-mo
    技能
  • → 如需内存缓存(也可作为ro插件使用),请查看
    samber/cc-skills-golang@golang-samber-hot
    技能
  • → 当响应式流过于重量级时,如需goroutine/channel模式,请查看
    samber/cc-skills-golang@golang-concurrency
    技能
  • → 如需在生产环境中监控响应式管道,请查看
    samber/cc-skills-golang@golang-observability
    技能