Merge pull request #379 from Cishoon/fix/codex-claude-content-block-start
fix: 修复 Codex→Claude 流式转换并发串流导致 content_block_start 丢失的问题
This commit is contained in:
commit
27ac8c9eaf
3 changed files with 45 additions and 43 deletions
|
|
@ -39,7 +39,7 @@ import {
|
|||
* @returns {object} 转换后的数据
|
||||
* @throws {Error} 如果找不到合适的转换函数
|
||||
*/
|
||||
export function convertData(data, type, fromProvider, toProvider, model) {
|
||||
export function convertData(data, type, fromProvider, toProvider, model, requestId) {
|
||||
try {
|
||||
// 获取协议前缀
|
||||
const fromProtocol = getProtocolPrefix(fromProvider);
|
||||
|
|
@ -67,7 +67,7 @@ export function convertData(data, type, fromProvider, toProvider, model) {
|
|||
return converter.convertResponse(data, toProtocol, model);
|
||||
|
||||
case 'streamChunk':
|
||||
return converter.convertStreamChunk(data, toProtocol, model);
|
||||
return converter.convertStreamChunk(data, toProtocol, model, requestId);
|
||||
|
||||
case 'modelList':
|
||||
return converter.convertModelList(data, toProtocol);
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ export class CodexConverter extends BaseConverter {
|
|||
/**
|
||||
* 转换流式响应块
|
||||
*/
|
||||
convertStreamChunk(chunk, targetProtocol, model) {
|
||||
convertStreamChunk(chunk, targetProtocol, model, requestId) {
|
||||
switch (targetProtocol) {
|
||||
case MODEL_PROTOCOL_PREFIX.OPENAI:
|
||||
return this.toOpenAIStreamChunk(chunk, model);
|
||||
|
|
@ -64,7 +64,7 @@ export class CodexConverter extends BaseConverter {
|
|||
case MODEL_PROTOCOL_PREFIX.GEMINI:
|
||||
return this.toGeminiStreamChunk(chunk, model);
|
||||
case MODEL_PROTOCOL_PREFIX.CLAUDE:
|
||||
return this.toClaudeStreamChunk(chunk, model);
|
||||
return this.toClaudeStreamChunk(chunk, model, requestId);
|
||||
case MODEL_PROTOCOL_PREFIX.CODEX:
|
||||
return chunk; // Codex to Codex
|
||||
default:
|
||||
|
|
@ -1147,48 +1147,26 @@ export class CodexConverter extends BaseConverter {
|
|||
/**
|
||||
* Codex → Claude 流式响应转换
|
||||
*/
|
||||
toClaudeStreamChunk(chunk, model) {
|
||||
toClaudeStreamChunk(chunk, model, requestId) {
|
||||
const type = chunk.type;
|
||||
|
||||
// Codex 的多数增量事件不带 response.id,需要将其归并到最近活跃的流状态。
|
||||
let resId = chunk.response?.id;
|
||||
if (!resId) {
|
||||
if (this.lastClaudeStreamResponseId && this.streamParams.has(this.lastClaudeStreamResponseId)) {
|
||||
resId = this.lastClaudeStreamResponseId;
|
||||
} else if (this.streamParams.size === 1) {
|
||||
resId = this.streamParams.keys().next().value;
|
||||
} else {
|
||||
// 兜底:选择最近更新的流,避免落到固定 "default" key 导致串流状态污染。
|
||||
let latestKey = null;
|
||||
let latestUpdatedAt = -1;
|
||||
for (const [key, streamState] of this.streamParams.entries()) {
|
||||
const updatedAt = streamState?.lastUpdatedAt || 0;
|
||||
if (updatedAt > latestUpdatedAt) {
|
||||
latestUpdatedAt = updatedAt;
|
||||
latestKey = key;
|
||||
}
|
||||
}
|
||||
resId = latestKey || 'default';
|
||||
}
|
||||
}
|
||||
// 使用 requestId 作为流状态的隔离 key(并发安全)。
|
||||
// 每个请求在 handleStreamRequest 中生成唯一 requestId,
|
||||
// 确保同一单例 converter 上的并发流状态完全独立。
|
||||
const stateKey = requestId || chunk.response?.id || 'default';
|
||||
|
||||
if (!this.streamParams.has(resId)) {
|
||||
this.streamParams.set(resId, {
|
||||
// response.created 携带 response.id,用它来初始化该请求的流状态
|
||||
if (type === 'response.created') {
|
||||
const resId = chunk.response.id;
|
||||
this.streamParams.set(stateKey, {
|
||||
model: model,
|
||||
createdAt: Math.floor(Date.now() / 1000),
|
||||
responseID: resId,
|
||||
blockIndex: 0,
|
||||
blockStarted: false, // track whether content_block_start has been sent for current block
|
||||
currentBlockType: null, // 'thinking' or 'text'
|
||||
lastUpdatedAt: Date.now()
|
||||
blockStarted: false,
|
||||
currentBlockType: null,
|
||||
});
|
||||
}
|
||||
const state = this.streamParams.get(resId);
|
||||
state.lastUpdatedAt = Date.now();
|
||||
|
||||
if (type === 'response.created') {
|
||||
state.responseID = chunk.response.id;
|
||||
this.lastClaudeStreamResponseId = state.responseID;
|
||||
const state = this.streamParams.get(stateKey);
|
||||
return {
|
||||
type: "message_start",
|
||||
message: {
|
||||
|
|
@ -1202,6 +1180,30 @@ export class CodexConverter extends BaseConverter {
|
|||
};
|
||||
}
|
||||
|
||||
if (!this.streamParams.has(stateKey)) {
|
||||
// 如果还没有状态(比如没有收到 response.created 就收到了其他事件),
|
||||
// 用 chunk 中能拿到的信息初始化
|
||||
this.streamParams.set(stateKey, {
|
||||
model: model,
|
||||
createdAt: Math.floor(Date.now() / 1000),
|
||||
responseID: chunk.response?.id || stateKey,
|
||||
blockIndex: 0,
|
||||
blockStarted: false,
|
||||
currentBlockType: null,
|
||||
});
|
||||
}
|
||||
const state = this.streamParams.get(stateKey);
|
||||
|
||||
// response.output_item.added 不产生 Claude 输出
|
||||
if (type === 'response.output_item.added') {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === 'response.created') {
|
||||
// 已在上方处理,不应到达此处
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === 'response.reasoning_summary_text.delta') {
|
||||
const events = [];
|
||||
// If switching from a different block type, close the previous block first
|
||||
|
|
@ -1308,10 +1310,8 @@ export class CodexConverter extends BaseConverter {
|
|||
},
|
||||
{ type: "message_stop" }
|
||||
);
|
||||
this.streamParams.delete(resId);
|
||||
if (this.lastClaudeStreamResponseId === resId) {
|
||||
this.lastClaudeStreamResponseId = null;
|
||||
}
|
||||
// 清理该请求的流状态
|
||||
this.streamParams.delete(stateKey);
|
||||
return events;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -344,6 +344,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from
|
|||
requestBody.model = model;
|
||||
const nativeStream = await service.generateContentStream(model, requestBody);
|
||||
const addEvent = getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.CLAUDE || getProtocolPrefix(fromProvider) === MODEL_PROTOCOL_PREFIX.OPENAI_RESPONSES;
|
||||
// 为每个请求生成唯一 ID,用于在单例 converter 中隔离并发流状态
|
||||
const streamRequestId = `req_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
|
||||
|
||||
for await (const nativeChunk of nativeStream) {
|
||||
// 检查客户端是否已断开连接
|
||||
|
|
@ -360,7 +362,7 @@ export async function handleStreamRequest(res, service, model, requestBody, from
|
|||
|
||||
// Convert the complete chunk object to the client's format (fromProvider), if necessary.
|
||||
const chunkToSend = needsConversion
|
||||
? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model)
|
||||
? convertData(nativeChunk, 'streamChunk', toProvider, fromProvider, model, streamRequestId)
|
||||
: nativeChunk;
|
||||
|
||||
// 监控钩子:流式响应分块
|
||||
|
|
|
|||
Loading…
Reference in a new issue