perf: optimize stream parsing to avoid duplicates and improve efficiency

This commit is contained in:
Sanyela 2025-12-03 13:50:38 +08:00
parent 28de81203f
commit d7a2332e24

View file

@ -1013,28 +1013,63 @@ async initializeAuth(forceRefresh = false) {
}
/**
* 解析 AWS Event Stream 格式的单个事件
* 格式: :event-type xxx:content-type application/json:message-type event{"content":"..."}
* 解析 AWS Event Stream 格式提取所有完整的 JSON 事件
* 返回 { events: 解析出的事件数组, remaining: 未处理完的缓冲区 }
*/
parseAwsEventStreamChunk(chunk) {
const results = [];
// 匹配 JSON 内容,可能是 {"content":"..."} 或其他格式
const jsonMatches = chunk.match(/\{"[^"]+":"[^"]*"\}/g);
if (jsonMatches) {
for (const jsonStr of jsonMatches) {
try {
const parsed = JSON.parse(jsonStr);
if (parsed.content !== undefined) {
results.push({ type: 'content', data: parsed.content });
} else if (parsed.toolUse !== undefined) {
results.push({ type: 'toolUse', data: parsed.toolUse });
}
} catch (e) {
// 解析失败,跳过
parseAwsEventStreamBuffer(buffer) {
const events = [];
let remaining = buffer;
let searchStart = 0;
while (true) {
// 查找 {"content": 或 {"toolUse" 的起始位置
const contentStart = remaining.indexOf('{"content":', searchStart);
const toolUseStart = remaining.indexOf('{"toolUse":', searchStart);
let jsonStart = -1;
if (contentStart >= 0 && toolUseStart >= 0) {
jsonStart = Math.min(contentStart, toolUseStart);
} else if (contentStart >= 0) {
jsonStart = contentStart;
} else if (toolUseStart >= 0) {
jsonStart = toolUseStart;
}
if (jsonStart < 0) break;
// 查找对应的 } 结束位置
const jsonEnd = remaining.indexOf('}', jsonStart);
if (jsonEnd < 0) {
// 不完整的 JSON保留在缓冲区
remaining = remaining.substring(jsonStart);
break;
}
const jsonStr = remaining.substring(jsonStart, jsonEnd + 1);
try {
const parsed = JSON.parse(jsonStr);
if (parsed.content !== undefined) {
events.push({ type: 'content', data: parsed.content });
} else if (parsed.toolUse !== undefined) {
events.push({ type: 'toolUse', data: parsed.toolUse });
}
} catch (e) {
// JSON 解析失败,可能是不完整的,继续搜索
}
searchStart = jsonEnd + 1;
if (searchStart >= remaining.length) {
remaining = '';
break;
}
}
return results;
// 如果 searchStart 有进展,截取剩余部分
if (searchStart > 0 && remaining.length > 0) {
remaining = remaining.substring(searchStart);
}
return { events, remaining };
}
/**
@ -1058,31 +1093,32 @@ async initializeAuth(forceRefresh = false) {
try {
const response = await this.axiosInstance.post(requestUrl, requestData, {
headers,
responseType: 'stream' // 关键:使用流式响应
responseType: 'stream'
});
const stream = response.data;
let buffer = '';
const processedPositions = new Set(); // 避免重复处理
for await (const chunk of stream) {
const chunkStr = chunk.toString();
buffer += chunkStr;
buffer += chunk.toString();
// 尝试解析缓冲区中的事件
const events = this.parseAwsEventStreamChunk(buffer);
// 解析缓冲区中的事件
const { events, remaining } = this.parseAwsEventStreamBuffer(buffer);
buffer = remaining;
// 只 yield 新的事件
for (const event of events) {
if (event.type === 'content' && event.data) {
yield { type: 'content', content: event.data };
} else if (event.type === 'toolUse') {
yield { type: 'toolUse', toolUse: event.data };
const eventKey = `${event.type}:${event.data}`;
if (!processedPositions.has(eventKey)) {
processedPositions.add(eventKey);
if (event.type === 'content' && event.data) {
yield { type: 'content', content: event.data };
} else if (event.type === 'toolUse') {
yield { type: 'toolUse', toolUse: event.data };
}
}
}
// 清理已处理的部分(保留最后一部分以防不完整)
const lastBraceIndex = buffer.lastIndexOf('}');
if (lastBraceIndex > 0) {
buffer = buffer.substring(lastBraceIndex + 1);
}
}
} catch (error) {
if (error.response?.status === 403 && !isRetry) {