Merge pull request #241 from leonaii/main

feat(kiro): 改进错误处理逻辑,添加配额耗尽自动恢复机制
This commit is contained in:
何夕2077 2026-01-14 20:29:20 +08:00 committed by GitHub
commit 33656869a5
3 changed files with 258 additions and 31 deletions

View file

@ -61,6 +61,22 @@ const MODEL_MAPPING = Object.fromEntries(
const KIRO_AUTH_TOKEN_FILE = "kiro-auth-token.json";
/**
* 自定义凭证错误类
* 用于标识需要切换凭证的错误
*/
class CredentialError extends Error {
constructor(message, options = {}) {
super(message);
this.name = 'CredentialError';
this.shouldSwitchCredential = options.shouldSwitchCredential ?? false;
this.skipErrorCount = options.skipErrorCount ?? false;
this.credentialMarkedUnhealthy = options.credentialMarkedUnhealthy ?? false;
this.statusCode = options.statusCode;
this.originalError = options.originalError;
}
}
/**
* Kiro API Service - Node.js implementation based on the Python ki2api
* Provides OpenAI-compatible API for Claude Sonnet 4 via Kiro/CodeWhisperer
@ -1338,27 +1354,39 @@ async initializeAuth(forceRefresh = false) {
}
}
// Handle 402 (Payment Required / Quota Exceeded) - verify usage and mark as unhealthy with recovery time
if (status === 402) {
await this._handle402Error(error, 'callApi');
}
// Handle 403 (Forbidden) - mark as unhealthy immediately, no retry
if (status === 403) {
console.log('[Kiro] Received 403. Marking credential as unhealthy...');
this._markCredentialUnhealthy('403 Forbidden', error);
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
// Handle 429 (Too Many Requests) with exponential backoff
if (status === 429 && retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`[Kiro] Received 429 (Too Many Requests). Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
return this.callApi(method, model, body, isRetry, retryCount + 1);
// Handle 429 (Too Many Requests) - wait baseDelay then switch credential
if (status === 429) {
console.log(`[Kiro] Received 429 (Too Many Requests). Waiting ${baseDelay}ms before switching credential...`);
await new Promise(resolve => setTimeout(resolve, baseDelay));
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
// Handle other retryable errors (5xx server errors)
if (status >= 500 && status < 600 && retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`[Kiro] Received ${status} server error. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
return this.callApi(method, model, body, isRetry, retryCount + 1);
// Handle 5xx server errors - wait baseDelay then switch credential
if (status >= 500 && status < 600) {
console.log(`[Kiro] Received ${status} server error. Waiting ${baseDelay}ms before switching credential...`);
await new Promise(resolve => setTimeout(resolve, baseDelay));
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
// Handle network errors (ECONNRESET, ETIMEDOUT, etc.) with exponential backoff
@ -1419,6 +1447,79 @@ async initializeAuth(forceRefresh = false) {
}
}
/**
* Helper method to mark the current credential as unhealthy with a scheduled recovery time
* Used for quota exhaustion (402) where quota resets at a specific time (e.g., 1st of next month)
* @param {string} reason - The reason for marking unhealthy
* @param {Error} [error] - Optional error object to attach the marker to
* @param {Date} [recoveryTime] - The time when the credential should be marked healthy again
* @returns {boolean} - Whether the credential was successfully marked as unhealthy
* @private
*/
_markCredentialUnhealthyWithRecovery(reason, error = null, recoveryTime = null) {
const poolManager = getProviderPoolManager();
if (poolManager && this.uuid) {
console.log(`[Kiro] Marking credential ${this.uuid} as unhealthy with recovery time. Reason: ${reason}, Recovery: ${recoveryTime?.toISOString()}`);
poolManager.markProviderUnhealthyWithRecoveryTime(MODEL_PROVIDER.KIRO_API, {
uuid: this.uuid
}, reason, recoveryTime);
// Attach marker to error object to prevent duplicate marking in upper layers
if (error) {
error.credentialMarkedUnhealthy = true;
}
return true;
} else {
console.warn(`[Kiro] Cannot mark credential as unhealthy: poolManager=${!!poolManager}, uuid=${this.uuid}`);
return false;
}
}
/**
* 计算下月1日 00:00:00 UTC 时间
* @returns {Date} 下月1日的 Date 对象
* @private
*/
_getNextMonthFirstDay() {
const now = new Date();
return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() + 1, 1, 0, 0, 0, 0));
}
/**
* 处理 402 错误配额耗尽
* 验证用量限制并标记凭证为不健康设置恢复时间为下月1日
* @param {Error} error - 原始错误对象
* @param {string} context - 错误发生的上下文 'callApi', 'stream'
* @throws {Error} 抛出带有切换凭证标记的错误
* @private
*/
async _handle402Error(error, context = 'unknown') {
console.log(`[Kiro] Received 402 (Quota Exceeded) in ${context}. Verifying usage limits...`);
try {
// Verify usage limits to confirm quota exhaustion
const usageLimits = await this.getUsageLimits();
const isQuotaExhausted = usageLimits?.usedCount >= usageLimits?.limitCount;
if (isQuotaExhausted) {
console.log(`[Kiro] Quota confirmed exhausted: ${usageLimits?.usedCount}/${usageLimits?.limitCount}`);
// Calculate recovery time: 1st day of next month at 00:00:00 UTC
const nextMonth = this._getNextMonthFirstDay();
this._markCredentialUnhealthyWithRecovery('402 Payment Required - Quota Exhausted', error, nextMonth);
} else {
console.log(`[Kiro] Quota not exhausted (${usageLimits?.usedCount}/${usageLimits?.limitCount}), but received 402. Marking unhealthy anyway.`);
this._markCredentialUnhealthy('402 Payment Required - Unexpected', error);
}
} catch (usageError) {
console.warn('[Kiro] Failed to verify usage limits:', usageError.message);
// If we can't verify, still mark as unhealthy with recovery time
const nextMonth = this._getNextMonthFirstDay();
this._markCredentialUnhealthyWithRecovery('402 Payment Required - Quota Exceeded (unverified)', error, nextMonth);
}
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
_processApiResponse(response) {
const rawResponseText = Buffer.isBuffer(response.data) ? response.data.toString('utf8') : String(response.data);
//console.log(`[Kiro] Raw response length: ${rawResponseText.length}`);
@ -1733,28 +1834,39 @@ async initializeAuth(forceRefresh = false) {
}
}
// Handle 402 (Payment Required / Quota Exceeded) - verify usage and mark as unhealthy with recovery time
if (status === 402) {
await this._handle402Error(error, 'stream');
}
// Handle 403 (Forbidden) - mark as unhealthy immediately, no retry
if (status === 403) {
console.log('[Kiro] Received 403 in stream. Marking credential as unhealthy...');
this._markCredentialUnhealthy('403 Forbidden', error);
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
if (status === 429 && retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`[Kiro] Received 429 in stream. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
yield* this.streamApiReal(method, model, body, isRetry, retryCount + 1);
return;
// Handle 429 (Too Many Requests) - wait baseDelay then switch credential
if (status === 429) {
console.log(`[Kiro] Received 429 (Too Many Requests) in stream. Waiting ${baseDelay}ms before switching credential...`);
await new Promise(resolve => setTimeout(resolve, baseDelay));
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
// Handle 5xx server errors with exponential backoff
if (status >= 500 && status < 600 && retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`[Kiro] Received ${status} server error in stream. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
yield* this.streamApiReal(method, model, body, isRetry, retryCount + 1);
return;
// Handle 5xx server errors - wait baseDelay then switch credential
if (status >= 500 && status < 600) {
console.log(`[Kiro] Received ${status} server error in stream. Waiting ${baseDelay}ms before switching credential...`);
await new Promise(resolve => setTimeout(resolve, baseDelay));
// Mark error for credential switch without recording error count
error.shouldSwitchCredential = true;
error.skipErrorCount = true;
throw error;
}
// Handle network errors (ECONNRESET, ETIMEDOUT, etc.) with exponential backoff

View file

@ -151,6 +151,10 @@ export class ProviderPoolManager {
*/
_doSelectProvider(providerType, requestedModel, options) {
const availableProviders = this.providerStatus[providerType] || [];
// 检查并恢复已到恢复时间的提供商
this._checkAndRecoverScheduledProviders(providerType);
let availableAndHealthyProviders = availableProviders.filter(p =>
p.config.isHealthy && !p.config.isDisabled
);
@ -499,6 +503,44 @@ export class ProviderPoolManager {
}
}
/**
* Marks a provider as unhealthy with a scheduled recovery time.
* Used for quota exhaustion errors (402) where the quota will reset at a specific time.
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to mark.
* @param {string} [errorMessage] - Optional error message to store.
* @param {Date|string} [recoveryTime] - Optional recovery time when the provider should be marked healthy again.
*/
markProviderUnhealthyWithRecoveryTime(providerType, providerConfig, errorMessage = null, recoveryTime = null) {
if (!providerConfig?.uuid) {
this._log('error', 'Invalid providerConfig in markProviderUnhealthyWithRecoveryTime');
return;
}
const provider = this._findProvider(providerType, providerConfig.uuid);
if (provider) {
provider.config.isHealthy = false;
provider.config.errorCount = this.maxErrorCount; // Set to max to indicate definitive failure
provider.config.lastErrorTime = new Date().toISOString();
provider.config.lastUsed = new Date().toISOString();
if (errorMessage) {
provider.config.lastErrorMessage = errorMessage;
}
// Set recovery time if provided
if (recoveryTime) {
const recoveryDate = recoveryTime instanceof Date ? recoveryTime : new Date(recoveryTime);
provider.config.scheduledRecoveryTime = recoveryDate.toISOString();
this._log('warn', `Marked provider as unhealthy with recovery time: ${providerConfig.uuid} for type ${providerType}. Recovery at: ${recoveryDate.toISOString()}. Reason: ${errorMessage || 'Quota exhausted'}`);
} else {
this._log('warn', `Marked provider as unhealthy: ${providerConfig.uuid} for type ${providerType}. Reason: ${errorMessage || 'Quota exhausted'}`);
}
this._debouncedSave(providerType);
}
}
/**
* Marks a provider as healthy.
* @param {string} providerType - The type of the provider.
@ -643,6 +685,41 @@ export class ProviderPoolManager {
return null;
}
/**
* 检查并恢复已到恢复时间的提供商
* @param {string} [providerType] - 可选指定要检查的提供商类型如果不提供检查所有类型
* @private
*/
_checkAndRecoverScheduledProviders(providerType = null) {
const now = new Date();
const typesToCheck = providerType ? [providerType] : Object.keys(this.providerStatus);
for (const type of typesToCheck) {
const providers = this.providerStatus[type] || [];
for (const providerStatus of providers) {
const config = providerStatus.config;
// 检查是否有 scheduledRecoveryTime 且已到恢复时间
if (config.scheduledRecoveryTime && !config.isHealthy) {
const recoveryTime = new Date(config.scheduledRecoveryTime);
if (now >= recoveryTime) {
this._log('info', `Auto-recovering provider ${config.uuid} (${type}). Scheduled recovery time reached: ${recoveryTime.toISOString()}`);
// 恢复健康状态
config.isHealthy = true;
config.errorCount = 0;
config.lastErrorTime = null;
config.lastErrorMessage = null;
config.scheduledRecoveryTime = null; // 清除恢复时间
// 保存更改
this._debouncedSave(type);
}
}
}
}
}
/**
* Performs health checks on all providers in the pool.
* This method would typically be called periodically (e.g., via cron job).
@ -651,10 +728,22 @@ export class ProviderPoolManager {
this._log('info', 'Performing health checks on all providers...');
const now = new Date();
// 首先检查并恢复已到恢复时间的提供商
this._checkAndRecoverScheduledProviders();
for (const providerType in this.providerStatus) {
for (const providerStatus of this.providerStatus[providerType]) {
const providerConfig = providerStatus.config;
// 如果提供商有 scheduledRecoveryTime 且未到恢复时间,跳过健康检查
if (providerConfig.scheduledRecoveryTime && !providerConfig.isHealthy) {
const recoveryTime = new Date(providerConfig.scheduledRecoveryTime);
if (now < recoveryTime) {
this._log('debug', `Skipping health check for ${providerConfig.uuid} (${providerType}). Waiting for scheduled recovery at ${recoveryTime.toISOString()}`);
continue;
}
}
// Only attempt to health check unhealthy providers after a certain interval
if (!providerStatus.config.isHealthy && providerStatus.config.lastErrorTime &&
(now.getTime() - new Date(providerStatus.config.lastErrorTime).getTime() < this.healthCheckInterval)) {

View file

@ -224,7 +224,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from
let responseClosed = false;
// 重试上下文:包含 CONFIG 和重试计数
const maxRetries = retryContext?.maxRetries ?? 2;
// maxRetries: 凭证切换最大次数(跨凭证),默认 5 次
const maxRetries = retryContext?.maxRetries ?? 5;
const currentRetry = retryContext?.currentRetry ?? 0;
const CONFIG = retryContext?.CONFIG;
const isRetry = currentRetry > 0;
@ -307,11 +308,16 @@ export async function handleStreamRequest(res, service, model, requestBody, from
// 获取状态码(用于日志记录,不再用于判断是否重试)
const status = error.response?.status;
// 检查是否应该跳过错误计数(用于 429/5xx 等需要直接切换凭证的情况)
const skipErrorCount = error.skipErrorCount === true;
// 检查是否应该切换凭证(用于 429/5xx/402/403 等情况)
const shouldSwitchCredential = error.shouldSwitchCredential === true;
// 检查凭证是否已在底层被标记为不健康(避免重复标记)
let credentialMarkedUnhealthy = error.credentialMarkedUnhealthy === true;
// 如果底层未标记,则在此处标记
if (!credentialMarkedUnhealthy && providerPoolManager && pooluuid) {
// 如果底层未标记,且不跳过错误计数,则在此处标记
if (!credentialMarkedUnhealthy && !skipErrorCount && providerPoolManager && pooluuid) {
console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to stream error (status: ${status || 'unknown'})`);
// 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康
providerPoolManager.markProviderUnhealthy(toProvider, {
@ -320,6 +326,13 @@ export async function handleStreamRequest(res, service, model, requestBody, from
credentialMarkedUnhealthy = true;
} else if (credentialMarkedUnhealthy) {
console.log(`[Provider Pool] Credential ${pooluuid} already marked as unhealthy by lower layer, skipping duplicate marking`);
} else if (skipErrorCount) {
console.log(`[Provider Pool] Skipping error count for ${toProvider} (${pooluuid}) - will switch credential without marking unhealthy`);
}
// 如果需要切换凭证(无论是否标记不健康),都设置标记以触发重试
if (shouldSwitchCredential && !credentialMarkedUnhealthy) {
credentialMarkedUnhealthy = true; // 触发下面的重试逻辑
}
// 凭证已被标记为不健康后,尝试切换到新凭证重试
@ -386,7 +399,8 @@ export async function handleStreamRequest(res, service, model, requestBody, from
export async function handleUnaryRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid, customName, retryContext = null) {
// 重试上下文:包含 CONFIG 和重试计数
const maxRetries = retryContext?.maxRetries ?? 2;
// maxRetries: 凭证切换最大次数(跨凭证),默认 5 次
const maxRetries = retryContext?.maxRetries ?? 5;
const currentRetry = retryContext?.currentRetry ?? 0;
const CONFIG = retryContext?.CONFIG;
@ -424,11 +438,16 @@ export async function handleUnaryRequest(res, service, model, requestBody, fromP
// 获取状态码(用于日志记录,不再用于判断是否重试)
const status = error.response?.status;
// 检查是否应该跳过错误计数(用于 429/5xx 等需要直接切换凭证的情况)
const skipErrorCount = error.skipErrorCount === true;
// 检查是否应该切换凭证(用于 429/5xx/402/403 等情况)
const shouldSwitchCredential = error.shouldSwitchCredential === true;
// 检查凭证是否已在底层被标记为不健康(避免重复标记)
let credentialMarkedUnhealthy = error.credentialMarkedUnhealthy === true;
// 如果底层未标记,则在此处标记
if (!credentialMarkedUnhealthy && providerPoolManager && pooluuid) {
// 如果底层未标记,且不跳过错误计数,则在此处标记
if (!credentialMarkedUnhealthy && !skipErrorCount && providerPoolManager && pooluuid) {
console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to unary error (status: ${status || 'unknown'})`);
// 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康
providerPoolManager.markProviderUnhealthy(toProvider, {
@ -437,6 +456,13 @@ export async function handleUnaryRequest(res, service, model, requestBody, fromP
credentialMarkedUnhealthy = true;
} else if (credentialMarkedUnhealthy) {
console.log(`[Provider Pool] Credential ${pooluuid} already marked as unhealthy by lower layer, skipping duplicate marking`);
} else if (skipErrorCount) {
console.log(`[Provider Pool] Skipping error count for ${toProvider} (${pooluuid}) - will switch credential without marking unhealthy`);
}
// 如果需要切换凭证(无论是否标记不健康),都设置标记以触发重试
if (shouldSwitchCredential && !credentialMarkedUnhealthy) {
credentialMarkedUnhealthy = true; // 触发下面的重试逻辑
}
// 凭证已被标记为不健康后,尝试切换到新凭证重试