rxjs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

RxJS (Reactive Extensions for JavaScript)

RxJS(JavaScript响应式扩展)

Version: 8.x (2025) Tags: Reactive Programming, Observables, Async
References: Docs — operators, API • Angular RxJSGitHub
版本: 8.x(2025年) 标签: 响应式编程、Observables、异步
参考资料: 官方文档 — 操作符、API • Angular RxJS指南GitHub仓库

API Changes

API变更

This section documents recent version-specific API changes.
  • NEW: RxJS 8 — Modern TypeScript types, smaller bundles, better tree-shaking source
  • NEW: Improved interop helpers — Simpler conversion to/from Signals with
    toSignal
    and
    toObservable
  • NEW: RxJS 8 ergonomics — Compact operator signatures and safer defaults
  • DEPRECATED: Legacy import style — Use RxJS 7+ pipeable operators instead of patch imports
本部分记录了近期各版本的特定API变化:
  • 新增:RxJS 8 — 现代化TypeScript类型、更小的打包体积、更优的tree-shaking支持 来源
  • 新增:改进的互操作辅助工具 — 通过
    toSignal
    toObservable
    可以更简单地与Signals互相转换
  • 新增:RxJS 8易用性优化 — 更简洁的操作符签名和更安全的默认配置
  • 已弃用:旧版导入风格 — 请使用RxJS 7+的管道式操作符,而非补丁式导入

Best Practices

最佳实践

  • Use AsyncPipe in templates — Handles subscription/unsubscription automatically, prevents memory leaks
ts
@Component({
  template: `
    <div *ngIf="data$ | async as data">
      {{ data.name }}
    </div>
  `
})
export class MyComponent {
  data$ = this.service.getData();
}
  • Use proper flattening operators — Choose based on use case:
ts
// switchMap - cancel previous, keep latest (search)
search(term: string): Observable<SearchResult[]> {
  return this.searchService.search(term).pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term => term ? this.http.get(...) : of([]))
  );
}

// mergeMap - run concurrently (multiple API calls)
this.items$.pipe(
  mergeMap(item => this.save(item))
);

// concatMap - run sequentially (order matters)
this.orders$.pipe(
  concatMap(order => this.processOrder(order))
);

// exhaustMap - ignore while running (form submit)
this.submit$.pipe(
  exhaustMap(() => this.submitForm())
);
  • Always unsubscribe — Prevent memory leaks
ts
// Use takeUntil pattern
private destroy$ = new Subject<void>();

ngOnInit() {
  this.data$.pipe(takeUntil(this.destroy$)).subscribe();
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

// Or use takeUntilDestroyed (Angular 16+)
private destroy$ = takeUntilDestroyed();
  • Use
    catchError
    for error handling
ts
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error(error);
    return of([]); // Return fallback value
  })
);
  • Use
    shareReplay(1)
    for caching shared observables
ts
this.data$ = this.http.get('/api/data').pipe(
  shareReplay(1)
);
  • Name observables with
    $
    suffix — Improves readability
  • Use Signals for UI state, RxJS for events — Modern Angular pattern
ts
// Convert Observable to Signal
users = toSignal(this.users$, { initialValue: [] });

// Convert Signal to Observable (if needed)
name$ = toObservable(this.name);
  • 在模板中使用AsyncPipe — 自动处理订阅/取消订阅,防止内存泄漏
ts
@Component({
  template: `
    <div *ngIf="data$ | async as data">
      {{ data.name }}
    </div>
  `
})
export class MyComponent {
  data$ = this.service.getData();
}
  • 选用合适的拍平操作符 — 根据使用场景选择:
ts
// switchMap - 取消之前的请求,保留最新结果(适用于搜索场景)
search(term: string): Observable<SearchResult[]> {
  return this.searchService.search(term).pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term => term ? this.http.get(...) : of([]))
  );
}

// mergeMap - 并发执行(适用于多个API调用场景)
this.items$.pipe(
  mergeMap(item => this.save(item))
);

// concatMap - 顺序执行(适用于对顺序有要求的场景)
this.orders$.pipe(
  concatMap(order => this.processOrder(order))
);

// exhaustMap - 运行中忽略新请求(适用于表单提交场景)
this.submit$.pipe(
  exhaustMap(() => this.submitForm())
);
  • 始终取消订阅 — 防止内存泄漏
ts
// 使用takeUntil模式
private destroy$ = new Subject<void>();

ngOnInit() {
  this.data$.pipe(takeUntil(this.destroy$)).subscribe();
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

// 或者使用takeUntilDestroyed(Angular 16+支持)
private destroy$ = takeUntilDestroyed();
  • 使用
    catchError
    进行错误处理
ts
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error(error);
    return of([]); // 返回兜底值
  })
);
  • 使用
    shareReplay(1)
    缓存共享的observables
ts
this.data$ = this.http.get('/api/data').pipe(
  shareReplay(1)
);
  • 给observables添加
    $
    后缀命名 — 提升代码可读性
  • UI状态使用Signals,事件处理使用RxJS — 现代Angular开发模式
ts
// 将Observable转换为Signal
users = toSignal(this.users$, { initialValue: [] });

// 如需将Signal转换为Observable
name$ = toObservable(this.name);