fix: 修复流式聊天发生错误时继续处理后续事件的问题

- 增加 hasError 标记,错误发生后立即停止处理后续事件
- 在 done 事件处理前检查是否已有错误
- 在 finally 中确保 reader 被正确关闭
- 防止错误发生后,done 事件继续执行导致历史响应被错误回填
main
sp mac bookpro 2605 2026-06-03 01:16:24 +08:00
parent c30d301fbc
commit 92ef2c38c8
1 changed files with 76 additions and 65 deletions

View File

@ -748,83 +748,94 @@ export async function regenerateMessage(
return await consumeSSE(resp, handlers, signal);
}
async function consumeSSE(resp: Response, h: StreamEvents, signal?: AbortSignal) {
if (!resp.ok || !resp.body) {
const txt = await resp.text().catch(() => '');
h.onError?.(`HTTP ${resp.status}: ${txt}`);
return;
}
const reader = resp.body.getReader();
const decoder = new TextDecoder('utf-8');
let buf = '';
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
async function consumeSSE(resp: Response, h: StreamEvents, signal?: AbortSignal) {
if (!resp.ok || !resp.body) {
const txt = await resp.text().catch(() => '');
h.onError?.(`HTTP ${resp.status}: ${txt}`);
return;
}
const reader = resp.body.getReader();
const decoder = new TextDecoder('utf-8');
let buf = '';
let hasError = false; // 标记是否已发生错误,避免后续事件继续处理
try {
while (true) {
const { value, done } = await reader.read();
if (done || hasError) break;
buf += decoder.decode(value, { stream: true });
buf = buf.replace(/\r\n/g, '\n');
let idx;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const raw = buf.slice(0, idx);
buf = buf.slice(idx + 2);
if (!raw.trim() || raw.startsWith(':')) continue;
let event = 'message';
let dataStr = '';
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) event = line.slice(6).trim();
let idx;
while ((idx = buf.indexOf('\n\n')) !== -1 && !hasError) {
const raw = buf.slice(0, idx);
buf = buf.slice(idx + 2);
if (!raw.trim() || raw.startsWith(':')) continue;
let event = 'message';
let dataStr = '';
for (const line of raw.split('\n')) {
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) {
let part = line.slice(5);
if (part.startsWith(' ')) part = part.slice(1);
dataStr += (dataStr ? '\n' : '') + part;
}
}
if (!dataStr) continue;
let data: any;
try {
data = JSON.parse(dataStr);
} catch {
continue;
}
switch (event) {
case 'meta':
h.onMeta?.(data);
break;
}
if (!dataStr) continue;
let data: any;
try {
data = JSON.parse(dataStr);
} catch {
continue;
}
switch (event) {
case 'meta':
h.onMeta?.(data);
break;
case 'retry':
h.onRetry?.(data);
break;
case 'reasoning_delta':
h.onReasoningDelta?.(data.content || '');
break;
case 'delta':
h.onDelta?.(data.content || '');
break;
case 'tool_call':
h.onToolCall?.(data);
break;
case 'tool_result':
h.onToolResult?.(data);
break;
case 'done':
h.onDone?.(data);
break;
case 'aborted':
h.onAborted?.(data);
break;
case 'error':
h.onError?.(data.message || 'stream error');
break;
}
}
}
} catch (e: any) {
if (signal?.aborted || e?.name === 'AbortError') {
// 静默:本地已 abort
return;
}
h.onError?.(e?.message ?? String(e));
}
case 'delta':
h.onDelta?.(data.content || '');
break;
case 'tool_call':
h.onToolCall?.(data);
break;
case 'tool_result':
h.onToolResult?.(data);
break;
case 'done':
// 只有在没有错误的情况下才处理 done 事件
if (!hasError) {
h.onDone?.(data);
}
break;
case 'aborted':
h.onAborted?.(data);
break;
case 'error':
hasError = true;
h.onError?.(data.message || 'stream error');
// 发生错误后立即停止读取
break;
}
}
}
} catch (e: any) {
if (signal?.aborted || e?.name === 'AbortError') {
// 静默:本地已 abort
return;
}
h.onError?.(e?.message ?? String(e));
} finally {
// 确保 reader 被释放
reader.cancel().catch(() => {});
}
}
// ============== Workflow (v1.1) ==============