diff --git a/src/handlers/ollama-handler.js b/src/handlers/ollama-handler.js index 2a1abfb..4744280 100644 --- a/src/handlers/ollama-handler.js +++ b/src/handlers/ollama-handler.js @@ -576,29 +576,64 @@ export async function handleOllamaChat(req, res, apiService, currentConfig, prov // Handle streaming if (ollamaRequest.stream) { - res.writeHead(200, { - 'Content-Type': 'application/json', - 'Transfer-Encoding': 'chunked', - 'Access-Control-Allow-Origin': '*', - 'Server': `ollama/${OLLAMA_VERSION}` - }); - - const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest); - - for await (const chunk of stream) { - try { - // Convert backend chunk to Ollama format - const ollamaChunk = ollamaConverter.convertStreamChunk(chunk, sourceProtocol, ollamaRequest.model, false); - res.write(JSON.stringify(ollamaChunk) + '\n'); - } catch (chunkError) { - logger.error('[Ollama] Error processing chunk:', chunkError); + let clientDisconnected = false; + let listenersRegistered = false; + + // 监听客户端断开连接(只注册一次) + const onClientClose = () => { + clientDisconnected = true; + logger.info('[Ollama] Client disconnected during streaming'); + }; + const onClientError = (err) => { + clientDisconnected = true; + logger.error('[Ollama] Response stream error:', err.message); + }; + + if (!listenersRegistered) { + res.on('close', onClientClose); + res.on('error', onClientError); + listenersRegistered = true; + } + + try { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'Transfer-Encoding': 'chunked', + 'Access-Control-Allow-Origin': '*', + 'Server': `ollama/${OLLAMA_VERSION}` + }); + + const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest); + + for await (const chunk of stream) { + if (clientDisconnected) { + logger.info('[Ollama] Stopping stream due to client disconnect'); + break; + } + + try { + // Convert backend chunk to Ollama format + const ollamaChunk = ollamaConverter.convertStreamChunk(chunk, sourceProtocol, ollamaRequest.model, false); + if (!res.writableEnded) { + res.write(JSON.stringify(ollamaChunk) + '\n'); + } + } catch (chunkError) { + logger.error('[Ollama] Error processing chunk:', chunkError); + } + } + + // Send final chunk + if (!clientDisconnected && !res.writableEnded) { + const finalChunk = ollamaConverter.convertStreamChunk({}, sourceProtocol, ollamaRequest.model, true); + res.write(JSON.stringify(finalChunk) + '\n'); + res.end(); + } + } finally { + if (listenersRegistered) { + res.off('close', onClientClose); + res.off('error', onClientError); } } - - // Send final chunk - const finalChunk = ollamaConverter.convertStreamChunk({}, sourceProtocol, ollamaRequest.model, true); - res.write(JSON.stringify(finalChunk) + '\n'); - res.end(); } else { // Non-streaming response const backendResponse = await actualApiService.generateContent(openaiRequest.model, backendRequest); @@ -683,29 +718,64 @@ export async function handleOllamaGenerate(req, res, apiService, currentConfig, // Handle streaming if (ollamaRequest.stream) { - res.writeHead(200, { - 'Content-Type': 'application/json', - 'Transfer-Encoding': 'chunked', - 'Access-Control-Allow-Origin': '*', - 'Server': `ollama/${OLLAMA_VERSION}` - }); - - const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest); - - for await (const chunk of stream) { - try { - // Convert backend chunk to Ollama generate format - const ollamaChunk = ollamaConverter.toOllamaGenerateStreamChunk(chunk, ollamaRequest.model, false); - res.write(JSON.stringify(ollamaChunk) + '\n'); - } catch (chunkError) { - logger.error('[Ollama] Error processing chunk:', chunkError); + let clientDisconnected = false; + let listenersRegistered = false; + + // 监听客户端断开连接(只注册一次) + const onClientClose = () => { + clientDisconnected = true; + logger.info('[Ollama Generate] Client disconnected during streaming'); + }; + const onClientError = (err) => { + clientDisconnected = true; + logger.error('[Ollama Generate] Response stream error:', err.message); + }; + + if (!listenersRegistered) { + res.on('close', onClientClose); + res.on('error', onClientError); + listenersRegistered = true; + } + + try { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'Transfer-Encoding': 'chunked', + 'Access-Control-Allow-Origin': '*', + 'Server': `ollama/${OLLAMA_VERSION}` + }); + + const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest); + + for await (const chunk of stream) { + if (clientDisconnected) { + logger.info('[Ollama Generate] Stopping stream due to client disconnect'); + break; + } + + try { + // Convert backend chunk to Ollama generate format + const ollamaChunk = ollamaConverter.toOllamaGenerateStreamChunk(chunk, ollamaRequest.model, false); + if (!res.writableEnded) { + res.write(JSON.stringify(ollamaChunk) + '\n'); + } + } catch (chunkError) { + logger.error('[Ollama] Error processing chunk:', chunkError); + } + } + + // Send final chunk + if (!clientDisconnected && !res.writableEnded) { + const finalChunk = ollamaConverter.toOllamaGenerateStreamChunk({}, ollamaRequest.model, true); + res.write(JSON.stringify(finalChunk) + '\n'); + res.end(); + } + } finally { + if (listenersRegistered) { + res.off('close', onClientClose); + res.off('error', onClientError); } } - - // Send final chunk - const finalChunk = ollamaConverter.toOllamaGenerateStreamChunk({}, ollamaRequest.model, true); - res.write(JSON.stringify(finalChunk) + '\n'); - res.end(); } else { // Non-streaming response const backendResponse = await actualApiService.generateContent(openaiRequest.model, backendRequest); diff --git a/src/plugins/api-potluck/middleware.js b/src/plugins/api-potluck/middleware.js index 0101c5d..c983b3b 100644 --- a/src/plugins/api-potluck/middleware.js +++ b/src/plugins/api-potluck/middleware.js @@ -147,6 +147,19 @@ export function sendPotluckError(res, error) { }; } - res.writeHead(error.statusCode, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify(response)); + // 检查响应流是否已关闭 + if (res.writableEnded || res.destroyed) { + logger.warn('[API Potluck] Response already ended, skipping error response'); + return; + } + + if (!res.headersSent) { + res.writeHead(error.statusCode, { 'Content-Type': 'application/json' }); + } + + try { + res.end(JSON.stringify(response)); + } catch (writeError) { + logger.error('[API Potluck] Failed to write error response:', writeError.message); + } } diff --git a/src/ui-modules/event-broadcast.js b/src/ui-modules/event-broadcast.js index dd1420f..af54ad4 100644 --- a/src/ui-modules/event-broadcast.js +++ b/src/ui-modules/event-broadcast.js @@ -36,7 +36,12 @@ export async function handleEvents(req, res) { 'Access-Control-Allow-Origin': '*' }); - res.write('\n'); + try { + res.write('\n'); + } catch (err) { + logger.error('[Event Broadcast] Failed to write initial data:', err.message); + return true; + } // Store the response object for broadcasting if (!global.eventClients) { @@ -46,7 +51,18 @@ export async function handleEvents(req, res) { // Keep connection alive const keepAlive = setInterval(() => { - res.write(':\n\n'); + if (!res.writableEnded && !res.destroyed) { + try { + res.write(':\n\n'); + } catch (err) { + logger.error('[Event Broadcast] Failed to write keepalive:', err.message); + clearInterval(keepAlive); + global.eventClients = global.eventClients.filter(r => r !== res); + } + } else { + clearInterval(keepAlive); + global.eventClients = global.eventClients.filter(r => r !== res); + } }, 30000); req.on('close', () => { diff --git a/src/ui-modules/oauth-api.js b/src/ui-modules/oauth-api.js index 8485016..7a6bf67 100644 --- a/src/ui-modules/oauth-api.js +++ b/src/ui-modules/oauth-api.js @@ -205,10 +205,18 @@ export async function handleBatchImportKiroTokens(req, res) { 'X-Accel-Buffering': 'no' }); - // 发送 SSE 事件的辅助函数 + // 发送 SSE 事件的辅助函数(带错误处理) const sendSSE = (event, data) => { - res.write(`event: ${event}\n`); - res.write(`data: ${JSON.stringify(data)}\n\n`); + if (!res.writableEnded && !res.destroyed) { + try { + res.write(`event: ${event}\n`); + res.write(`data: ${JSON.stringify(data)}\n\n`); + } catch (err) { + logger.error('[Kiro Batch Import] Failed to write SSE:', err.message); + return false; + } + } + return true; }; // 发送开始事件 @@ -241,11 +249,15 @@ export async function handleBatchImportKiroTokens(req, res) { } catch (error) { logger.error('[Kiro Batch Import] Error:', error); // 如果已经开始发送 SSE,则发送错误事件 - if (res.headersSent) { - res.write(`event: error\n`); - res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`); - res.end(); - } else { + if (res.headersSent && !res.writableEnded && !res.destroyed) { + try { + res.write(`event: error\n`); + res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`); + res.end(); + } catch (writeErr) { + logger.error('[Kiro Batch Import] Failed to write error:', writeErr.message); + } + } else if (!res.headersSent) { res.writeHead(500, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ success: false, diff --git a/src/utils/common.js b/src/utils/common.js index ccbde75..863ad16 100644 --- a/src/utils/common.js +++ b/src/utils/common.js @@ -285,6 +285,29 @@ export async function handleStreamRequest(res, service, model, requestBody, from const currentRetry = retryContext?.currentRetry ?? 0; const CONFIG = retryContext?.CONFIG; const isRetry = currentRetry > 0; + + // 使用共享的 clientDisconnected 状态(如果是重试,继承上层的状态) + let clientDisconnected = retryContext?.clientDisconnected || { value: false }; + if (!isRetry) { + clientDisconnected = { value: false }; // 使用对象引用,便于在递归中共享状态 + } + + // 监听客户端断开连接事件(命名函数,便于移除) + const onClientClose = () => { + clientDisconnected.value = true; + logger.info('[Stream] Client disconnected, stopping stream processing'); + }; + + const onClientError = (err) => { + clientDisconnected.value = true; + logger.error('[Stream] Response stream error:', err.message); + }; + + // 只在首次请求时注册事件监听器(避免重试时重复注册) + if (!isRetry) { + res.on('close', onClientClose); + res.on('error', onClientError); + } // 只在首次请求时发送响应头,重试时跳过(响应头已发送) if (!isRetry) { @@ -302,6 +325,12 @@ export async function handleStreamRequest(res, service, model, requestBody, from try { for await (const nativeChunk of nativeStream) { + // 检查客户端是否已断开连接 + if (clientDisconnected.value) { + logger.info('[Stream] Stopping iteration due to client disconnect'); + break; + } + // Extract text for logging purposes const chunkText = extractResponseText(nativeChunk, toProvider); if (chunkText && !Array.isArray(chunkText)) { @@ -336,6 +365,11 @@ export async function handleStreamRequest(res, service, model, requestBody, from const chunksToSend = Array.isArray(chunkToSend) ? chunkToSend : [chunkToSend]; for (const chunk of chunksToSend) { + // 再次检查客户端连接状态 + if (clientDisconnected.value) { + break; + } + // [FIX] 跟踪工具调用并在结束时修正 finish_reason // OpenAI 格式 if (chunk.choices?.[0]?.delta?.tool_calls || chunk.choices?.[0]?.finish_reason === 'tool_calls') { @@ -368,13 +402,29 @@ export async function handleStreamRequest(res, service, model, requestBody, from if (addEvent) { // fullOldResponseJson += chunk.type+"\n"; // fullResponseJson += chunk.type+"\n"; - res.write(`event: ${chunk.type}\n`); + if (!clientDisconnected.value && !res.writableEnded) { + try { + res.write(`event: ${chunk.type}\n`); + } catch (writeErr) { + logger.error('[Stream] Failed to write event:', writeErr.message); + clientDisconnected.value = true; + break; + } + } // logger.info(`event: ${chunk.type}\n`); } // fullOldResponseJson += JSON.stringify(chunk)+"\n"; // fullResponseJson += JSON.stringify(chunk)+"\n\n"; - res.write(`data: ${JSON.stringify(chunk)}\n\n`); + if (!clientDisconnected.value && !res.writableEnded) { + try { + res.write(`data: ${JSON.stringify(chunk)}\n\n`); + } catch (writeErr) { + logger.error('[Stream] Failed to write data:', writeErr.message); + clientDisconnected.value = true; + break; + } + } // logger.info(`data: ${JSON.stringify(chunk)}\n`); } } @@ -391,13 +441,26 @@ export async function handleStreamRequest(res, service, model, requestBody, from } catch (error) { logger.error('\n[Server] Error during stream processing:', error.stack); + // 如果客户端已断开,不需要发送错误响应 + if (clientDisconnected.value) { + logger.info('[Stream] Skipping error response due to client disconnect'); + responseClosed = true; + return; + } + // 如果已经发送了内容,不进行重试(避免响应数据损坏) if (fullResponseText.length > 0) { logger.info(`[Stream Retry] Cannot retry: ${fullResponseText.length} bytes already sent to client`); // 直接发送错误并结束 const errorPayload = createStreamErrorResponse(error, fromProvider); - res.write(errorPayload); - res.end(); + if (!res.writableEnded) { + try { + res.write(errorPayload); + res.end(); + } catch (writeErr) { + logger.error('[Stream] Failed to write error response:', writeErr.message); + } + } responseClosed = true; return; } @@ -458,7 +521,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from ...retryContext, CONFIG, currentRetry: currentRetry + 1, - maxRetries + maxRetries, + clientDisconnected // 传递断开状态 }; // 递归调用,使用新的服务 @@ -486,25 +550,49 @@ export async function handleStreamRequest(res, service, model, requestBody, from // 使用新方法创建符合 fromProvider 格式的流式错误响应 const errorPayload = createStreamErrorResponse(error, fromProvider); - res.write(errorPayload); - res.end(); + if (!clientDisconnected.value && !res.writableEnded) { + try { + res.write(errorPayload); + res.end(); + } catch (writeErr) { + logger.error('[Stream] Failed to write error response:', writeErr.message); + } + } responseClosed = true; } finally { - if (!responseClosed) { + // 只在首次请求时移除事件监听器(避免重试时误删) + if (!isRetry) { + res.off('close', onClientClose); + res.off('error', onClientError); + } + + // 只在非重试或重试失败时才发送结束标记 + // 如果是重试成功,递归调用会处理结束标记 + if (!responseClosed && !clientDisconnected.value && !isRetry) { // 根据客户端协议发送相应的流式结束标记 const clientProtocol = getProtocolPrefix(fromProvider); - if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI) { - res.write('data: [DONE]\n\n'); - } else if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES) { - res.write('event: done\n'); - res.write('data: {}\n\n'); - } else if (clientProtocol === MODEL_PROTOCOL_PREFIX.CLAUDE) { - res.write('event: message_stop\n'); - res.write('data: {"type":"message_stop"}\n\n'); + if (!res.writableEnded) { + try { + if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI) { + res.write('data: [DONE]\n\n'); + } else if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES) { + res.write('event: done\n'); + res.write('data: {}\n\n'); + } else if (clientProtocol === MODEL_PROTOCOL_PREFIX.CLAUDE) { + res.write('event: message_stop\n'); + res.write('data: {"type":"message_stop"}\n\n'); + } + res.end(); + } catch (writeErr) { + logger.error('[Stream] Failed to write completion marker:', writeErr.message); + } } - res.end(); } - await logConversation('output', fullResponseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME); + + // 只在首次请求时记录日志(避免重试时重复记录) + if (!isRetry) { + await logConversation('output', fullResponseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME); + } // fs.writeFile('oldResponseChunk'+Date.now()+'.json', fullOldResponseJson); // fs.writeFile('responseChunk'+Date.now()+'.json', fullResponseJson); } @@ -916,6 +1004,12 @@ export function handleError(res, error, provider = null) { } logger.error('[Server] Full error details:', error.stack); + // 检查响应流是否已关闭或结束 + if (res.writableEnded || res.destroyed) { + logger.warn('[Server] Response already ended or destroyed, skipping error response'); + return; + } + if (!res.headersSent) { res.writeHead(statusCode, { 'Content-Type': 'application/json' }); } @@ -928,7 +1022,12 @@ export function handleError(res, error, provider = null) { details: error.response?.data } }; - res.end(JSON.stringify(errorPayload)); + + try { + res.end(JSON.stringify(errorPayload)); + } catch (writeError) { + logger.error('[Server] Failed to write error response:', writeError.message); + } } /** diff --git a/src/utils/logger.js b/src/utils/logger.js index 4f2c3fe..ead4b29 100644 --- a/src/utils/logger.js +++ b/src/utils/logger.js @@ -239,10 +239,15 @@ class Logger { // 输出到文件 if (this.config.outputMode === 'file' || this.config.outputMode === 'all') { - if (this.logStream && !this.logStream.destroyed) { - // 检查文件大小并轮转 - this.checkAndRotateLogFile(); - this.logStream.write(message + '\n'); + if (this.logStream && !this.logStream.destroyed && this.logStream.writable) { + try { + // 检查文件大小并轮转 + this.checkAndRotateLogFile(); + this.logStream.write(message + '\n'); + } catch (err) { + // 如果写入失败,输出到控制台作为备份 + console.error('[Logger] Failed to write to log file:', err.message); + } } } }