rxjs 是什么?解决了什么问题?

把"异步事件流"当数组来玩,前端和 NestJS 都在用

Posted by chanweiyan on April 28, 2026

Marble diagrams - 弹珠图

Marble diagrams - 弹珠图

RxJS 是什么

RxJS = Reactive Extensions for JavaScript,把异步数据”事件流化”,再用一套类似数组方法的操作符(map / filter / merge / debounceTime…)来组合处理。

一句话理解:

Promise 描述”一次性的异步值”,Observable 描述”随时间推移的多个异步值”。

核心三件套:

概念 类比 作用
Observable 一个”异步事件水管” 描述数据从哪来、什么时候来
Operator 水管中间的”过滤器/变压器” 转换、组合、过滤数据流
Subscription 打开水龙头 订阅才会真正开始流动

代码长这样:

1
2
3
4
5
6
7
8
9
10
11
12
import { fromEvent } from "rxjs";
import { map, debounceTime, filter } from "rxjs/operators";

const input = document.querySelector("input")!;

fromEvent(input, "input") // 把 DOM 事件变成 Observable
  .pipe(
    debounceTime(300),                       // 停顿 300ms 才往下走
    map((e) => (e.target as HTMLInputElement).value),
    filter((v) => v.length >= 2),            // 至少 2 个字符
  )
  .subscribe((keyword) => search(keyword));

如果用纯 JS 写,要自己管 setTimeout 防抖、cleanup、上一次请求取消……同样逻辑,RxJS 几行就完了。

它解决了什么问题

前端 / Node 里”异步”的种类太多:

  • 用户事件(click、input、scroll、keydown)
  • HTTP 请求
  • WebSocket / SSE 推送
  • 定时器(setInterval、requestAnimationFrame)
  • 文件 / 流读写

每一种都有自己的 API(addEventListener、Promise、ws.on、fs.createReadStream…),写起来风格各异,组合起来更难

“用户停止输入 300ms 后发请求,请求过程中再次输入要取消上一次请求,最新结果到了渲染列表。”

用 Promise / async-await 写完整逻辑要 50 行;用 RxJS:

1
2
3
4
5
6
7
8
fromEvent(input, "input")
  .pipe(
    debounceTime(300),
    map((e) => (e.target as HTMLInputElement).value),
    filter((v) => v.length >= 2),
    switchMap((keyword) => fetchSearch(keyword)), // ← switchMap 自动取消上一次
  )
  .subscribe(renderList);

RxJS 解决的核心问题

  1. 统一异步模型:事件、HTTP、WebSocket、定时器都被抽象成 Observable,用同一套操作符处理
  2. 声明式组合:链式 pipe() 像写 SQL,关心”做什么”而不是”怎么做”
  3. 天然管理时间维度:防抖、节流、超时、重试、取消,每个都是一个操作符
  4. 优雅地取消unsubscribe() 一个调用就能停止整条流;switchMap 自动取消旧请求
  5. 背压(backpressure)友好:源头快、消费慢的场景,可用 throttle / buffer 等控制

5 个真实业务场景

1. 搜索框:防抖 + 取消旧请求(前端经典)

1
2
3
4
5
6
7
8
fromEvent(input, "input")
  .pipe(
    map((e) => (e.target as HTMLInputElement).value.trim()),
    debounceTime(300),
    distinctUntilChanged(),                   // 关键词没变就不发请求
    switchMap((kw) => (kw ? api.search(kw) : of([]))), // 自动取消上一次
  )
  .subscribe((list) => setResults(list));

痛点解决:手写防抖 + AbortController + 状态管理大约要 30 行;RxJS 6 行。

2. 表单联动 / 多个数据源合并

订单详情页要同时拿用户、商品、物流三个接口,全部到齐再渲染

1
2
3
4
5
6
7
8
9
import { forkJoin } from "rxjs";

forkJoin({
  user: api.getUser(userId),
  product: api.getProduct(productId),
  logistics: api.getLogistics(orderId),
}).subscribe(({ user, product, logistics }) => {
  render({ user, product, logistics });
});

等价于 Promise.all,但能继续用所有 RxJS 操作符(重试、超时、错误恢复)。

3. WebSocket / SSE 实时推送

聊天室、行情看板、订单状态推送:

1
2
3
4
5
6
7
8
9
10
11
import { webSocket } from "rxjs/webSocket";
import { retry, filter } from "rxjs/operators";

const ws$ = webSocket<Message>("wss://api.example.com/chat");

ws$
  .pipe(
    filter((msg) => msg.roomId === currentRoomId),
    retry({ delay: 3000 }), // 断线 3s 后重连
  )
  .subscribe((msg) => appendMessage(msg));

痛点解决:原生 WebSocket 要自己写心跳、重连、消息过滤;这里几行操作符串起来。

4. NestJS Interceptor:请求超时 + 重试 + 缓存

NestJS 控制器返回值会被自动包成 Observable,Interceptor 里能直接用 RxJS 操作符:

1
2
3
4
5
6
7
8
9
10
11
12
@Injectable()
export class TimeoutInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler) {
    return next.handle().pipe(
      timeout(5000),                                     // 5 秒超时
      retry({ count: 2, delay: 500 }),                   // 失败重试 2 次
      catchError((err) =>
        throwError(() => new InternalServerErrorException(err.message)),
      ),
    );
  }
}

痛点解决:传统 Promise 里实现”超时 + 重试 + 错误转换”要嵌套一堆 try/catch,这里 4 个操作符搞定。NestJS 微服务通道里,return throwError(() => err) 也是 RxJS 模式(详见 nestjs 切换不同上下文)。

5. 拖拽 / 鼠标手势

经典的 “mousedown → mousemove → mouseup” 拖拽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const mouseDown$ = fromEvent<MouseEvent>(box, "mousedown");
const mouseMove$ = fromEvent<MouseEvent>(document, "mousemove");
const mouseUp$ = fromEvent<MouseEvent>(document, "mouseup");

mouseDown$
  .pipe(
    switchMap((start) =>
      mouseMove$.pipe(
        map((move) => ({ x: move.clientX - start.clientX, y: move.clientY - start.clientY })),
        takeUntil(mouseUp$), // 松手就停
      ),
    ),
  )
  .subscribe(({ x, y }) => {
    box.style.transform = `translate(${x}px, ${y}px)`;
  });

痛点解决:原生写法要自己挂 / 摘事件监听器,处理状态机;RxJS 用 takeUntil 描述”什么时候结束”,逻辑直白。

Observable vs Promise vs async/await

维度 Promise / async-await Observable
值的数量 一次性(单值) 0 个、1 个、多个、无限个
求值时机 立即(创建即执行) 惰性(订阅才执行)
能否取消 不行(要 AbortController 配合) unsubscribe() 一键停
组合操作 Promise.all / race 等少数 100+ 操作符(map / merge / switchMap / retry…)
时间维度 没有 debounceTime / throttleTime / delay
学习曲线 较陡(操作符 + 冷热流)

怎么选

  • 单次异步(一个 fetch、一个 setTimeout)→ async/await 就够了
  • 多次事件、需要时间控制、需要取消、需要组合多源 → RxJS 优势明显

在 NestJS 里 RxJS 用在哪

NestJS 内核就用 RxJS,业务里以下几处会直接接触:

  1. Interceptorintercept(context, next)next.handle() 返回 Observable
  2. 微服务(@MessagePattern:handler 返回值会被包成 Observable;错误必须用 throwError
  3. HttpService@nestjs/axios):http.get() 返回 Observable,可链 pipe(retry, catchError, map)
  4. 流式响应(SSE、@Sse('events')):直接 return 一个 Observable,Nest 自动转成 SSE 流
1
2
3
4
5
// SSE 示例:每秒推一个事件
@Sse("events")
events(): Observable<{ data: { time: number } }> {
  return interval(1000).pipe(map((n) => ({ data: { time: n } })));
}

QA: RxJS 学习曲线那么陡,值不值得学?

💬点击展开/收起

取决于你做什么

  • 写普通 CRUD 后台、表单页:基本用不上 RxJS,async/await 完全够。强行用反而显得复杂
  • 写 Angular:必学,Angular 内核就是 RxJS(HttpClient、Router、Reactive Forms 全都返回 Observable)
  • 写 NestJS 微服务、Interceptor、SSE:至少要会读,能用 pipe + map + catchError + throwError + timeout + retry 这 6 个就够日常
  • 写复杂前端交互(搜索建议、实时协作、拖拽、动画):值得花 2 天学,回报极高

学习路径建议

  1. 先理解 “Observable vs Promise” 的差异(惰性 + 多值 + 可取消)
  2. 掌握 6 个高频操作符:map / filter / tap / switchMap / catchError / debounceTime
  3. 理解”冷流(cold)vs 热流(hot)”:决定多个订阅者会不会共享数据源
  4. 会用 rxjs.dev 的弹珠图(Marble Diagram)思考

不要一上来就背 100+ 操作符——80% 的场景用上面那 6 个就解决了。

QA: switchMap、mergeMap、concatMap、exhaustMap 区别?(高频面试)

💬点击展开/收起

它们都是”把一个流里的每个值,映射成另一个流”,区别在新流来了之后怎么处理上一个

操作符 新值到来时 典型场景
switchMap 取消上一个,切换到新的 搜索框(旧请求作废)
mergeMap 并发执行,全都保留 批量上传(互不干扰)
concatMap 排队,等上一个结束 顺序写库 / 顺序播放音频
exhaustMap 忽略新值,直到上一个完 防重复提交按钮、防抖动登录

记忆口诀:switch 切换 / merge 并发 / concat 排队 / exhaust 忽略

QA: 弹珠图(Marble Diagram)是什么意思?

💬点击展开/收起

弹珠图是 RxJS 社区用来可视化描述一个 Observable 随时间发出值的图形化表示法——把时间画成一条横线,每个值画成线上的”弹珠”。它是理解操作符行为最直观的工具。

基本符号

1
2
3
4
─── 时间轴(从左到右)
○   一个值(弹珠)
|   complete(流正常结束)
X   error(流异常终止)

举例:

1
--1--2--3--|

含义:等一会儿发出 1,再等一会儿发出 2,再发出 3,然后流结束。

用弹珠图理解操作符

操作符通常画成”两条线 + 中间一个盒子”:上面是输入流,盒子是操作符名,下面是输出流。

map(x => x * 2)

1
2
3
input:  --1--2--3--|
        map(x => x * 2)
output: --2--4--6--|

filter(x => x > 1)

1
2
3
input:  --1--2--3--|
        filter(x => x > 1)
output: -----2--3--|

debounceTime(20)(防抖):

1
2
3
input:  -1-2-3----4--|
        debounceTime(20)
output: ---------3----4--|

1 2 3 间隔太短只保留最后一个 34 后面没人打扰,原样发出。

switchMap(搜索框场景):

1
2
3
4
5
6
input1:  --a--b--c--|
                a → ---1---2|
                b → ---4|
                c → ---5---6|
output:  -------1--4----5---6|
                ↑ a 的内层流被 b 取消,b 的又被 c 切掉

merge vs concat

1
2
3
4
5
A: --1----2----3|
B: ----a----b|

merge(A, B):  --1-a--2-b--3|         ← 并发,谁先到谁先发
concat(A, B): --1----2----3----a----b| ← 排队,A 完了再 B

在哪里能看到 / 用到

  1. 官方文档 rxjs.dev:每个操作符都有 SVG 弹珠图
  2. rxmarbles.com:交互式弹珠图,可以拖动弹珠实时看输出变化,学操作符神器
  3. 测试:RxJS 自带 TestScheduler 支持用文本字符串写弹珠图做单元测试:
1
2
3
4
5
6
7
8
9
10
11
import { TestScheduler } from "rxjs/testing";

const scheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

scheduler.run(({ cold, expectObservable }) => {
  const source = cold("--1--2--3|");
  const expected = "    --2--4--6|";
  expectObservable(source.pipe(map((x) => x * 2))).toBe(expected);
});

字符串约定:

  • - = 一个时间帧(10ms)
  • 1 2 a = 发出的值
  • | = complete
  • # = error
  • () = 同一帧发出多个值,如 --(12)--|

一句话总结

弹珠图 = Observable 的”时序乐谱”。横轴是时间,弹珠是值,操作符把上面一行变成下面一行。看懂弹珠图 = 看懂操作符

学习时遇到不熟的操作符,直接去 rxmarbles.com 拖一下,比看 5 篇博客都快。

踩坑提示

  1. 忘了 subscribe 整个流不会执行:Observable 是惰性的,没人订阅它就静止
  2. subscribe 后忘了 unsubscribe:组件销毁、请求中断时不取消,会导致内存泄漏 / 重复回调。Angular 用 takeUntilDestroyed,React 在 useEffect 的清理函数里调
  3. 冷热流混淆fromEventwebSocket 是热流(多订阅共享),offromHttpClient.get 是冷流(每次订阅都重新开始)
  4. switchMap 滥用:批量上传场景用 switchMap 会把前面文件全部取消,应该用 mergeMap
  5. NestJS handler 直接 throw 在 rpc 上下文不生效:要返回 throwError(() => err),详见 nestjs 切换不同上下文

小结

  • RxJS 把”异步事件”统一抽象成 Observable + 操作符,组合复杂异步逻辑像写 SQL 一样声明式
  • 它解决的是”多次异步、有时间维度、需要取消和组合“的问题,普通一次性请求用 Promise 就够了
  • 业务上常用:搜索防抖、实时推送、多源合并、超时重试、拖拽手势
  • NestJS 的 Interceptor、微服务、SSE、HttpService 都用 RxJS——会用 6 个高频操作符就能覆盖大多数场景