流式响应与打字机效果:前端 AI 对话的用户体验优化

HTMLPAGE 团队
16 分钟阅读

详解 SSE 流式传输原理、前端接收处理、打字机动画实现及性能优化技巧

#流式响应 #SSE #打字机效果 #用户体验

为什么需要流式响应

当你调用 LLM API 时,模型可能需要 5-30 秒才能生成完整回答。如果等全部生成完再显示,用户会觉得"卡住了"。

流式响应让文字边生成边显示,配合打字机效果,用户体验直接提升一个档次。

本文目标

  1. 理解 SSE 流式传输原理
  2. 实现服务端流式输出
  3. 前端接收与解析流数据
  4. 打字机动画效果实现
  5. 性能优化与边界处理

SSE 协议基础

Server-Sent Events(SSE)是服务端向客户端单向推送数据的标准协议。

数据格式

data: 第一条消息\n\n
data: 第二条消息\n\n
data: {"type": "chunk", "content": "你好"}\n\n
  • 每条消息以 data: 开头
  • 消息以两个换行符 \n\n 结尾
  • 支持 JSON 格式数据

关键 HTTP 头

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

服务端实现

Nuxt 3 示例

// server/api/chat.post.ts
import { OpenAI } from 'openai'

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })

export default defineEventHandler(async (event) => {
  const { messages } = await readBody(event)

  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo-preview',
    messages,
    stream: true
  })

  // 设置 SSE 响应头
  setHeader(event, 'Content-Type', 'text/event-stream')
  setHeader(event, 'Cache-Control', 'no-cache')
  setHeader(event, 'Connection', 'keep-alive')

  // 返回流
  return new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder()

      try {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content || ''
          if (content) {
            const data = JSON.stringify({ content, done: false })
            controller.enqueue(encoder.encode(`data: ${data}\n\n`))
          }
        }
        
        // 发送完成信号
        controller.enqueue(encoder.encode(`data: {"done": true}\n\n`))
      } catch (error) {
        const errorData = JSON.stringify({ error: '生成失败', done: true })
        controller.enqueue(encoder.encode(`data: ${errorData}\n\n`))
      } finally {
        controller.close()
      }
    }
  })
})

Next.js 示例

// app/api/chat/route.ts
import { OpenAI } from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo-preview',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  const readable = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content || ''
        if (content) {
          controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
        }
      }
      controller.enqueue(encoder.encode('data: [DONE]\n\n'))
      controller.close()
    }
  })

  return new Response(readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive'
    }
  })
}

前端接收流数据

使用 fetch + ReadableStream

async function* streamChat(messages: Message[]) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  if (!response.ok) {
    throw new Error(`HTTP ${response.status}`)
  }

  const reader = response.body!.getReader()
  const decoder = new TextDecoder()
  let buffer = ''

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    buffer += decoder.decode(value, { stream: true })
    
    // 按行分割处理
    const lines = buffer.split('\n')
    buffer = lines.pop() || '' // 保留未完成的行

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6).trim()
        if (data === '[DONE]') return
        
        try {
          yield JSON.parse(data)
        } catch (e) {
          console.warn('解析失败:', data)
        }
      }
    }
  }
}

// 使用
for await (const chunk of streamChat(messages)) {
  console.log(chunk.content)
}

封装成 Composable

// composables/useStreamChat.ts
export function useStreamChat() {
  const content = ref('')
  const isStreaming = ref(false)
  const error = ref<Error | null>(null)

  let abortController: AbortController | null = null

  async function send(messages: Message[]) {
    // 取消上一次请求
    abortController?.abort()
    abortController = new AbortController()

    content.value = ''
    isStreaming.value = true
    error.value = null

    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages }),
        signal: abortController.signal
      })

      const reader = response.body!.getReader()
      const decoder = new TextDecoder()
      let buffer = ''

      while (true) {
        const { done, value } = await reader.read()
        if (done) break

        buffer += decoder.decode(value, { stream: true })
        const lines = buffer.split('\n')
        buffer = lines.pop() || ''

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6).trim()
            if (data === '[DONE]' || data.includes('"done":true')) continue
            
            try {
              const parsed = JSON.parse(data)
              content.value += parsed.content || ''
            } catch {}
          }
        }
      }
    } catch (e) {
      if ((e as Error).name !== 'AbortError') {
        error.value = e as Error
      }
    } finally {
      isStreaming.value = false
    }
  }

  function stop() {
    abortController?.abort()
    isStreaming.value = false
  }

  return { content, isStreaming, error, send, stop }
}

打字机效果实现

基础版本

<script setup lang="ts">
const props = defineProps<{
  text: string
  speed?: number
}>()

const displayText = ref('')

watch(() => props.text, (newText, oldText) => {
  // 只添加新增的部分
  if (newText.startsWith(oldText || '')) {
    displayText.value = newText
  } else {
    displayText.value = newText
  }
})
</script>

<template>
  <div class="whitespace-pre-wrap">
    {{ displayText }}
    <span class="animate-pulse">▌</span>
  </div>
</template>

平滑动画版本

<script setup lang="ts">
const props = defineProps<{
  text: string
  speed?: number  // 每个字符的延迟(毫秒)
}>()

const displayText = ref('')
const isTyping = ref(false)
let typeIndex = 0
let animationFrame: number

watch(() => props.text, (newText) => {
  if (newText.length > displayText.value.length) {
    // 新内容到达,继续打字
    typeNextChar(newText)
  }
}, { immediate: true })

function typeNextChar(targetText: string) {
  if (typeIndex >= targetText.length) {
    isTyping.value = false
    return
  }

  isTyping.value = true
  
  const type = () => {
    if (typeIndex < targetText.length) {
      displayText.value = targetText.slice(0, typeIndex + 1)
      typeIndex++
      
      // 根据字符类型调整速度
      const char = targetText[typeIndex - 1]
      const delay = /[。!?.!?]/.test(char) ? 100 : 
                    /[,,]/.test(char) ? 50 : 
                    props.speed || 20
      
      setTimeout(type, delay)
    } else {
      isTyping.value = false
    }
  }
  
  type()
}

onUnmounted(() => {
  cancelAnimationFrame(animationFrame)
})
</script>

<template>
  <div class="relative">
    <span class="whitespace-pre-wrap" v-html="displayText"></span>
    <span 
      v-if="isTyping" 
      class="inline-block w-2 h-5 bg-current animate-blink ml-0.5"
    ></span>
  </div>
</template>

<style scoped>
@keyframes blink {
  0%, 50% { opacity: 1; }
  51%, 100% { opacity: 0; }
}
.animate-blink {
  animation: blink 1s infinite;
}
</style>

支持 Markdown 渲染

<script setup lang="ts">
import { marked } from 'marked'
import DOMPurify from 'dompurify'

const props = defineProps<{
  text: string
}>()

const renderedHtml = computed(() => {
  const html = marked.parse(props.text) as string
  return DOMPurify.sanitize(html)
})
</script>

<template>
  <div 
    class="prose dark:prose-invert max-w-none"
    v-html="renderedHtml"
  ></div>
</template>

性能优化

1. 防抖渲染

流式更新频率很高(每秒几十次),每次都触发渲染会卡顿。

function useThrottledContent(rawContent: Ref<string>, interval = 50) {
  const throttledContent = ref('')
  let lastUpdate = 0
  let pending = false

  watch(rawContent, (value) => {
    const now = Date.now()
    if (now - lastUpdate >= interval) {
      throttledContent.value = value
      lastUpdate = now
    } else if (!pending) {
      pending = true
      setTimeout(() => {
        throttledContent.value = rawContent.value
        lastUpdate = Date.now()
        pending = false
      }, interval - (now - lastUpdate))
    }
  })

  return throttledContent
}

// 使用
const { content } = useStreamChat()
const displayContent = useThrottledContent(content, 50)

2. 虚拟滚动长文本

<script setup lang="ts">
// 对于超长回复,只渲染可视区域
const containerRef = ref<HTMLElement>()
const { list, containerProps, wrapperProps } = useVirtualList(
  computed(() => content.value.split('\n')),
  { itemHeight: 24 }
)
</script>

<template>
  <div v-bind="containerProps" class="h-96 overflow-auto">
    <div v-bind="wrapperProps">
      <div v-for="item in list" :key="item.index">
        {{ item.data }}
      </div>
    </div>
  </div>
</template>

3. 自动滚动到底部

const chatContainer = ref<HTMLElement>()
const shouldAutoScroll = ref(true)

// 检测用户是否手动滚动
function handleScroll() {
  const el = chatContainer.value!
  const isAtBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 50
  shouldAutoScroll.value = isAtBottom
}

// 新内容时自动滚动
watch(content, () => {
  if (shouldAutoScroll.value) {
    nextTick(() => {
      chatContainer.value?.scrollTo({
        top: chatContainer.value.scrollHeight,
        behavior: 'smooth'
      })
    })
  }
})

边界情况处理

连接中断

async function streamWithReconnect(messages: Message[], maxRetries = 3) {
  let retries = 0
  let receivedContent = ''

  while (retries < maxRetries) {
    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        body: JSON.stringify({ 
          messages,
          resumeFrom: receivedContent // 服务端支持断点续传
        })
      })
      
      // 正常处理流...
      for await (const chunk of parseStream(response)) {
        receivedContent += chunk.content
        yield chunk
      }
      
      return // 成功完成
    } catch (e) {
      retries++
      if (retries >= maxRetries) throw e
      await new Promise(r => setTimeout(r, 1000 * retries))
    }
  }
}

超时处理

function streamWithTimeout(messages: Message[], timeout = 30000) {
  const controller = new AbortController()
  
  const timeoutId = setTimeout(() => {
    controller.abort()
  }, timeout)

  return {
    stream: streamChat(messages, controller.signal),
    cancel: () => {
      clearTimeout(timeoutId)
      controller.abort()
    }
  }
}

完整组件示例

<script setup lang="ts">
const messages = ref<Message[]>([])
const inputText = ref('')
const { content, isStreaming, send, stop } = useStreamChat()

async function handleSubmit() {
  if (!inputText.value.trim() || isStreaming.value) return
  
  const userMessage = { role: 'user', content: inputText.value }
  messages.value.push(userMessage)
  inputText.value = ''

  await send(messages.value)
  
  messages.value.push({ role: 'assistant', content: content.value })
}
</script>

<template>
  <div class="flex flex-col h-screen">
    <!-- 消息列表 -->
    <div class="flex-1 overflow-auto p-4 space-y-4">
      <div 
        v-for="(msg, i) in messages" 
        :key="i"
        :class="msg.role === 'user' ? 'text-right' : 'text-left'"
      >
        <div :class="[
          'inline-block max-w-[80%] p-3 rounded-lg',
          msg.role === 'user' ? 'bg-blue-500 text-white' : 'bg-gray-100'
        ]">
          {{ msg.content }}
        </div>
      </div>
      
      <!-- 流式输出中 -->
      <div v-if="isStreaming" class="text-left">
        <div class="inline-block max-w-[80%] p-3 rounded-lg bg-gray-100">
          <TypewriterText :text="content" />
        </div>
      </div>
    </div>

    <!-- 输入区域 -->
    <div class="border-t p-4">
      <div class="flex gap-2">
        <input 
          v-model="inputText"
          @keydown.enter="handleSubmit"
          class="flex-1 border rounded-lg px-4 py-2"
          placeholder="输入消息..."
          :disabled="isStreaming"
        />
        <button 
          v-if="isStreaming"
          @click="stop"
          class="px-4 py-2 bg-red-500 text-white rounded-lg"
        >
          停止
        </button>
        <button 
          v-else
          @click="handleSubmit"
          class="px-4 py-2 bg-blue-500 text-white rounded-lg"
        >
          发送
        </button>
      </div>
    </div>
  </div>
</template>

总结

流式响应 + 打字机效果 = 卓越的 AI 对话体验

关键点:

  1. 服务端:正确设置 SSE 响应头,逐 chunk 发送
  2. 前端接收:使用 ReadableStream 解析流数据
  3. 打字机:平滑动画,标点智能停顿
  4. 性能:节流渲染,自动滚动,长文本虚拟化

相关文章推荐: