diff --git a/src/api.ts b/src/api.ts index 9208cf4..951dc93 100644 --- a/src/api.ts +++ b/src/api.ts @@ -748,83 +748,94 @@ export async function regenerateMessage( return await consumeSSE(resp, handlers, signal); } -async function consumeSSE(resp: Response, h: StreamEvents, signal?: AbortSignal) { - if (!resp.ok || !resp.body) { - const txt = await resp.text().catch(() => ''); - h.onError?.(`HTTP ${resp.status}: ${txt}`); - return; - } - const reader = resp.body.getReader(); - const decoder = new TextDecoder('utf-8'); - let buf = ''; - try { - while (true) { - const { value, done } = await reader.read(); - if (done) break; - buf += decoder.decode(value, { stream: true }); +async function consumeSSE(resp: Response, h: StreamEvents, signal?: AbortSignal) { + if (!resp.ok || !resp.body) { + const txt = await resp.text().catch(() => ''); + h.onError?.(`HTTP ${resp.status}: ${txt}`); + return; + } + const reader = resp.body.getReader(); + const decoder = new TextDecoder('utf-8'); + let buf = ''; + let hasError = false; // 标记是否已发生错误,避免后续事件继续处理 + + try { + while (true) { + const { value, done } = await reader.read(); + if (done || hasError) break; + + buf += decoder.decode(value, { stream: true }); buf = buf.replace(/\r\n/g, '\n'); - - let idx; - while ((idx = buf.indexOf('\n\n')) !== -1) { - const raw = buf.slice(0, idx); - buf = buf.slice(idx + 2); - if (!raw.trim() || raw.startsWith(':')) continue; - - let event = 'message'; - let dataStr = ''; - for (const line of raw.split('\n')) { - if (line.startsWith('event:')) event = line.slice(6).trim(); + + let idx; + while ((idx = buf.indexOf('\n\n')) !== -1 && !hasError) { + const raw = buf.slice(0, idx); + buf = buf.slice(idx + 2); + if (!raw.trim() || raw.startsWith(':')) continue; + + let event = 'message'; + let dataStr = ''; + for (const line of raw.split('\n')) { + if (line.startsWith('event:')) event = line.slice(6).trim(); else if (line.startsWith('data:')) { let part = line.slice(5); if (part.startsWith(' ')) part = part.slice(1); dataStr += (dataStr ? '\n' : '') + part; } - } - if (!dataStr) continue; - let data: any; - try { - data = JSON.parse(dataStr); - } catch { - continue; - } - switch (event) { - case 'meta': - h.onMeta?.(data); - break; + } + if (!dataStr) continue; + let data: any; + try { + data = JSON.parse(dataStr); + } catch { + continue; + } + switch (event) { + case 'meta': + h.onMeta?.(data); + break; case 'retry': h.onRetry?.(data); break; case 'reasoning_delta': h.onReasoningDelta?.(data.content || ''); break; - case 'delta': - h.onDelta?.(data.content || ''); - break; - case 'tool_call': - h.onToolCall?.(data); - break; - case 'tool_result': - h.onToolResult?.(data); - break; - case 'done': - h.onDone?.(data); - break; - case 'aborted': - h.onAborted?.(data); - break; - case 'error': - h.onError?.(data.message || 'stream error'); - break; - } - } - } - } catch (e: any) { - if (signal?.aborted || e?.name === 'AbortError') { - // 静默:本地已 abort - return; - } - h.onError?.(e?.message ?? String(e)); - } + case 'delta': + h.onDelta?.(data.content || ''); + break; + case 'tool_call': + h.onToolCall?.(data); + break; + case 'tool_result': + h.onToolResult?.(data); + break; + case 'done': + // 只有在没有错误的情况下才处理 done 事件 + if (!hasError) { + h.onDone?.(data); + } + break; + case 'aborted': + h.onAborted?.(data); + break; + case 'error': + hasError = true; + h.onError?.(data.message || 'stream error'); + // 发生错误后立即停止读取 + break; + } + } + } + } catch (e: any) { + if (signal?.aborted || e?.name === 'AbortError') { + // 静默:本地已 abort + return; + } + h.onError?.(e?.message ?? String(e)); + } finally { + // 确保 reader 被释放 + reader.cancel().catch(() => {}); + } } // ============== Workflow (v1.1) ==============