为什么需要流式响应
当你调用 LLM API 时,模型可能需要 5-30 秒才能生成完整回答。如果等全部生成完再显示,用户会觉得"卡住了"。
流式响应让文字边生成边显示,配合打字机效果,用户体验直接提升一个档次。
本文目标
- 理解 SSE 流式传输原理
- 实现服务端流式输出
- 前端接收与解析流数据
- 打字机动画效果实现
- 性能优化与边界处理
SSE 协议基础
Server-Sent Events(SSE)是服务端向客户端单向推送数据的标准协议。
数据格式
data: 第一条消息\n\n
data: 第二条消息\n\n
data: {"type": "chunk", "content": "你好"}\n\n
- 每条消息以
data:开头 - 消息以两个换行符
\n\n结尾 - 支持 JSON 格式数据
关键 HTTP 头
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
服务端实现
Nuxt 3 示例
// server/api/chat.post.ts
import { OpenAI } from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export default defineEventHandler(async (event) => {
const { messages } = await readBody(event)
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages,
stream: true
})
// 设置 SSE 响应头
setHeader(event, 'Content-Type', 'text/event-stream')
setHeader(event, 'Cache-Control', 'no-cache')
setHeader(event, 'Connection', 'keep-alive')
// 返回流
return new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
try {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
const data = JSON.stringify({ content, done: false })
controller.enqueue(encoder.encode(`data: ${data}\n\n`))
}
}
// 发送完成信号
controller.enqueue(encoder.encode(`data: {"done": true}\n\n`))
} catch (error) {
const errorData = JSON.stringify({ error: '生成失败', done: true })
controller.enqueue(encoder.encode(`data: ${errorData}\n\n`))
} finally {
controller.close()
}
}
})
})
Next.js 示例
// app/api/chat/route.ts
import { OpenAI } from 'openai'
const openai = new OpenAI()
export async function POST(req: Request) {
const { messages } = await req.json()
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages,
stream: true
})
const encoder = new TextEncoder()
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
}
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
}
})
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
})
}
前端接收流数据
使用 fetch + ReadableStream
async function* streamChat(messages: Message[]) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// 按行分割处理
const lines = buffer.split('\n')
buffer = lines.pop() || '' // 保留未完成的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim()
if (data === '[DONE]') return
try {
yield JSON.parse(data)
} catch (e) {
console.warn('解析失败:', data)
}
}
}
}
}
// 使用
for await (const chunk of streamChat(messages)) {
console.log(chunk.content)
}
封装成 Composable
// composables/useStreamChat.ts
export function useStreamChat() {
const content = ref('')
const isStreaming = ref(false)
const error = ref<Error | null>(null)
let abortController: AbortController | null = null
async function send(messages: Message[]) {
// 取消上一次请求
abortController?.abort()
abortController = new AbortController()
content.value = ''
isStreaming.value = true
error.value = null
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
signal: abortController.signal
})
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim()
if (data === '[DONE]' || data.includes('"done":true')) continue
try {
const parsed = JSON.parse(data)
content.value += parsed.content || ''
} catch {}
}
}
}
} catch (e) {
if ((e as Error).name !== 'AbortError') {
error.value = e as Error
}
} finally {
isStreaming.value = false
}
}
function stop() {
abortController?.abort()
isStreaming.value = false
}
return { content, isStreaming, error, send, stop }
}
打字机效果实现
基础版本
<script setup lang="ts">
const props = defineProps<{
text: string
speed?: number
}>()
const displayText = ref('')
watch(() => props.text, (newText, oldText) => {
// 只添加新增的部分
if (newText.startsWith(oldText || '')) {
displayText.value = newText
} else {
displayText.value = newText
}
})
</script>
<template>
<div class="whitespace-pre-wrap">
{{ displayText }}
<span class="animate-pulse">▌</span>
</div>
</template>
平滑动画版本
<script setup lang="ts">
const props = defineProps<{
text: string
speed?: number // 每个字符的延迟(毫秒)
}>()
const displayText = ref('')
const isTyping = ref(false)
let typeIndex = 0
let animationFrame: number
watch(() => props.text, (newText) => {
if (newText.length > displayText.value.length) {
// 新内容到达,继续打字
typeNextChar(newText)
}
}, { immediate: true })
function typeNextChar(targetText: string) {
if (typeIndex >= targetText.length) {
isTyping.value = false
return
}
isTyping.value = true
const type = () => {
if (typeIndex < targetText.length) {
displayText.value = targetText.slice(0, typeIndex + 1)
typeIndex++
// 根据字符类型调整速度
const char = targetText[typeIndex - 1]
const delay = /[。!?.!?]/.test(char) ? 100 :
/[,,]/.test(char) ? 50 :
props.speed || 20
setTimeout(type, delay)
} else {
isTyping.value = false
}
}
type()
}
onUnmounted(() => {
cancelAnimationFrame(animationFrame)
})
</script>
<template>
<div class="relative">
<span class="whitespace-pre-wrap" v-html="displayText"></span>
<span
v-if="isTyping"
class="inline-block w-2 h-5 bg-current animate-blink ml-0.5"
></span>
</div>
</template>
<style scoped>
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
.animate-blink {
animation: blink 1s infinite;
}
</style>
支持 Markdown 渲染
<script setup lang="ts">
import { marked } from 'marked'
import DOMPurify from 'dompurify'
const props = defineProps<{
text: string
}>()
const renderedHtml = computed(() => {
const html = marked.parse(props.text) as string
return DOMPurify.sanitize(html)
})
</script>
<template>
<div
class="prose dark:prose-invert max-w-none"
v-html="renderedHtml"
></div>
</template>
性能优化
1. 防抖渲染
流式更新频率很高(每秒几十次),每次都触发渲染会卡顿。
function useThrottledContent(rawContent: Ref<string>, interval = 50) {
const throttledContent = ref('')
let lastUpdate = 0
let pending = false
watch(rawContent, (value) => {
const now = Date.now()
if (now - lastUpdate >= interval) {
throttledContent.value = value
lastUpdate = now
} else if (!pending) {
pending = true
setTimeout(() => {
throttledContent.value = rawContent.value
lastUpdate = Date.now()
pending = false
}, interval - (now - lastUpdate))
}
})
return throttledContent
}
// 使用
const { content } = useStreamChat()
const displayContent = useThrottledContent(content, 50)
2. 虚拟滚动长文本
<script setup lang="ts">
// 对于超长回复,只渲染可视区域
const containerRef = ref<HTMLElement>()
const { list, containerProps, wrapperProps } = useVirtualList(
computed(() => content.value.split('\n')),
{ itemHeight: 24 }
)
</script>
<template>
<div v-bind="containerProps" class="h-96 overflow-auto">
<div v-bind="wrapperProps">
<div v-for="item in list" :key="item.index">
{{ item.data }}
</div>
</div>
</div>
</template>
3. 自动滚动到底部
const chatContainer = ref<HTMLElement>()
const shouldAutoScroll = ref(true)
// 检测用户是否手动滚动
function handleScroll() {
const el = chatContainer.value!
const isAtBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 50
shouldAutoScroll.value = isAtBottom
}
// 新内容时自动滚动
watch(content, () => {
if (shouldAutoScroll.value) {
nextTick(() => {
chatContainer.value?.scrollTo({
top: chatContainer.value.scrollHeight,
behavior: 'smooth'
})
})
}
})
边界情况处理
连接中断
async function streamWithReconnect(messages: Message[], maxRetries = 3) {
let retries = 0
let receivedContent = ''
while (retries < maxRetries) {
try {
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({
messages,
resumeFrom: receivedContent // 服务端支持断点续传
})
})
// 正常处理流...
for await (const chunk of parseStream(response)) {
receivedContent += chunk.content
yield chunk
}
return // 成功完成
} catch (e) {
retries++
if (retries >= maxRetries) throw e
await new Promise(r => setTimeout(r, 1000 * retries))
}
}
}
超时处理
function streamWithTimeout(messages: Message[], timeout = 30000) {
const controller = new AbortController()
const timeoutId = setTimeout(() => {
controller.abort()
}, timeout)
return {
stream: streamChat(messages, controller.signal),
cancel: () => {
clearTimeout(timeoutId)
controller.abort()
}
}
}
完整组件示例
<script setup lang="ts">
const messages = ref<Message[]>([])
const inputText = ref('')
const { content, isStreaming, send, stop } = useStreamChat()
async function handleSubmit() {
if (!inputText.value.trim() || isStreaming.value) return
const userMessage = { role: 'user', content: inputText.value }
messages.value.push(userMessage)
inputText.value = ''
await send(messages.value)
messages.value.push({ role: 'assistant', content: content.value })
}
</script>
<template>
<div class="flex flex-col h-screen">
<!-- 消息列表 -->
<div class="flex-1 overflow-auto p-4 space-y-4">
<div
v-for="(msg, i) in messages"
:key="i"
:class="msg.role === 'user' ? 'text-right' : 'text-left'"
>
<div :class="[
'inline-block max-w-[80%] p-3 rounded-lg',
msg.role === 'user' ? 'bg-blue-500 text-white' : 'bg-gray-100'
]">
{{ msg.content }}
</div>
</div>
<!-- 流式输出中 -->
<div v-if="isStreaming" class="text-left">
<div class="inline-block max-w-[80%] p-3 rounded-lg bg-gray-100">
<TypewriterText :text="content" />
</div>
</div>
</div>
<!-- 输入区域 -->
<div class="border-t p-4">
<div class="flex gap-2">
<input
v-model="inputText"
@keydown.enter="handleSubmit"
class="flex-1 border rounded-lg px-4 py-2"
placeholder="输入消息..."
:disabled="isStreaming"
/>
<button
v-if="isStreaming"
@click="stop"
class="px-4 py-2 bg-red-500 text-white rounded-lg"
>
停止
</button>
<button
v-else
@click="handleSubmit"
class="px-4 py-2 bg-blue-500 text-white rounded-lg"
>
发送
</button>
</div>
</div>
</div>
</template>
总结
流式响应 + 打字机效果 = 卓越的 AI 对话体验
关键点:
- 服务端:正确设置 SSE 响应头,逐 chunk 发送
- 前端接收:使用 ReadableStream 解析流数据
- 打字机:平滑动画,标点智能停顿
- 性能:节流渲染,自动滚动,长文本虚拟化
相关文章推荐:


