AIClient-2-API/src/provider-pool-manager.js
hex2077 203f0a4e65 feat(antigravity-core): 添加PROJECT_ID配置支持
refactor(provider-pool-manager): 修改markProviderHealthy调用方式
2025-12-01 13:20:00 +08:00

488 lines
No EOL
21 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import * as fs from 'fs'; // Import fs module
import { getServiceAdapter } from './adapter.js';
import { MODEL_PROVIDER } from './common.js';
/**
* Manages a pool of API service providers, handling their health and selection.
*/
export class ProviderPoolManager {
// 默认健康检查模型配置
static DEFAULT_HEALTH_CHECK_MODELS = {
'gemini-cli': 'gemini-2.5-flash',
'openai-custom': 'gpt-3.5-turbo',
'claude-custom': 'claude-3-7-sonnet-20250219',
'kiro-api': 'claude-3-7-sonnet-20250219',
'qwen-api': 'qwen3-coder-flash',
'openai-custom-responses': 'gpt-5-low'
};
constructor(providerPools, options = {}) {
this.providerPools = providerPools;
this.globalConfig = options.globalConfig || {}; // 存储全局配置
this.providerStatus = {}; // Tracks health and usage for each provider instance
this.roundRobinIndex = {}; // Tracks the current index for round-robin selection for each provider type
this.maxErrorCount = options.maxErrorCount || 3; // Default to 3 errors before marking unhealthy
this.healthCheckInterval = options.healthCheckInterval || 10 * 60 * 1000; // Default to 10 minutes
// 日志级别控制
this.logLevel = options.logLevel || 'info'; // 'debug', 'info', 'warn', 'error'
// 添加防抖机制,避免频繁的文件 I/O 操作
this.saveDebounceTime = options.saveDebounceTime || 1000; // 默认1秒防抖
this.saveTimer = null;
this.pendingSaves = new Set(); // 记录待保存的 providerType
this.initializeProviderStatus();
}
/**
* 日志输出方法,支持日志级别控制
* @private
*/
_log(level, message) {
const levels = { debug: 0, info: 1, warn: 2, error: 3 };
if (levels[level] >= levels[this.logLevel]) {
const logMethod = level === 'debug' ? 'log' : level;
console[logMethod](`[ProviderPoolManager] ${message}`);
}
}
/**
* 查找指定的 provider
* @private
*/
_findProvider(providerType, uuid) {
if (!providerType || !uuid) {
this._log('error', `Invalid parameters: providerType=${providerType}, uuid=${uuid}`);
return null;
}
const pool = this.providerStatus[providerType];
return pool?.find(p => p.uuid === uuid) || null;
}
/**
* Initializes the status for each provider in the pools.
* Initially, all providers are considered healthy and have zero usage.
*/
initializeProviderStatus() {
for (const providerType in this.providerPools) {
this.providerStatus[providerType] = [];
this.roundRobinIndex[providerType] = 0; // Initialize round-robin index for each type
this.providerPools[providerType].forEach((providerConfig) => {
// Ensure initial health and usage stats are present in the config
providerConfig.isHealthy = providerConfig.isHealthy !== undefined ? providerConfig.isHealthy : true;
providerConfig.isDisabled = providerConfig.isDisabled !== undefined ? providerConfig.isDisabled : false;
providerConfig.lastUsed = providerConfig.lastUsed !== undefined ? providerConfig.lastUsed : null;
providerConfig.usageCount = providerConfig.usageCount !== undefined ? providerConfig.usageCount : 0;
providerConfig.errorCount = providerConfig.errorCount !== undefined ? providerConfig.errorCount : 0;
// 优化2: 简化 lastErrorTime 处理逻辑
providerConfig.lastErrorTime = providerConfig.lastErrorTime instanceof Date
? providerConfig.lastErrorTime.toISOString()
: (providerConfig.lastErrorTime || null);
this.providerStatus[providerType].push({
config: providerConfig,
uuid: providerConfig.uuid, // Still keep uuid at the top level for easy access
});
});
}
this._log('info', 'Initialized provider statuses: ok');
}
/**
* Selects a provider from the pool for a given provider type.
* Currently uses a simple round-robin for healthy providers.
* If requestedModel is provided, providers that don't support the model will be excluded.
* @param {string} providerType - The type of provider to select (e.g., 'gemini-cli', 'openai-custom').
* @param {string} [requestedModel] - Optional. The model name to filter providers by.
* @returns {object|null} The selected provider's configuration, or null if no healthy provider is found.
*/
selectProvider(providerType, requestedModel = null) {
// 参数校验
if (!providerType || typeof providerType !== 'string') {
this._log('error', `Invalid providerType: ${providerType}`);
return null;
}
const availableProviders = this.providerStatus[providerType] || [];
let availableAndHealthyProviders = availableProviders.filter(p =>
p.config.isHealthy && !p.config.isDisabled
);
// 如果指定了模型,则排除不支持该模型的提供商
if (requestedModel) {
const modelFilteredProviders = availableAndHealthyProviders.filter(p => {
// 如果提供商没有配置 notSupportedModels则认为它支持所有模型
if (!p.config.notSupportedModels || !Array.isArray(p.config.notSupportedModels)) {
return true;
}
// 检查 notSupportedModels 数组中是否包含请求的模型,如果包含则排除
return !p.config.notSupportedModels.includes(requestedModel);
});
if (modelFilteredProviders.length === 0) {
this._log('warn', `No available providers for type: ${providerType} that support model: ${requestedModel}`);
return null;
}
availableAndHealthyProviders = modelFilteredProviders;
this._log('debug', `Filtered ${modelFilteredProviders.length} providers supporting model: ${requestedModel}`);
}
if (availableAndHealthyProviders.length === 0) {
this._log('warn', `No available and healthy providers for type: ${providerType}`);
return null;
}
// 为每个提供商类型和模型组合维护独立的轮询索引
// 使用组合键providerType 或 providerType:model
const indexKey = requestedModel ? `${providerType}:${requestedModel}` : providerType;
const currentIndex = this.roundRobinIndex[indexKey] || 0;
// 使用取模确保索引始终在有效范围内,即使列表长度变化
const providerIndex = currentIndex % availableAndHealthyProviders.length;
const selected = availableAndHealthyProviders[providerIndex];
// 更新下次轮询的索引
this.roundRobinIndex[indexKey] = (currentIndex + 1) % availableAndHealthyProviders.length;
// 更新使用信息
selected.config.lastUsed = new Date().toISOString();
selected.config.usageCount++;
this._log('debug', `Selected provider for ${providerType} (round-robin): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}`);
// 使用防抖保存
this._debouncedSave(providerType);
return selected.config;
}
/**
* Marks a provider as unhealthy (e.g., after an API error).
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to mark.
*/
markProviderUnhealthy(providerType, providerConfig) {
if (!providerConfig?.uuid) {
this._log('error', 'Invalid providerConfig in markProviderUnhealthy');
return;
}
const provider = this._findProvider(providerType, providerConfig.uuid);
if (provider) {
provider.config.errorCount++;
provider.config.lastErrorTime = new Date().toISOString();
if (provider.config.errorCount >= this.maxErrorCount) {
provider.config.isHealthy = false;
this._log('warn', `Marked provider as unhealthy: ${providerConfig.uuid} for type ${providerType}. Total errors: ${provider.config.errorCount}`);
} else {
this._log('warn', `Provider ${providerConfig.uuid} for type ${providerType} error count: ${provider.config.errorCount}/${this.maxErrorCount}. Still healthy.`);
}
this._debouncedSave(providerType);
}
}
/**
* Marks a provider as healthy.
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to mark.
* @param {boolean} isInit - Whether to reset usage count (optional, default: false).
*/
markProviderHealthy(providerType, providerConfig, resetUsageCount = false) {
if (!providerConfig?.uuid) {
this._log('error', 'Invalid providerConfig in markProviderHealthy');
return;
}
const provider = this._findProvider(providerType, providerConfig.uuid);
if (provider) {
provider.config.isHealthy = true;
provider.config.errorCount = 0;
provider.config.lastErrorTime = null;
// 只有在明确要求重置使用计数时才重置
if (resetUsageCount) {
provider.config.usageCount = 0;
}
this._log('info', `Marked provider as healthy: ${provider.config.uuid} for type ${providerType}${resetUsageCount ? ' (usage count reset)' : ''}`);
this._debouncedSave(providerType);
}
}
/**
* 重置提供商的计数器(错误计数和使用计数)
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to mark.
*/
resetProviderCounters(providerType, providerConfig) {
if (!providerConfig?.uuid) {
this._log('error', 'Invalid providerConfig in resetProviderCounters');
return;
}
const provider = this._findProvider(providerType, providerConfig.uuid);
if (provider) {
provider.config.errorCount = 0;
provider.config.usageCount = 0;
this._log('info', `Reset provider counters: ${provider.config.uuid} for type ${providerType}`);
this._debouncedSave(providerType);
}
}
/**
* 禁用指定提供商
* @param {string} providerType - 提供商类型
* @param {object} providerConfig - 提供商配置
*/
disableProvider(providerType, providerConfig) {
if (!providerConfig?.uuid) {
this._log('error', 'Invalid providerConfig in disableProvider');
return;
}
const provider = this._findProvider(providerType, providerConfig.uuid);
if (provider) {
provider.config.isDisabled = true;
this._log('info', `Disabled provider: ${providerConfig.uuid} for type ${providerType}`);
this._debouncedSave(providerType);
}
}
/**
* 启用指定提供商
* @param {string} providerType - 提供商类型
* @param {object} providerConfig - 提供商配置
*/
enableProvider(providerType, providerConfig) {
if (!providerConfig?.uuid) {
this._log('error', 'Invalid providerConfig in enableProvider');
return;
}
const provider = this._findProvider(providerType, providerConfig.uuid);
if (provider) {
provider.config.isDisabled = false;
this._log('info', `Enabled provider: ${providerConfig.uuid} for type ${providerType}`);
this._debouncedSave(providerType);
}
}
/**
* Performs health checks on all providers in the pool.
* This method would typically be called periodically (e.g., via cron job).
*/
async performHealthChecks(isInit = false) {
this._log('info', 'Performing health checks on all providers...');
const now = new Date();
for (const providerType in this.providerStatus) {
for (const providerStatus of this.providerStatus[providerType]) {
const providerConfig = providerStatus.config;
// Only attempt to health check unhealthy providers after a certain interval
if (!providerStatus.config.isHealthy && providerStatus.config.lastErrorTime &&
(now.getTime() - new Date(providerStatus.config.lastErrorTime).getTime() < this.healthCheckInterval)) {
this._log('debug', `Skipping health check for ${providerConfig.uuid} (${providerType}). Last error too recent.`);
continue;
}
try {
// Perform actual health check based on provider type
const isHealthy = await this._checkProviderHealth(providerType, providerConfig);
if (isHealthy === null) {
this._log('debug', `Health check for ${providerConfig.uuid} (${providerType}) skipped: Check not implemented.`);
this.resetProviderCounters(providerType, providerConfig);
continue;
}
if (isHealthy) {
if (!providerStatus.config.isHealthy) {
// Provider was unhealthy but is now healthy
// 恢复健康时不重置使用计数,保持原有值
this.markProviderHealthy(providerType, providerConfig, true);
this._log('info', `Health check for ${providerConfig.uuid} (${providerType}): Marked Healthy (actual check)`);
} else {
// Provider was already healthy and still is
// 只在初始化时重置使用计数
this.markProviderHealthy(providerType, providerConfig, true);
this._log('debug', `Health check for ${providerConfig.uuid} (${providerType}): Still Healthy`);
}
} else {
// Provider is not healthy
this._log('warn', `Health check for ${providerConfig.uuid} (${providerType}) failed: Provider is not responding correctly.`);
this.markProviderUnhealthy(providerType, providerConfig);
}
} catch (error) {
this._log('error', `Health check for ${providerConfig.uuid} (${providerType}) failed: ${error.message}`);
// If a health check fails, mark it unhealthy, which will update error count and lastErrorTime
this.markProviderUnhealthy(providerType, providerConfig);
}
}
}
}
/**
* 构建健康检查请求
* @private
*/
_buildHealthCheckRequest(providerType, modelName) {
const baseMessage = { role: 'user', content: 'Hello, are you ok?' };
// Gemini 使用不同的请求格式
if (providerType === 'gemini-cli') {
return {
contents: [{
role: 'user',
parts: [{ text: baseMessage.content }]
}]
};
}
// OpenAI Custom Responses 使用特殊格式
if (providerType === 'openai-custom-responses') {
return {
input: [baseMessage],
model: modelName
};
}
// 其他提供商OpenAI、Claude、Kiro、Qwen使用标准格式
return {
messages: [baseMessage],
model: modelName
};
}
/**
* Performs an actual health check for a specific provider.
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to check.
* @returns {Promise<boolean|null>} - True if healthy, false if unhealthy, null if check not implemented.
*/
async _checkProviderHealth(providerType, providerConfig) {
try {
// 如果未启用健康检查,返回 null
if (!providerConfig.checkHealth) {
return null;
}
// 合并全局配置和 provider 配置(简化代理配置)
const proxyKeys = ['GEMINI', 'OPENAI', 'CLAUDE', 'QWEN', 'KIRO'];
const tempConfig = {
...providerConfig,
MODEL_PROVIDER: providerType
};
// 动态添加代理配置
proxyKeys.forEach(key => {
const proxyKey = `USE_SYSTEM_PROXY_${key}`;
if (this.globalConfig[proxyKey] !== undefined) {
tempConfig[proxyKey] = this.globalConfig[proxyKey];
}
});
const serviceAdapter = getServiceAdapter(tempConfig);
// 确定健康检查使用的模型名称
const modelName = providerConfig.checkModelName ||
ProviderPoolManager.DEFAULT_HEALTH_CHECK_MODELS[providerType];
if (!modelName) {
this._log('warn', `Unknown provider type for health check: ${providerType}`);
return false;
}
// 构建健康检查请求
const healthCheckRequest = this._buildHealthCheckRequest(providerType, modelName);
this._log('debug', `Health check request for ${modelName}: ${JSON.stringify(healthCheckRequest)}`);
await serviceAdapter.generateContent(modelName, healthCheckRequest);
return true;
} catch (error) {
this._log('error', `Health check failed for ${providerType}: ${error.message}`);
return false;
}
}
/**
* 优化1: 添加防抖保存方法
* 延迟保存操作,避免频繁的文件 I/O
* @private
*/
_debouncedSave(providerType) {
// 将待保存的 providerType 添加到集合中
this.pendingSaves.add(providerType);
// 清除之前的定时器
if (this.saveTimer) {
clearTimeout(this.saveTimer);
}
// 设置新的定时器
this.saveTimer = setTimeout(() => {
this._flushPendingSaves();
}, this.saveDebounceTime);
}
/**
* 批量保存所有待保存的 providerType优化为单次文件写入
* @private
*/
async _flushPendingSaves() {
const typesToSave = Array.from(this.pendingSaves);
if (typesToSave.length === 0) return;
this.pendingSaves.clear();
this.saveTimer = null;
try {
const filePath = this.globalConfig.PROVIDER_POOLS_FILE_PATH || 'provider_pools.json';
let currentPools = {};
// 一次性读取文件
try {
const fileContent = await fs.promises.readFile(filePath, 'utf8');
currentPools = JSON.parse(fileContent);
} catch (readError) {
if (readError.code === 'ENOENT') {
this._log('info', 'provider_pools.json does not exist, creating new file.');
} else {
throw readError;
}
}
// 更新所有待保存的 providerType
for (const providerType of typesToSave) {
if (this.providerStatus[providerType]) {
currentPools[providerType] = this.providerStatus[providerType].map(p => {
// Convert Date objects to ISOString if they exist
const config = { ...p.config };
if (config.lastUsed instanceof Date) {
config.lastUsed = config.lastUsed.toISOString();
}
if (config.lastErrorTime instanceof Date) {
config.lastErrorTime = config.lastErrorTime.toISOString();
}
return config;
});
} else {
this._log('warn', `Attempted to save unknown providerType: ${providerType}`);
}
}
// 一次性写入文件
await fs.promises.writeFile(filePath, JSON.stringify(currentPools, null, 2), 'utf8');
this._log('info', `provider_pools.json updated successfully for types: ${typesToSave.join(', ')}`);
} catch (error) {
this._log('error', `Failed to write provider_pools.json: ${error.message}`);
}
}
}