Marble diagrams - 弹珠图
RxJS 是什么
RxJS = Reactive Extensions for JavaScript,把异步数据”事件流化”,再用一套类似数组方法的操作符(map / filter / merge / debounceTime…)来组合处理。
一句话理解:
Promise 描述”一次性的异步值”,Observable 描述”随时间推移的多个异步值”。
核心三件套:
| 概念 | 类比 | 作用 |
|---|---|---|
| Observable | 一个”异步事件水管” | 描述数据从哪来、什么时候来 |
| Operator | 水管中间的”过滤器/变压器” | 转换、组合、过滤数据流 |
| Subscription | 打开水龙头 | 订阅才会真正开始流动 |
代码长这样:
1
2
3
4
5
6
7
8
9
10
11
12
import { fromEvent } from "rxjs";
import { map, debounceTime, filter } from "rxjs/operators";
const input = document.querySelector("input")!;
fromEvent(input, "input") // 把 DOM 事件变成 Observable
.pipe(
debounceTime(300), // 停顿 300ms 才往下走
map((e) => (e.target as HTMLInputElement).value),
filter((v) => v.length >= 2), // 至少 2 个字符
)
.subscribe((keyword) => search(keyword));
如果用纯 JS 写,要自己管 setTimeout 防抖、cleanup、上一次请求取消……同样逻辑,RxJS 几行就完了。
它解决了什么问题
前端 / Node 里”异步”的种类太多:
- 用户事件(click、input、scroll、keydown)
- HTTP 请求
- WebSocket / SSE 推送
- 定时器(setInterval、requestAnimationFrame)
- 文件 / 流读写
每一种都有自己的 API(addEventListener、Promise、ws.on、fs.createReadStream…),写起来风格各异,组合起来更难:
“用户停止输入 300ms 后发请求,请求过程中再次输入要取消上一次请求,最新结果到了渲染列表。”
用 Promise / async-await 写完整逻辑要 50 行;用 RxJS:
1
2
3
4
5
6
7
8
fromEvent(input, "input")
.pipe(
debounceTime(300),
map((e) => (e.target as HTMLInputElement).value),
filter((v) => v.length >= 2),
switchMap((keyword) => fetchSearch(keyword)), // ← switchMap 自动取消上一次
)
.subscribe(renderList);
RxJS 解决的核心问题:
- 统一异步模型:事件、HTTP、WebSocket、定时器都被抽象成 Observable,用同一套操作符处理
- 声明式组合:链式
pipe()像写 SQL,关心”做什么”而不是”怎么做” - 天然管理时间维度:防抖、节流、超时、重试、取消,每个都是一个操作符
- 优雅地取消:
unsubscribe()一个调用就能停止整条流;switchMap自动取消旧请求 - 背压(backpressure)友好:源头快、消费慢的场景,可用
throttle/buffer等控制
5 个真实业务场景
1. 搜索框:防抖 + 取消旧请求(前端经典)
1
2
3
4
5
6
7
8
fromEvent(input, "input")
.pipe(
map((e) => (e.target as HTMLInputElement).value.trim()),
debounceTime(300),
distinctUntilChanged(), // 关键词没变就不发请求
switchMap((kw) => (kw ? api.search(kw) : of([]))), // 自动取消上一次
)
.subscribe((list) => setResults(list));
痛点解决:手写防抖 + AbortController + 状态管理大约要 30 行;RxJS 6 行。
2. 表单联动 / 多个数据源合并
订单详情页要同时拿用户、商品、物流三个接口,全部到齐再渲染:
1
2
3
4
5
6
7
8
9
import { forkJoin } from "rxjs";
forkJoin({
user: api.getUser(userId),
product: api.getProduct(productId),
logistics: api.getLogistics(orderId),
}).subscribe(({ user, product, logistics }) => {
render({ user, product, logistics });
});
等价于 Promise.all,但能继续用所有 RxJS 操作符(重试、超时、错误恢复)。
3. WebSocket / SSE 实时推送
聊天室、行情看板、订单状态推送:
1
2
3
4
5
6
7
8
9
10
11
import { webSocket } from "rxjs/webSocket";
import { retry, filter } from "rxjs/operators";
const ws$ = webSocket<Message>("wss://api.example.com/chat");
ws$
.pipe(
filter((msg) => msg.roomId === currentRoomId),
retry({ delay: 3000 }), // 断线 3s 后重连
)
.subscribe((msg) => appendMessage(msg));
痛点解决:原生 WebSocket 要自己写心跳、重连、消息过滤;这里几行操作符串起来。
4. NestJS Interceptor:请求超时 + 重试 + 缓存
NestJS 控制器返回值会被自动包成 Observable,Interceptor 里能直接用 RxJS 操作符:
1
2
3
4
5
6
7
8
9
10
11
12
@Injectable()
export class TimeoutInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler) {
return next.handle().pipe(
timeout(5000), // 5 秒超时
retry({ count: 2, delay: 500 }), // 失败重试 2 次
catchError((err) =>
throwError(() => new InternalServerErrorException(err.message)),
),
);
}
}
痛点解决:传统 Promise 里实现”超时 + 重试 + 错误转换”要嵌套一堆 try/catch,这里 4 个操作符搞定。NestJS 微服务通道里,return throwError(() => err) 也是 RxJS 模式(详见 nestjs 切换不同上下文)。
5. 拖拽 / 鼠标手势
经典的 “mousedown → mousemove → mouseup” 拖拽:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const mouseDown$ = fromEvent<MouseEvent>(box, "mousedown");
const mouseMove$ = fromEvent<MouseEvent>(document, "mousemove");
const mouseUp$ = fromEvent<MouseEvent>(document, "mouseup");
mouseDown$
.pipe(
switchMap((start) =>
mouseMove$.pipe(
map((move) => ({ x: move.clientX - start.clientX, y: move.clientY - start.clientY })),
takeUntil(mouseUp$), // 松手就停
),
),
)
.subscribe(({ x, y }) => {
box.style.transform = `translate(${x}px, ${y}px)`;
});
痛点解决:原生写法要自己挂 / 摘事件监听器,处理状态机;RxJS 用 takeUntil 描述”什么时候结束”,逻辑直白。
Observable vs Promise vs async/await
| 维度 | Promise / async-await | Observable |
|---|---|---|
| 值的数量 | 一次性(单值) | 0 个、1 个、多个、无限个 |
| 求值时机 | 立即(创建即执行) | 惰性(订阅才执行) |
| 能否取消 | 不行(要 AbortController 配合) | unsubscribe() 一键停 |
| 组合操作 | Promise.all / race 等少数 |
100+ 操作符(map / merge / switchMap / retry…) |
| 时间维度 | 没有 | debounceTime / throttleTime / delay |
| 学习曲线 | 低 | 较陡(操作符 + 冷热流) |
怎么选:
- 单次异步(一个 fetch、一个 setTimeout)→
async/await就够了 - 多次事件、需要时间控制、需要取消、需要组合多源 → RxJS 优势明显
在 NestJS 里 RxJS 用在哪
NestJS 内核就用 RxJS,业务里以下几处会直接接触:
- Interceptor:
intercept(context, next)的next.handle()返回 Observable - 微服务(
@MessagePattern):handler 返回值会被包成 Observable;错误必须用throwError抛 HttpService(@nestjs/axios):http.get()返回 Observable,可链pipe(retry, catchError, map)- 流式响应(SSE、
@Sse('events')):直接 return 一个 Observable,Nest 自动转成 SSE 流
1
2
3
4
5
// SSE 示例:每秒推一个事件
@Sse("events")
events(): Observable<{ data: { time: number } }> {
return interval(1000).pipe(map((n) => ({ data: { time: n } })));
}
QA: RxJS 学习曲线那么陡,值不值得学?
💬点击展开/收起
取决于你做什么:
- 写普通 CRUD 后台、表单页:基本用不上 RxJS,
async/await完全够。强行用反而显得复杂 - 写 Angular:必学,Angular 内核就是 RxJS(HttpClient、Router、Reactive Forms 全都返回 Observable)
- 写 NestJS 微服务、Interceptor、SSE:至少要会读,能用
pipe + map + catchError + throwError + timeout + retry这 6 个就够日常 - 写复杂前端交互(搜索建议、实时协作、拖拽、动画):值得花 2 天学,回报极高
学习路径建议:
- 先理解 “Observable vs Promise” 的差异(惰性 + 多值 + 可取消)
- 掌握 6 个高频操作符:
map/filter/tap/switchMap/catchError/debounceTime - 理解”冷流(cold)vs 热流(hot)”:决定多个订阅者会不会共享数据源
- 会用 rxjs.dev 的弹珠图(Marble Diagram)思考
不要一上来就背 100+ 操作符——80% 的场景用上面那 6 个就解决了。
QA: switchMap、mergeMap、concatMap、exhaustMap 区别?(高频面试)
💬点击展开/收起
它们都是”把一个流里的每个值,映射成另一个流”,区别在新流来了之后怎么处理上一个:
| 操作符 | 新值到来时 | 典型场景 |
|---|---|---|
| switchMap | 取消上一个,切换到新的 | 搜索框(旧请求作废) |
| mergeMap | 并发执行,全都保留 | 批量上传(互不干扰) |
| concatMap | 排队,等上一个结束 | 顺序写库 / 顺序播放音频 |
| exhaustMap | 忽略新值,直到上一个完 | 防重复提交按钮、防抖动登录 |
记忆口诀:switch 切换 / merge 并发 / concat 排队 / exhaust 忽略。
QA: 弹珠图(Marble Diagram)是什么意思?
💬点击展开/收起
弹珠图是 RxJS 社区用来可视化描述一个 Observable 随时间发出值的图形化表示法——把时间画成一条横线,每个值画成线上的”弹珠”。它是理解操作符行为最直观的工具。
基本符号
1
2
3
4
─── 时间轴(从左到右)
○ 一个值(弹珠)
| complete(流正常结束)
X error(流异常终止)
举例:
1
--1--2--3--|
含义:等一会儿发出 1,再等一会儿发出 2,再发出 3,然后流结束。
用弹珠图理解操作符
操作符通常画成”两条线 + 中间一个盒子”:上面是输入流,盒子是操作符名,下面是输出流。
map(x => x * 2):
1
2
3
input: --1--2--3--|
map(x => x * 2)
output: --2--4--6--|
filter(x => x > 1):
1
2
3
input: --1--2--3--|
filter(x => x > 1)
output: -----2--3--|
debounceTime(20)(防抖):
1
2
3
input: -1-2-3----4--|
debounceTime(20)
output: ---------3----4--|
1 2 3 间隔太短只保留最后一个 3;4 后面没人打扰,原样发出。
switchMap(搜索框场景):
1
2
3
4
5
6
input1: --a--b--c--|
a → ---1---2|
b → ---4|
c → ---5---6|
output: -------1--4----5---6|
↑ a 的内层流被 b 取消,b 的又被 c 切掉
merge vs concat:
1
2
3
4
5
A: --1----2----3|
B: ----a----b|
merge(A, B): --1-a--2-b--3| ← 并发,谁先到谁先发
concat(A, B): --1----2----3----a----b| ← 排队,A 完了再 B
在哪里能看到 / 用到
- 官方文档 rxjs.dev:每个操作符都有 SVG 弹珠图
- rxmarbles.com:交互式弹珠图,可以拖动弹珠实时看输出变化,学操作符神器
- 测试:RxJS 自带
TestScheduler支持用文本字符串写弹珠图做单元测试:
1
2
3
4
5
6
7
8
9
10
11
import { TestScheduler } from "rxjs/testing";
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
scheduler.run(({ cold, expectObservable }) => {
const source = cold("--1--2--3|");
const expected = " --2--4--6|";
expectObservable(source.pipe(map((x) => x * 2))).toBe(expected);
});
字符串约定:
-= 一个时间帧(10ms)12a= 发出的值|= complete#= error()= 同一帧发出多个值,如--(12)--|
一句话总结
弹珠图 = Observable 的”时序乐谱”。横轴是时间,弹珠是值,操作符把上面一行变成下面一行。看懂弹珠图 = 看懂操作符。
学习时遇到不熟的操作符,直接去 rxmarbles.com 拖一下,比看 5 篇博客都快。
踩坑提示
- 忘了 subscribe 整个流不会执行:Observable 是惰性的,没人订阅它就静止
- subscribe 后忘了 unsubscribe:组件销毁、请求中断时不取消,会导致内存泄漏 / 重复回调。Angular 用
takeUntilDestroyed,React 在 useEffect 的清理函数里调 - 冷热流混淆:
fromEvent、webSocket是热流(多订阅共享),of、from、HttpClient.get是冷流(每次订阅都重新开始) switchMap滥用:批量上传场景用switchMap会把前面文件全部取消,应该用mergeMap- NestJS handler 直接 throw 在 rpc 上下文不生效:要返回
throwError(() => err),详见 nestjs 切换不同上下文
小结
- RxJS 把”异步事件”统一抽象成 Observable + 操作符,组合复杂异步逻辑像写 SQL 一样声明式
- 它解决的是”多次异步、有时间维度、需要取消和组合“的问题,普通一次性请求用 Promise 就够了
- 业务上常用:搜索防抖、实时推送、多源合并、超时重试、拖拽手势
- NestJS 的 Interceptor、微服务、SSE、HttpService 都用 RxJS——会用 6 个高频操作符就能覆盖大多数场景