一句话理解
SSE(Server-Sent Events)= 浏览器开一个长连接,服务器单向、源源不断地往浏览器推 text 消息。
它是 HTML5 标准的一部分,浏览器侧用 EventSource API 接收,服务器侧返回 Content-Type: text/event-stream 的特殊响应即可。NestJS 通过 @Sse() 装饰器原生支持。
1
2
3
4
浏览器 ──── GET /events ────→ 服务器
←── data: hello\n\n ─── 推
←── data: world\n\n ─── 推
←── data: ...\n\n ─── 推(连接持续打开)
SSE vs WebSocket vs 轮询
| 特性 | 轮询 (Polling) | SSE | WebSocket |
|---|---|---|---|
| 协议 | HTTP | HTTP(长连接) | ws://(升级协议) |
| 方向 | 客户端 → 服务器 | 服务器 → 客户端 | 双向 |
| 文本/二进制 | 文本 | 仅文本 | 文本 + 二进制 |
| 自动重连 | 自己实现 | 浏览器内置 | 自己实现 |
| 代理友好度 | 高 | 高(标准 HTTP) | 中(要 upgrade) |
| 复杂度 | 简单 | 简单 | 复杂 |
| 适用场景 | 状态偶尔查一下 | 通知、日志流、AI 流式响应 | 实时聊天、协作、游戏 |
经验法则:
- 只需要服务器推 → 选 SSE(更轻、有内置重连)
- 要双向、二进制、低延迟 → 选 WebSocket
- AI 对话流式输出 → SSE 是事实标准(OpenAI、Anthropic 都用 SSE)
SSE 协议格式
服务器返回的响应体是纯文本,每条消息由若干行 field: value 组成,两个连续换行 \n\n 表示一条消息结束:
1
2
3
4
5
6
7
8
9
10
11
data: hello world
data: {"id":1,"text":"hi"}
event: ping
data: keep-alive
id: 42
retry: 5000
data: 这是带 ID 和重连间隔的消息
字段说明:
| 字段 | 含义 |
|---|---|
data: |
消息内容(必填)。多行 data: 会被自动用 \n 拼接 |
event: |
事件名,前端用 addEventListener("ping", ...) 监听 |
id: |
消息 ID。断线重连时浏览器会自动带上 Last-Event-ID |
retry: |
断线后重连等待毫秒数(默认 3000ms) |
: |
注释行(冒号开头),常用作心跳保活 |
NestJS 服务端实现
1. 基础用法:@Sse() 装饰器
NestJS 的 @Sse() 把方法返回的 Observable<MessageEvent> 直接转成 SSE 响应:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Controller, Sse, MessageEvent } from "@nestjs/common";
import { interval, map, Observable } from "rxjs";
@Controller("events")
export class EventsController {
@Sse("stream")
stream(): Observable<MessageEvent> {
return interval(1000).pipe(
map((n) => ({
// MessageEvent 字段对应 SSE 协议字段
data: { count: n, time: new Date().toISOString() },
// 可选:
// id: String(n),
// type: "tick", // 对应 event: 字段
// retry: 5000,
})),
);
}
}
访问 GET /events/stream,浏览器会持续收到:
1
2
3
4
data: {"count":0,"time":"2026-04-29T..."}
data: {"count":1,"time":"2026-04-29T..."}
...
NestJS 会自动设置响应头:
Content-Type: text/event-stream、Cache-Control: no-cache、Connection: keep-alive。
2. 业务事件推送(基于 Subject)
实战中很少用 interval 定时推,更多是”业务事件触发就推”。用 RxJS Subject 做事件总线:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { Injectable } from "@nestjs/common";
import { Subject } from "rxjs";
@Injectable()
export class NotifyService {
// 用 Subject 做"广播",所有订阅者都能收到
private readonly events$ = new Subject<{ userId: string; payload: any }>();
// 业务代码调用:触发一次推送
push(userId: string, payload: any) {
this.events$.next({ userId, payload });
}
// SSE 控制器订阅这里
asObservable() {
return this.events$.asObservable();
}
}
@Controller("notify")
export class NotifyController {
constructor(private readonly notify: NotifyService) {}
@Sse(":userId")
subscribe(@Param("userId") userId: string): Observable<MessageEvent> {
// 只推给当前用户的事件
return this.notify.asObservable().pipe(
filter((e) => e.userId === userId),
map((e) => ({ data: e.payload })),
);
}
}
// 别处触发:notify.push("u123", { type: "order.paid", id: 9527 });
3. AI 流式响应(最常见的 SSE 场景)
把 OpenAI / Claude / 内部大模型的流式 token 透传给前端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Controller("chat")
export class ChatController {
@Sse("stream")
async *chat(@Query("q") q: string) {
// NestJS 也支持返回 AsyncIterable,自动转成 SSE
const stream = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: q }],
stream: true,
});
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content ?? "";
if (delta) {
yield { data: delta };
}
}
yield { data: "[DONE]", type: "end" };
}
}
NestJS 9+ 支持
@Sse()方法返回AsyncIterable,写起来比 RxJS 更直观。
浏览器侧:EventSource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 基础用法
const es = new EventSource("/events/stream");
es.onopen = () => console.log("连上了");
es.onmessage = (e) => {
// 默认事件(没有 event: 字段的消息)
console.log("收到", JSON.parse(e.data));
};
// 监听具名事件
es.addEventListener("tick", (e) => {
console.log("tick:", e.data);
});
es.onerror = (err) => {
// 浏览器会自动按 retry 间隔重连,不需要手动处理
console.warn("断了,浏览器会自动重连", err);
};
// 主动关闭
es.close();
重要特性:
- 自动重连:连接断开后,浏览器按
retry:字段(默认 3s)自动重连 Last-Event-ID:重连时浏览器自动带上请求头Last-Event-ID: <最后收到的 id>,服务器据此从断点续推- 同源策略:
EventSource默认走同源,跨域要服务器配置 CORS(Access-Control-Allow-Origin)
跨域 + 携带 Cookie
1
const es = new EventSource("/events", { withCredentials: true });
服务器对应要返回:
1
2
Access-Control-Allow-Origin: https://app.example.com
Access-Control-Allow-Credentials: true
EventSource 不支持的能力
EventSource API 比较”裸”,做不到:
- 自定义请求头(如
Authorization: Bearer xxx)—— 只能把 token 放 URL query 或 cookie - POST 请求 —— 只能 GET
- 请求体 —— 没有
需要这些能力时,用 fetch + ReadableStream 自己实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const res = await fetch("/chat/stream", {
method: "POST",
headers: { Authorization: `Bearer ${token}` },
body: JSON.stringify({ q: "hello" }),
});
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按 \n\n 切分 SSE 消息
const events = buffer.split("\n\n");
buffer = events.pop()!; // 留下不完整的最后一段
for (const event of events) {
const data = event.replace(/^data: /, "");
console.log(data);
}
}
社区库 @microsoft/fetch-event-source 把这些细节封装好了,是 AI 流式应用的事实选择。
反向代理 / 部署注意
SSE 长连接经常被各种”中间人”截断,几个必踩的坑:
1. Nginx 必须关闭 buffering
1
2
3
4
5
6
7
8
9
10
11
12
location /events/ {
proxy_pass http://nest_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
# 关键:不缓冲!否则消息会攒一批才推到客户端
proxy_buffering off;
proxy_cache off;
# 长连接超时调大(默认 60s 会被截断)
proxy_read_timeout 24h;
}
或者在响应里加:
1
res.setHeader("X-Accel-Buffering", "no"); // 让 Nginx 单独这条不缓冲
2. NestJS 默认压缩会破坏 SSE
如果用了 compression 中间件,要排除 SSE 路径:
1
2
3
4
5
6
7
8
9
10
import compression from "compression";
app.use(
compression({
filter: (req, res) => {
if (req.path.startsWith("/events")) return false;
return compression.filter(req, res);
},
}),
);
3. 心跳保活
很多代理 / 负载均衡器对 60s 没流量的连接会强制关闭。每 15-30s 发一条注释行作为心跳:
1
2
3
4
5
6
7
@Sse("stream")
stream(): Observable<MessageEvent> {
return merge(
businessEvents$,
interval(20_000).pipe(map(() => ({ data: "", type: "heartbeat" }))),
);
}
或直接在 data: 之前发 : ping\n\n(注释行不会触发 onmessage)。
4. 浏览器并发连接限制
同一域名下 EventSource 连接数受 HTTP/1.1 限制(每个域名约 6 个)。多个 tab 同时打开会很快用完。解决方案:
- 后端用 HTTP/2(不限连接数)
- 前端用
SharedWorker共享一个 SSE 连接给多个 tab
QA: SSE 和 WebSocket 怎么选?
💬点击展开/收起
| 维度 | 选 SSE | 选 WebSocket |
|---|---|---|
| 通信方向 | 服务器 → 客户端(单向推送) | 双向(聊天、协作) |
| 数据类型 | 纯文本 / JSON | 文本 + 二进制(音视频、文件) |
| 自动重连 | ✅ 浏览器内置 | ❌ 自己实现 |
| 代理 / CDN | ✅ 标准 HTTP,到处能跑 | ❌ 要支持 upgrade,部分代理会拦 |
| 实现复杂度 | ✅ 几行代码 | ❌ 要管心跳、重连、状态机 |
| 适用场景 | 通知推送、日志流、AI 流式响应、行情推 | 实时聊天、协作编辑、游戏、白板 |
90% 的”服务器推消息”场景用 SSE 就够了。WebSocket 留给真正需要双向 + 二进制 + 低延迟的场景。
QA: SSE 怎么做鉴权?
💬点击展开/收起
EventSource 不支持自定义请求头,所以传统 Authorization: Bearer xxx 用不了。三种方案:
1. Cookie 鉴权(最简单)
1
const es = new EventSource("/events", { withCredentials: true });
服务器侧用 NestJS 的 JwtAuthGuard / 自定义 Guard 校验 cookie 即可。
2. URL Query 传 token(不推荐生产)
1
const es = new EventSource(`/events?token=${jwt}`);
缺点:token 进 access log、被 referer 泄漏。临时调试可以,正式环境别用。
3. 用 fetch + ReadableStream 替代 EventSource
可以自定义 header,能 POST 带 body:
1
2
3
4
await fetch("/events", {
headers: { Authorization: `Bearer ${jwt}` },
});
// 然后手动按 SSE 格式解析
推荐用 @microsoft/fetch-event-source 库,API 类似 EventSource 但全功能。
QA: 多实例部署时 SSE 怎么广播?
💬点击展开/收起
单实例时 Subject 直接广播没问题,但多实例(K8s 多 Pod)时,用户的 SSE 连接落在哪台机器是随机的。Subject.next() 只会通知本机连接的客户端。
解决方案是引入 Pub/Sub 中间件:
1
业务代码 ──→ Redis Pub/Sub ──→ 所有实例的 SSE Subject ──→ 推给本机连接的客户端
NestJS 实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Injectable()
export class NotifyService implements OnModuleInit {
private readonly local$ = new Subject<any>();
constructor(@Inject("REDIS") private readonly redis: Redis) {}
async onModuleInit() {
const sub = this.redis.duplicate();
await sub.subscribe("notify");
sub.on("message", (_ch, msg) => {
this.local$.next(JSON.parse(msg)); // 把跨机消息塞进本机 Subject
});
}
// 业务调用:跨机广播
async push(payload: any) {
await this.redis.publish("notify", JSON.stringify(payload));
}
asObservable() {
return this.local$.asObservable();
}
}
也可以用 NATS / Kafka / RabbitMQ 替代 Redis,看团队基础设施。
踩坑提示
- Nginx 不关 buffering → 消息延迟到一批才到:必须
proxy_buffering off compression中间件 → SSE 完全收不到:把/events路径排除- 多实例不同步:单进程
Subject在 K8s 多 Pod 下只能广播本机;用 Redis Pub/Sub EventSource不支持 POST + 自定义 header:要带 token / 请求体用fetch + ReadableStream- 忘了关 SSE 连接 → 内存泄漏:客户端
es.close();服务端要监听req.on('close')清理订阅 - 没心跳 → 60s 后被代理切断:每 20-30s 发一条注释行 / 自定义事件
- 多 Tab 打爆连接数:HTTP/1.1 单域名 6 个连接上限,开 HTTP/2 或 SharedWorker
小结
- SSE = 服务器单向推消息的轻量长连接,基于纯 HTTP,浏览器原生支持
EventSource - 协议核心:
Content-Type: text/event-stream+data: ...\n\n - NestJS 用
@Sse()+ 返回Observable<MessageEvent>/AsyncIterable即可 - AI 流式输出、通知推送、日志/进度流是它的主战场
- 部署关键:Nginx 关 buffering、加心跳、多实例用 Redis Pub/Sub
- 鉴权用 Cookie 或换
fetch + ReadableStream,需要双向通信换 WebSocket