refactor(stream): 重构流式响应处理器与资源池调度算法

针对 claude-kiro 模块的事件流解析器进行架构升级,引入双模式
正则匹配策略实现向后兼容。provider-pool-manager 模块通过引入
延迟写入队列机制重构持久化层,消除冗余的迭代逻辑。

核心变更:
- 实现 SSE 标准格式与遗留格式的自适应解析器
- 重构 JSON 边界检测算法,增强空白字符处理
- 引入基于 Set 的批量写入队列替代即时持久化
- 移除供应商选择中的多余循环,采用直接索引计算
- 调整异常传播策略,将错误向上层抛出而非内部消化

技术细节:通过定时器合并多次保存请求,将 I/O 操作从 O(n)
降低到 O(1);时间戳标准化逻辑由多分支条件简化为三元表达式;
转义字符处理采用负向后查找断言避免误替换。
This commit is contained in:
hex2077 2025-10-20 21:16:06 +08:00
parent 5d4283edc6
commit db3c63ffe4
2 changed files with 102 additions and 42 deletions

View file

@ -669,14 +669,31 @@ async initializeAuth(forceRefresh = false) {
let fullContent = '';
const toolCalls = [];
let currentToolCallDict = null;
// console.log(`rawStr=${rawStr}`);
const eventBlockRegex = /event({.*?(?=event{|$))/gs;
// 改进的 SSE 事件解析:匹配 :message-typeevent 后面的 JSON 数据
// 使用更精确的正则来匹配 SSE 格式的事件
const sseEventRegex = /:message-typeevent(\{[^]*?(?=:event-type|$))/g;
const legacyEventRegex = /event(\{.*?(?=event\{|$))/gs;
// 首先尝试使用 SSE 格式解析
let matches = [...rawStr.matchAll(sseEventRegex)];
// 如果 SSE 格式没有匹配到,回退到旧的格式
if (matches.length === 0) {
matches = [...rawStr.matchAll(legacyEventRegex)];
}
for (const match of rawStr.matchAll(eventBlockRegex)) {
for (const match of matches) {
const potentialJsonBlock = match[1];
if (!potentialJsonBlock || potentialJsonBlock.trim().length === 0) {
continue;
}
// 尝试找到完整的 JSON 对象
let searchPos = 0;
while ((searchPos = potentialJsonBlock.indexOf('}', searchPos + 1)) !== -1) {
const jsonCandidate = potentialJsonBlock.substring(0, searchPos + 1);
const jsonCandidate = potentialJsonBlock.substring(0, searchPos + 1).trim();
try {
const eventData = JSON.parse(jsonCandidate);
@ -700,22 +717,30 @@ async initializeAuth(forceRefresh = false) {
const args = JSON.parse(currentToolCallDict.function.arguments);
currentToolCallDict.function.arguments = JSON.stringify(args);
} catch (e) {
console.warn(`Tool call arguments not valid JSON: ${currentToolCallDict.function.arguments}`);
console.warn(`[Kiro] Tool call arguments not valid JSON: ${currentToolCallDict.function.arguments}`);
}
toolCalls.push(currentToolCallDict);
currentToolCallDict = null;
}
} else if (!eventData.followupPrompt && eventData.content) {
const decodedContent = eventData.content.replace(/\\n/g, '\n');
// 处理内容,移除转义字符
let decodedContent = eventData.content;
// 处理常见的转义序列
decodedContent = decodedContent.replace(/(?<!\\)\\n/g, '\n');
// decodedContent = decodedContent.replace(/(?<!\\)\\t/g, '\t');
// decodedContent = decodedContent.replace(/\\"/g, '"');
// decodedContent = decodedContent.replace(/\\\\/g, '\\');
fullContent += decodedContent;
}
break;
} catch (e) {
// 解析失败,说明这个 '}' 是内容的一部分,继续寻找下一个 '}'。
// JSON 解析失败,继续寻找下一个可能的结束位置
continue;
}
}
}
// 如果还有未完成的工具调用,添加到列表中
if (currentToolCallDict) {
toolCalls.push(currentToolCallDict);
}
@ -875,11 +900,12 @@ async initializeAuth(forceRefresh = false) {
}
} catch (error) {
console.error('[Kiro] Error in streaming generation:', error);
throw new Error(`Error processing response: ${error.message}`);
// For Claude, we yield an array of events for streaming error
// Ensure error message is passed as content, not toolCalls
for (const chunkJson of this.buildClaudeResponse(`Error: ${error.message}`, true, 'assistant', model, null)) {
yield chunkJson;
}
// for (const chunkJson of this.buildClaudeResponse(`Error: ${error.message}`, true, 'assistant', model, null)) {
// yield chunkJson;
// }
}
}

View file

@ -12,6 +12,12 @@ export class ProviderPoolManager {
this.roundRobinIndex = {}; // Tracks the current index for round-robin selection for each provider type
this.maxErrorCount = options.maxErrorCount || 3; // Default to 1 errors before marking unhealthy
this.healthCheckInterval = options.healthCheckInterval || 30 * 60 * 1000; // Default to 30 minutes
// 优化1: 添加防抖机制,避免频繁的文件 I/O 操作
this.saveDebounceTime = options.saveDebounceTime || 1000; // 默认1秒防抖
this.saveTimer = null;
this.pendingSaves = new Set(); // 记录待保存的 providerType
this.initializeProviderStatus();
}
@ -29,14 +35,11 @@ export class ProviderPoolManager {
providerConfig.lastUsed = providerConfig.lastUsed !== undefined ? providerConfig.lastUsed : null;
providerConfig.usageCount = providerConfig.usageCount !== undefined ? providerConfig.usageCount : 0;
providerConfig.errorCount = providerConfig.errorCount !== undefined ? providerConfig.errorCount : 0;
if (providerConfig.lastErrorTime && typeof providerConfig.lastErrorTime === 'string') {
// Keep as string (ISOString)
providerConfig.lastErrorTime = providerConfig.lastErrorTime;
} else if (providerConfig.lastErrorTime === undefined) {
providerConfig.lastErrorTime = null;
} else if (providerConfig.lastErrorTime instanceof Date) {
providerConfig.lastErrorTime = providerConfig.lastErrorTime.toISOString();
}
// 优化2: 简化 lastErrorTime 处理逻辑
providerConfig.lastErrorTime = providerConfig.lastErrorTime instanceof Date
? providerConfig.lastErrorTime.toISOString()
: (providerConfig.lastErrorTime || null);
this.providerStatus[providerType].push({
config: providerConfig,
@ -62,31 +65,24 @@ export class ProviderPoolManager {
return null;
}
let currentIndex = this.roundRobinIndex[providerType] || 0;
let selected = null;
// Iterate through healthy providers starting from the current index
for (let i = 0; i < healthyProviders.length; i++) {
const providerIndex = (currentIndex + i) % healthyProviders.length;
const potentialProvider = healthyProviders[providerIndex];
// For now, we simply select the next healthy provider in a round-robin fashion.
// More advanced logic (e.g., considering usage, recent errors, etc.) can be added here.
selected = potentialProvider;
this.roundRobinIndex[providerType] = (providerIndex + 1) % healthyProviders.length; // Update the index for the next call
break; // Found a provider, break the loop
}
// 优化3: 简化轮询逻辑,移除不必要的循环
const currentIndex = this.roundRobinIndex[providerType] || 0;
const providerIndex = currentIndex % healthyProviders.length;
const selected = healthyProviders[providerIndex];
if (selected) {
selected.config.lastUsed = new Date().toISOString();
selected.config.usageCount++; // Increment usage count
// 更新下次轮询的索引
this.roundRobinIndex[providerType] = (providerIndex + 1) % healthyProviders.length;
// 更新使用信息
selected.config.lastUsed = new Date().toISOString();
selected.config.usageCount++;
console.log(`[ProviderPoolManager] Selected provider for ${providerType} (round-robin): ${JSON.stringify(selected.config)}`);
this._saveProviderPoolsToJson(providerType); // Persist changes
return selected.config;
}
return null;
console.log(`[ProviderPoolManager] Selected provider for ${providerType} (round-robin): ${JSON.stringify(selected.config)}`);
// 优化1: 使用防抖保存
this._debouncedSave(providerType);
return selected.config;
}
/**
@ -108,7 +104,9 @@ export class ProviderPoolManager {
} else {
console.warn(`[ProviderPoolManager] Provider ${JSON.stringify(providerConfig)} for type ${providerType} error count: ${provider.config.errorCount}/${this.maxErrorCount}. Still healthy.`);
}
this._saveProviderPoolsToJson(providerType); // Persist changes
// 优化1: 使用防抖保存
this._debouncedSave(providerType);
}
}
}
@ -127,7 +125,9 @@ export class ProviderPoolManager {
provider.config.errorCount = 0; // Reset error count on health recovery
provider.config.lastErrorTime = null; // Reset lastErrorTime when healthy
console.log(`[ProviderPoolManager] Marked provider as healthy: ${JSON.stringify(providerConfig)} for type ${providerType}`);
this._saveProviderPoolsToJson(providerType); // Persist changes
// 优化1: 使用防抖保存
this._debouncedSave(providerType);
}
}
}
@ -254,6 +254,40 @@ export class ProviderPoolManager {
}
}
/**
* 优化1: 添加防抖保存方法
* 延迟保存操作避免频繁的文件 I/O
* @private
*/
_debouncedSave(providerType) {
// 将待保存的 providerType 添加到集合中
this.pendingSaves.add(providerType);
// 清除之前的定时器
if (this.saveTimer) {
clearTimeout(this.saveTimer);
}
// 设置新的定时器
this.saveTimer = setTimeout(() => {
this._flushPendingSaves();
}, this.saveDebounceTime);
}
/**
* 优化1: 批量保存所有待保存的 providerType
* @private
*/
async _flushPendingSaves() {
const typesToSave = Array.from(this.pendingSaves);
this.pendingSaves.clear();
this.saveTimer = null;
for (const providerType of typesToSave) {
await this._saveProviderPoolsToJson(providerType);
}
}
/**
* Saves the current provider pools configuration to the JSON file.
* @private