From 9f58db2c1f4799bc8e411dfef536a26de7845a2f Mon Sep 17 00:00:00 2001 From: leonai <731962175@qq.com> Date: Thu, 15 Jan 2026 21:49:54 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=BC=95=E5=85=A5=E5=87=AD=E8=AF=81?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=AE=A1=E7=90=86=E5=99=A8=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20OAuth=20Token=20=E5=B9=B6=E5=8F=91=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 CredentialCacheManager 替代文件锁机制 - 所有 OAuth Provider 迁移到内存缓存 - Token 刷新策略优化:过期阻塞,即将过期后台刷新 - 健康检查增加 Token 预检和超时保护 - 进程退出时自动同步凭证到文件 --- src/providers/claude/claude-kiro.js | 232 ++++--- src/providers/gemini/antigravity-core.js | 92 ++- src/providers/gemini/gemini-core.js | 35 +- src/providers/openai/iflow-core.js | 143 ++-- src/providers/openai/qwen-core.js | 95 ++- src/providers/provider-pool-manager.js | 77 ++- src/services/service-manager.js | 59 +- src/utils/credential-cache-manager.js | 806 +++++++++++++++++++++++ src/utils/file-lock.js | 220 ------- 9 files changed, 1334 insertions(+), 425 deletions(-) create mode 100644 src/utils/credential-cache-manager.js delete mode 100644 src/utils/file-lock.js diff --git a/src/providers/claude/claude-kiro.js b/src/providers/claude/claude-kiro.js index fd14aca..e4d7e18 100644 --- a/src/providers/claude/claude-kiro.js +++ b/src/providers/claude/claude-kiro.js @@ -11,7 +11,7 @@ import { countTokens } from '@anthropic-ai/tokenizer'; import { configureAxiosProxy } from '../../utils/proxy-utils.js'; import { isRetryableNetworkError, MODEL_PROVIDER } from '../../utils/common.js'; import { getProviderPoolManager } from '../../services/service-manager.js'; -import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; +import { CredentialCacheManager } from '../../utils/credential-cache-manager.js'; const KIRO_THINKING = { MAX_BUDGET_TOKENS: 24576, @@ -29,7 +29,8 @@ const KIRO_CONSTANTS = { AMAZON_Q_URL: 'https://codewhisperer.{{region}}.amazonaws.com/SendMessageStreaming', USAGE_LIMITS_URL: 'https://q.{{region}}.amazonaws.com/getUsageLimits', DEFAULT_MODEL_NAME: 'claude-opus-4-5', - AXIOS_TIMEOUT: 120000, // 2 minutes timeout (increased from 2 minutes) + AXIOS_TIMEOUT: 120000, // 2 minutes timeout for normal requests + TOKEN_REFRESH_TIMEOUT: 15000, // 15 seconds timeout for token refresh (shorter to avoid blocking) USER_AGENT: 'KiroIDE', KIRO_VERSION: '0.7.5', CONTENT_TYPE_JSON: 'application/json', @@ -429,7 +430,11 @@ async initializeAuth(forceRefresh = false) { // 获取凭证文件路径,用于去重锁的 key const tokenFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE); - // Helper to load credentials from a file + // 获取凭证缓存管理器 + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'claude-kiro-oauth'; + + // Helper to load credentials from a file (fallback when cache miss) const loadCredentialsFromFile = async (filePath) => { try { const fileContent = await fs.readFile(filePath, 'utf8'); @@ -457,11 +462,21 @@ async initializeAuth(forceRefresh = false) { } }; - // Helper to save credentials to a file (with file locking to prevent concurrent write corruption) + // Helper to save credentials - 使用内存缓存替代直接文件写入 const saveCredentialsToFile = async (filePath, newData) => { - // 获取文件锁,防止并发写入 - const releaseLock = await acquireFileLock(filePath); - try { + // 优先更新内存缓存 + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const entry = credentialCache.getCredentials(providerType, this.uuid); + if (entry) { + const mergedData = { ...entry.credentials, ...newData }; + credentialCache.updateCredentials(providerType, this.uuid, mergedData, filePath); + console.info(`[Kiro Auth] Updated credentials in memory cache: ${this.uuid}`); + return; + } + } + + // 如果没有缓存条目,使用内存锁方式写入 + await credentialCache.withMemoryLock(`kiro-save:${filePath}`, async () => { let existingData = {}; try { const fileContent = await fs.readFile(filePath, 'utf8'); @@ -488,12 +503,7 @@ async initializeAuth(forceRefresh = false) { const mergedData = { ...existingData, ...newData }; await fs.writeFile(filePath, JSON.stringify(mergedData, null, 2), 'utf8'); console.info(`[Kiro Auth] Updated token file: ${filePath}`); - } catch (error) { - console.error(`[Kiro Auth] Failed to write token to file ${filePath}: ${error.message}`); - } finally { - // 确保锁被释放 - releaseLock(); - } + }); }; try { @@ -503,46 +513,49 @@ async initializeAuth(forceRefresh = false) { if (this.base64Creds) { Object.assign(mergedCredentials, this.base64Creds); console.info('[Kiro Auth] Successfully loaded credentials from Base64 (constructor).'); - // Clear base64Creds after use to prevent re-processing this.base64Creds = null; } - // Priority 2 & 3 合并: 从指定文件路径或目录加载凭证 - // 读取指定的 credPath 文件以及目录下的其他 JSON 文件(排除当前文件) - const targetFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE); - const dirPath = path.dirname(targetFilePath); - const targetFileName = path.basename(targetFilePath); - - console.debug(`[Kiro Auth] Attempting to load credentials from directory: ${dirPath}`); - - try { - // 首先尝试读取目标文件 - const targetCredentials = await loadCredentialsFromFile(targetFilePath); - if (targetCredentials) { - Object.assign(mergedCredentials, targetCredentials); - console.info(`[Kiro Auth] Successfully loaded OAuth credentials from ${targetFilePath}`); + // Priority 2: 尝试从内存缓存加载凭证 + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, this.uuid); + if (cachedEntry && cachedEntry.credentials) { + Object.assign(mergedCredentials, cachedEntry.credentials); + console.info(`[Kiro Auth] Successfully loaded credentials from memory cache: ${this.uuid}`); } - - // 然后读取目录下的其他 JSON 文件(排除目标文件本身) - const files = await fs.readdir(dirPath); - for (const file of files) { - if (file.endsWith('.json') && file !== targetFileName) { - const filePath = path.join(dirPath, file); - const credentials = await loadCredentialsFromFile(filePath); - if (credentials) { - // 保留已有的 expiresAt,避免被覆盖 - credentials.expiresAt = mergedCredentials.expiresAt; - Object.assign(mergedCredentials, credentials); - console.debug(`[Kiro Auth] Loaded Client credentials from ${file}`); + } else { + // Priority 3: 从文件加载(缓存未命中时的回退) + const targetFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE); + const dirPath = path.dirname(targetFilePath); + const targetFileName = path.basename(targetFilePath); + + console.debug(`[Kiro Auth] Cache miss, loading credentials from directory: ${dirPath}`); + + try { + const targetCredentials = await loadCredentialsFromFile(targetFilePath); + if (targetCredentials) { + Object.assign(mergedCredentials, targetCredentials); + console.info(`[Kiro Auth] Successfully loaded OAuth credentials from ${targetFilePath}`); + } + + const files = await fs.readdir(dirPath); + for (const file of files) { + if (file.endsWith('.json') && file !== targetFileName) { + const filePath = path.join(dirPath, file); + const credentials = await loadCredentialsFromFile(filePath); + if (credentials) { + credentials.expiresAt = mergedCredentials.expiresAt; + Object.assign(mergedCredentials, credentials); + console.debug(`[Kiro Auth] Loaded Client credentials from ${file}`); + } } } + } catch (error) { + console.warn(`[Kiro Auth] Error loading credentials from directory ${dirPath}: ${error.message}`); } - } catch (error) { - console.warn(`[Kiro Auth] Error loading credentials from directory ${dirPath}: ${error.message}`); } - // console.log('[Kiro Auth] Merged credentials:', mergedCredentials); - // Apply loaded credentials, prioritizing existing values if they are not null/undefined + // Apply loaded credentials this.accessToken = this.accessToken || mergedCredentials.accessToken; this.refreshToken = this.refreshToken || mergedCredentials.refreshToken; this.clientId = this.clientId || mergedCredentials.clientId; @@ -552,10 +565,9 @@ async initializeAuth(forceRefresh = false) { this.profileArn = this.profileArn || mergedCredentials.profileArn; this.region = this.region || mergedCredentials.region; - // Ensure region is set before using it in URLs if (!this.region) { console.warn('[Kiro Auth] Region not found in credentials. Using default region us-east-1 for URLs.'); - this.region = 'us-east-1'; // Set default region + this.region = 'us-east-1'; } this.refreshUrl = (this.config.KIRO_REFRESH_URL || KIRO_CONSTANTS.REFRESH_URL).replace("{{region}}", this.region); @@ -571,21 +583,16 @@ async initializeAuth(forceRefresh = false) { if (!this.refreshToken) { throw new Error('No refresh token available to refresh access token.'); } - - // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + + // 使用内存锁替代文件锁进行去重 const dedupeKey = `kiro-token-refresh:${tokenFilePath}`; - await withDeduplication(dedupeKey, async () => { + await credentialCache.withDeduplication(dedupeKey, async () => { await this._doTokenRefresh(saveCredentialsToFile, tokenFilePath); }); - - // 如果是等待其他请求完成的刷新,需要重新加载凭证 - // 因为 _doTokenRefresh 只更新了执行刷新的实例的内存状态 - // 注意:withDeduplication 会让所有等待者共享同一个 Promise - // 但只有第一个调用者的实例会执行 _doTokenRefresh 并更新自己的内存状态 - // 其他等待者需要从文件重新加载 - if (!this.accessToken || this.isExpiryDateNear()) { - await this._reloadCredentialsAfterRefresh(tokenFilePath); - } + + // 刷新完成后(无论是自己执行还是等待其他请求),都从缓存重新加载凭证 + // 确保所有并发请求都能获取到最新的 token + await this._reloadCredentialsAfterRefresh(tokenFilePath); } if (!this.accessToken) { @@ -613,11 +620,13 @@ async initializeAuth(forceRefresh = false) { } let response = null; + // 使用更短的超时时间进行 token 刷新,避免阻塞其他请求 + const refreshConfig = { timeout: KIRO_CONSTANTS.TOKEN_REFRESH_TIMEOUT }; if (this.authMethod === KIRO_CONSTANTS.AUTH_METHOD_SOCIAL) { - response = await this.axiosSocialRefreshInstance.post(refreshUrl, requestBody); + response = await this.axiosSocialRefreshInstance.post(refreshUrl, requestBody, refreshConfig); console.log('[Kiro Auth] Token refresh social response: ok'); } else { - response = await this.axiosInstance.post(refreshUrl, requestBody); + response = await this.axiosInstance.post(refreshUrl, requestBody, refreshConfig); console.log('[Kiro Auth] Token refresh idc response: ok'); } @@ -653,6 +662,25 @@ async initializeAuth(forceRefresh = false) { * @param {string} tokenFilePath - 凭证文件路径 */ async _reloadCredentialsAfterRefresh(tokenFilePath) { + // 优先从内存缓存加载 + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'claude-kiro-oauth'; + + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, this.uuid); + if (cachedEntry && cachedEntry.credentials) { + this.accessToken = cachedEntry.credentials.accessToken; + this.refreshToken = cachedEntry.credentials.refreshToken; + this.expiresAt = cachedEntry.credentials.expiresAt; + if (cachedEntry.credentials.profileArn) { + this.profileArn = cachedEntry.credentials.profileArn; + } + console.debug('[Kiro Auth] Credentials reloaded from memory cache after concurrent refresh'); + return; + } + } + + // 回退到文件加载 try { const fileContent = await fs.readFile(tokenFilePath, 'utf8'); let credentials; @@ -675,7 +703,7 @@ async initializeAuth(forceRefresh = false) { if (credentials.profileArn) { this.profileArn = credentials.profileArn; } - console.debug('[Kiro Auth] Credentials reloaded after concurrent refresh'); + console.debug('[Kiro Auth] Credentials reloaded from file after concurrent refresh'); } catch (error) { console.warn(`[Kiro Auth] Failed to reload credentials after refresh: ${error.message}`); throw error; @@ -1564,11 +1592,16 @@ async initializeAuth(forceRefresh = false) { async generateContent(model, requestBody) { if (!this.isInitialized) await this.initialize(); - - // 检查 token 是否即将过期,如果是则先刷新 - if (this.isExpiryDateNear()) { - console.log('[Kiro] Token is near expiry, refreshing before generateContent request...'); + + // Token 刷新策略: + // 1. 已过期 → 必须等待刷新 + // 2. 即将过期但还能用 → 后台异步刷新,不阻塞当前请求 + if (this.isTokenExpired()) { + console.log('[Kiro] Token is expired, must refresh before generateContent request...'); await this.initializeAuth(true); + } else if (this.isExpiryDateNear()) { + console.log('[Kiro] Token is near expiry, triggering background refresh...'); + this.triggerBackgroundRefresh(); } const finalModel = MODEL_MAPPING[model] ? model : this.modelName; @@ -1902,11 +1935,16 @@ async initializeAuth(forceRefresh = false) { // 真正的流式传输实现 async * generateContentStream(model, requestBody) { if (!this.isInitialized) await this.initialize(); - - // 检查 token 是否即将过期,如果是则先刷新 - if (this.isExpiryDateNear()) { - console.log('[Kiro] Token is near expiry, refreshing before generateContentStream request...'); + + // Token 刷新策略: + // 1. 已过期 → 必须等待刷新 + // 2. 即将过期但还能用 → 后台异步刷新,不阻塞当前请求 + if (this.isTokenExpired()) { + console.log('[Kiro] Token is expired, must refresh before generateContentStream request...'); await this.initializeAuth(true); + } else if (this.isExpiryDateNear()) { + console.log('[Kiro] Token is near expiry, triggering background refresh...'); + this.triggerBackgroundRefresh(); } const finalModel = MODEL_MAPPING[model] ? model : this.modelName; @@ -2528,7 +2566,25 @@ async initializeAuth(forceRefresh = false) { } /** - * Checks if the given expiresAt timestamp is within 10 minutes from now. + * Checks if the token is completely expired (cannot be used at all). + * @returns {boolean} - True if token is expired, false otherwise. + */ + isTokenExpired() { + try { + if (!this.expiresAt) return true; + const expirationTime = new Date(this.expiresAt); + const currentTime = new Date(); + // 给 30 秒缓冲,避免请求过程中过期 + const bufferMs = 30 * 1000; + return expirationTime.getTime() <= (currentTime.getTime() + bufferMs); + } catch (error) { + console.error(`[Kiro] Error checking token expiry: ${error.message}`); + return true; // Treat as expired if parsing fails + } + } + + /** + * Checks if the given expiresAt timestamp is within 10 minutes from now (needs refresh soon). * @returns {boolean} - True if expiresAt is less than 10 minutes from now, false otherwise. */ isExpiryDateNear() { @@ -2545,6 +2601,29 @@ async initializeAuth(forceRefresh = false) { } } + /** + * 后台异步刷新 token(不阻塞当前请求) + */ + triggerBackgroundRefresh() { + const tokenFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AH_TOKEN_FILE); + const dedupeKey = `kiro-token-refresh:${tokenFilePath}`; + const credentialCache = CredentialCacheManager.getInstance(); + + // 使用 withDeduplication 确保只有一个刷新任务在执行 + credentialCache.withDeduplication(dedupeKey, async () => { + console.log('[Kiro] Background token refresh started...'); + try { + await this.initializeAuth(true); + console.log('[Kiro] Background token refresh completed successfully'); + } catch (error) { + console.error('[Kiro] Background token refresh failed:', error.message); + // 后台刷新失败不抛出错误,下次请求会重试 + } + }).catch(() => { + // 忽略后台刷新错误,不影响当前请求 + }); + } + /** * Count tokens for a message request (compatible with Anthropic API) * POST /v1/messages/count_tokens @@ -2618,11 +2697,16 @@ async initializeAuth(forceRefresh = false) { */ async getUsageLimits() { if (!this.isInitialized) await this.initialize(); - - // 检查 token 是否即将过期,如果是则先刷新 - if (this.isExpiryDateNear()) { - console.log('[Kiro] Token is near expiry, refreshing before getUsageLimits request...'); + + // Token 刷新策略: + // 1. 已过期 → 必须等待刷新 + // 2. 即将过期但还能用 → 后台异步刷新,不阻塞当前请求 + if (this.isTokenExpired()) { + console.log('[Kiro] Token is expired, must refresh before getUsageLimits request...'); await this.initializeAuth(true); + } else if (this.isExpiryDateNear()) { + console.log('[Kiro] Token is near expiry, triggering background refresh...'); + this.triggerBackgroundRefresh(); } // 内部固定的资源类型 diff --git a/src/providers/gemini/antigravity-core.js b/src/providers/gemini/antigravity-core.js index aa5f4a4..48309a3 100644 --- a/src/providers/gemini/antigravity-core.js +++ b/src/providers/gemini/antigravity-core.js @@ -14,7 +14,7 @@ import { getProviderModels } from '../provider-models.js'; import { handleGeminiAntigravityOAuth } from '../../auth/oauth-handlers.js'; import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js'; import { cleanJsonSchemaProperties } from '../../converters/utils.js'; -import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; +import { CredentialCacheManager } from '../../utils/credential-cache-manager.js'; // 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏 const httpAgent = new http.Agent({ @@ -707,13 +707,13 @@ export class AntigravityApiService { constructor(config) { // 检查是否需要使用代理 const proxyConfig = getGoogleAuthProxyConfig(config, 'gemini-antigravity'); - + // 配置 OAuth2Client 使用自定义的 HTTP agent const oauth2Options = { clientId: OAUTH_CLIENT_ID, clientSecret: OAUTH_CLIENT_SECRET, }; - + if (proxyConfig) { oauth2Options.transporterOptions = proxyConfig; console.log('[Antigravity] Using proxy for OAuth2Client'); @@ -722,7 +722,7 @@ export class AntigravityApiService { agent: httpsAgent, }; } - + this.authClient = new OAuth2Client(oauth2Options); this.availableModels = []; this.isInitialized = false; @@ -732,10 +732,11 @@ export class AntigravityApiService { this.oauthCredsFilePath = config.ANTIGRAVITY_OAUTH_CREDS_FILE_PATH; this.userAgent = DEFAULT_USER_AGENT; // 支持通用 USER_AGENT 配置 this.projectId = config.PROJECT_ID; + this.uuid = config.uuid; // 保存 uuid 用于缓存管理 // 多环境降级顺序 - 按照 Go 代码的顺序 this.baseURLs = this.getBaseURLFallbackOrder(config); - + // 保存代理配置供后续使用 this.proxyConfig = getProxyConfigForProvider(config, 'gemini-antigravity'); } @@ -777,6 +778,9 @@ export class AntigravityApiService { } async initializeAuth(forceRefresh = false) { + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'gemini-antigravity'; + // 检查是否需要刷新 Token const needsRefresh = forceRefresh || this.isTokenExpiringSoon(); @@ -789,31 +793,59 @@ export class AntigravityApiService { const credPath = this.oauthCredsFilePath || path.join(os.homedir(), CREDENTIALS_DIR, CREDENTIALS_FILE); try { - const data = await fs.readFile(credPath, "utf8"); - const credentials = JSON.parse(data); + // 优先从内存缓存加载 + let credentials = null; + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, this.uuid); + if (cachedEntry && cachedEntry.credentials) { + credentials = cachedEntry.credentials; + console.log('[Antigravity Auth] Loaded credentials from memory cache'); + } + } + + // Fallback: 从文件加载 + if (!credentials) { + const data = await fs.readFile(credPath, "utf8"); + credentials = JSON.parse(data); + console.log('[Antigravity Auth] Loaded credentials from file'); + } + this.authClient.setCredentials(credentials); console.log('[Antigravity Auth] Authentication configured successfully from file.'); if (needsRefresh) { // 使用去重锁:多个并发刷新请求只执行一次,共享结果 const dedupeKey = `antigravity-token-refresh:${credPath}`; - await withDeduplication(dedupeKey, async () => { + await credentialCache.withDeduplication(dedupeKey, async () => { console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...'); const { credentials: newCredentials } = await this.authClient.refreshAccessToken(); this.authClient.setCredentials(newCredentials); - // 保存刷新后的凭证到文件(使用文件锁) - await this._saveCredentialsToFile(credPath, newCredentials); + // 保存刷新后的凭证(优先保存到内存缓存) + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + credentialCache.updateCredentials(providerType, this.uuid, newCredentials, credPath); + console.log(`[Antigravity Auth] Token refreshed and saved to memory cache: ${this.uuid}`); + } else { + await this._saveCredentialsToFile(credPath, newCredentials); + } console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`); }); - + // 如果是等待其他请求完成的刷新,需要重新加载凭证 // 因为 withDeduplication 只让第一个调用者执行刷新并更新自己的内存状态 - // 其他等待者需要从文件重新加载 + // 其他等待者需要从缓存重新加载 if (this.isTokenExpiringSoon()) { - const refreshedData = await fs.readFile(credPath, "utf8"); - const refreshedCredentials = JSON.parse(refreshedData); - this.authClient.setCredentials(refreshedCredentials); - console.log('[Antigravity Auth] Credentials reloaded after concurrent refresh'); + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, this.uuid); + if (cachedEntry && cachedEntry.credentials) { + this.authClient.setCredentials(cachedEntry.credentials); + console.log('[Antigravity Auth] Credentials reloaded from memory cache after concurrent refresh'); + } + } else { + const refreshedData = await fs.readFile(credPath, "utf8"); + const refreshedCredentials = JSON.parse(refreshedData); + this.authClient.setCredentials(refreshedCredentials); + console.log('[Antigravity Auth] Credentials reloaded from file after concurrent refresh'); + } } } } catch (error) { @@ -890,20 +922,30 @@ export class AntigravityApiService { } /** - * 保存凭证到文件(使用文件锁防止并发写入) + * 保存凭证到文件(优先使用内存缓存) * @param {string} filePath - 凭证文件路径 * @param {Object} credentials - 凭证数据 */ async _saveCredentialsToFile(filePath, credentials) { - const releaseLock = await acquireFileLock(filePath); - try { - await fs.writeFile(filePath, JSON.stringify(credentials, null, 2)); - console.log(`[Antigravity Auth] Credentials saved to ${filePath}`); - } catch (error) { - console.error(`[Antigravity Auth] Failed to save credentials to ${filePath}: ${error.message}`); - } finally { - releaseLock(); + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'gemini-antigravity'; + + // 优先保存到内存缓存 + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + credentialCache.updateCredentials(providerType, this.uuid, credentials, filePath); + console.log(`[Antigravity Auth] Credentials saved to memory cache: ${this.uuid}`); + return; } + + // Fallback: 使用内存锁的文件写入 + await credentialCache.withMemoryLock(`antigravity-save:${filePath}`, async () => { + try { + await fs.writeFile(filePath, JSON.stringify(credentials, null, 2)); + console.log(`[Antigravity Auth] Credentials saved to ${filePath}`); + } catch (error) { + console.error(`[Antigravity Auth] Failed to save credentials to ${filePath}: ${error.message}`); + } + }); } async discoverProjectAndModels() { diff --git a/src/providers/gemini/gemini-core.js b/src/providers/gemini/gemini-core.js index 026eeaf..5ec6eca 100644 --- a/src/providers/gemini/gemini-core.js +++ b/src/providers/gemini/gemini-core.js @@ -10,7 +10,7 @@ import { API_ACTIONS, formatExpiryTime, isRetryableNetworkError } from '../../ut import { getProviderModels } from '../provider-models.js'; import { handleGeminiCliOAuth } from '../../auth/oauth-handlers.js'; import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js'; -import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; +import { CredentialCacheManager } from '../../utils/credential-cache-manager.js'; // 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏 const httpAgent = new http.Agent({ @@ -273,10 +273,11 @@ export class GeminiApiService { if (forceRefresh) { console.log('[Gemini Auth] Forcing token refresh...'); - + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 const dedupeKey = `gemini-token-refresh:${credPath}`; - await withDeduplication(dedupeKey, async () => { + const credentialCache = CredentialCacheManager.getInstance(); + await credentialCache.withDeduplication(dedupeKey, async () => { const { credentials: newCredentials } = await this.authClient.refreshAccessToken(); this.authClient.setCredentials(newCredentials); // Save refreshed credentials back to file (with file locking) @@ -636,20 +637,30 @@ export class GeminiApiService { } /** - * 保存凭证到文件(使用文件锁防止并发写入) + * 保存凭证到文件(使用内存缓存优先) * @param {string} filePath - 凭证文件路径 * @param {Object} credentials - 凭证数据 */ async _saveCredentialsToFile(filePath, credentials) { - const releaseLock = await acquireFileLock(filePath); - try { - await fs.writeFile(filePath, JSON.stringify(credentials, null, 2)); - console.log(`[Gemini Auth] Credentials saved to ${filePath}`); - } catch (error) { - console.error(`[Gemini Auth] Failed to save credentials to ${filePath}: ${error.message}`); - } finally { - releaseLock(); + // 优先更新内存缓存 + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'gemini-cli-oauth'; + + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + credentialCache.updateCredentials(providerType, this.uuid, credentials, filePath); + console.log(`[Gemini Auth] Credentials saved to memory cache: ${this.uuid}`); + return; } + + // 回退到使用内存锁写入文件 + await credentialCache.withMemoryLock(`gemini-save:${filePath}`, async () => { + try { + await fs.writeFile(filePath, JSON.stringify(credentials, null, 2)); + console.log(`[Gemini Auth] Credentials saved to ${filePath}`); + } catch (error) { + console.error(`[Gemini Auth] Failed to save credentials to ${filePath}: ${error.message}`); + } + }); } /** diff --git a/src/providers/openai/iflow-core.js b/src/providers/openai/iflow-core.js index 9b64110..6b75305 100644 --- a/src/providers/openai/iflow-core.js +++ b/src/providers/openai/iflow-core.js @@ -24,7 +24,7 @@ import * as path from 'path'; import * as os from 'os'; import { configureAxiosProxy } from '../../utils/proxy-utils.js'; import { isRetryableNetworkError } from '../../utils/common.js'; -import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; +import { CredentialCacheManager } from '../../utils/credential-cache-manager.js'; // iFlow API 端点 const IFLOW_API_BASE_URL = 'https://apis.iflow.cn/v1'; @@ -132,38 +132,56 @@ async function loadTokenFromFile(filePath) { * @param {string} filePath - Token 文件路径 * @param {IFlowTokenStorage} tokenStorage - Token 存储对象 */ -async function saveTokenToFile(filePath, tokenStorage) { +async function saveTokenToFile(filePath, tokenStorage, uuid = null) { const absolutePath = path.isAbsolute(filePath) ? filePath : path.join(process.cwd(), filePath); - - // 获取文件锁,防止并发写入 - const releaseLock = await acquireFileLock(absolutePath); - try { - // 确保目录存在 - const dir = path.dirname(absolutePath); - await fs.mkdir(dir, { recursive: true }); - - // 写入文件 + + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'openai-iflow'; + + // 优先保存到内存缓存 + if (uuid && credentialCache.hasCredentials(providerType, uuid)) { const json = tokenStorage.toJSON(); - + // 验证关键字段是否存在 if (!json.refresh_token || json.refresh_token.trim() === '') { - console.error('[iFlow] WARNING: Attempting to save token file with empty refresh_token!'); + console.error('[iFlow] WARNING: Attempting to save token with empty refresh_token!'); } if (!json.apiKey || json.apiKey.trim() === '') { - console.error('[iFlow] WARNING: Attempting to save token file with empty apiKey!'); + console.error('[iFlow] WARNING: Attempting to save token with empty apiKey!'); } - - await fs.writeFile(absolutePath, JSON.stringify(json, null, 2), 'utf-8'); - - console.log(`[iFlow] Token saved to: ${filePath} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`); - } catch (error) { - throw new Error(`[iFlow] Failed to save token to file: ${error.message}`); - } finally { - // 确保锁被释放 - releaseLock(); + + credentialCache.updateCredentials(providerType, uuid, json, absolutePath); + console.log(`[iFlow] Token saved to memory cache: ${uuid} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`); + return; } + + // Fallback: 使用内存锁的文件写入 + await credentialCache.withMemoryLock(`iflow-save:${absolutePath}`, async () => { + try { + // 确保目录存在 + const dir = path.dirname(absolutePath); + await fs.mkdir(dir, { recursive: true }); + + // 写入文件 + const json = tokenStorage.toJSON(); + + // 验证关键字段是否存在 + if (!json.refresh_token || json.refresh_token.trim() === '') { + console.error('[iFlow] WARNING: Attempting to save token file with empty refresh_token!'); + } + if (!json.apiKey || json.apiKey.trim() === '') { + console.error('[iFlow] WARNING: Attempting to save token file with empty apiKey!'); + } + + await fs.writeFile(absolutePath, JSON.stringify(json, null, 2), 'utf-8'); + + console.log(`[iFlow] Token saved to: ${filePath} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`); + } catch (error) { + throw new Error(`[iFlow] Failed to save token to file: ${error.message}`); + } + }); } // ==================== Token 刷新逻辑 ==================== @@ -458,9 +476,10 @@ export class IFlowApiService { this.apiKey = null; this.baseUrl = config.IFLOW_BASE_URL || IFLOW_API_BASE_URL; this.tokenFilePath = config.IFLOW_TOKEN_FILE_PATH || DEFAULT_TOKEN_FILE_PATH; + this.uuid = config.uuid; // 保存 uuid 用于缓存管理 this.isInitialized = false; this.tokenStorage = null; - + // 配置 HTTP/HTTPS agent const httpAgent = new http.Agent({ keepAlive: true, @@ -484,10 +503,10 @@ export class IFlowApiService { 'User-Agent': IFLOW_USER_AGENT, }, }; - + // 配置自定义代理 configureAxiosProxy(axiosConfig, config, 'openai-iflow'); - + this.axiosInstance = axios.create(axiosConfig); } @@ -509,20 +528,39 @@ export class IFlowApiService { * @param {boolean} forceRefresh - 是否强制刷新 Token */ async initializeAuth(forceRefresh = false) { + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'openai-iflow'; + // 如果已有 API Key 且不强制刷新,直接返回 if (this.apiKey && !forceRefresh) return; - + // 从 Token 文件加载 API Key if (!this.tokenFilePath) { throw new Error('[iFlow] IFLOW_TOKEN_FILE_PATH is required.'); } - + try { - this.tokenStorage = await loadTokenFromFile(this.tokenFilePath); + // 优先从内存缓存加载 + let tokenData = null; + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, this.uuid); + if (cachedEntry && cachedEntry.credentials) { + tokenData = cachedEntry.credentials; + this.tokenStorage = IFlowTokenStorage.fromJSON(tokenData); + console.log('[iFlow Auth] Loaded credentials from memory cache'); + } + } + + // Fallback: 从文件加载 + if (!this.tokenStorage) { + this.tokenStorage = await loadTokenFromFile(this.tokenFilePath); + console.log('[iFlow Auth] Loaded credentials from file'); + } + if (this.tokenStorage && this.tokenStorage.apiKey) { this.apiKey = this.tokenStorage.apiKey; console.log('[iFlow Auth] Authentication configured successfully from file.'); - + if (forceRefresh) { console.log('[iFlow Auth] Forcing token refresh...'); await this._refreshOAuthTokens(); @@ -541,7 +579,7 @@ export class IFlowApiService { throw new Error(`[iFlow Auth] Failed to load OAuth credentials.`); } } - + // 更新 axios 实例的 Authorization header this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`; } @@ -551,42 +589,55 @@ export class IFlowApiService { * @returns {Promise} - 是否执行了刷新 */ async _checkAndRefreshTokenIfNeeded() { + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'openai-iflow'; + if (!this.tokenStorage) { return false; } - + // 检查是否有 refresh_token if (!this.tokenStorage.refreshToken || this.tokenStorage.refreshToken.trim() === '') { console.log('[iFlow] No refresh_token available, skipping token refresh check'); return false; } - + // 使用 isExpiryDateNear 检查过期时间 if (!this.isExpiryDateNear()) { console.log('[iFlow] Token is valid, no refresh needed'); return false; } - + console.log('[iFlow] Token is expiring soon, attempting refresh...'); - + try { - // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 const dedupeKey = `iflow-token-refresh:${this.tokenFilePath}`; - await withDeduplication(dedupeKey, async () => { + await credentialCache.withDeduplication(dedupeKey, async () => { await this._refreshOAuthTokens(); }); - + // 如果是等待其他请求完成的刷新,需要重新加载凭证 if (this.isExpiryDateNear()) { - const refreshedStorage = await loadTokenFromFile(this.tokenFilePath); - if (refreshedStorage && refreshedStorage.apiKey) { - this.tokenStorage = refreshedStorage; - this.apiKey = refreshedStorage.apiKey; - this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`; - console.log('[iFlow] Credentials reloaded after concurrent refresh'); + if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, this.uuid); + if (cachedEntry && cachedEntry.credentials) { + this.tokenStorage = IFlowTokenStorage.fromJSON(cachedEntry.credentials); + this.apiKey = this.tokenStorage.apiKey; + this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`; + console.log('[iFlow] Credentials reloaded from memory cache after concurrent refresh'); + } + } else { + const refreshedStorage = await loadTokenFromFile(this.tokenFilePath); + if (refreshedStorage && refreshedStorage.apiKey) { + this.tokenStorage = refreshedStorage; + this.apiKey = refreshedStorage.apiKey; + this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`; + console.log('[iFlow] Credentials reloaded from file after concurrent refresh'); + } } } - + return true; } catch (error) { console.error('[iFlow] Token refresh failed:', error.message); @@ -637,7 +688,7 @@ export class IFlowApiService { this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`; // 保存到文件 - await saveTokenToFile(this.tokenFilePath, this.tokenStorage); + await saveTokenToFile(this.tokenFilePath, this.tokenStorage, this.uuid); console.log(`[iFlow] Token refresh successful, new: ${this._maskToken(tokenData.accessToken)}`); } diff --git a/src/providers/openai/qwen-core.js b/src/providers/openai/qwen-core.js index f5527e2..08d81aa 100644 --- a/src/providers/openai/qwen-core.js +++ b/src/providers/openai/qwen-core.js @@ -12,7 +12,7 @@ import { getProviderModels } from '../provider-models.js'; import { handleQwenOAuth } from '../../auth/oauth-handlers.js'; import { configureAxiosProxy } from '../../utils/proxy-utils.js'; import { isRetryableNetworkError } from '../../utils/common.js'; -import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; +import { CredentialCacheManager } from '../../utils/credential-cache-manager.js'; // --- Constants --- const QWEN_DIR = '.qwen'; @@ -389,18 +389,18 @@ export class QwenApiService { async _cacheQwenCredentials(credentials) { const filePath = this._getQwenCachedCredentialPath(); - // 获取文件锁,防止并发写入 - const releaseLock = await acquireFileLock(filePath); - try { - await fs.mkdir(path.dirname(filePath), { recursive: true }); - const credString = JSON.stringify(credentials, null, 2); - await fs.writeFile(filePath, credString); - console.log(`[Qwen Auth] Credentials cached to ${filePath}`); - } catch (error) { - console.error(`[Qwen Auth] Failed to cache credentials to ${filePath}: ${error.message}`); - } finally { - releaseLock(); - } + const credentialCache = CredentialCacheManager.getInstance(); + // 使用内存锁替代文件锁,防止并发写入 + await credentialCache.withMemoryLock(`qwen-cache:${filePath}`, async () => { + try { + await fs.mkdir(path.dirname(filePath), { recursive: true }); + const credString = JSON.stringify(credentials, null, 2); + await fs.writeFile(filePath, credString); + console.log(`[Qwen Auth] Credentials cached to ${filePath}`); + } catch (error) { + console.error(`[Qwen Auth] Failed to cache credentials to ${filePath}: ${error.message}`); + } + }); } getCurrentEndpoint(resourceUrl) { @@ -794,13 +794,27 @@ class SharedTokenManager { throw new TokenManagerError(TokenError.NO_REFRESH_TOKEN, 'No refresh token available'); } - // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + // 获取凭证缓存管理器 + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'openai-qwen-oauth'; + + // 使用内存锁进行去重 const dedupeKey = `qwen-token-refresh:${context.credentialFilePath}`; - + try { - const credentials = await withDeduplication(dedupeKey, async () => { + const credentials = await credentialCache.withDeduplication(dedupeKey, async () => { await this.acquireLock(context); try { + // 优先从全局缓存检查 + if (context.uuid && credentialCache.hasCredentials(providerType, context.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, context.uuid); + if (cachedEntry && cachedEntry.credentials && !forceRefresh && this.isTokenValid(cachedEntry.credentials)) { + context.memoryCache.credentials = cachedEntry.credentials; + qwenClient.setCredentials(cachedEntry.credentials); + return cachedEntry.credentials; + } + } + await this.checkAndReloadIfNeeded(context); if (!forceRefresh && context.memoryCache.credentials && this.isTokenValid(context.memoryCache.credentials)) { @@ -832,22 +846,28 @@ class SharedTokenManager { await this.releaseLock(context); } }); - + // 如果是等待其他请求完成的刷新,需要重新加载凭证 - // 因为 withDeduplication 会让所有等待者共享同一个 Promise - // 但只有第一个调用者的实例会执行刷新并更新自己的内存状态 - // 其他等待者需要从文件重新加载 if (!context.memoryCache.credentials || !this.isTokenValid(context.memoryCache.credentials)) { - await this.reloadCredentialsFromFile(context); - if (context.memoryCache.credentials) { - qwenClient.setCredentials(context.memoryCache.credentials); + // 优先从全局缓存加载 + if (context.uuid && credentialCache.hasCredentials(providerType, context.uuid)) { + const cachedEntry = credentialCache.getCredentials(providerType, context.uuid); + if (cachedEntry && cachedEntry.credentials) { + context.memoryCache.credentials = cachedEntry.credentials; + qwenClient.setCredentials(cachedEntry.credentials); + } + } else { + await this.reloadCredentialsFromFile(context); + if (context.memoryCache.credentials) { + qwenClient.setCredentials(context.memoryCache.credentials); + } } } - + return credentials; } catch (error) { if (error instanceof TokenManagerError) throw error; - + // 处理 CredentialsClearRequiredError - 清除凭证文件 if (error instanceof CredentialsClearRequiredError) { try { @@ -856,7 +876,7 @@ class SharedTokenManager { } catch (_) { /* ignore */ } throw error; // 重新抛出以便上层处理 } - + // 如果刷新令牌无效/过期,删除此上下文对应的凭证文件 if (error && (error.status === 400 || /expired|invalid/i.test(error.message || ''))) { try { await fs.unlink(context.credentialFilePath); } catch (_) { /* ignore */ } @@ -870,17 +890,26 @@ class SharedTokenManager { } async saveCredentialsToFile(context, credentials) { - // 获取文件锁,防止并发写入 - const releaseLock = await acquireFileLock(context.credentialFilePath); - try { + // 优先更新内存缓存 + const credentialCache = CredentialCacheManager.getInstance(); + const providerType = 'openai-qwen-oauth'; + + if (context.uuid && credentialCache.hasCredentials(providerType, context.uuid)) { + credentialCache.updateCredentials(providerType, context.uuid, credentials, context.credentialFilePath); + console.info(`[Qwen Auth] Updated credentials in memory cache: ${context.uuid}`); + // 同时更新本地内存缓存 + context.memoryCache.credentials = credentials; + context.memoryCache.fileModTime = Date.now(); + return; + } + + // 回退到使用内存锁写入文件 + await credentialCache.withMemoryLock(`qwen-save:${context.credentialFilePath}`, async () => { await fs.mkdir(path.dirname(context.credentialFilePath), { recursive: true, mode: 0o700 }); await fs.writeFile(context.credentialFilePath, JSON.stringify(credentials, null, 2), { mode: 0o600 }); const stats = await fs.stat(context.credentialFilePath); context.memoryCache.fileModTime = stats.mtimeMs; - } finally { - // 确保锁被释放 - releaseLock(); - } + }); } isTokenValid(credentials) { diff --git a/src/providers/provider-pool-manager.js b/src/providers/provider-pool-manager.js index c2604a7..9449576 100644 --- a/src/providers/provider-pool-manager.js +++ b/src/providers/provider-pool-manager.js @@ -2,6 +2,7 @@ import * as fs from 'fs'; // Import fs module import { getServiceAdapter } from './adapter.js'; import { MODEL_PROVIDER, getProtocolPrefix } from '../utils/common.js'; import { getProviderModels } from './provider-models.js'; +import { CredentialCacheManager } from '../utils/credential-cache-manager.js'; import axios from 'axios'; /** @@ -194,15 +195,15 @@ export class ProviderPoolManager { // 如果时间相同,使用使用次数辅助判断 return (a.config.usageCount || 0) - (b.config.usageCount || 0); })[0]; - - // 更新使用信息(除非明确跳过) - // 注意:这里的更新是同步的,在锁保护下执行,确保下一个请求能看到最新的 lastUsed + + // 始终更新 lastUsed(确保 LRU 策略生效,避免并发请求选到同一个 provider) + // usageCount 只在请求成功后才增加(由 skipUsageCount 控制) + selected.config.lastUsed = new Date().toISOString(); if (!options.skipUsageCount) { - selected.config.lastUsed = new Date().toISOString(); selected.config.usageCount++; - // 使用防抖保存(文件 I/O 是异步的,但内存已经更新) - this._debouncedSave(providerType); } + // 使用防抖保存(文件 I/O 是异步的,但内存已经更新) + this._debouncedSave(providerType); this._log('debug', `Selected provider for ${providerType} (LRU): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}${options.skipUsageCount ? ' (skip usage count)' : ''}`); @@ -853,7 +854,7 @@ export class ProviderPoolManager { // 确定健康检查使用的模型名称 const modelName = providerConfig.checkModelName || ProviderPoolManager.DEFAULT_HEALTH_CHECK_MODELS[providerType]; - + // 如果未启用健康检查且不是强制检查,返回 null if (!providerConfig.checkHealth && !forceCheck) { return null; @@ -864,13 +865,48 @@ export class ProviderPoolManager { return { success: false, modelName: null, errorMessage: 'Unknown provider type for health check' }; } - // 使用内部服务适配器方式进行健康检查 + // ========== 快速预检:从内存缓存检查 OAuth 凭证状态 ========== + // 对于 OAuth 类型的 provider,先检查 token 是否过期,避免阻塞 + const oauthProviderTypes = ['claude-kiro-oauth', 'gemini-cli-oauth', 'gemini-antigravity', 'openai-qwen-oauth', 'openai-iflow-oauth', 'claude-orchids-oauth']; + if (oauthProviderTypes.includes(providerType) && providerConfig.uuid) { + const credentialCache = CredentialCacheManager.getInstance(); + const cachedEntry = credentialCache.getCredentials(providerType, providerConfig.uuid); + + if (cachedEntry && cachedEntry.credentials) { + const { expiresAt, accessToken } = cachedEntry.credentials; + + // 检查 token 是否存在 + if (!accessToken) { + this._log('warn', `Health check fast-fail for ${providerConfig.uuid}: No access token in cache`); + return { success: false, modelName, errorMessage: 'No access token available' }; + } + + // 检查 token 是否已过期(30秒缓冲) + if (expiresAt) { + const expirationTime = new Date(expiresAt).getTime(); + const now = Date.now(); + const bufferMs = 30 * 1000; + + if (expirationTime <= now + bufferMs) { + this._log('warn', `Health check fast-fail for ${providerConfig.uuid}: Token expired`); + return { success: false, modelName, errorMessage: 'Token expired' }; + } + } + + this._log('debug', `Health check pre-check passed for ${providerConfig.uuid}: Token valid in cache`); + } + // 注意:如果缓存中没有凭证,不要直接返回失败 + // 让后续的实际健康检查去尝试初始化和加载凭证 + // 这样可以支持刚重置健康状态或新添加的 provider + } + + // ========== 实际 API 健康检查(带超时保护)========== const proxyKeys = ['GEMINI', 'OPENAI', 'CLAUDE', 'QWEN', 'KIRO']; const tempConfig = { ...providerConfig, MODEL_PROVIDER: providerType }; - + proxyKeys.forEach(key => { const proxyKey = `USE_SYSTEM_PROXY_${key}`; if (this.globalConfig[proxyKey] !== undefined) { @@ -879,19 +915,32 @@ export class ProviderPoolManager { }); const serviceAdapter = getServiceAdapter(tempConfig); - + // 获取所有可能的请求格式 const healthCheckRequests = this._buildHealthCheckRequests(providerType, modelName); - + + // 健康检查超时时间(15秒,避免长时间阻塞) + const healthCheckTimeout = 15000; + // 重试机制:尝试不同的请求格式 const maxRetries = healthCheckRequests.length; let lastError = null; - + for (let i = 0; i < maxRetries; i++) { const healthCheckRequest = healthCheckRequests[i]; try { this._log('debug', `Health check attempt ${i + 1}/${maxRetries} for ${modelName}: ${JSON.stringify(healthCheckRequest)}`); - await serviceAdapter.generateContent(modelName, healthCheckRequest); + + // 带超时的健康检查 + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Health check timeout')), healthCheckTimeout) + ); + + await Promise.race([ + serviceAdapter.generateContent(modelName, healthCheckRequest), + timeoutPromise + ]); + return { success: true, modelName, errorMessage: null }; } catch (error) { lastError = error; @@ -899,7 +948,7 @@ export class ProviderPoolManager { // 继续尝试下一个格式 } } - + // 所有尝试都失败 this._log('error', `Health check failed for ${providerType} after ${maxRetries} attempts: ${lastError?.message}`); return { success: false, modelName, errorMessage: lastError?.message || 'All health check attempts failed' }; diff --git a/src/services/service-manager.js b/src/services/service-manager.js index 37933aa..3a50a16 100644 --- a/src/services/service-manager.js +++ b/src/services/service-manager.js @@ -1,5 +1,6 @@ import { getServiceAdapter, serviceInstances } from '../providers/adapter.js'; import { ProviderPoolManager } from '../providers/provider-pool-manager.js'; +import { CredentialCacheManager } from '../utils/credential-cache-manager.js'; import deepmerge from 'deepmerge'; import * as fs from 'fs'; import { promises as pfs } from 'fs'; @@ -166,7 +167,7 @@ async function scanProviderDirectory(dirPath, linkedPaths, newProviders, options * @returns {Promise} The initialized services */ export async function initApiService(config) { - + if (config.providerPools && Object.keys(config.providerPools).length > 0) { providerPoolManager = new ProviderPoolManager(config.providerPools, { globalConfig: config, @@ -175,6 +176,62 @@ export async function initApiService(config) { }); console.log('[Initialization] ProviderPoolManager initialized with configured pools.'); // 健康检查将在服务器完全启动后执行 + + // 初始化凭证缓存管理器 + const credentialCache = CredentialCacheManager.getInstance(); + + // 获取单实例锁,防止多实例并发运行 + try { + await credentialCache.acquireInstanceLock(); + } catch (lockError) { + console.error('[Initialization] Failed to acquire instance lock:', lockError.message); + console.error('[Initialization] Please ensure no other instance is running.'); + process.exit(1); + } + + await credentialCache.preloadAllCredentials(config.providerPools); + credentialCache.startPeriodicSync(config.CREDENTIAL_SYNC_INTERVAL || 5000); + + // 注册进程退出钩子 + const shutdownHandler = async () => { + await credentialCache.shutdown(); + }; + process.on('beforeExit', shutdownHandler); + process.on('SIGINT', async () => { + await shutdownHandler(); + process.exit(0); + }); + process.on('SIGTERM', async () => { + await shutdownHandler(); + process.exit(0); + }); + + // 处理未捕获的异常,进行紧急同步 + process.on('uncaughtException', async (error) => { + console.error('[FATAL] Uncaught exception:', error); + console.log('[CredentialCache] Attempting emergency sync before crash...'); + try { + await credentialCache.syncToFile(); + console.log('[CredentialCache] Emergency sync completed'); + } catch (syncError) { + console.error('[CredentialCache] Emergency sync failed:', syncError.message); + } + await credentialCache.shutdown(); + process.exit(1); + }); + + process.on('unhandledRejection', async (reason, promise) => { + console.error('[FATAL] Unhandled rejection at:', promise, 'reason:', reason); + console.log('[CredentialCache] Attempting emergency sync...'); + try { + await credentialCache.syncToFile(); + console.log('[CredentialCache] Emergency sync completed'); + } catch (syncError) { + console.error('[CredentialCache] Emergency sync failed:', syncError.message); + } + }); + + console.log('[Initialization] CredentialCacheManager initialized.'); } else { console.log('[Initialization] No provider pools configured. Using single provider mode.'); } diff --git a/src/utils/credential-cache-manager.js b/src/utils/credential-cache-manager.js new file mode 100644 index 0000000..bb44e67 --- /dev/null +++ b/src/utils/credential-cache-manager.js @@ -0,0 +1,806 @@ +/** + * 凭证缓存管理器 + * 纯内存操作,使用内存锁保证并发安全,定时同步到文件 + * 优先考虑并发性能,支持单实例部署 + */ + +import { promises as fs } from 'fs'; +import * as path from 'path'; +import { PROVIDER_MAPPINGS } from './provider-utils.js'; +import * as os from 'os'; + +/** + * 凭证条目结构 + * @typedef {Object} CredentialEntry + * @property {string} providerType - 提供商类型 + * @property {string} uuid - 节点唯一标识 + * @property {string} credPath - 凭证文件路径 + * @property {Object} credentials - 凭证数据 + * @property {number} lastModified - 最后修改时间戳 + * @property {number} lastAccessed - 最后访问时间戳 + * @property {boolean} isDirty - 是否有未同步的修改 + * @property {number} retryCount - 同步失败重试次数 + */ + +export class CredentialCacheManager { + static instance = null; + + constructor() { + // 凭证缓存: Map + // cacheKey = `${providerType}:${uuid}` + this.credentialCache = new Map(); + + // Promise链锁: Map + this.lockChains = new Map(); + + // 脏数据标记(需要同步到文件的 cacheKey) + this.dirtyKeys = new Set(); + + // 同步定时器 + this.syncTimer = null; + this.syncIntervalMs = 5000; // 默认5秒 + + // 是否已初始化 + this.isInitialized = false; + + // 是否正在同步 + this.isSyncing = false; + + // 最大重试次数 + this.maxRetries = 5; + + // 最大脏数据条目数 + this.maxDirtyKeys = 1000; + + // 死信队列 - 存储同步失败的凭证 + this.deadLetterQueue = new Map(); + + // 实例锁文件句柄 + this.instanceLockRelease = null; + + // 提供商类型到凭证路径键的映射 + this.providerCredPathKeys = {}; + for (const mapping of PROVIDER_MAPPINGS) { + this.providerCredPathKeys[mapping.providerType] = mapping.credPathKey; + } + } + + /** + * 获取单例实例 + * @returns {CredentialCacheManager} + */ + static getInstance() { + if (!CredentialCacheManager.instance) { + CredentialCacheManager.instance = new CredentialCacheManager(); + } + return CredentialCacheManager.instance; + } + + /** + * 生成缓存键 + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + * @returns {string} + */ + _getCacheKey(providerType, uuid) { + return `${providerType}:${uuid}`; + } + + /** + * 获取单实例锁,防止多实例并发运行 + * @throws {Error} 如果已有实例在运行 + */ + async acquireInstanceLock() { + const lockPath = path.join(os.tmpdir(), 'credential-cache.lock'); + const pidPath = path.join(os.tmpdir(), 'credential-cache.pid'); + + try { + // 尝试读取已存在的 PID 文件 + try { + const existingPid = await fs.readFile(pidPath, 'utf8'); + const pid = parseInt(existingPid.trim(), 10); + + // 检查进程是否还在运行 + try { + process.kill(pid, 0); // 0 信号仅检查进程存在性 + throw new Error(`[CredentialCache] Another instance is running (PID: ${pid}). Please stop it first.`); + } catch (killError) { + if (killError.code === 'ESRCH') { + // 进程已死亡,可以继续 + console.log(`[CredentialCache] Stale lock detected (PID: ${pid}), cleaning up...`); + } else { + throw killError; + } + } + } catch (readError) { + if (readError.code !== 'ENOENT') { + throw readError; + } + // PID 文件不存在,可以继续 + } + + // 写入当前进程 PID + await fs.writeFile(pidPath, String(process.pid), 'utf8'); + console.log(`[CredentialCache] Instance lock acquired (PID: ${process.pid})`); + + // 注册清理钩子 + this.instanceLockRelease = async () => { + try { + await fs.unlink(pidPath); + console.log('[CredentialCache] Instance lock released'); + } catch (error) { + if (error.code !== 'ENOENT') { + console.warn(`[CredentialCache] Failed to release lock: ${error.message}`); + } + } + }; + } catch (error) { + console.error(`[CredentialCache] Failed to acquire instance lock: ${error.message}`); + throw error; + } + } + + /** + * 预加载所有凭证到内存 + * @param {Object} providerPools - 提供商池配置 + */ + async preloadAllCredentials(providerPools) { + if (!providerPools || typeof providerPools !== 'object') { + console.log('[CredentialCache] No provider pools to preload'); + return; + } + + let loadedCount = 0; + let failedCount = 0; + + for (const [providerType, providers] of Object.entries(providerPools)) { + if (!Array.isArray(providers)) continue; + + const credPathKey = this.providerCredPathKeys[providerType]; + if (!credPathKey) { + console.warn(`[CredentialCache] Unknown provider type: ${providerType}`); + continue; + } + + for (const providerConfig of providers) { + const uuid = providerConfig.uuid; + const credPath = providerConfig[credPathKey]; + + if (!uuid || !credPath) { + continue; + } + + try { + const credentials = await this._loadCredentialsFromFile(credPath); + if (credentials) { + const cacheKey = this._getCacheKey(providerType, uuid); + this.credentialCache.set(cacheKey, { + providerType, + uuid, + credPath, + credentials, + lastModified: Date.now(), + lastAccessed: Date.now(), + isDirty: false, + retryCount: 0 + }); + loadedCount++; + } + } catch (error) { + failedCount++; + console.warn(`[CredentialCache] Failed to load credentials for ${providerType}:${uuid}: ${error.message}`); + } + } + } + + this.isInitialized = true; + console.log(`[CredentialCache] Preloaded ${loadedCount} credentials (${failedCount} failed)`); + } + + /** + * 原子写入文件 (temp + rename 模式) + * @param {string} filePath - 目标文件路径 + * @param {string} data - 要写入的数据 + */ + async _atomicWriteFile(filePath, data) { + const tmpPath = `${filePath}.tmp.${Date.now()}.${process.pid}`; + + try { + // 1. 写入临时文件 + await fs.writeFile(tmpPath, data, 'utf8'); + + // 2. fsync 确保落盘 (需要文件句柄) + const fileHandle = await fs.open(tmpPath, 'r+'); + try { + await fileHandle.sync(); + } finally { + await fileHandle.close(); + } + + // 3. 原子重命名 + await fs.rename(tmpPath, filePath); + + return true; + } catch (error) { + // 清理临时文件 + try { + await fs.unlink(tmpPath); + } catch (unlinkError) { + // 忽略清理失败 + } + throw error; + } + } + /** + * 从文件加载凭证(仅用于初始导入和手动重载) + * 支持从损坏文件的备份恢复 + * @param {string} filePath - 文件路径 + * @returns {Promise} + */ + async _loadCredentialsFromFile(filePath) { + try { + // 处理相对路径 + const absolutePath = path.isAbsolute(filePath) + ? filePath + : path.join(process.cwd(), filePath); + + const content = await fs.readFile(absolutePath, 'utf8'); + return JSON.parse(content); + } catch (error) { + if (error.code === 'ENOENT') { + return null; + } + throw error; + } + } + + /** + * 获取凭证(从内存) + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + * @returns {CredentialEntry|null} + */ + getCredentials(providerType, uuid) { + const cacheKey = this._getCacheKey(providerType, uuid); + const entry = this.credentialCache.get(cacheKey); + + if (entry) { + entry.lastAccessed = Date.now(); + return entry; + } + + return null; + } + + /** + * 检查凭证是否存在于缓存中 + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + * @returns {boolean} + */ + hasCredentials(providerType, uuid) { + const cacheKey = this._getCacheKey(providerType, uuid); + return this.credentialCache.has(cacheKey); + } + + /** + * 更新凭证(仅更新内存,标记为dirty) + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + * @param {Object} newCredentials - 新凭证数据 + * @param {string} [credPath] - 凭证文件路径(可选,用于新建条目) + */ + updateCredentials(providerType, uuid, newCredentials, credPath = null) { + const cacheKey = this._getCacheKey(providerType, uuid); + let entry = this.credentialCache.get(cacheKey); + + if (entry) { + // 更新现有条目 + entry.credentials = newCredentials; + entry.lastModified = Date.now(); + entry.isDirty = true; + entry.retryCount = 0; // 重置重试计数 + } else if (credPath) { + // 创建新条目 + entry = { + providerType, + uuid, + credPath, + credentials: newCredentials, + lastModified: Date.now(), + lastAccessed: Date.now(), + isDirty: true, + retryCount: 0 + }; + this.credentialCache.set(cacheKey, entry); + } else { + console.warn(`[CredentialCache] Cannot update non-existent entry without credPath: ${cacheKey}`); + return; + } + + // 标记为需要同步 + this.dirtyKeys.add(cacheKey); + + // 检查脏数据是否过多 + if (this.dirtyKeys.size > this.maxDirtyKeys) { + console.warn(`[CredentialCache] Dirty keys exceeded ${this.maxDirtyKeys}, triggering immediate sync`); + this.syncToFile().catch(err => console.error('[CredentialCache] Emergency sync failed:', err.message)); + } + } + + /** + * 删除凭证(从内存中删除,同时删除文件) + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + */ + async deleteCredentials(providerType, uuid) { + const cacheKey = this._getCacheKey(providerType, uuid); + const entry = this.credentialCache.get(cacheKey); + + if (!entry) { + return; + } + + // 从内存中删除 + this.credentialCache.delete(cacheKey); + this.dirtyKeys.delete(cacheKey); + + // 删除文件 + if (entry.credPath) { + try { + const absolutePath = path.isAbsolute(entry.credPath) + ? entry.credPath + : path.join(process.cwd(), entry.credPath); + await fs.unlink(absolutePath); + console.log(`[CredentialCache] Deleted credential file: ${entry.credPath}`); + } catch (error) { + if (error.code !== 'ENOENT') { + console.error(`[CredentialCache] Failed to delete ${entry.credPath}: ${error.message}`); + } + } + } + } + + /** + * Promise链式内存锁 - 保证串行执行 + * 使用 Promise 构造器模式避免 Read-Check-Write 竞态 + * @param {string} key - 锁的唯一标识 + * @param {Function} operation - 要执行的操作 + * @returns {Promise} + */ + async withMemoryLock(key, operation) { + // 创建新的 Promise 用于链接后续操作 + let resolveNext, rejectNext; + const nextPromise = new Promise((resolve, reject) => { + resolveNext = resolve; + rejectNext = reject; + }); + + // 原子化获取前序链并立即更新 Map (避免竞态) + const prevChain = this.lockChains.get(key); + this.lockChains.set(key, nextPromise); + + // 链接到前序 Promise 执行操作 + const executeOperation = async () => { + try { + // 等待前序操作完成(忽略前序的错误,继续执行当前操作) + if (prevChain) { + await prevChain.catch(() => {}); + } + const result = await operation(); + resolveNext(result); + } catch (error) { + rejectNext(error); + } finally { + // 只有当前 Promise 还在 Map 中时才删除 + if (this.lockChains.get(key) === nextPromise) { + this.lockChains.delete(key); + } + } + }; + + // 立即开始执行(不阻塞) + executeOperation(); + + return nextPromise; + } + + /** + * 去重执行 - 多个并发请求共享同一个操作结果 + * @param {string} key - 去重键 + * @param {Function} operation - 要执行的操作 + * @returns {Promise} + */ + async withDeduplication(key, operation) { + const dedupeKey = `dedupe:${key}`; + + // 检查是否已有正在执行的操作 + const existingPromise = this.lockChains.get(dedupeKey); + if (existingPromise) { + // 直接等待现有操作完成并返回结果 + return existingPromise; + } + + // 创建新操作 Promise 并立即存储(避免竞态) + const operationPromise = (async () => { + try { + return await operation(); + } finally { + // 操作完成后清理 + if (this.lockChains.get(dedupeKey) === operationPromise) { + this.lockChains.delete(dedupeKey); + } + } + })(); + + // 立即存储,确保后续请求能看到 + this.lockChains.set(dedupeKey, operationPromise); + + return operationPromise; + } + + /** + * 启动定时同步 + * @param {number} intervalMs - 同步间隔(毫秒) + */ + startPeriodicSync(intervalMs = 5000) { + this.syncIntervalMs = intervalMs; + + if (this.syncTimer) { + clearInterval(this.syncTimer); + } + + this.syncTimer = setInterval(() => { + this.syncToFile().catch(error => { + console.error('[CredentialCache] Periodic sync failed:', error.message); + }); + }, this.syncIntervalMs); + + // 确保定时器不阻止进程退出 + if (this.syncTimer.unref) { + this.syncTimer.unref(); + } + + console.log(`[CredentialCache] Started periodic sync (interval: ${intervalMs}ms)`); + } + + /** + * 停止定时同步 + */ + stopPeriodicSync() { + if (this.syncTimer) { + clearInterval(this.syncTimer); + this.syncTimer = null; + console.log('[CredentialCache] Stopped periodic sync'); + } + } + + /** + * 同步脏数据到文件(纯内存操作,不使用文件锁) + */ + async syncToFile() { + if (this.dirtyKeys.size === 0) { + return; + } + + if (this.isSyncing) { + return; // 防止重入 + } + + this.isSyncing = true; + + // 复制脏数据集合(不立即清空,同步成功后再清理) + const keysToSync = Array.from(this.dirtyKeys); + + console.log(`[CredentialCache] Syncing ${keysToSync.length} credential(s) to file...`); + + let successCount = 0; + let failedCount = 0; + const successKeys = new Set(); + + // 并发写入所有文件(提高性能) + await Promise.allSettled( + keysToSync.map(async (cacheKey) => { + const entry = this.credentialCache.get(cacheKey); + if (!entry || !entry.credPath) { + return; + } + + // 检查重试次数 + if (entry.retryCount >= this.maxRetries) { + // 达到最大重试次数,移入死信队列 + console.error(`[CredentialCache] Max retries exceeded for ${cacheKey}, moving to dead letter queue`); + + this.deadLetterQueue.set(cacheKey, { + entry: JSON.parse(JSON.stringify(entry)), // 深拷贝 + failureReason: entry.lastError || 'Max retries exceeded', + firstFailureTime: entry.firstFailureTime || Date.now(), + timestamp: Date.now() + }); + + // 尝试导出到紧急备份 + try { + const emergencyDir = path.join(process.cwd(), 'emergency_backup'); + await fs.mkdir(emergencyDir, { recursive: true }); + const emergencyPath = path.join(emergencyDir, `${cacheKey.replace(/:/g, '_')}.json`); + await fs.writeFile(emergencyPath, JSON.stringify(entry.credentials, null, 2), 'utf8'); + console.log(`[CredentialCache] Credential backed up to: ${emergencyPath}`); + } catch (backupError) { + console.error(`[CredentialCache] Failed to backup credential: ${backupError.message}`); + } + + this.dirtyKeys.delete(cacheKey); + entry.isDirty = false; + return; + } + + try { + // 处理相对路径 + const absolutePath = path.isAbsolute(entry.credPath) + ? entry.credPath + : path.join(process.cwd(), entry.credPath); + + // 确保目录存在 + const dir = path.dirname(absolutePath); + await fs.mkdir(dir, { recursive: true }); + + // 使用原子写入 + await this._atomicWriteFile( + absolutePath, + JSON.stringify(entry.credentials, null, 2) + ); + + entry.isDirty = false; + entry.retryCount = 0; + entry.lastError = null; + entry.firstFailureTime = null; + successKeys.add(cacheKey); + successCount++; + } catch (error) { + // 同步失败,增加重试计数并记录错误 + entry.retryCount++; + entry.lastError = error.message; + if (!entry.firstFailureTime) { + entry.firstFailureTime = Date.now(); + } + failedCount++; + console.error(`[CredentialCache] Failed to sync ${entry.credPath} (retry ${entry.retryCount}/${this.maxRetries}): ${error.message}`); + } + }) + ); + + // 从脏数据集合中移除成功同步的键 + for (const key of successKeys) { + this.dirtyKeys.delete(key); + } + + this.isSyncing = false; + + if (successCount > 0 || failedCount > 0) { + console.log(`[CredentialCache] Sync completed: ${successCount} success, ${failedCount} failed, ${this.dirtyKeys.size} pending`); + } + } + + /** + * 立即同步指定凭证到文件 + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + */ + async syncCredentialToFile(providerType, uuid) { + const cacheKey = this._getCacheKey(providerType, uuid); + const entry = this.credentialCache.get(cacheKey); + + if (!entry || !entry.credPath) { + return; + } + + try { + const absolutePath = path.isAbsolute(entry.credPath) + ? entry.credPath + : path.join(process.cwd(), entry.credPath); + + const dir = path.dirname(absolutePath); + await fs.mkdir(dir, { recursive: true }); + + // 使用原子写入 + await this._atomicWriteFile( + absolutePath, + JSON.stringify(entry.credentials, null, 2) + ); + + entry.isDirty = false; + entry.retryCount = 0; + entry.lastError = null; + entry.firstFailureTime = null; + this.dirtyKeys.delete(cacheKey); + } catch (error) { + console.error(`[CredentialCache] Failed to sync ${entry.credPath}: ${error.message}`); + throw error; + } + } + + /** + * 关闭时同步所有数据(带超时) + */ + async shutdown() { + console.log('[CredentialCache] Shutting down...'); + + // 停止定时同步 + this.stopPeriodicSync(); + + // 同步所有脏数据(带动态超时) + if (this.dirtyKeys.size > 0) { + console.log(`[CredentialCache] Syncing ${this.dirtyKeys.size} dirty credential(s) before shutdown...`); + + // 根据脏数据量动态调整超时时间 + const timeoutMs = Math.max(10000, this.dirtyKeys.size * 50 + 5000); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Shutdown sync timeout')), timeoutMs) + ); + + try { + await Promise.race([ + this.syncToFile(), + timeoutPromise + ]); + } catch (error) { + console.error(`[CredentialCache] Shutdown sync failed: ${error.message}`); + console.error(`[CredentialCache] ${this.dirtyKeys.size} credentials NOT saved`); + + // 尝试紧急备份未保存的凭证 + if (this.dirtyKeys.size > 0) { + console.log('[CredentialCache] Attempting emergency backup...'); + try { + const emergencyDir = path.join(process.cwd(), 'emergency_backup'); + await fs.mkdir(emergencyDir, { recursive: true }); + for (const cacheKey of this.dirtyKeys) { + const entry = this.credentialCache.get(cacheKey); + if (entry && entry.credentials) { + const emergencyPath = path.join(emergencyDir, `${cacheKey.replace(/:/g, '_')}.json`); + await fs.writeFile(emergencyPath, JSON.stringify(entry.credentials, null, 2), 'utf8'); + } + } + console.log(`[CredentialCache] Emergency backup completed to: ${emergencyDir}`); + } catch (backupError) { + console.error(`[CredentialCache] Emergency backup failed: ${backupError.message}`); + } + } + } + } + + // 释放实例锁 + if (this.instanceLockRelease) { + await this.instanceLockRelease(); + } + + // 输出死信队列状态 + if (this.deadLetterQueue.size > 0) { + console.warn(`[CredentialCache] ${this.deadLetterQueue.size} credential(s) in dead letter queue`); + } + + console.log('[CredentialCache] Shutdown complete'); + } + + /** + * 获取缓存统计信息 + * @returns {Object} + */ + getStats() { + const stats = { + totalEntries: this.credentialCache.size, + dirtyEntries: this.dirtyKeys.size, + activeLocks: this.lockChains.size, + deadLetterQueueSize: this.deadLetterQueue.size, + isInitialized: this.isInitialized, + isSyncing: this.isSyncing, + syncInterval: this.syncIntervalMs, + byProvider: {} + }; + + for (const [cacheKey, entry] of this.credentialCache) { + const providerType = entry.providerType; + if (!stats.byProvider[providerType]) { + stats.byProvider[providerType] = { + count: 0, + dirty: 0, + maxRetries: 0 + }; + } + stats.byProvider[providerType].count++; + if (entry.isDirty) { + stats.byProvider[providerType].dirty++; + stats.byProvider[providerType].maxRetries = Math.max( + stats.byProvider[providerType].maxRetries, + entry.retryCount + ); + } + } + + return stats; + } + + /** + * 获取死信队列内容 + * @returns {Array} + */ + getDeadLetterQueue() { + return Array.from(this.deadLetterQueue.entries()).map(([key, value]) => ({ + cacheKey: key, + ...value + })); + } + + /** + * 从死信队列恢复凭证 + * @param {string} cacheKey - 缓存键 + * @returns {boolean} 是否恢复成功 + */ + recoverFromDeadLetter(cacheKey) { + const deadEntry = this.deadLetterQueue.get(cacheKey); + if (!deadEntry) { + return false; + } + + // 恢复到缓存 + const entry = deadEntry.entry; + entry.retryCount = 0; + entry.isDirty = true; + entry.lastError = null; + entry.firstFailureTime = null; + this.credentialCache.set(cacheKey, entry); + this.dirtyKeys.add(cacheKey); + + // 从死信队列移除 + this.deadLetterQueue.delete(cacheKey); + + console.log(`[CredentialCache] Recovered credential from dead letter queue: ${cacheKey}`); + return true; + } + + /** + * 清除所有缓存(用于测试) + */ + clear() { + this.credentialCache.clear(); + this.lockChains.clear(); + this.dirtyKeys.clear(); + this.deadLetterQueue.clear(); + this.isInitialized = false; + } + + /** + * 重新加载指定凭证(从文件) + * @param {string} providerType - 提供商类型 + * @param {string} uuid - 节点UUID + * @returns {Promise} + */ + async reloadCredentials(providerType, uuid) { + const cacheKey = this._getCacheKey(providerType, uuid); + const entry = this.credentialCache.get(cacheKey); + + if (!entry || !entry.credPath) { + return null; + } + + try { + const credentials = await this._loadCredentialsFromFile(entry.credPath); + if (credentials) { + entry.credentials = credentials; + entry.lastModified = Date.now(); + entry.isDirty = false; + entry.retryCount = 0; + this.dirtyKeys.delete(cacheKey); + return entry; + } + } catch (error) { + console.error(`[CredentialCache] Failed to reload ${entry.credPath}: ${error.message}`); + } + + return null; + } +} + +// 导出单例获取函数 +export function getCredentialCacheManager() { + return CredentialCacheManager.getInstance(); +} diff --git a/src/utils/file-lock.js b/src/utils/file-lock.js deleted file mode 100644 index 46c5a5a..0000000 --- a/src/utils/file-lock.js +++ /dev/null @@ -1,220 +0,0 @@ -import * as path from 'path'; - -/** - * 文件锁管理器 - 防止并发写入导致文件损坏 - * - * 使用场景: - * - 多个异步操作同时读写同一文件 - * - 防止读-改-写竞争条件(Race Condition) - * - 防止写入交错导致文件内容损坏 - * - * 注意:这是进程内锁,只能防止同一 Node.js 进程内的并发。 - * 如果需要跨进程文件锁,请使用 proper-lockfile 等库。 - */ - -// 存储每个文件的锁队列(Promise 链) -// 每个文件对应一个 Promise,新的锁请求会链接到当前 Promise 之后 -const fileLockQueues = new Map(); - -// 存储去重锁的进行中 Promise -// 用于合并相同 key 的并发请求,只执行一次操作 -const dedupePromises = new Map(); - -/** - * 获取文件锁,确保同一时间只有一个操作可以访问特定文件 - * - * 实现原理:使用 Promise 链实现队列机制 - * - 每个文件维护一个 Promise 链 - * - 新的锁请求会等待当前链完成,然后创建新的链节点 - * - 这确保了锁的获取是严格串行的,避免竞态条件 - * - * @param {string} filePath - 文件路径 - * @returns {Promise<() => void>} 释放锁的函数 - * - * @example - * const releaseLock = await acquireFileLock('/path/to/file.json'); - * try { - * // 读取、修改、写入文件 - * const data = await fs.readFile(filePath, 'utf8'); - * const modified = JSON.parse(data); - * modified.key = 'new value'; - * await fs.writeFile(filePath, JSON.stringify(modified, null, 2)); - * } finally { - * releaseLock(); // 确保锁被释放 - * } - */ -export async function acquireFileLock(filePath) { - const normalizedPath = path.resolve(filePath); - - // 获取当前队列中的最后一个 Promise(如果存在) - const currentLock = fileLockQueues.get(normalizedPath) || Promise.resolve(); - - // 创建释放锁的 resolver - let releaseLock; - const newLockPromise = new Promise(resolve => { - releaseLock = resolve; - }); - - // 立即将新的 Promise 加入队列(在 await 之前!) - // 这是关键:确保后续请求会等待这个新的 Promise - fileLockQueues.set(normalizedPath, newLockPromise); - - // 等待前一个锁释放 - await currentLock; - - // 返回释放锁的函数 - return () => { - // 只有当当前锁仍是队列中的最后一个时才清理 - // 否则保留队列让后续请求继续等待 - if (fileLockQueues.get(normalizedPath) === newLockPromise) { - fileLockQueues.delete(normalizedPath); - } - releaseLock(); - }; -} - -/** - * 使用文件锁执行操作的便捷函数 - * @param {string} filePath - 文件路径 - * @param {Function} operation - 要执行的异步操作 - * @returns {Promise} 操作的返回值 - * - * @example - * const result = await withFileLock('/path/to/file.json', async () => { - * const data = await fs.readFile(filePath, 'utf8'); - * const modified = JSON.parse(data); - * modified.key = 'new value'; - * await fs.writeFile(filePath, JSON.stringify(modified, null, 2)); - * return modified; - * }); - */ -export async function withFileLock(filePath, operation) { - const releaseLock = await acquireFileLock(filePath); - try { - return await operation(); - } finally { - releaseLock(); - } -} - -/** - * 检查文件是否被锁定(有等待中的锁队列) - * @param {string} filePath - 文件路径 - * @returns {boolean} 是否被锁定 - */ -export function isFileLocked(filePath) { - const normalizedPath = path.resolve(filePath); - return fileLockQueues.has(normalizedPath); -} - -/** - * 获取当前被锁定的文件数量(用于调试) - * @returns {number} 被锁定的文件数量 - */ -export function getLockedFileCount() { - return fileLockQueues.size; -} - -/** - * 去重执行 - 合并相同 key 的并发请求,只执行一次操作 - * - * 与 withFileLock 的区别: - * - withFileLock(队列锁):10个并发请求 → 排队执行10次 - * - withDeduplication(去重锁):10个并发请求 → 只执行1次,共享结果 - * - * 使用场景: - * - Token 刷新:多个请求同时发现 token 过期,只需刷新一次 - * - 缓存填充:多个请求同时 cache miss,只需加载一次 - * - 任何"结果可共享"的昂贵操作 - * - * @param {string} key - 去重的唯一标识符 - * @param {Function} operation - 要执行的异步操作 - * @returns {Promise} 操作的返回值(所有等待者共享同一结果) - * - * @example - * // 多个并发调用只会执行一次 refreshToken - * const newToken = await withDeduplication('token-refresh', async () => { - * const response = await fetch('/refresh'); - * return response.json(); - * }); - */ -export async function withDeduplication(key, operation) { - // 如果已有相同 key 的操作在进行中,直接等待它的结果 - if (dedupePromises.has(key)) { - return dedupePromises.get(key); - } - - // 创建新的操作 Promise - const operationPromise = (async () => { - try { - return await operation(); - } finally { - // 操作完成后清理 - dedupePromises.delete(key); - } - })(); - - // 存入 Map,让后续请求可以共享 - dedupePromises.set(key, operationPromise); - - return operationPromise; -} - -/** - * 组合去重锁和文件锁 - 先去重再加文件锁 - * - * 典型场景:Token 刷新 - * 1. 去重层:10个并发刷新请求 → 合并为1次刷新操作 - * 2. 文件锁层:保护那1次刷新操作的文件写入不与其他操作冲突 - * - * @param {string} dedupeKey - 去重的唯一标识符 - * @param {string} filePath - 需要保护的文件路径 - * @param {Function} operation - 要执行的异步操作 - * @returns {Promise} 操作的返回值 - * - * @example - * // Token 刷新场景 - * const newToken = await withDeduplicationAndFileLock( - * 'token-refresh-' + credentialId, - * '/path/to/token.json', - * async () => { - * const response = await fetch('/refresh'); - * const data = await response.json(); - * await fs.writeFile('/path/to/token.json', JSON.stringify(data)); - * return data; - * } - * ); - */ -export async function withDeduplicationAndFileLock(dedupeKey, filePath, operation) { - return withDeduplication(dedupeKey, async () => { - return withFileLock(filePath, operation); - }); -} - -/** - * 检查是否有去重操作正在进行 - * @param {string} key - 去重的唯一标识符 - * @returns {boolean} 是否有操作在进行中 - */ -export function isDedupeInProgress(key) { - return dedupePromises.has(key); -} - -/** - * 获取当前进行中的去重操作数量(用于调试) - * @returns {number} 进行中的去重操作数量 - */ -export function getDedupeCount() { - return dedupePromises.size; -} - -export default { - acquireFileLock, - withFileLock, - isFileLocked, - getLockedFileCount, - withDeduplication, - withDeduplicationAndFileLock, - isDedupeInProgress, - getDedupeCount -}; \ No newline at end of file