uni.request 配置流式输出
问题背景
在项目中,需要调用通义千问 API 实现多轮对话功能。为了能够实时接收服务器返回的响应数据,需要配置 uni.request 发送 POST 请求时的流式输出(streaming output),以便能够实时接收服务器返回的响应数据。
- 服务端对话接口
import json
from fastapi import APIRouter, Depends
from app.api.v1.chat.schema import ChatRequest
from fastapi.responses import StreamingResponse, JSONResponse
from app.core.rag import RAG
# 定义模块路由
router = APIRouter(
prefix="/chat", # 接口前缀:/v1/chat
tags=["聊天接口"], # 文档分类标签
# dependencies=[Depends(verify_user_token)] # 该模块所有接口复用鉴权依赖
)
# 通用工具函数:初始化RAG和会话配置
def _init_rag_session(request: ChatRequest):
"""初始化RAG实例和会话配置"""
session_config = {
"configurable": {
"session_id": request.session_id
}
}
rag = RAG()
return rag, session_config
# 流式响应接口
@router.post("/stream", response_class=StreamingResponse)
async def chat_stream(request: ChatRequest):
"""
流式聊天接口
返回SSE格式的流式响应,专门处理流式输出场景
"""
rag, session_config = _init_rag_session(request)
async def generate_response():
try:
# 获取流式响应迭代器
response_stream = rag.chain.stream({"question": request.prompt}, session_config)
# 逐个yield响应块(SSE格式)
for chunk in response_stream:
yield f"data: {json.dumps({'chunk': chunk}, ensure_ascii=False)}\n\n"
# 发送结束标记
yield "data: [DONE]\n\n"
except Exception as e:
# 错误处理:返回标准错误格式
error_data = json.dumps({"error": str(e)}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
# 返回SSE格式的流式响应
return StreamingResponse(
generate_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
# 非流式响应接口
@router.post("/completion", response_class=JSONResponse)
async def chat_completion(request: ChatRequest):
"""
非流式聊天接口
返回完整的JSON响应,专门处理一次性输出场景
"""
try:
rag, session_config = _init_rag_session(request)
# 非流式调用
response = rag.chain.invoke({"question": request.prompt}, session_config)
return JSONResponse(
content={"session_id": request.session_id, "response": response},
status_code=200
)
except Exception as e:
# 统一异常处理
return JSONResponse(
content={"session_id": request.session_id, "error": str(e)},
status_code=500
)解决方案
起初,我希望通过云函数调用服务器的 AI 对话接口。于是我交给 Trae 处理。Trae 在 uniCloud 云函数中调用服务端对话接口后无法返回数据。经过多次与 Trae 沟通,仍然无法接受数据。
通过查看 uniCloud 云函数的文档,终于发现 uniCloud 云函数默认是不支持流式输出的。
为了解决这个问题,需要使用客户端请求接口 uni.request 并配置 enableChunked: true,开启流式输出。
当初首选 uniCloud 云函数调用服务端对话接口,是因为考虑到云函数的安全性高。但是,由于服务端对话接口默认不支持流式输出,导致无法实时接收服务器返回的响应数据。最终方案确定为:云函数负责用户校验和获取请求地址,在前端使用 uni.request 发送 POST 请求。
接下来我参考了网上的资料,将问题和方案继续交给 Trae 处理。分别编写 uniapp 请求工具类和前端调用代码。
在工具类中,由于 H5 不支持流式输出,所以需要兼容非流式接口。此外,接收和解析流式输出的响应数据也可以在工具类中实现,前端调用时监听 onData 事件,只需要专注于 UI 更新即可。
- 请求工具类
// 通用请求函数(自动判断流式并控制 enableChunked)
const request = (url, method, data, headers) => {
return new Promise((resolve, reject) => {
// 合并默认 header
const mergedHeaders = {
'Content-Type': 'application/json',
...headers,
};
// 核心:根据接口路径判断是否为流式接口
const isStreamApi = url.includes('/chat/stream');
// 非流式:直接返回简单对象
if (!isStreamApi) {
uni.request({
url,
method,
data,
header: mergedHeaders,
enableChunked: false,
success: (res) => {
const result = res.data?.response || res.data;
resolve({
onData: (callback) => callback(result),
onHeaders: () => { },
abort: () => { },
getResult: () => Promise.resolve(result),
});
},
fail: reject,
});
return;
}
// 流式处理
const requestTask = uni.request({
url,
method,
data,
header: mergedHeaders,
enableChunked: true,
success: () => { }, // 流式响应通过 onChunkReceived 处理
fail: reject,
});
// 统一的响应处理对象(适配 UI 刷新)
const responseHandler = {
/**
* 监听数据(核心:直接返回可渲染的纯文本片段)
* @param {Function} callback - 接收文本片段的回调 (textChunk) => void
* @param {Object} options - 可选配置:{ speed: 100 } 逐字输出速率(ms)
*/
onData: (callback, options = { speed: 100 }) => {
const { speed } = options;
let chunkCache = '';
if (requestTask.onChunkReceived) {
requestTask.onChunkReceived((res) => {
// 1. 解析 ArrayBuffer 为字符串
let chunkStr = '';
if (res.data instanceof ArrayBuffer) {
chunkStr = new TextDecoder('utf-8').decode(res.data);
} else {
chunkStr = res.data || '';
}
// 2. 合并缓存
chunkStr = chunkCache + chunkStr;
chunkCache = '';
// 3. 按行分割 SSE 格式数据
const lines = chunkStr.split('\n');
const validLines = [];
// 处理最后一行可能不完整的情况
if (lines.length > 0) {
const lastLine = lines[lines.length - 1];
if (!lastLine.startsWith('data: ') || !lastLine.trim()) {
chunkCache = lastLine;
lines.pop();
}
}
// 过滤有效行并解析
lines.forEach(line => {
const jsonStr = line.replace('data: ', '').trim();
try {
const jsonData = JSON.parse(jsonStr);
validLines.push(jsonData.chunk || jsonData.error || '');
} catch (e) {
validLines.push(jsonStr);
}
});
// 4. 按速率逐行触发回调
let lineIndex = 0;
const interval = setInterval(() => {
if (lineIndex >= validLines.length) {
clearInterval(interval);
return;
}
callback(validLines[lineIndex]);
lineIndex++;
}, speed);
});
} else {
// 降级处理
console.warn('当前平台不支持流式,已降级');
requestTask.success = function (res) {
callback(res.data?.response || res.data);
};
}
},
onHeaders: (callback) => {
if (requestTask.onHeadersReceived) {
requestTask.onHeadersReceived((res) => callback(res.header));
}
},
abort: () => {
if (requestTask.abort) {
requestTask.abort();
}
},
getResult: () => {
return new Promise((resolve) => {
let fullText = '';
responseHandler.onData((chunk) => {
fullText += chunk;
}, { speed: 0 });
setTimeout(() => resolve(fullText), 100);
});
},
};
resolve(responseHandler);
});
};
// GET/POST 方法保持不变
const get = (url, data) => request(url, 'GET', data);
const post = (url, data, headers) => request(url, 'POST', data, headers);
export { get, post };注意
流式 API 支持 : onChunkReceived 和 onHeadersReceived 方法在不同平台的支持情况不同,代码中已做降级处理,但仍需注意部分平台可能完全不支持。在 uniapp 中,onChunkReceived 方法在部分平台(如 H5)上不支持,导致无法实时接收服务器返回的响应数据。
enableChunked 参数 :仅在小程序中支持,其他平台(如 H5)可能会忽略此参数。
- 前端调用代码
// 发送消息
sendMessage() {
// 如果传入了content,使用传入的内容,否则使用输入框的内容
if (!this.inputMessage) return
// 添加用户消息
this.messages.push({
role: "human",
content: this.inputMessage
},{
role: "ai",
content: "思考中..."
})
// 清空输入框
this.inputMessage = ""
// 滚动到最新消息
this.scrollToBottom()
// 发送消息中
this.isSending = true
// 调用智能客服
this.chatWithBot()
},
// 智能客服
async chatWithBot() {
let isStream = false
// #ifdef MP-WEIXIN
isStream = true
// #endif
// 请求智能客服URL
const {session_id, url} = await chatbotCloudObj.getUrl("chat", isStream)
const prompt = this.messages[this.messages.length - 2].content
const res = await post(url, {
prompt: prompt,
session_id: session_id,
})
// 核心:监听 onData,直接拿到纯文本片段
res.onData((textChunk) => {
console.log(textChunk)
// 首次触发清空「思考中...」
if (this.messages[this.messages.length - 1].content === "思考中...") {
this.messages[this.messages.length - 1].content = "";
}
// 处理结束标记(兼容非流式:非流式不会返回 [DONE],需单独处理)
const isDone = textChunk.includes("[DONE]") || (!isStream && textChunk);
if (isDone) {
// 非流式:移除可能的结束标记,确保内容干净
const cleanText = textChunk.replace("[DONE]", "").trim();
if (cleanText) {
this.messages[this.messages.length - 1].content += cleanText;
}
this.isSending = false;
this.$forceUpdate();
this.scrollToBottom();
return;
}
// 追加文本片段(直接渲染)
this.messages[this.messages.length - 1].content += textChunk;
// 强制刷新 UI 并滚动到底部
this.$forceUpdate();
this.scrollToBottom();
}, { speed: isStream ? 100 : 0 }); // 可选:控制逐字输出速率(ms)
}注意
流式响应适配 :仅在微信小程序环境下启用流式处理 ( #ifdef MP-WEIXIN ),其他平台(如 H5)使用非流式响应。
输出速率控制 :根据是否为流式响应设置不同的输出速率 ( speed: isStream ? 100 : 0 )。