From 98caecb4634f0f36edad8d7b91670530d3b360d5 Mon Sep 17 00:00:00 2001 From: Cishoon Date: Mon, 9 Mar 2026 15:49:36 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=80=9A=E8=BF=87=20per-request=20reque?= =?UTF-8?q?stId=20=E5=BD=BB=E5=BA=95=E8=A7=A3=E5=86=B3=20Codex=E2=86=92Cla?= =?UTF-8?q?ude=20=E6=B5=81=E5=BC=8F=E8=BD=AC=E6=8D=A2=E7=9A=84=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E4=B8=B2=E6=B5=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 原有 item_id→resId 映射方案中,response.output_item.added 事件不携带 response.id,建立映射时仍依赖共享的 lastClaudeStreamResponseId,并发场景下 映射关系可能从一开始就是错的。 修复:在 handleStreamRequest 中为每个请求生成唯一 requestId,通过调用链传入 toClaudeStreamChunk 作为 streamParams 的隔离 key,使并发流状态完全独立, 不再依赖任何共享状态做并发关联。 --- src/convert/convert.js | 4 +- src/converters/strategies/CodexConverter.js | 107 ++++++++------------ src/utils/common.js | 4 +- 3 files changed, 45 insertions(+), 70 deletions(-) diff --git a/src/convert/convert.js b/src/convert/convert.js index e846d16..bb74633 100644 --- a/src/convert/convert.js +++ b/src/convert/convert.js @@ -39,7 +39,7 @@ import { * @returns {object} 转换后的数据 * @throws {Error} 如果找不到合适的转换函数 */ -export function convertData(data, type, fromProvider, toProvider, model) { +export function convertData(data, type, fromProvider, toProvider, model, requestId) { try { // 获取协议前缀 const fromProtocol = getProtocolPrefix(fromProvider); @@ -67,7 +67,7 @@ export function convertData(data, type, fromProvider, toProvider, model) { return converter.convertResponse(data, toProtocol, model); case 'streamChunk': - return converter.convertStreamChunk(data, toProtocol, model); + return converter.convertStreamChunk(data, toProtocol, model, requestId); case 'modelList': return converter.convertModelList(data, toProtocol); diff --git a/src/converters/strategies/CodexConverter.js b/src/converters/strategies/CodexConverter.js index 67f224c..395a504 100644 --- a/src/converters/strategies/CodexConverter.js +++ b/src/converters/strategies/CodexConverter.js @@ -55,7 +55,7 @@ export class CodexConverter extends BaseConverter { /** * 转换流式响应块 */ - convertStreamChunk(chunk, targetProtocol, model) { + convertStreamChunk(chunk, targetProtocol, model, requestId) { switch (targetProtocol) { case MODEL_PROTOCOL_PREFIX.OPENAI: return this.toOpenAIStreamChunk(chunk, model); @@ -64,7 +64,7 @@ export class CodexConverter extends BaseConverter { case MODEL_PROTOCOL_PREFIX.GEMINI: return this.toGeminiStreamChunk(chunk, model); case MODEL_PROTOCOL_PREFIX.CLAUDE: - return this.toClaudeStreamChunk(chunk, model); + return this.toClaudeStreamChunk(chunk, model, requestId); case MODEL_PROTOCOL_PREFIX.CODEX: return chunk; // Codex to Codex default: @@ -1124,67 +1124,26 @@ export class CodexConverter extends BaseConverter { /** * Codex → Claude 流式响应转换 */ - toClaudeStreamChunk(chunk, model) { + toClaudeStreamChunk(chunk, model, requestId) { const type = chunk.type; - // 初始化 item_id → resId 映射表(用于并发流隔离) - if (!this._claudeItemToResId) { - this._claudeItemToResId = new Map(); - } + // 使用 requestId 作为流状态的隔离 key(并发安全)。 + // 每个请求在 handleStreamRequest 中生成唯一 requestId, + // 确保同一单例 converter 上的并发流状态完全独立。 + const stateKey = requestId || chunk.response?.id || 'default'; - // Codex 的多数增量事件不带 response.id,需要通过 item_id 映射或兜底逻辑归并到正确的流状态。 - let resId = chunk.response?.id; - if (!resId) { - // 优先通过 item_id 精确匹配到对应的 response(并发安全) - const itemId = chunk.item_id || chunk.item?.id; - if (itemId && this._claudeItemToResId.has(itemId)) { - resId = this._claudeItemToResId.get(itemId); - } else if (this.lastClaudeStreamResponseId && this.streamParams.has(this.lastClaudeStreamResponseId)) { - resId = this.lastClaudeStreamResponseId; - } else if (this.streamParams.size === 1) { - resId = this.streamParams.keys().next().value; - } else { - // 兜底:选择最近更新的流,避免落到固定 "default" key 导致串流状态污染。 - let latestKey = null; - let latestUpdatedAt = -1; - for (const [key, streamState] of this.streamParams.entries()) { - const updatedAt = streamState?.lastUpdatedAt || 0; - if (updatedAt > latestUpdatedAt) { - latestUpdatedAt = updatedAt; - latestKey = key; - } - } - resId = latestKey || 'default'; - } - } - - if (!this.streamParams.has(resId)) { - this.streamParams.set(resId, { + // response.created 携带 response.id,用它来初始化该请求的流状态 + if (type === 'response.created') { + const resId = chunk.response.id; + this.streamParams.set(stateKey, { model: model, createdAt: Math.floor(Date.now() / 1000), responseID: resId, blockIndex: 0, - blockStarted: false, // track whether content_block_start has been sent for current block - currentBlockType: null, // 'thinking' or 'text' - lastUpdatedAt: Date.now() + blockStarted: false, + currentBlockType: null, }); - } - const state = this.streamParams.get(resId); - state.lastUpdatedAt = Date.now(); - - // 捕获 response.output_item.added 事件中的 item_id → resId 映射, - // 使后续 delta 事件能通过 item_id 精确关联到正确的流(并发安全)。 - if (type === 'response.output_item.added') { - const itemId = chunk.item?.id; - if (itemId && resId) { - this._claudeItemToResId.set(itemId, resId); - } - return null; // 此事件不产生 Claude 输出 - } - - if (type === 'response.created') { - state.responseID = chunk.response.id; - this.lastClaudeStreamResponseId = state.responseID; + const state = this.streamParams.get(stateKey); return { type: "message_start", message: { @@ -1198,6 +1157,30 @@ export class CodexConverter extends BaseConverter { }; } + if (!this.streamParams.has(stateKey)) { + // 如果还没有状态(比如没有收到 response.created 就收到了其他事件), + // 用 chunk 中能拿到的信息初始化 + this.streamParams.set(stateKey, { + model: model, + createdAt: Math.floor(Date.now() / 1000), + responseID: chunk.response?.id || stateKey, + blockIndex: 0, + blockStarted: false, + currentBlockType: null, + }); + } + const state = this.streamParams.get(stateKey); + + // response.output_item.added 不产生 Claude 输出 + if (type === 'response.output_item.added') { + return null; + } + + if (type === 'response.created') { + // 已在上方处理,不应到达此处 + return null; + } + if (type === 'response.reasoning_summary_text.delta') { const events = []; // If switching from a different block type, close the previous block first @@ -1304,18 +1287,8 @@ export class CodexConverter extends BaseConverter { }, { type: "message_stop" } ); - // 清理 item_id → resId 映射,避免内存泄漏 - if (this._claudeItemToResId) { - for (const [itemId, mappedResId] of this._claudeItemToResId.entries()) { - if (mappedResId === resId) { - this._claudeItemToResId.delete(itemId); - } - } - } - this.streamParams.delete(resId); - if (this.lastClaudeStreamResponseId === resId) { - this.lastClaudeStreamResponseId = null; - } + // 清理该请求的流状态 + this.streamParams.delete(stateKey); return events; } diff --git a/src/utils/common.js b/src/utils/common.js index 745bc6b..c031ea4 100644 --- a/src/utils/common.js +++ b/src/utils/common.js @@ -344,6 +344,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from requestBody.model = model; const nativeStream = await service.generateContentStream(model, requestBody); const addEvent = getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.CLAUDE || getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES; + // 为每个请求生成唯一 ID,用于在单例 converter 中隔离并发流状态 + const streamRequestId = `req_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`; for await (const nativeChunk of nativeStream) { // 检查客户端是否已断开连接 @@ -360,7 +362,7 @@ export async function handleStreamRequest(res, service, model, requestBody, from // Convert the complete chunk object to the client's format (fromProvider), if necessary. const chunkToSend = needsConversion - ? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model) + ? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model, streamRequestId) : nativeChunk; // 监控钩子:流式响应分块