From 042d9c2004d4a4059415c80062f6c0ab3b808a46 Mon Sep 17 00:00:00 2001 From: Cishoon Date: Sun, 8 Mar 2026 14:31:31 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20Codex=E2=86=92Claud?= =?UTF-8?q?e=20=E6=B5=81=E5=BC=8F=E8=BD=AC=E6=8D=A2=E4=B8=AD=20content=5Fb?= =?UTF-8?q?lock=5Fstart=20=E8=A2=AB=E8=B7=B3=E8=BF=87=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodexConverter 是单例,streamParams 和 lastClaudeStreamResponseId 在并发请求间共享。 当多个 Codex→Claude 流同时活跃时,delta 事件(不携带 response.id)会通过共享的 lastClaudeStreamResponseId 解析到错误的流状态,导致 content_block_start 被跳过。 修复:利用 Codex SSE 事件中的 item_id,在 response.output_item.added 时建立 item_id→resId 映射,使 delta 事件能精确关联到自己所属的 response state。 --- src/converters/strategies/CodexConverter.js | 31 +++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/converters/strategies/CodexConverter.js b/src/converters/strategies/CodexConverter.js index 5d341e3..67f224c 100644 --- a/src/converters/strategies/CodexConverter.js +++ b/src/converters/strategies/CodexConverter.js @@ -1127,10 +1127,19 @@ export class CodexConverter extends BaseConverter { toClaudeStreamChunk(chunk, model) { const type = chunk.type; - // Codex 的多数增量事件不带 response.id,需要将其归并到最近活跃的流状态。 + // 初始化 item_id → resId 映射表(用于并发流隔离) + if (!this._claudeItemToResId) { + this._claudeItemToResId = new Map(); + } + + // Codex 的多数增量事件不带 response.id,需要通过 item_id 映射或兜底逻辑归并到正确的流状态。 let resId = chunk.response?.id; if (!resId) { - if (this.lastClaudeStreamResponseId && this.streamParams.has(this.lastClaudeStreamResponseId)) { + // 优先通过 item_id 精确匹配到对应的 response(并发安全) + const itemId = chunk.item_id || chunk.item?.id; + if (itemId && this._claudeItemToResId.has(itemId)) { + resId = this._claudeItemToResId.get(itemId); + } else if (this.lastClaudeStreamResponseId && this.streamParams.has(this.lastClaudeStreamResponseId)) { resId = this.lastClaudeStreamResponseId; } else if (this.streamParams.size === 1) { resId = this.streamParams.keys().next().value; @@ -1163,6 +1172,16 @@ export class CodexConverter extends BaseConverter { const state = this.streamParams.get(resId); state.lastUpdatedAt = Date.now(); + // 捕获 response.output_item.added 事件中的 item_id → resId 映射, + // 使后续 delta 事件能通过 item_id 精确关联到正确的流(并发安全)。 + if (type === 'response.output_item.added') { + const itemId = chunk.item?.id; + if (itemId && resId) { + this._claudeItemToResId.set(itemId, resId); + } + return null; // 此事件不产生 Claude 输出 + } + if (type === 'response.created') { state.responseID = chunk.response.id; this.lastClaudeStreamResponseId = state.responseID; @@ -1285,6 +1304,14 @@ export class CodexConverter extends BaseConverter { }, { type: "message_stop" } ); + // 清理 item_id → resId 映射,避免内存泄漏 + if (this._claudeItemToResId) { + for (const [itemId, mappedResId] of this._claudeItemToResId.entries()) { + if (mappedResId === resId) { + this._claudeItemToResId.delete(itemId); + } + } + } this.streamParams.delete(resId); if (this.lastClaudeStreamResponseId === resId) { this.lastClaudeStreamResponseId = null;