fix: 通过 per-request requestId 彻底解决 Codex→Claude 流式转换的并发串流问题

原有 item_id→resId 映射方案中,response.output_item.added 事件不携带
response.id,建立映射时仍依赖共享的 lastClaudeStreamResponseId,并发场景下
映射关系可能从一开始就是错的。

修复:在 handleStreamRequest 中为每个请求生成唯一 requestId,通过调用链传入
toClaudeStreamChunk 作为 streamParams 的隔离 key,使并发流状态完全独立,
不再依赖任何共享状态做并发关联。
This commit is contained in:
Cishoon 2026-03-09 15:49:36 +08:00
parent 042d9c2004
commit 98caecb463
3 changed files with 45 additions and 70 deletions

View file

@ -39,7 +39,7 @@ import {
* @returns {object} 转换后的数据
* @throws {Error} 如果找不到合适的转换函数
*/
export function convertData(data, type, fromProvider, toProvider, model) {
export function convertData(data, type, fromProvider, toProvider, model, requestId) {
try {
// 获取协议前缀
const fromProtocol = getProtocolPrefix(fromProvider);
@ -67,7 +67,7 @@ export function convertData(data, type, fromProvider, toProvider, model) {
return converter.convertResponse(data, toProtocol, model);
case 'streamChunk':
return converter.convertStreamChunk(data, toProtocol, model);
return converter.convertStreamChunk(data, toProtocol, model, requestId);
case 'modelList':
return converter.convertModelList(data, toProtocol);

View file

@ -55,7 +55,7 @@ export class CodexConverter extends BaseConverter {
/**
* 转换流式响应块
*/
convertStreamChunk(chunk, targetProtocol, model) {
convertStreamChunk(chunk, targetProtocol, model, requestId) {
switch (targetProtocol) {
case MODEL_PROTOCOL_PREFIX.OPENAI:
return this.toOpenAIStreamChunk(chunk, model);
@ -64,7 +64,7 @@ export class CodexConverter extends BaseConverter {
case MODEL_PROTOCOL_PREFIX.GEMINI:
return this.toGeminiStreamChunk(chunk, model);
case MODEL_PROTOCOL_PREFIX.CLAUDE:
return this.toClaudeStreamChunk(chunk, model);
return this.toClaudeStreamChunk(chunk, model, requestId);
case MODEL_PROTOCOL_PREFIX.CODEX:
return chunk; // Codex to Codex
default:
@ -1124,67 +1124,26 @@ export class CodexConverter extends BaseConverter {
/**
* Codex Claude 流式响应转换
*/
toClaudeStreamChunk(chunk, model) {
toClaudeStreamChunk(chunk, model, requestId) {
const type = chunk.type;
// 初始化 item_id → resId 映射表(用于并发流隔离)
if (!this._claudeItemToResId) {
this._claudeItemToResId = new Map();
}
// 使用 requestId 作为流状态的隔离 key并发安全
// 每个请求在 handleStreamRequest 中生成唯一 requestId
// 确保同一单例 converter 上的并发流状态完全独立。
const stateKey = requestId || chunk.response?.id || 'default';
// Codex 的多数增量事件不带 response.id需要通过 item_id 映射或兜底逻辑归并到正确的流状态。
let resId = chunk.response?.id;
if (!resId) {
// 优先通过 item_id 精确匹配到对应的 response并发安全
const itemId = chunk.item_id || chunk.item?.id;
if (itemId && this._claudeItemToResId.has(itemId)) {
resId = this._claudeItemToResId.get(itemId);
} else if (this.lastClaudeStreamResponseId && this.streamParams.has(this.lastClaudeStreamResponseId)) {
resId = this.lastClaudeStreamResponseId;
} else if (this.streamParams.size === 1) {
resId = this.streamParams.keys().next().value;
} else {
// 兜底:选择最近更新的流,避免落到固定 "default" key 导致串流状态污染。
let latestKey = null;
let latestUpdatedAt = -1;
for (const [key, streamState] of this.streamParams.entries()) {
const updatedAt = streamState?.lastUpdatedAt || 0;
if (updatedAt > latestUpdatedAt) {
latestUpdatedAt = updatedAt;
latestKey = key;
}
}
resId = latestKey || 'default';
}
}
if (!this.streamParams.has(resId)) {
this.streamParams.set(resId, {
// response.created 携带 response.id用它来初始化该请求的流状态
if (type === 'response.created') {
const resId = chunk.response.id;
this.streamParams.set(stateKey, {
model: model,
createdAt: Math.floor(Date.now() / 1000),
responseID: resId,
blockIndex: 0,
blockStarted: false, // track whether content_block_start has been sent for current block
currentBlockType: null, // 'thinking' or 'text'
lastUpdatedAt: Date.now()
blockStarted: false,
currentBlockType: null,
});
}
const state = this.streamParams.get(resId);
state.lastUpdatedAt = Date.now();
// 捕获 response.output_item.added 事件中的 item_id → resId 映射,
// 使后续 delta 事件能通过 item_id 精确关联到正确的流(并发安全)。
if (type === 'response.output_item.added') {
const itemId = chunk.item?.id;
if (itemId && resId) {
this._claudeItemToResId.set(itemId, resId);
}
return null; // 此事件不产生 Claude 输出
}
if (type === 'response.created') {
state.responseID = chunk.response.id;
this.lastClaudeStreamResponseId = state.responseID;
const state = this.streamParams.get(stateKey);
return {
type: "message_start",
message: {
@ -1198,6 +1157,30 @@ export class CodexConverter extends BaseConverter {
};
}
if (!this.streamParams.has(stateKey)) {
// 如果还没有状态(比如没有收到 response.created 就收到了其他事件),
// 用 chunk 中能拿到的信息初始化
this.streamParams.set(stateKey, {
model: model,
createdAt: Math.floor(Date.now() / 1000),
responseID: chunk.response?.id || stateKey,
blockIndex: 0,
blockStarted: false,
currentBlockType: null,
});
}
const state = this.streamParams.get(stateKey);
// response.output_item.added 不产生 Claude 输出
if (type === 'response.output_item.added') {
return null;
}
if (type === 'response.created') {
// 已在上方处理,不应到达此处
return null;
}
if (type === 'response.reasoning_summary_text.delta') {
const events = [];
// If switching from a different block type, close the previous block first
@ -1304,18 +1287,8 @@ export class CodexConverter extends BaseConverter {
},
{ type: "message_stop" }
);
// 清理 item_id → resId 映射,避免内存泄漏
if (this._claudeItemToResId) {
for (const [itemId, mappedResId] of this._claudeItemToResId.entries()) {
if (mappedResId === resId) {
this._claudeItemToResId.delete(itemId);
}
}
}
this.streamParams.delete(resId);
if (this.lastClaudeStreamResponseId === resId) {
this.lastClaudeStreamResponseId = null;
}
// 清理该请求的流状态
this.streamParams.delete(stateKey);
return events;
}

View file

@ -344,6 +344,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from
requestBody.model = model;
const nativeStream = await service.generateContentStream(model, requestBody);
const addEvent = getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.CLAUDE || getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES;
// 为每个请求生成唯一 ID用于在单例 converter 中隔离并发流状态
const streamRequestId = `req_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
for await (const nativeChunk of nativeStream) {
// 检查客户端是否已断开连接
@ -360,7 +362,7 @@ export async function handleStreamRequest(res, service, model, requestBody, from
// Convert the complete chunk object to the client's format (fromProvider), if necessary.
const chunkToSend = needsConversion
? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model)
? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model, streamRequestId)
: nativeChunk;
// 监控钩子:流式响应分块