diff --git a/src/providers/claude/claude-kiro.js b/src/providers/claude/claude-kiro.js index 1c966c8..edb3f89 100644 --- a/src/providers/claude/claude-kiro.js +++ b/src/providers/claude/claude-kiro.js @@ -65,6 +65,22 @@ const KIRO_AUTH_TOKEN_FILE = "kiro-auth-token.json"; // 这解决了文件锁导致的并发请求串行化问题 const tokenRefreshPromises = new Map(); +/** + * 自定义凭证错误类 + * 用于标识需要切换凭证的错误 + */ +class CredentialError extends Error { + constructor(message, options = {}) { + super(message); + this.name = 'CredentialError'; + this.shouldSwitchCredential = options.shouldSwitchCredential ?? false; + this.skipErrorCount = options.skipErrorCount ?? false; + this.credentialMarkedUnhealthy = options.credentialMarkedUnhealthy ?? false; + this.statusCode = options.statusCode; + this.originalError = options.originalError; + } +} + /** * Kiro API Service - Node.js implementation based on the Python ki2api * Provides OpenAI-compatible API for Claude Sonnet 4 via Kiro/CodeWhisperer @@ -1353,27 +1369,39 @@ async initializeAuth(forceRefresh = false) { } } + // Handle 402 (Payment Required / Quota Exceeded) - verify usage and mark as unhealthy with recovery time + if (status === 402) { + await this._handle402Error(error, 'callApi'); + } + // Handle 403 (Forbidden) - mark as unhealthy immediately, no retry if (status === 403) { console.log('[Kiro] Received 403. Marking credential as unhealthy...'); this._markCredentialUnhealthy('403 Forbidden', error); + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; throw error; } - // Handle 429 (Too Many Requests) with exponential backoff - if (status === 429 && retryCount < maxRetries) { - const delay = baseDelay * Math.pow(2, retryCount); - console.log(`[Kiro] Received 429 (Too Many Requests). Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`); - await new Promise(resolve => setTimeout(resolve, delay)); - return this.callApi(method, model, body, isRetry, retryCount + 1); + // Handle 429 (Too Many Requests) - wait baseDelay then switch credential + if (status === 429) { + console.log(`[Kiro] Received 429 (Too Many Requests). Waiting ${baseDelay}ms before switching credential...`); + await new Promise(resolve => setTimeout(resolve, baseDelay)); + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; + throw error; } - // Handle other retryable errors (5xx server errors) - if (status >= 500 && status < 600 && retryCount < maxRetries) { - const delay = baseDelay * Math.pow(2, retryCount); - console.log(`[Kiro] Received ${status} server error. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`); - await new Promise(resolve => setTimeout(resolve, delay)); - return this.callApi(method, model, body, isRetry, retryCount + 1); + // Handle 5xx server errors - wait baseDelay then switch credential + if (status >= 500 && status < 600) { + console.log(`[Kiro] Received ${status} server error. Waiting ${baseDelay}ms before switching credential...`); + await new Promise(resolve => setTimeout(resolve, baseDelay)); + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; + throw error; } // Handle network errors (ECONNRESET, ETIMEDOUT, etc.) with exponential backoff @@ -1434,6 +1462,79 @@ async initializeAuth(forceRefresh = false) { } } + /** + * Helper method to mark the current credential as unhealthy with a scheduled recovery time + * Used for quota exhaustion (402) where quota resets at a specific time (e.g., 1st of next month) + * @param {string} reason - The reason for marking unhealthy + * @param {Error} [error] - Optional error object to attach the marker to + * @param {Date} [recoveryTime] - The time when the credential should be marked healthy again + * @returns {boolean} - Whether the credential was successfully marked as unhealthy + * @private + */ + _markCredentialUnhealthyWithRecovery(reason, error = null, recoveryTime = null) { + const poolManager = getProviderPoolManager(); + if (poolManager && this.uuid) { + console.log(`[Kiro] Marking credential ${this.uuid} as unhealthy with recovery time. Reason: ${reason}, Recovery: ${recoveryTime?.toISOString()}`); + poolManager.markProviderUnhealthyWithRecoveryTime(MODEL_PROVIDER.KIRO_API, { + uuid: this.uuid + }, reason, recoveryTime); + // Attach marker to error object to prevent duplicate marking in upper layers + if (error) { + error.credentialMarkedUnhealthy = true; + } + return true; + } else { + console.warn(`[Kiro] Cannot mark credential as unhealthy: poolManager=${!!poolManager}, uuid=${this.uuid}`); + return false; + } + } + + /** + * 计算下月1日 00:00:00 UTC 时间 + * @returns {Date} 下月1日的 Date 对象 + * @private + */ + _getNextMonthFirstDay() { + const now = new Date(); + return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() + 1, 1, 0, 0, 0, 0)); + } + + /** + * 处理 402 错误(配额耗尽) + * 验证用量限制并标记凭证为不健康,设置恢复时间为下月1日 + * @param {Error} error - 原始错误对象 + * @param {string} context - 错误发生的上下文(如 'callApi', 'stream') + * @throws {Error} 抛出带有切换凭证标记的错误 + * @private + */ + async _handle402Error(error, context = 'unknown') { + console.log(`[Kiro] Received 402 (Quota Exceeded) in ${context}. Verifying usage limits...`); + try { + // Verify usage limits to confirm quota exhaustion + const usageLimits = await this.getUsageLimits(); + const isQuotaExhausted = usageLimits?.usedCount >= usageLimits?.limitCount; + + if (isQuotaExhausted) { + console.log(`[Kiro] Quota confirmed exhausted: ${usageLimits?.usedCount}/${usageLimits?.limitCount}`); + // Calculate recovery time: 1st day of next month at 00:00:00 UTC + const nextMonth = this._getNextMonthFirstDay(); + this._markCredentialUnhealthyWithRecovery('402 Payment Required - Quota Exhausted', error, nextMonth); + } else { + console.log(`[Kiro] Quota not exhausted (${usageLimits?.usedCount}/${usageLimits?.limitCount}), but received 402. Marking unhealthy anyway.`); + this._markCredentialUnhealthy('402 Payment Required - Unexpected', error); + } + } catch (usageError) { + console.warn('[Kiro] Failed to verify usage limits:', usageError.message); + // If we can't verify, still mark as unhealthy with recovery time + const nextMonth = this._getNextMonthFirstDay(); + this._markCredentialUnhealthyWithRecovery('402 Payment Required - Quota Exceeded (unverified)', error, nextMonth); + } + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; + throw error; + } + _processApiResponse(response) { const rawResponseText = Buffer.isBuffer(response.data) ? response.data.toString('utf8') : String(response.data); //console.log(`[Kiro] Raw response length: ${rawResponseText.length}`); @@ -1748,28 +1849,39 @@ async initializeAuth(forceRefresh = false) { } } + // Handle 402 (Payment Required / Quota Exceeded) - verify usage and mark as unhealthy with recovery time + if (status === 402) { + await this._handle402Error(error, 'stream'); + } + // Handle 403 (Forbidden) - mark as unhealthy immediately, no retry if (status === 403) { console.log('[Kiro] Received 403 in stream. Marking credential as unhealthy...'); this._markCredentialUnhealthy('403 Forbidden', error); + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; throw error; } - if (status === 429 && retryCount < maxRetries) { - const delay = baseDelay * Math.pow(2, retryCount); - console.log(`[Kiro] Received 429 in stream. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`); - await new Promise(resolve => setTimeout(resolve, delay)); - yield* this.streamApiReal(method, model, body, isRetry, retryCount + 1); - return; + // Handle 429 (Too Many Requests) - wait baseDelay then switch credential + if (status === 429) { + console.log(`[Kiro] Received 429 (Too Many Requests) in stream. Waiting ${baseDelay}ms before switching credential...`); + await new Promise(resolve => setTimeout(resolve, baseDelay)); + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; + throw error; } - // Handle 5xx server errors with exponential backoff - if (status >= 500 && status < 600 && retryCount < maxRetries) { - const delay = baseDelay * Math.pow(2, retryCount); - console.log(`[Kiro] Received ${status} server error in stream. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`); - await new Promise(resolve => setTimeout(resolve, delay)); - yield* this.streamApiReal(method, model, body, isRetry, retryCount + 1); - return; + // Handle 5xx server errors - wait baseDelay then switch credential + if (status >= 500 && status < 600) { + console.log(`[Kiro] Received ${status} server error in stream. Waiting ${baseDelay}ms before switching credential...`); + await new Promise(resolve => setTimeout(resolve, baseDelay)); + // Mark error for credential switch without recording error count + error.shouldSwitchCredential = true; + error.skipErrorCount = true; + throw error; } // Handle network errors (ECONNRESET, ETIMEDOUT, etc.) with exponential backoff diff --git a/src/providers/provider-pool-manager.js b/src/providers/provider-pool-manager.js index ec63455..0db6022 100644 --- a/src/providers/provider-pool-manager.js +++ b/src/providers/provider-pool-manager.js @@ -123,6 +123,10 @@ export class ProviderPoolManager { } const availableProviders = this.providerStatus[providerType] || []; + + // 检查并恢复已到恢复时间的提供商 + this._checkAndRecoverScheduledProviders(providerType); + let availableAndHealthyProviders = availableProviders.filter(p => p.config.isHealthy && !p.config.isDisabled ); @@ -458,6 +462,44 @@ export class ProviderPoolManager { } } + /** + * Marks a provider as unhealthy with a scheduled recovery time. + * Used for quota exhaustion errors (402) where the quota will reset at a specific time. + * @param {string} providerType - The type of the provider. + * @param {object} providerConfig - The configuration of the provider to mark. + * @param {string} [errorMessage] - Optional error message to store. + * @param {Date|string} [recoveryTime] - Optional recovery time when the provider should be marked healthy again. + */ + markProviderUnhealthyWithRecoveryTime(providerType, providerConfig, errorMessage = null, recoveryTime = null) { + if (!providerConfig?.uuid) { + this._log('error', 'Invalid providerConfig in markProviderUnhealthyWithRecoveryTime'); + return; + } + + const provider = this._findProvider(providerType, providerConfig.uuid); + if (provider) { + provider.config.isHealthy = false; + provider.config.errorCount = this.maxErrorCount; // Set to max to indicate definitive failure + provider.config.lastErrorTime = new Date().toISOString(); + provider.config.lastUsed = new Date().toISOString(); + + if (errorMessage) { + provider.config.lastErrorMessage = errorMessage; + } + + // Set recovery time if provided + if (recoveryTime) { + const recoveryDate = recoveryTime instanceof Date ? recoveryTime : new Date(recoveryTime); + provider.config.scheduledRecoveryTime = recoveryDate.toISOString(); + this._log('warn', `Marked provider as unhealthy with recovery time: ${providerConfig.uuid} for type ${providerType}. Recovery at: ${recoveryDate.toISOString()}. Reason: ${errorMessage || 'Quota exhausted'}`); + } else { + this._log('warn', `Marked provider as unhealthy: ${providerConfig.uuid} for type ${providerType}. Reason: ${errorMessage || 'Quota exhausted'}`); + } + + this._debouncedSave(providerType); + } + } + /** * Marks a provider as healthy. * @param {string} providerType - The type of the provider. @@ -602,6 +644,41 @@ export class ProviderPoolManager { return null; } + /** + * 检查并恢复已到恢复时间的提供商 + * @param {string} [providerType] - 可选,指定要检查的提供商类型。如果不提供,检查所有类型 + * @private + */ + _checkAndRecoverScheduledProviders(providerType = null) { + const now = new Date(); + const typesToCheck = providerType ? [providerType] : Object.keys(this.providerStatus); + + for (const type of typesToCheck) { + const providers = this.providerStatus[type] || []; + for (const providerStatus of providers) { + const config = providerStatus.config; + + // 检查是否有 scheduledRecoveryTime 且已到恢复时间 + if (config.scheduledRecoveryTime && !config.isHealthy) { + const recoveryTime = new Date(config.scheduledRecoveryTime); + if (now >= recoveryTime) { + this._log('info', `Auto-recovering provider ${config.uuid} (${type}). Scheduled recovery time reached: ${recoveryTime.toISOString()}`); + + // 恢复健康状态 + config.isHealthy = true; + config.errorCount = 0; + config.lastErrorTime = null; + config.lastErrorMessage = null; + config.scheduledRecoveryTime = null; // 清除恢复时间 + + // 保存更改 + this._debouncedSave(type); + } + } + } + } + } + /** * Performs health checks on all providers in the pool. * This method would typically be called periodically (e.g., via cron job). @@ -610,10 +687,22 @@ export class ProviderPoolManager { this._log('info', 'Performing health checks on all providers...'); const now = new Date(); + // 首先检查并恢复已到恢复时间的提供商 + this._checkAndRecoverScheduledProviders(); + for (const providerType in this.providerStatus) { for (const providerStatus of this.providerStatus[providerType]) { const providerConfig = providerStatus.config; + // 如果提供商有 scheduledRecoveryTime 且未到恢复时间,跳过健康检查 + if (providerConfig.scheduledRecoveryTime && !providerConfig.isHealthy) { + const recoveryTime = new Date(providerConfig.scheduledRecoveryTime); + if (now < recoveryTime) { + this._log('debug', `Skipping health check for ${providerConfig.uuid} (${providerType}). Waiting for scheduled recovery at ${recoveryTime.toISOString()}`); + continue; + } + } + // Only attempt to health check unhealthy providers after a certain interval if (!providerStatus.config.isHealthy && providerStatus.config.lastErrorTime && (now.getTime() - new Date(providerStatus.config.lastErrorTime).getTime() < this.healthCheckInterval)) { diff --git a/src/utils/common.js b/src/utils/common.js index d3acc92..80e37ef 100644 --- a/src/utils/common.js +++ b/src/utils/common.js @@ -224,7 +224,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from let responseClosed = false; // 重试上下文:包含 CONFIG 和重试计数 - const maxRetries = retryContext?.maxRetries ?? 2; + // maxRetries: 凭证切换最大次数(跨凭证),默认 5 次 + const maxRetries = retryContext?.maxRetries ?? 5; const currentRetry = retryContext?.currentRetry ?? 0; const CONFIG = retryContext?.CONFIG; const isRetry = currentRetry > 0; @@ -307,11 +308,16 @@ export async function handleStreamRequest(res, service, model, requestBody, from // 获取状态码(用于日志记录,不再用于判断是否重试) const status = error.response?.status; + // 检查是否应该跳过错误计数(用于 429/5xx 等需要直接切换凭证的情况) + const skipErrorCount = error.skipErrorCount === true; + // 检查是否应该切换凭证(用于 429/5xx/402/403 等情况) + const shouldSwitchCredential = error.shouldSwitchCredential === true; + // 检查凭证是否已在底层被标记为不健康(避免重复标记) let credentialMarkedUnhealthy = error.credentialMarkedUnhealthy === true; - // 如果底层未标记,则在此处标记 - if (!credentialMarkedUnhealthy && providerPoolManager && pooluuid) { + // 如果底层未标记,且不跳过错误计数,则在此处标记 + if (!credentialMarkedUnhealthy && !skipErrorCount && providerPoolManager && pooluuid) { console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to stream error (status: ${status || 'unknown'})`); // 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康 providerPoolManager.markProviderUnhealthy(toProvider, { @@ -320,6 +326,13 @@ export async function handleStreamRequest(res, service, model, requestBody, from credentialMarkedUnhealthy = true; } else if (credentialMarkedUnhealthy) { console.log(`[Provider Pool] Credential ${pooluuid} already marked as unhealthy by lower layer, skipping duplicate marking`); + } else if (skipErrorCount) { + console.log(`[Provider Pool] Skipping error count for ${toProvider} (${pooluuid}) - will switch credential without marking unhealthy`); + } + + // 如果需要切换凭证(无论是否标记不健康),都设置标记以触发重试 + if (shouldSwitchCredential && !credentialMarkedUnhealthy) { + credentialMarkedUnhealthy = true; // 触发下面的重试逻辑 } // 凭证已被标记为不健康后,尝试切换到新凭证重试 @@ -386,7 +399,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from export async function handleUnaryRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid, customName, retryContext = null) { // 重试上下文:包含 CONFIG 和重试计数 - const maxRetries = retryContext?.maxRetries ?? 2; + // maxRetries: 凭证切换最大次数(跨凭证),默认 5 次 + const maxRetries = retryContext?.maxRetries ?? 5; const currentRetry = retryContext?.currentRetry ?? 0; const CONFIG = retryContext?.CONFIG; @@ -424,11 +438,16 @@ export async function handleUnaryRequest(res, service, model, requestBody, fromP // 获取状态码(用于日志记录,不再用于判断是否重试) const status = error.response?.status; + // 检查是否应该跳过错误计数(用于 429/5xx 等需要直接切换凭证的情况) + const skipErrorCount = error.skipErrorCount === true; + // 检查是否应该切换凭证(用于 429/5xx/402/403 等情况) + const shouldSwitchCredential = error.shouldSwitchCredential === true; + // 检查凭证是否已在底层被标记为不健康(避免重复标记) let credentialMarkedUnhealthy = error.credentialMarkedUnhealthy === true; - // 如果底层未标记,则在此处标记 - if (!credentialMarkedUnhealthy && providerPoolManager && pooluuid) { + // 如果底层未标记,且不跳过错误计数,则在此处标记 + if (!credentialMarkedUnhealthy && !skipErrorCount && providerPoolManager && pooluuid) { console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to unary error (status: ${status || 'unknown'})`); // 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康 providerPoolManager.markProviderUnhealthy(toProvider, { @@ -437,6 +456,13 @@ export async function handleUnaryRequest(res, service, model, requestBody, fromP credentialMarkedUnhealthy = true; } else if (credentialMarkedUnhealthy) { console.log(`[Provider Pool] Credential ${pooluuid} already marked as unhealthy by lower layer, skipping duplicate marking`); + } else if (skipErrorCount) { + console.log(`[Provider Pool] Skipping error count for ${toProvider} (${pooluuid}) - will switch credential without marking unhealthy`); + } + + // 如果需要切换凭证(无论是否标记不健康),都设置标记以触发重试 + if (shouldSwitchCredential && !credentialMarkedUnhealthy) { + credentialMarkedUnhealthy = true; // 触发下面的重试逻辑 } // 凭证已被标记为不健康后,尝试切换到新凭证重试