Merge pull request #306 from majie776/main

fix(stream):修复流被客户端关闭的时候的写入错误导致服务crash的问题
This commit is contained in:
何夕2077 2026-02-05 20:31:27 +08:00 committed by GitHub
commit cbab1c3b5e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 292 additions and 77 deletions

View file

@ -576,29 +576,64 @@ export async function handleOllamaChat(req, res, apiService, currentConfig, prov
// Handle streaming
if (ollamaRequest.stream) {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
'Access-Control-Allow-Origin': '*',
'Server': `ollama/${OLLAMA_VERSION}`
});
const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest);
for await (const chunk of stream) {
try {
// Convert backend chunk to Ollama format
const ollamaChunk = ollamaConverter.convertStreamChunk(chunk, sourceProtocol, ollamaRequest.model, false);
res.write(JSON.stringify(ollamaChunk) + '\n');
} catch (chunkError) {
logger.error('[Ollama] Error processing chunk:', chunkError);
let clientDisconnected = false;
let listenersRegistered = false;
// 监听客户端断开连接(只注册一次)
const onClientClose = () => {
clientDisconnected = true;
logger.info('[Ollama] Client disconnected during streaming');
};
const onClientError = (err) => {
clientDisconnected = true;
logger.error('[Ollama] Response stream error:', err.message);
};
if (!listenersRegistered) {
res.on('close', onClientClose);
res.on('error', onClientError);
listenersRegistered = true;
}
try {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
'Access-Control-Allow-Origin': '*',
'Server': `ollama/${OLLAMA_VERSION}`
});
const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest);
for await (const chunk of stream) {
if (clientDisconnected) {
logger.info('[Ollama] Stopping stream due to client disconnect');
break;
}
try {
// Convert backend chunk to Ollama format
const ollamaChunk = ollamaConverter.convertStreamChunk(chunk, sourceProtocol, ollamaRequest.model, false);
if (!res.writableEnded) {
res.write(JSON.stringify(ollamaChunk) + '\n');
}
} catch (chunkError) {
logger.error('[Ollama] Error processing chunk:', chunkError);
}
}
// Send final chunk
if (!clientDisconnected && !res.writableEnded) {
const finalChunk = ollamaConverter.convertStreamChunk({}, sourceProtocol, ollamaRequest.model, true);
res.write(JSON.stringify(finalChunk) + '\n');
res.end();
}
} finally {
if (listenersRegistered) {
res.off('close', onClientClose);
res.off('error', onClientError);
}
}
// Send final chunk
const finalChunk = ollamaConverter.convertStreamChunk({}, sourceProtocol, ollamaRequest.model, true);
res.write(JSON.stringify(finalChunk) + '\n');
res.end();
} else {
// Non-streaming response
const backendResponse = await actualApiService.generateContent(openaiRequest.model, backendRequest);
@ -683,29 +718,64 @@ export async function handleOllamaGenerate(req, res, apiService, currentConfig,
// Handle streaming
if (ollamaRequest.stream) {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
'Access-Control-Allow-Origin': '*',
'Server': `ollama/${OLLAMA_VERSION}`
});
const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest);
for await (const chunk of stream) {
try {
// Convert backend chunk to Ollama generate format
const ollamaChunk = ollamaConverter.toOllamaGenerateStreamChunk(chunk, ollamaRequest.model, false);
res.write(JSON.stringify(ollamaChunk) + '\n');
} catch (chunkError) {
logger.error('[Ollama] Error processing chunk:', chunkError);
let clientDisconnected = false;
let listenersRegistered = false;
// 监听客户端断开连接(只注册一次)
const onClientClose = () => {
clientDisconnected = true;
logger.info('[Ollama Generate] Client disconnected during streaming');
};
const onClientError = (err) => {
clientDisconnected = true;
logger.error('[Ollama Generate] Response stream error:', err.message);
};
if (!listenersRegistered) {
res.on('close', onClientClose);
res.on('error', onClientError);
listenersRegistered = true;
}
try {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
'Access-Control-Allow-Origin': '*',
'Server': `ollama/${OLLAMA_VERSION}`
});
const stream = await actualApiService.generateContentStream(openaiRequest.model, backendRequest);
for await (const chunk of stream) {
if (clientDisconnected) {
logger.info('[Ollama Generate] Stopping stream due to client disconnect');
break;
}
try {
// Convert backend chunk to Ollama generate format
const ollamaChunk = ollamaConverter.toOllamaGenerateStreamChunk(chunk, ollamaRequest.model, false);
if (!res.writableEnded) {
res.write(JSON.stringify(ollamaChunk) + '\n');
}
} catch (chunkError) {
logger.error('[Ollama] Error processing chunk:', chunkError);
}
}
// Send final chunk
if (!clientDisconnected && !res.writableEnded) {
const finalChunk = ollamaConverter.toOllamaGenerateStreamChunk({}, ollamaRequest.model, true);
res.write(JSON.stringify(finalChunk) + '\n');
res.end();
}
} finally {
if (listenersRegistered) {
res.off('close', onClientClose);
res.off('error', onClientError);
}
}
// Send final chunk
const finalChunk = ollamaConverter.toOllamaGenerateStreamChunk({}, ollamaRequest.model, true);
res.write(JSON.stringify(finalChunk) + '\n');
res.end();
} else {
// Non-streaming response
const backendResponse = await actualApiService.generateContent(openaiRequest.model, backendRequest);

View file

@ -147,6 +147,19 @@ export function sendPotluckError(res, error) {
};
}
res.writeHead(error.statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(response));
// 检查响应流是否已关闭
if (res.writableEnded || res.destroyed) {
logger.warn('[API Potluck] Response already ended, skipping error response');
return;
}
if (!res.headersSent) {
res.writeHead(error.statusCode, { 'Content-Type': 'application/json' });
}
try {
res.end(JSON.stringify(response));
} catch (writeError) {
logger.error('[API Potluck] Failed to write error response:', writeError.message);
}
}

View file

@ -36,7 +36,12 @@ export async function handleEvents(req, res) {
'Access-Control-Allow-Origin': '*'
});
res.write('\n');
try {
res.write('\n');
} catch (err) {
logger.error('[Event Broadcast] Failed to write initial data:', err.message);
return true;
}
// Store the response object for broadcasting
if (!global.eventClients) {
@ -46,7 +51,18 @@ export async function handleEvents(req, res) {
// Keep connection alive
const keepAlive = setInterval(() => {
res.write(':\n\n');
if (!res.writableEnded && !res.destroyed) {
try {
res.write(':\n\n');
} catch (err) {
logger.error('[Event Broadcast] Failed to write keepalive:', err.message);
clearInterval(keepAlive);
global.eventClients = global.eventClients.filter(r => r !== res);
}
} else {
clearInterval(keepAlive);
global.eventClients = global.eventClients.filter(r => r !== res);
}
}, 30000);
req.on('close', () => {

View file

@ -205,10 +205,18 @@ export async function handleBatchImportKiroTokens(req, res) {
'X-Accel-Buffering': 'no'
});
// 发送 SSE 事件的辅助函数
// 发送 SSE 事件的辅助函数(带错误处理)
const sendSSE = (event, data) => {
res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
if (!res.writableEnded && !res.destroyed) {
try {
res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
} catch (err) {
logger.error('[Kiro Batch Import] Failed to write SSE:', err.message);
return false;
}
}
return true;
};
// 发送开始事件
@ -241,11 +249,15 @@ export async function handleBatchImportKiroTokens(req, res) {
} catch (error) {
logger.error('[Kiro Batch Import] Error:', error);
// 如果已经开始发送 SSE则发送错误事件
if (res.headersSent) {
res.write(`event: error\n`);
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
} else {
if (res.headersSent && !res.writableEnded && !res.destroyed) {
try {
res.write(`event: error\n`);
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
} catch (writeErr) {
logger.error('[Kiro Batch Import] Failed to write error:', writeErr.message);
}
} else if (!res.headersSent) {
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
success: false,

View file

@ -285,6 +285,29 @@ export async function handleStreamRequest(res, service, model, requestBody, from
const currentRetry = retryContext?.currentRetry ?? 0;
const CONFIG = retryContext?.CONFIG;
const isRetry = currentRetry > 0;
// 使用共享的 clientDisconnected 状态(如果是重试,继承上层的状态)
let clientDisconnected = retryContext?.clientDisconnected || { value: false };
if (!isRetry) {
clientDisconnected = { value: false }; // 使用对象引用,便于在递归中共享状态
}
// 监听客户端断开连接事件(命名函数,便于移除)
const onClientClose = () => {
clientDisconnected.value = true;
logger.info('[Stream] Client disconnected, stopping stream processing');
};
const onClientError = (err) => {
clientDisconnected.value = true;
logger.error('[Stream] Response stream error:', err.message);
};
// 只在首次请求时注册事件监听器(避免重试时重复注册)
if (!isRetry) {
res.on('close', onClientClose);
res.on('error', onClientError);
}
// 只在首次请求时发送响应头,重试时跳过(响应头已发送)
if (!isRetry) {
@ -302,6 +325,12 @@ export async function handleStreamRequest(res, service, model, requestBody, from
try {
for await (const nativeChunk of nativeStream) {
// 检查客户端是否已断开连接
if (clientDisconnected.value) {
logger.info('[Stream] Stopping iteration due to client disconnect');
break;
}
// Extract text for logging purposes
const chunkText = extractResponseText(nativeChunk, toProvider);
if (chunkText && !Array.isArray(chunkText)) {
@ -336,6 +365,11 @@ export async function handleStreamRequest(res, service, model, requestBody, from
const chunksToSend = Array.isArray(chunkToSend) ? chunkToSend : [chunkToSend];
for (const chunk of chunksToSend) {
// 再次检查客户端连接状态
if (clientDisconnected.value) {
break;
}
// [FIX] 跟踪工具调用并在结束时修正 finish_reason
// OpenAI 格式
if (chunk.choices?.[0]?.delta?.tool_calls || chunk.choices?.[0]?.finish_reason === 'tool_calls') {
@ -368,13 +402,29 @@ export async function handleStreamRequest(res, service, model, requestBody, from
if (addEvent) {
// fullOldResponseJson += chunk.type+"\n";
// fullResponseJson += chunk.type+"\n";
res.write(`event: ${chunk.type}\n`);
if (!clientDisconnected.value && !res.writableEnded) {
try {
res.write(`event: ${chunk.type}\n`);
} catch (writeErr) {
logger.error('[Stream] Failed to write event:', writeErr.message);
clientDisconnected.value = true;
break;
}
}
// logger.info(`event: ${chunk.type}\n`);
}
// fullOldResponseJson += JSON.stringify(chunk)+"\n";
// fullResponseJson += JSON.stringify(chunk)+"\n\n";
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
if (!clientDisconnected.value && !res.writableEnded) {
try {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
} catch (writeErr) {
logger.error('[Stream] Failed to write data:', writeErr.message);
clientDisconnected.value = true;
break;
}
}
// logger.info(`data: ${JSON.stringify(chunk)}\n`);
}
}
@ -391,13 +441,26 @@ export async function handleStreamRequest(res, service, model, requestBody, from
} catch (error) {
logger.error('\n[Server] Error during stream processing:', error.stack);
// 如果客户端已断开,不需要发送错误响应
if (clientDisconnected.value) {
logger.info('[Stream] Skipping error response due to client disconnect');
responseClosed = true;
return;
}
// 如果已经发送了内容,不进行重试(避免响应数据损坏)
if (fullResponseText.length > 0) {
logger.info(`[Stream Retry] Cannot retry: ${fullResponseText.length} bytes already sent to client`);
// 直接发送错误并结束
const errorPayload = createStreamErrorResponse(error, fromProvider);
res.write(errorPayload);
res.end();
if (!res.writableEnded) {
try {
res.write(errorPayload);
res.end();
} catch (writeErr) {
logger.error('[Stream] Failed to write error response:', writeErr.message);
}
}
responseClosed = true;
return;
}
@ -458,7 +521,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from
...retryContext,
CONFIG,
currentRetry: currentRetry + 1,
maxRetries
maxRetries,
clientDisconnected // 传递断开状态
};
// 递归调用,使用新的服务
@ -486,25 +550,49 @@ export async function handleStreamRequest(res, service, model, requestBody, from
// 使用新方法创建符合 fromProvider 格式的流式错误响应
const errorPayload = createStreamErrorResponse(error, fromProvider);
res.write(errorPayload);
res.end();
if (!clientDisconnected.value && !res.writableEnded) {
try {
res.write(errorPayload);
res.end();
} catch (writeErr) {
logger.error('[Stream] Failed to write error response:', writeErr.message);
}
}
responseClosed = true;
} finally {
if (!responseClosed) {
// 只在首次请求时移除事件监听器(避免重试时误删)
if (!isRetry) {
res.off('close', onClientClose);
res.off('error', onClientError);
}
// 只在非重试或重试失败时才发送结束标记
// 如果是重试成功,递归调用会处理结束标记
if (!responseClosed && !clientDisconnected.value && !isRetry) {
// 根据客户端协议发送相应的流式结束标记
const clientProtocol = getProtocolPrefix(fromProvider);
if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI) {
res.write('data: [DONE]\n\n');
} else if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES) {
res.write('event: done\n');
res.write('data: {}\n\n');
} else if (clientProtocol === MODEL_PROTOCOL_PREFIX.CLAUDE) {
res.write('event: message_stop\n');
res.write('data: {"type":"message_stop"}\n\n');
if (!res.writableEnded) {
try {
if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI) {
res.write('data: [DONE]\n\n');
} else if (clientProtocol === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES) {
res.write('event: done\n');
res.write('data: {}\n\n');
} else if (clientProtocol === MODEL_PROTOCOL_PREFIX.CLAUDE) {
res.write('event: message_stop\n');
res.write('data: {"type":"message_stop"}\n\n');
}
res.end();
} catch (writeErr) {
logger.error('[Stream] Failed to write completion marker:', writeErr.message);
}
}
res.end();
}
await logConversation('output', fullResponseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME);
// 只在首次请求时记录日志(避免重试时重复记录)
if (!isRetry) {
await logConversation('output', fullResponseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME);
}
// fs.writeFile('oldResponseChunk'+Date.now()+'.json', fullOldResponseJson);
// fs.writeFile('responseChunk'+Date.now()+'.json', fullResponseJson);
}
@ -916,6 +1004,12 @@ export function handleError(res, error, provider = null) {
}
logger.error('[Server] Full error details:', error.stack);
// 检查响应流是否已关闭或结束
if (res.writableEnded || res.destroyed) {
logger.warn('[Server] Response already ended or destroyed, skipping error response');
return;
}
if (!res.headersSent) {
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
}
@ -928,7 +1022,12 @@ export function handleError(res, error, provider = null) {
details: error.response?.data
}
};
res.end(JSON.stringify(errorPayload));
try {
res.end(JSON.stringify(errorPayload));
} catch (writeError) {
logger.error('[Server] Failed to write error response:', writeError.message);
}
}
/**

View file

@ -239,10 +239,15 @@ class Logger {
// 输出到文件
if (this.config.outputMode === 'file' || this.config.outputMode === 'all') {
if (this.logStream && !this.logStream.destroyed) {
// 检查文件大小并轮转
this.checkAndRotateLogFile();
this.logStream.write(message + '\n');
if (this.logStream && !this.logStream.destroyed && this.logStream.writable) {
try {
// 检查文件大小并轮转
this.checkAndRotateLogFile();
this.logStream.write(message + '\n');
} catch (err) {
// 如果写入失败,输出到控制台作为备份
console.error('[Logger] Failed to write to log file:', err.message);
}
}
}
}