nestjs SSE, EventSource

服务器主动推消息给浏览器的"轻量版 WebSocket"

Posted by chanweiyan on April 29, 2026

一句话理解

SSE(Server-Sent Events)= 浏览器开一个长连接,服务器单向、源源不断地往浏览器推 text 消息。

它是 HTML5 标准的一部分,浏览器侧用 EventSource API 接收,服务器侧返回 Content-Type: text/event-stream 的特殊响应即可。NestJS 通过 @Sse() 装饰器原生支持。

1
2
3
4
浏览器 ──── GET /events ────→ 服务器
       ←── data: hello\n\n ─── 推
       ←── data: world\n\n ─── 推
       ←── data: ...\n\n   ─── 推(连接持续打开)

SSE vs WebSocket vs 轮询

特性 轮询 (Polling) SSE WebSocket
协议 HTTP HTTP(长连接) ws://(升级协议)
方向 客户端 → 服务器 服务器 → 客户端 双向
文本/二进制 文本 仅文本 文本 + 二进制
自动重连 自己实现 浏览器内置 自己实现
代理友好度 高(标准 HTTP) 中(要 upgrade)
复杂度 简单 简单 复杂
适用场景 状态偶尔查一下 通知、日志流、AI 流式响应 实时聊天、协作、游戏

经验法则

  • 只需要服务器推 → 选 SSE(更轻、有内置重连)
  • 要双向、二进制、低延迟 → 选 WebSocket
  • AI 对话流式输出 → SSE 是事实标准(OpenAI、Anthropic 都用 SSE)

SSE 协议格式

服务器返回的响应体是纯文本,每条消息由若干行 field: value 组成,两个连续换行 \n\n 表示一条消息结束

1
2
3
4
5
6
7
8
9
10
11
data: hello world

data: {"id":1,"text":"hi"}

event: ping
data: keep-alive

id: 42
retry: 5000
data: 这是带 ID 和重连间隔的消息

字段说明:

字段 含义
data: 消息内容(必填)。多行 data: 会被自动用 \n 拼接
event: 事件名,前端用 addEventListener("ping", ...) 监听
id: 消息 ID。断线重连时浏览器会自动带上 Last-Event-ID
retry: 断线后重连等待毫秒数(默认 3000ms)
: 注释行(冒号开头),常用作心跳保活

NestJS 服务端实现

1. 基础用法:@Sse() 装饰器

NestJS 的 @Sse() 把方法返回的 Observable<MessageEvent> 直接转成 SSE 响应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { Controller, Sse, MessageEvent } from "@nestjs/common";
import { interval, map, Observable } from "rxjs";

@Controller("events")
export class EventsController {
  @Sse("stream")
  stream(): Observable<MessageEvent> {
    return interval(1000).pipe(
      map((n) => ({
        // MessageEvent 字段对应 SSE 协议字段
        data: { count: n, time: new Date().toISOString() },
        // 可选:
        // id:    String(n),
        // type:  "tick",      // 对应 event: 字段
        // retry: 5000,
      })),
    );
  }
}

访问 GET /events/stream,浏览器会持续收到:

1
2
3
4
data: {"count":0,"time":"2026-04-29T..."}

data: {"count":1,"time":"2026-04-29T..."}
...

NestJS 会自动设置响应头:Content-Type: text/event-streamCache-Control: no-cacheConnection: keep-alive

2. 业务事件推送(基于 Subject)

实战中很少用 interval 定时推,更多是”业务事件触发就推”。用 RxJS Subject 做事件总线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { Injectable } from "@nestjs/common";
import { Subject } from "rxjs";

@Injectable()
export class NotifyService {
  // 用 Subject 做"广播",所有订阅者都能收到
  private readonly events$ = new Subject<{ userId: string; payload: any }>();

  // 业务代码调用:触发一次推送
  push(userId: string, payload: any) {
    this.events$.next({ userId, payload });
  }

  // SSE 控制器订阅这里
  asObservable() {
    return this.events$.asObservable();
  }
}

@Controller("notify")
export class NotifyController {
  constructor(private readonly notify: NotifyService) {}

  @Sse(":userId")
  subscribe(@Param("userId") userId: string): Observable<MessageEvent> {
    // 只推给当前用户的事件
    return this.notify.asObservable().pipe(
      filter((e) => e.userId === userId),
      map((e) => ({ data: e.payload })),
    );
  }
}

// 别处触发:notify.push("u123", { type: "order.paid", id: 9527 });

3. AI 流式响应(最常见的 SSE 场景)

把 OpenAI / Claude / 内部大模型的流式 token 透传给前端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Controller("chat")
export class ChatController {
  @Sse("stream")
  async *chat(@Query("q") q: string) {
    // NestJS 也支持返回 AsyncIterable,自动转成 SSE
    const stream = await openai.chat.completions.create({
      model: "gpt-4",
      messages: [{ role: "user", content: q }],
      stream: true,
    });

    for await (const chunk of stream) {
      const delta = chunk.choices[0]?.delta?.content ?? "";
      if (delta) {
        yield { data: delta };
      }
    }
    yield { data: "[DONE]", type: "end" };
  }
}

NestJS 9+ 支持 @Sse() 方法返回 AsyncIterable,写起来比 RxJS 更直观。

浏览器侧:EventSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 基础用法
const es = new EventSource("/events/stream");

es.onopen = () => console.log("连上了");

es.onmessage = (e) => {
  // 默认事件(没有 event: 字段的消息)
  console.log("收到", JSON.parse(e.data));
};

// 监听具名事件
es.addEventListener("tick", (e) => {
  console.log("tick:", e.data);
});

es.onerror = (err) => {
  // 浏览器会自动按 retry 间隔重连,不需要手动处理
  console.warn("断了,浏览器会自动重连", err);
};

// 主动关闭
es.close();

重要特性

  • 自动重连:连接断开后,浏览器按 retry: 字段(默认 3s)自动重连
  • Last-Event-ID:重连时浏览器自动带上请求头 Last-Event-ID: <最后收到的 id>,服务器据此从断点续推
  • 同源策略EventSource 默认走同源,跨域要服务器配置 CORS(Access-Control-Allow-Origin
1
const es = new EventSource("/events", { withCredentials: true });

服务器对应要返回:

1
2
Access-Control-Allow-Origin: https://app.example.com
Access-Control-Allow-Credentials: true

EventSource 不支持的能力

EventSource API 比较”裸”,做不到:

  • 自定义请求头(如 Authorization: Bearer xxx)—— 只能把 token 放 URL query 或 cookie
  • POST 请求 —— 只能 GET
  • 请求体 —— 没有

需要这些能力时,用 fetch + ReadableStream 自己实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const res = await fetch("/chat/stream", {
  method: "POST",
  headers: { Authorization: `Bearer ${token}` },
  body: JSON.stringify({ q: "hello" }),
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });

  // 按 \n\n 切分 SSE 消息
  const events = buffer.split("\n\n");
  buffer = events.pop()!; // 留下不完整的最后一段
  for (const event of events) {
    const data = event.replace(/^data: /, "");
    console.log(data);
  }
}

社区库 @microsoft/fetch-event-source 把这些细节封装好了,是 AI 流式应用的事实选择。

反向代理 / 部署注意

SSE 长连接经常被各种”中间人”截断,几个必踩的坑:

1. Nginx 必须关闭 buffering

1
2
3
4
5
6
7
8
9
10
11
12
location /events/ {
  proxy_pass http://nest_backend;
  proxy_http_version 1.1;
  proxy_set_header Connection "";

  # 关键:不缓冲!否则消息会攒一批才推到客户端
  proxy_buffering off;
  proxy_cache off;

  # 长连接超时调大(默认 60s 会被截断)
  proxy_read_timeout 24h;
}

或者在响应里加:

1
res.setHeader("X-Accel-Buffering", "no"); // 让 Nginx 单独这条不缓冲

2. NestJS 默认压缩会破坏 SSE

如果用了 compression 中间件,要排除 SSE 路径:

1
2
3
4
5
6
7
8
9
10
import compression from "compression";

app.use(
  compression({
    filter: (req, res) => {
      if (req.path.startsWith("/events")) return false;
      return compression.filter(req, res);
    },
  }),
);

3. 心跳保活

很多代理 / 负载均衡器对 60s 没流量的连接会强制关闭。每 15-30s 发一条注释行作为心跳:

1
2
3
4
5
6
7
@Sse("stream")
stream(): Observable<MessageEvent> {
  return merge(
    businessEvents$,
    interval(20_000).pipe(map(() => ({ data: "", type: "heartbeat" }))),
  );
}

或直接在 data: 之前发 : ping\n\n(注释行不会触发 onmessage)。

4. 浏览器并发连接限制

同一域名下 EventSource 连接数受 HTTP/1.1 限制(每个域名约 6 个)。多个 tab 同时打开会很快用完。解决方案:

  • 后端用 HTTP/2(不限连接数)
  • 前端用 SharedWorker 共享一个 SSE 连接给多个 tab

QA: SSE 和 WebSocket 怎么选?

💬点击展开/收起
维度 选 SSE 选 WebSocket
通信方向 服务器 → 客户端(单向推送) 双向(聊天、协作)
数据类型 纯文本 / JSON 文本 + 二进制(音视频、文件)
自动重连 ✅ 浏览器内置 ❌ 自己实现
代理 / CDN ✅ 标准 HTTP,到处能跑 ❌ 要支持 upgrade,部分代理会拦
实现复杂度 ✅ 几行代码 ❌ 要管心跳、重连、状态机
适用场景 通知推送、日志流、AI 流式响应、行情推 实时聊天、协作编辑、游戏、白板

90% 的”服务器推消息”场景用 SSE 就够了。WebSocket 留给真正需要双向 + 二进制 + 低延迟的场景。

QA: SSE 怎么做鉴权?

💬点击展开/收起

EventSource 不支持自定义请求头,所以传统 Authorization: Bearer xxx 用不了。三种方案:

1. Cookie 鉴权(最简单)

1
const es = new EventSource("/events", { withCredentials: true });

服务器侧用 NestJS 的 JwtAuthGuard / 自定义 Guard 校验 cookie 即可。

2. URL Query 传 token(不推荐生产)

1
const es = new EventSource(`/events?token=${jwt}`);

缺点:token 进 access log、被 referer 泄漏。临时调试可以,正式环境别用。

3. 用 fetch + ReadableStream 替代 EventSource

可以自定义 header,能 POST 带 body:

1
2
3
4
await fetch("/events", {
  headers: { Authorization: `Bearer ${jwt}` },
});
// 然后手动按 SSE 格式解析

推荐用 @microsoft/fetch-event-source 库,API 类似 EventSource 但全功能。

QA: 多实例部署时 SSE 怎么广播?

💬点击展开/收起

单实例时 Subject 直接广播没问题,但多实例(K8s 多 Pod)时,用户的 SSE 连接落在哪台机器是随机的Subject.next() 只会通知本机连接的客户端。

解决方案是引入 Pub/Sub 中间件

1
业务代码 ──→ Redis Pub/Sub ──→ 所有实例的 SSE Subject ──→ 推给本机连接的客户端

NestJS 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Injectable()
export class NotifyService implements OnModuleInit {
  private readonly local$ = new Subject<any>();

  constructor(@Inject("REDIS") private readonly redis: Redis) {}

  async onModuleInit() {
    const sub = this.redis.duplicate();
    await sub.subscribe("notify");
    sub.on("message", (_ch, msg) => {
      this.local$.next(JSON.parse(msg)); // 把跨机消息塞进本机 Subject
    });
  }

  // 业务调用:跨机广播
  async push(payload: any) {
    await this.redis.publish("notify", JSON.stringify(payload));
  }

  asObservable() {
    return this.local$.asObservable();
  }
}

也可以用 NATS / Kafka / RabbitMQ 替代 Redis,看团队基础设施。

踩坑提示

  1. Nginx 不关 buffering → 消息延迟到一批才到:必须 proxy_buffering off
  2. compression 中间件 → SSE 完全收不到:把 /events 路径排除
  3. 多实例不同步:单进程 Subject 在 K8s 多 Pod 下只能广播本机;用 Redis Pub/Sub
  4. EventSource 不支持 POST + 自定义 header:要带 token / 请求体用 fetch + ReadableStream
  5. 忘了关 SSE 连接 → 内存泄漏:客户端 es.close();服务端要监听 req.on('close') 清理订阅
  6. 没心跳 → 60s 后被代理切断:每 20-30s 发一条注释行 / 自定义事件
  7. 多 Tab 打爆连接数:HTTP/1.1 单域名 6 个连接上限,开 HTTP/2 或 SharedWorker

小结

  • SSE = 服务器单向推消息的轻量长连接,基于纯 HTTP,浏览器原生支持 EventSource
  • 协议核心:Content-Type: text/event-stream + data: ...\n\n
  • NestJS 用 @Sse() + 返回 Observable<MessageEvent> / AsyncIterable 即可
  • AI 流式输出、通知推送、日志/进度流是它的主战场
  • 部署关键:Nginx 关 buffering、加心跳、多实例用 Redis Pub/Sub
  • 鉴权用 Cookie 或换 fetch + ReadableStream,需要双向通信换 WebSocket