angular-rxjs-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Angular RxJS Patterns

Angular RxJS 模式

Master RxJS in Angular for handling async operations, data streams, and reactive programming patterns.
掌握Angular中的RxJS,用于处理异步操作、数据流和响应式编程模式。

Observable Creation

Observable 创建

Basic Observable Creation

基础 Observable 创建

typescript
import { Observable, of, from, interval, fromEvent } from 'rxjs';

// of - emit values in sequence
const numbers$ = of(1, 2, 3, 4, 5);

// from - convert array, promise, or iterable
const fromArray$ = from([1, 2, 3]);
const fromPromise$ = from(fetch('/api/data'));

// interval - emit numbers at intervals
const timer$ = interval(1000); // Every second

// fromEvent - DOM events
const clicks$ = fromEvent(document, 'click');

// Custom observable
const custom$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
});
typescript
import { Observable, of, from, interval, fromEvent } from 'rxjs';

// of - 按顺序发射值
const numbers$ = of(1, 2, 3, 4, 5);

// from - 转换数组、Promise或可迭代对象
const fromArray$ = from([1, 2, 3]);
const fromPromise$ = from(fetch('/api/data'));

// interval - 按间隔发射数字
const timer$ = interval(1000); // 每秒一次

// fromEvent - DOM事件
const clicks$ = fromEvent(document, 'click');

// 自定义Observable
const custom$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
});

HttpClient Observables

HttpClient Observables

typescript
import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class DataService {
  constructor(private http: HttpClient) {}

  getData(): Observable<Data[]> {
    return this.http.get<Data[]>('/api/data');
  }

  getItem(id: string): Observable<Data> {
    return this.http.get<Data>(`/api/data/${id}`);
  }

  createItem(data: Data): Observable<Data> {
    return this.http.post<Data>('/api/data', data);
  }

  updateItem(id: string, data: Data): Observable<Data> {
    return this.http.put<Data>(`/api/data/${id}`, data);
  }

  deleteItem(id: string): Observable<void> {
    return this.http.delete<void>(`/api/data/${id}`);
  }
}
typescript
import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class DataService {
  constructor(private http: HttpClient) {}

  getData(): Observable<Data[]> {
    return this.http.get<Data[]>('/api/data');
  }

  getItem(id: string): Observable<Data> {
    return this.http.get<Data>(`/api/data/${id}`);
  }

  createItem(data: Data): Observable<Data> {
    return this.http.post<Data>('/api/data', data);
  }

  updateItem(id: string, data: Data): Observable<Data> {
    return this.http.put<Data>(`/api/data/${id}`, data);
  }

  deleteItem(id: string): Observable<void> {
    return this.http.delete<void>(`/api/data/${id}`);
  }
}

Common Operators

常用操作符

Transformation Operators

转换操作符

typescript
import { map, pluck, switchMap, mergeMap, concatMap } from 'rxjs/operators';
import { of } from 'rxjs';

// map - transform values
const numbers$ = of(1, 2, 3).pipe(
  map(n => n * 2) // 2, 4, 6
);

// pluck - extract property (deprecated, use map)
const users$ = of(
  { name: 'John', age: 30 },
  { name: 'Jane', age: 25 }
).pipe(
  map(user => user.name) // 'John', 'Jane'
);

// switchMap - cancel previous, emit new
searchControl.valueChanges.pipe(
  switchMap(term => this.searchService.search(term))
).subscribe(results => {
  this.results = results;
});

// mergeMap - run in parallel
const ids$ = of(1, 2, 3);
ids$.pipe(
  mergeMap(id => this.getUser(id)) // All requests in parallel
).subscribe();

// concatMap - run in sequence
ids$.pipe(
  concatMap(id => this.getUser(id)) // One at a time
).subscribe();
typescript
import { map, pluck, switchMap, mergeMap, concatMap } from 'rxjs/operators';
import { of } from 'rxjs';

// map - 转换值
const numbers$ = of(1, 2, 3).pipe(
  map(n => n * 2) // 2, 4, 6
);

// pluck - 提取属性(已废弃,使用map)
const users$ = of(
  { name: 'John', age: 30 },
  { name: 'Jane', age: 25 }
).pipe(
  map(user => user.name) // 'John', 'Jane'
);

// switchMap - 取消前一个,发射新值
searchControl.valueChanges.pipe(
  switchMap(term => this.searchService.search(term))
).subscribe(results => {
  this.results = results;
});

// mergeMap - 并行执行
const ids$ = of(1, 2, 3);
ids$.pipe(
  mergeMap(id => this.getUser(id)) // 所有请求并行执行
).subscribe();

// concatMap - 按顺序执行
ids$.pipe(
  concatMap(id => this.getUser(id)) // 逐个执行
).subscribe();

Filtering Operators

过滤操作符

typescript
import { filter, take, takeUntil, takeWhile, distinctUntilChanged } from 'rxjs/operators';

// filter - only emit matching values
of(1, 2, 3, 4, 5).pipe(
  filter(n => n % 2 === 0) // 2, 4
);

// take - first N values
interval(1000).pipe(
  take(5) // First 5 emissions
);

// takeUntil - until another observable emits
const destroy$ = new Subject();
source$.pipe(
  takeUntil(destroy$)
).subscribe();

// distinctUntilChanged - skip duplicate consecutive values
of(1, 1, 2, 2, 3, 3).pipe(
  distinctUntilChanged() // 1, 2, 3
);
typescript
import { filter, take, takeUntil, takeWhile, distinctUntilChanged } from 'rxjs/operators';

// filter - 仅发射匹配的值
of(1, 2, 3, 4, 5).pipe(
  filter(n => n % 2 === 0) // 2, 4
);

// take - 取前N个值
interval(1000).pipe(
  take(5) // 前5次发射
);

// takeUntil - 直到另一个Observable发射值
const destroy$ = new Subject();
source$.pipe(
  takeUntil(destroy$)
).subscribe();

// distinctUntilChanged - 跳过连续的重复值
of(1, 1, 2, 2, 3, 3).pipe(
  distinctUntilChanged() // 1, 2, 3
);

Combination Operators

组合操作符

typescript
import { combineLatest, merge, concat, forkJoin, zip } from 'rxjs';
import { startWith } from 'rxjs/operators';

// combineLatest - emit when any source emits
combineLatest([
  this.user$,
  this.settings$
]).pipe(
  map(([user, settings]) => ({ user, settings }))
).subscribe();

// merge - emit from any source
merge(
  this.clicks$,
  this.hovers$
).subscribe();

// concat - emit in sequence
concat(
  this.loadUser$,
  this.loadSettings$
).subscribe();

// forkJoin - wait for all to complete
forkJoin({
  user: this.getUser(),
  posts: this.getPosts(),
  comments: this.getComments()
}).subscribe(({ user, posts, comments }) => {
  // All complete
});

// zip - pair values from sources
zip(
  of(1, 2, 3),
  of('a', 'b', 'c')
).pipe(
  map(([num, letter]) => `${num}${letter}`)
); // '1a', '2b', '3c'
typescript
import { combineLatest, merge, concat, forkJoin, zip } from 'rxjs';
import { startWith } from 'rxjs/operators';

// combineLatest - 当任意源发射值时触发
combineLatest([
  this.user$,
  this.settings$
]).pipe(
  map(([user, settings]) => ({ user, settings }))
).subscribe();

// merge - 从任意源发射值
merge(
  this.clicks$,
  this.hovers$
).subscribe();

// concat - 按顺序发射
concat(
  this.loadUser$,
  this.loadSettings$
).subscribe();

// forkJoin - 等待所有源完成
forkJoin({
  user: this.getUser(),
  posts: this.getPosts(),
  comments: this.getComments()
}).subscribe(({ user, posts, comments }) => {
  // 所有请求已完成
});

// zip - 将多个源的值配对
zip(
  of(1, 2, 3),
  of('a', 'b', 'c')
).pipe(
  map(([num, letter]) => `${num}${letter}`)
); // '1a', '2b', '3c'

Utility Operators

工具操作符

typescript
import { tap, delay, debounceTime, throttleTime, distinctUntilChanged } from 'rxjs/operators';

// tap - side effects (logging, etc.)
source$.pipe(
  tap(value => console.log('Value:', value)),
  map(value => value * 2)
);

// delay - delay emissions
of(1, 2, 3).pipe(
  delay(1000) // Delay 1 second
);

// debounceTime - wait for pause in emissions
searchControl.valueChanges.pipe(
  debounceTime(300) // Wait 300ms after user stops typing
);

// throttleTime - emit first value, ignore for duration
clicks$.pipe(
  throttleTime(1000) // Only once per second
);

// distinctUntilChanged - skip duplicates
input$.pipe(
  distinctUntilChanged() // Only when value changes
);
typescript
import { tap, delay, debounceTime, throttleTime, distinctUntilChanged } from 'rxjs/operators';

// tap - 副作用操作(如日志记录等)
source$.pipe(
  tap(value => console.log('Value:', value)),
  map(value => value * 2)
);

// delay - 延迟发射值
of(1, 2, 3).pipe(
  delay(1000) // 延迟1秒
);

// debounceTime - 等待发射暂停后触发
searchControl.valueChanges.pipe(
  debounceTime(300) // 用户停止输入后等待300ms
);

// throttleTime - 发射第一个值,忽略后续持续时间内的发射
clicks$.pipe(
  throttleTime(1000) // 每秒仅触发一次
);

// distinctUntilChanged - 跳过重复值
input$.pipe(
  distinctUntilChanged() // 仅当值变化时触发
);

Error Handling

错误处理

catchError - Handle Errors

catchError - 处理错误

typescript
import { catchError } from 'rxjs/operators';
import { of, EMPTY, throwError } from 'rxjs';

// Return fallback value
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]); // Return empty array
  })
);

// Return empty observable
source$.pipe(
  catchError(() => EMPTY) // Complete without emitting
);

// Re-throw error
source$.pipe(
  catchError(error => {
    console.error('Error:', error);
    return throwError(() => new Error('Custom error'));
  })
);

// Handle different error types
source$.pipe(
  catchError(error => {
    if (error.status === 404) {
      return of(null);
    }
    return throwError(() => error);
  })
);
typescript
import { catchError } from 'rxjs/operators';
import { of, EMPTY, throwError } from 'rxjs';

// 返回回退值
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]); // 返回空数组
  })
);

// 返回空Observable
source$.pipe(
  catchError(() => EMPTY) // 完成且不发射值
);

// 重新抛出错误
source$.pipe(
  catchError(error => {
    console.error('Error:', error);
    return throwError(() => new Error('Custom error'));
  })
);

// 处理不同类型的错误
source$.pipe(
  catchError(error => {
    if (error.status === 404) {
      return of(null);
    }
    return throwError(() => error);
  })
);

retry and retryWhen

retry 和 retryWhen

typescript
import { retry, retryWhen, delay, take } from 'rxjs/operators';

// Simple retry
this.http.get('/api/data').pipe(
  retry(3) // Retry up to 3 times
);

// Retry with delay
this.http.get('/api/data').pipe(
  retryWhen(errors =>
    errors.pipe(
      delay(1000), // Wait 1 second
      take(3) // Max 3 retries
    )
  )
);

// Exponential backoff
this.http.get('/api/data').pipe(
  retryWhen(errors =>
    errors.pipe(
      mergeMap((error, index) => {
        if (index >= 3) {
          return throwError(() => error);
        }
        const delayMs = Math.pow(2, index) * 1000;
        return of(error).pipe(delay(delayMs));
      })
    )
  )
);
typescript
import { retry, retryWhen, delay, take } from 'rxjs/operators';

// 简单重试
this.http.get('/api/data').pipe(
  retry(3) // 最多重试3次
);

// 带延迟的重试
this.http.get('/api/data').pipe(
  retryWhen(errors =>
    errors.pipe(
      delay(1000), // 等待1秒
      take(3) // 最多重试3次
    )
  )
);

// 指数退避重试
this.http.get('/api/data').pipe(
  retryWhen(errors =>
    errors.pipe(
      mergeMap((error, index) => {
        if (index >= 3) {
          return throwError(() => error);
        }
        const delayMs = Math.pow(2, index) * 1000;
        return of(error).pipe(delay(delayMs));
      })
    )
  )
);

Subscription Management

订阅管理

Manual Subscription Cleanup

手动清理订阅

typescript
import { Component, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-my-component'
})
export class MyComponent implements OnDestroy {
  private subscription = new Subscription();

  ngOnInit() {
    // Add subscriptions
    this.subscription.add(
      this.data$.subscribe(data => {
        this.data = data;
      })
    );

    this.subscription.add(
      this.other$.subscribe(other => {
        this.other = other;
      })
    );
  }

  ngOnDestroy() {
    // Unsubscribe from all
    this.subscription.unsubscribe();
  }
}
typescript
import { Component, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-my-component'
})
export class MyComponent implements OnDestroy {
  private subscription = new Subscription();

  ngOnInit() {
    // 添加订阅
    this.subscription.add(
      this.data$.subscribe(data => {
        this.data = data;
      })
    );

    this.subscription.add(
      this.other$.subscribe(other => {
        this.other = other;
      })
    );
  }

  ngOnDestroy() {
    // 取消所有订阅
    this.subscription.unsubscribe();
  }
}

takeUntil Pattern

takeUntil 模式

typescript
import { Component, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'app-my-component'
})
export class MyComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    this.data$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => {
      this.data = data;
    });

    this.other$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(other => {
      this.other = other;
    });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}
typescript
import { Component, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'app-my-component'
})
export class MyComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    this.data$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => {
      this.data = data;
    });

    this.other$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(other => {
      this.other = other;
    });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Async Pipe (No Manual Unsubscribe)

Async Pipe(无需手动取消订阅)

typescript
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-user-list',
  template: `
    <div *ngIf="users$ | async as users">
      <div *ngFor="let user of users">
        {{ user.name }}
      </div>
    </div>

    <div *ngIf="loading$ | async">Loading...</div>
    <div *ngIf="error$ | async as error">Error: {{ error }}</div>
  `
})
export class UserListComponent {
  users$: Observable<User[]>;
  loading$: Observable<boolean>;
  error$: Observable<string | null>;

  constructor(private userService: UserService) {
    this.users$ = this.userService.getUsers();
    this.loading$ = this.userService.loading$;
    this.error$ = this.userService.error$;
  }
}
typescript
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-user-list',
  template: `
    <div *ngIf="users$ | async as users">
      <div *ngFor="let user of users">
        {{ user.name }}
      </div>
    </div>

    <div *ngIf="loading$ | async">Loading...</div>
    <div *ngIf="error$ | async as error">Error: {{ error }}</div>
  `
})
export class UserListComponent {
  users$: Observable<User[]>;
  loading$: Observable<boolean>;
  error$: Observable<string | null>;

  constructor(private userService: UserService) {
    this.users$ = this.userService.getUsers();
    this.loading$ = this.userService.loading$;
    this.error$ = this.userService.error$;
  }
}

Subjects

Subjects

Subject - Multicast

Subject - 多播

typescript
import { Subject } from 'rxjs';

const subject = new Subject<number>();

// Multiple subscribers
subject.subscribe(val => console.log('A:', val));
subject.subscribe(val => console.log('B:', val));

subject.next(1); // A: 1, B: 1
subject.next(2); // A: 2, B: 2
typescript
import { Subject } from 'rxjs';

const subject = new Subject<number>();

// 多个订阅者
subject.subscribe(val => console.log('A:', val));
subject.subscribe(val => console.log('B:', val));

subject.next(1); // A: 1, B: 1
subject.next(2); // A: 2, B: 2

BehaviorSubject - Current Value

BehaviorSubject - 当前值

typescript
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject<number>(0); // Initial value

subject.subscribe(val => console.log('A:', val)); // A: 0

subject.next(1); // A: 1
subject.next(2); // A: 2

subject.subscribe(val => console.log('B:', val)); // B: 2 (latest value)

// Common pattern for state management
@Injectable({
  providedIn: 'root'
})
export class StateService {
  private stateSubject = new BehaviorSubject<State>(initialState);
  state$ = this.stateSubject.asObservable();

  updateState(newState: State) {
    this.stateSubject.next(newState);
  }

  get currentState(): State {
    return this.stateSubject.value;
  }
}
typescript
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject<number>(0); // 初始值

subject.subscribe(val => console.log('A:', val)); // A: 0

subject.next(1); // A: 1
subject.next(2); // A: 2

subject.subscribe(val => console.log('B:', val)); // B: 2(最新值)

// 状态管理的常用模式
@Injectable({
  providedIn: 'root'
})
export class StateService {
  private stateSubject = new BehaviorSubject<State>(initialState);
  state$ = this.stateSubject.asObservable();

  updateState(newState: State) {
    this.stateSubject.next(newState);
  }

  get currentState(): State {
    return this.stateSubject.value;
  }
}

ReplaySubject - Buffer Values

ReplaySubject - 缓存值

typescript
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject<number>(2); // Buffer last 2 values

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(val => console.log('A:', val)); // A: 2, A: 3

subject.next(4); // A: 4

subject.subscribe(val => console.log('B:', val)); // B: 3, B: 4
typescript
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject<number>(2); // 缓存最后2个值

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(val => console.log('A:', val)); // A: 2, A: 3

subject.next(4); // A: 4

subject.subscribe(val => console.log('B:', val)); // B: 3, B: 4

AsyncSubject - Last Value on Complete

AsyncSubject - 完成时发射最后一个值

typescript
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe(val => console.log('A:', val));

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete(); // A: 3 (only last value when complete)
typescript
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe(val => console.log('A:', val));

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete(); // A: 3(仅在完成时发射最后一个值)

Hot vs Cold Observables

热 Observable vs 冷 Observable

Cold Observable - Unicast

冷 Observable - 单播

typescript
// Each subscription creates new execution
const cold$ = interval(1000);

cold$.subscribe(val => console.log('A:', val)); // A: 0, 1, 2...
setTimeout(() => {
  cold$.subscribe(val => console.log('B:', val)); // B: 0, 1, 2... (separate execution)
}, 2000);
typescript
// 每个订阅都会创建新的执行
const cold$ = interval(1000);

cold$.subscribe(val => console.log('A:', val)); // A: 0, 1, 2...
setTimeout(() => {
  cold$.subscribe(val => console.log('B:', val)); // B: 0, 1, 2...(独立执行)
}, 2000);

Hot Observable - Multicast

热 Observable - 多播

typescript
import { Subject, interval } from 'rxjs';
import { share, shareReplay } from 'rxjs/operators';

// Using Subject
const subject = new Subject();
const source$ = interval(1000);
source$.subscribe(subject);

subject.subscribe(val => console.log('A:', val)); // A: 0, 1, 2...
setTimeout(() => {
  subject.subscribe(val => console.log('B:', val)); // B: 2, 3, 4... (shared)
}, 2000);

// Using share operator
const shared$ = interval(1000).pipe(share());

shared$.subscribe(val => console.log('A:', val));
setTimeout(() => {
  shared$.subscribe(val => console.log('B:', val)); // Shares source
}, 2000);

// Using shareReplay
const cached$ = this.http.get('/api/data').pipe(
  shareReplay(1) // Cache last 1 value
);

// Multiple subscribers get cached result
cached$.subscribe();
cached$.subscribe(); // No second HTTP request
typescript
import { Subject, interval } from 'rxjs';
import { share, shareReplay } from 'rxjs/operators';

// 使用Subject
const subject = new Subject();
const source$ = interval(1000);
source$.subscribe(subject);

subject.subscribe(val => console.log('A:', val)); // A: 0, 1, 2...
setTimeout(() => {
  subject.subscribe(val => console.log('B:', val)); // B: 2, 3, 4...(共享执行)
}, 2000);

// 使用share操作符
const shared$ = interval(1000).pipe(share());

shared$.subscribe(val => console.log('A:', val));
setTimeout(() => {
  shared$.subscribe(val => console.log('B:', val)); // 共享源
}, 2000);

// 使用shareReplay
const cached$ = this.http.get('/api/data').pipe(
  shareReplay(1) // 缓存最后1个值
);

// 多个订阅者获取缓存结果
cached$.subscribe();
cached$.subscribe(); // 不会发起第二次HTTP请求

RxJS in Services

服务中的RxJS

Data Service with State

带状态的数据服务

typescript
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { BehaviorSubject, Observable } from 'rxjs';
import { tap, catchError, finalize } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class UserService {
  private usersSubject = new BehaviorSubject<User[]>([]);
  private loadingSubject = new BehaviorSubject<boolean>(false);
  private errorSubject = new BehaviorSubject<string | null>(null);

  users$ = this.usersSubject.asObservable();
  loading$ = this.loadingSubject.asObservable();
  error$ = this.errorSubject.asObservable();

  constructor(private http: HttpClient) {}

  loadUsers(): void {
    this.loadingSubject.next(true);
    this.errorSubject.next(null);

    this.http.get<User[]>('/api/users').pipe(
      tap(users => this.usersSubject.next(users)),
      catchError(error => {
        this.errorSubject.next(error.message);
        return of([]);
      }),
      finalize(() => this.loadingSubject.next(false))
    ).subscribe();
  }

  getUser(id: string): Observable<User> {
    return this.http.get<User>(`/api/users/${id}`);
  }
}
typescript
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { BehaviorSubject, Observable } from 'rxjs';
import { tap, catchError, finalize } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class UserService {
  private usersSubject = new BehaviorSubject<User[]>([]);
  private loadingSubject = new BehaviorSubject<boolean>(false);
  private errorSubject = new BehaviorSubject<string | null>(null);

  users$ = this.usersSubject.asObservable();
  loading$ = this.loadingSubject.asObservable();
  error$ = this.errorSubject.asObservable();

  constructor(private http: HttpClient) {}

  loadUsers(): void {
    this.loadingSubject.next(true);
    this.errorSubject.next(null);

    this.http.get<User[]>('/api/users').pipe(
      tap(users => this.usersSubject.next(users)),
      catchError(error => {
        this.errorSubject.next(error.message);
        return of([]);
      }),
      finalize(() => this.loadingSubject.next(false))
    ).subscribe();
  }

  getUser(id: string): Observable<User> {
    return this.http.get<User>(`/api/users/${id}`);
  }
}

Search Service with Debounce

带防抖的搜索服务

typescript
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class SearchService {
  private searchTerms = new Subject<string>();

  results$: Observable<SearchResult[]>;

  constructor(private http: HttpClient) {
    this.results$ = this.searchTerms.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(term => this.search(term))
    );
  }

  search(term: string): Observable<SearchResult[]> {
    if (!term.trim()) {
      return of([]);
    }
    return this.http.get<SearchResult[]>(`/api/search?q=${term}`);
  }

  setSearchTerm(term: string): void {
    this.searchTerms.next(term);
  }
}
typescript
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class SearchService {
  private searchTerms = new Subject<string>();

  results$: Observable<SearchResult[]>;

  constructor(private http: HttpClient) {
    this.results$ = this.searchTerms.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(term => this.search(term))
    );
  }

  search(term: string): Observable<SearchResult[]> {
    if (!term.trim()) {
      return of([]);
    }
    return this.http.get<SearchResult[]>(`/api/search?q=${term}`);
  }

  setSearchTerm(term: string): void {
    this.searchTerms.next(term);
  }
}

Testing RxJS

测试RxJS

Testing Observables

测试Observables

typescript
import { TestBed } from '@angular/core/testing';
import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';

describe('UserService', () => {
  let service: UserService;
  let httpMock: HttpTestingController;

  beforeEach(() => {
    TestBed.configureTestingModule({
      imports: [HttpClientTestingModule],
      providers: [UserService]
    });

    service = TestBed.inject(UserService);
    httpMock = TestBed.inject(HttpTestingController);
  });

  afterEach(() => {
    httpMock.verify();
  });

  it('should fetch users', () => {
    const mockUsers = [{ id: 1, name: 'John' }];

    service.getUsers().subscribe(users => {
      expect(users).toEqual(mockUsers);
    });

    const req = httpMock.expectOne('/api/users');
    expect(req.request.method).toBe('GET');
    req.flush(mockUsers);
  });
});
typescript
import { TestBed } from '@angular/core/testing';
import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing';

describe('UserService', () => {
  let service: UserService;
  let httpMock: HttpTestingController;

  beforeEach(() => {
    TestBed.configureTestingModule({
      imports: [HttpClientTestingModule],
      providers: [UserService]
    });

    service = TestBed.inject(UserService);
    httpMock = TestBed.inject(HttpTestingController);
  });

  afterEach(() => {
    httpMock.verify();
  });

  it('should fetch users', () => {
    const mockUsers = [{ id: 1, name: 'John' }];

    service.getUsers().subscribe(users => {
      expect(users).toEqual(mockUsers);
    });

    const req = httpMock.expectOne('/api/users');
    expect(req.request.method).toBe('GET');
    req.flush(mockUsers);
  });
});

Testing with Marble Diagrams

使用Marble图测试

typescript
import { TestScheduler } from 'rxjs/testing';

describe('Marble tests', () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('should debounce', () => {
    scheduler.run(({ cold, expectObservable }) => {
      const source$ = cold('-a-b-c|');
      const expected = '-----c|';

      const result$ = source$.pipe(debounceTime(20));
      expectObservable(result$).toBe(expected);
    });
  });
});
typescript
import { TestScheduler } from 'rxjs/testing';

describe('Marble tests', () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('should debounce', () => {
    scheduler.run(({ cold, expectObservable }) => {
      const source$ = cold('-a-b-c|');
      const expected = '-----c|';

      const result$ = source$.pipe(debounceTime(20));
      expectObservable(result$).toBe(expected);
    });
  });
});

When to Use This Skill

何时使用该技能

Use angular-rxjs-patterns when building modern, production-ready applications that require:
  • Complex async data flows
  • Real-time updates and streaming data
  • Efficient HTTP request management
  • Form input handling with debouncing
  • State management with observables
  • Error handling and retry logic
  • Combining multiple async sources
  • Memory-safe subscription management
在构建现代、生产级应用时,当需要以下功能时使用angular-rxjs-patterns:
  • 复杂的异步数据流
  • 实时更新和流式数据
  • 高效的HTTP请求管理
  • 带防抖的表单输入处理
  • 使用Observables进行状态管理
  • 错误处理和重试逻辑
  • 组合多个异步源
  • 内存安全的订阅管理

RxJS Best Practices in Angular

Angular中的RxJS最佳实践

  1. Use async pipe - Automatic subscription management
  2. takeUntil for cleanup - Unsubscribe in ngOnDestroy
  3. shareReplay for caching - Avoid duplicate HTTP requests
  4. debounceTime for inputs - Reduce API calls
  5. switchMap for cancellation - Cancel old requests
  6. catchError for errors - Always handle errors
  7. BehaviorSubject for state - Share current state
  8. Avoid nested subscriptions - Use operators instead
  9. Use operators over imperative code - More declarative
  10. Test observables properly - Use marble diagrams
  1. 使用async pipe - 自动管理订阅
  2. 使用takeUntil进行清理 - 在ngOnDestroy中取消订阅
  3. 使用shareReplay进行缓存 - 避免重复HTTP请求
  4. 对输入使用debounceTime - 减少API调用
  5. 使用switchMap进行取消 - 取消旧请求
  6. 使用catchError处理错误 - 始终处理错误
  7. 使用BehaviorSubject进行状态管理 - 共享当前状态
  8. 避免嵌套订阅 - 使用操作符替代
  9. 使用操作符而非命令式代码 - 更具声明性
  10. 正确测试Observables - 使用Marble图

Common RxJS Mistakes

常见RxJS错误

  1. Not unsubscribing - Memory leaks
  2. Nested subscriptions - Callback hell
  3. Not using operators - Imperative instead of declarative
  4. Subscribing in services - Return observables instead
  5. Not handling errors - Silent failures
  6. Using Subject incorrectly - Prefer BehaviorSubject for state
  7. Not using shareReplay - Duplicate HTTP requests
  8. Forgetting to complete subjects - Memory leaks
  9. Using subscribe in templates - Use async pipe
  10. Not understanding hot vs cold - Unexpected behavior
  1. 未取消订阅 - 内存泄漏
  2. 嵌套订阅 - 回调地狱
  3. 未使用操作符 - 命令式而非声明式
  4. 在服务中订阅 - 应返回Observables
  5. 未处理错误 - 静默失败
  6. 错误使用Subject - 状态管理优先使用BehaviorSubject
  7. 未使用shareReplay - 重复HTTP请求
  8. 未完成Subject - 内存泄漏
  9. 在模板中使用subscribe - 使用async pipe
  10. 不理解热/冷Observable - 意外行为

Resources

资源