diff --git a/src/convert/convert.js b/src/convert/convert.js index e846d16..bb74633 100644 --- a/src/convert/convert.js +++ b/src/convert/convert.js @@ -39,7 +39,7 @@ import { * @returns {object} 转换后的数据 * @throws {Error} 如果找不到合适的转换函数 */ -export function convertData(data, type, fromProvider, toProvider, model) { +export function convertData(data, type, fromProvider, toProvider, model, requestId) { try { // 获取协议前缀 const fromProtocol = getProtocolPrefix(fromProvider); @@ -67,7 +67,7 @@ export function convertData(data, type, fromProvider, toProvider, model) { return converter.convertResponse(data, toProtocol, model); case 'streamChunk': - return converter.convertStreamChunk(data, toProtocol, model); + return converter.convertStreamChunk(data, toProtocol, model, requestId); case 'modelList': return converter.convertModelList(data, toProtocol); diff --git a/src/converters/strategies/CodexConverter.js b/src/converters/strategies/CodexConverter.js index 67f224c..395a504 100644 --- a/src/converters/strategies/CodexConverter.js +++ b/src/converters/strategies/CodexConverter.js @@ -55,7 +55,7 @@ export class CodexConverter extends BaseConverter { /** * 转换流式响应块 */ - convertStreamChunk(chunk, targetProtocol, model) { + convertStreamChunk(chunk, targetProtocol, model, requestId) { switch (targetProtocol) { case MODEL_PROTOCOL_PREFIX.OPENAI: return this.toOpenAIStreamChunk(chunk, model); @@ -64,7 +64,7 @@ export class CodexConverter extends BaseConverter { case MODEL_PROTOCOL_PREFIX.GEMINI: return this.toGeminiStreamChunk(chunk, model); case MODEL_PROTOCOL_PREFIX.CLAUDE: - return this.toClaudeStreamChunk(chunk, model); + return this.toClaudeStreamChunk(chunk, model, requestId); case MODEL_PROTOCOL_PREFIX.CODEX: return chunk; // Codex to Codex default: @@ -1124,67 +1124,26 @@ export class CodexConverter extends BaseConverter { /** * Codex → Claude 流式响应转换 */ - toClaudeStreamChunk(chunk, model) { + toClaudeStreamChunk(chunk, model, requestId) { const type = chunk.type; - // 初始化 item_id → resId 映射表(用于并发流隔离) - if (!this._claudeItemToResId) { - this._claudeItemToResId = new Map(); - } + // 使用 requestId 作为流状态的隔离 key(并发安全)。 + // 每个请求在 handleStreamRequest 中生成唯一 requestId, + // 确保同一单例 converter 上的并发流状态完全独立。 + const stateKey = requestId || chunk.response?.id || 'default'; - // Codex 的多数增量事件不带 response.id,需要通过 item_id 映射或兜底逻辑归并到正确的流状态。 - let resId = chunk.response?.id; - if (!resId) { - // 优先通过 item_id 精确匹配到对应的 response(并发安全) - const itemId = chunk.item_id || chunk.item?.id; - if (itemId && this._claudeItemToResId.has(itemId)) { - resId = this._claudeItemToResId.get(itemId); - } else if (this.lastClaudeStreamResponseId && this.streamParams.has(this.lastClaudeStreamResponseId)) { - resId = this.lastClaudeStreamResponseId; - } else if (this.streamParams.size === 1) { - resId = this.streamParams.keys().next().value; - } else { - // 兜底:选择最近更新的流,避免落到固定 "default" key 导致串流状态污染。 - let latestKey = null; - let latestUpdatedAt = -1; - for (const [key, streamState] of this.streamParams.entries()) { - const updatedAt = streamState?.lastUpdatedAt || 0; - if (updatedAt > latestUpdatedAt) { - latestUpdatedAt = updatedAt; - latestKey = key; - } - } - resId = latestKey || 'default'; - } - } - - if (!this.streamParams.has(resId)) { - this.streamParams.set(resId, { + // response.created 携带 response.id,用它来初始化该请求的流状态 + if (type === 'response.created') { + const resId = chunk.response.id; + this.streamParams.set(stateKey, { model: model, createdAt: Math.floor(Date.now() / 1000), responseID: resId, blockIndex: 0, - blockStarted: false, // track whether content_block_start has been sent for current block - currentBlockType: null, // 'thinking' or 'text' - lastUpdatedAt: Date.now() + blockStarted: false, + currentBlockType: null, }); - } - const state = this.streamParams.get(resId); - state.lastUpdatedAt = Date.now(); - - // 捕获 response.output_item.added 事件中的 item_id → resId 映射, - // 使后续 delta 事件能通过 item_id 精确关联到正确的流(并发安全)。 - if (type === 'response.output_item.added') { - const itemId = chunk.item?.id; - if (itemId && resId) { - this._claudeItemToResId.set(itemId, resId); - } - return null; // 此事件不产生 Claude 输出 - } - - if (type === 'response.created') { - state.responseID = chunk.response.id; - this.lastClaudeStreamResponseId = state.responseID; + const state = this.streamParams.get(stateKey); return { type: "message_start", message: { @@ -1198,6 +1157,30 @@ export class CodexConverter extends BaseConverter { }; } + if (!this.streamParams.has(stateKey)) { + // 如果还没有状态(比如没有收到 response.created 就收到了其他事件), + // 用 chunk 中能拿到的信息初始化 + this.streamParams.set(stateKey, { + model: model, + createdAt: Math.floor(Date.now() / 1000), + responseID: chunk.response?.id || stateKey, + blockIndex: 0, + blockStarted: false, + currentBlockType: null, + }); + } + const state = this.streamParams.get(stateKey); + + // response.output_item.added 不产生 Claude 输出 + if (type === 'response.output_item.added') { + return null; + } + + if (type === 'response.created') { + // 已在上方处理,不应到达此处 + return null; + } + if (type === 'response.reasoning_summary_text.delta') { const events = []; // If switching from a different block type, close the previous block first @@ -1304,18 +1287,8 @@ export class CodexConverter extends BaseConverter { }, { type: "message_stop" } ); - // 清理 item_id → resId 映射,避免内存泄漏 - if (this._claudeItemToResId) { - for (const [itemId, mappedResId] of this._claudeItemToResId.entries()) { - if (mappedResId === resId) { - this._claudeItemToResId.delete(itemId); - } - } - } - this.streamParams.delete(resId); - if (this.lastClaudeStreamResponseId === resId) { - this.lastClaudeStreamResponseId = null; - } + // 清理该请求的流状态 + this.streamParams.delete(stateKey); return events; } diff --git a/src/utils/common.js b/src/utils/common.js index 745bc6b..c031ea4 100644 --- a/src/utils/common.js +++ b/src/utils/common.js @@ -344,6 +344,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from requestBody.model = model; const nativeStream = await service.generateContentStream(model, requestBody); const addEvent = getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.CLAUDE || getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES; + // 为每个请求生成唯一 ID,用于在单例 converter 中隔离并发流状态 + const streamRequestId = `req_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`; for await (const nativeChunk of nativeStream) { // 检查客户端是否已断开连接 @@ -360,7 +362,7 @@ export async function handleStreamRequest(res, service, model, requestBody, from // Convert the complete chunk object to the client's format (fromProvider), if necessary. const chunkToSend = needsConversion - ? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model) + ? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model, streamRequestId) : nativeChunk; // 监控钩子:流式响应分块