diff --git a/VERSION b/VERSION index d48d370..24ba9a3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.6.9 +2.7.0 diff --git a/src/handlers/ollama-handler.js b/src/handlers/ollama-handler.js index 48fff52..33afe42 100644 --- a/src/handlers/ollama-handler.js +++ b/src/handlers/ollama-handler.js @@ -533,8 +533,8 @@ export async function handleOllamaChat(req, res, apiService, currentConfig, prov // If apiService is null or provider is different, get the appropriate service from pool if (!apiService || detectedProvider !== currentConfig.MODEL_PROVIDER) { if (providerPoolManager) { - // Select provider from pool - const providerConfig = providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true }); + // Select provider from pool (now async) + const providerConfig = await providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true }); if (providerConfig) { actualConfig = { ...currentConfig, @@ -640,8 +640,8 @@ export async function handleOllamaGenerate(req, res, apiService, currentConfig, // If apiService is null or provider is different, get the appropriate service from pool if (!apiService || detectedProvider !== currentConfig.MODEL_PROVIDER) { if (providerPoolManager) { - // Select provider from pool - const providerConfig = providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true }); + // Select provider from pool (now async) + const providerConfig = await providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true }); if (providerConfig) { actualConfig = { ...currentConfig, diff --git a/src/providers/claude/claude-kiro.js b/src/providers/claude/claude-kiro.js index 1c966c8..3f6a353 100644 --- a/src/providers/claude/claude-kiro.js +++ b/src/providers/claude/claude-kiro.js @@ -11,7 +11,7 @@ import { countTokens } from '@anthropic-ai/tokenizer'; import { configureAxiosProxy } from '../../utils/proxy-utils.js'; import { isRetryableNetworkError, MODEL_PROVIDER } from '../../utils/common.js'; import { getProviderPoolManager } from '../../services/service-manager.js'; -import { acquireFileLock } from '../../utils/file-lock.js'; +import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; const KIRO_THINKING = { MAX_BUDGET_TOKENS: 24576, @@ -61,10 +61,6 @@ const MODEL_MAPPING = Object.fromEntries( const KIRO_AUTH_TOKEN_FILE = "kiro-auth-token.json"; -// Token 刷新单例锁 - 按凭证文件路径索引,防止多个并发请求同时刷新同一个 token -// 这解决了文件锁导致的并发请求串行化问题 -const tokenRefreshPromises = new Map(); - /** * Kiro API Service - Node.js implementation based on the Python ki2api * Provides OpenAI-compatible API for Claude Sonnet 4 via Kiro/CodeWhisperer @@ -414,23 +410,8 @@ async initializeAuth(forceRefresh = false) { return; } - // 获取凭证文件路径,用于单例锁的 key + // 获取凭证文件路径,用于去重锁的 key const tokenFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE); - - // 单例刷新逻辑:如果已有刷新在进行中,等待它完成而不是重复刷新 - if (forceRefresh && tokenRefreshPromises.has(tokenFilePath)) { - console.log('[Kiro Auth] Token refresh already in progress for this credential, waiting...'); - try { - await tokenRefreshPromises.get(tokenFilePath); - // 刷新完成后,重新加载凭证(因为其他请求可能已经更新了 token) - await this._reloadCredentialsAfterRefresh(tokenFilePath); - console.log('[Kiro Auth] Reused token from concurrent refresh'); - return; - } catch (error) { - // 如果等待的刷新失败了,我们需要自己尝试刷新 - console.warn('[Kiro Auth] Concurrent refresh failed, will attempt own refresh:', error.message); - } - } // Helper to load credentials from a file const loadCredentialsFromFile = async (filePath) => { @@ -575,15 +556,19 @@ async initializeAuth(forceRefresh = false) { throw new Error('No refresh token available to refresh access token.'); } - // 创建刷新 Promise 并存入单例锁 Map - const refreshPromise = this._doTokenRefresh(saveCredentialsToFile, tokenFilePath); - tokenRefreshPromises.set(tokenFilePath, refreshPromise); + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + const dedupeKey = `kiro-token-refresh:${tokenFilePath}`; + await withDeduplication(dedupeKey, async () => { + await this._doTokenRefresh(saveCredentialsToFile, tokenFilePath); + }); - try { - await refreshPromise; - } finally { - // 刷新完成后清理单例锁 - tokenRefreshPromises.delete(tokenFilePath); + // 如果是等待其他请求完成的刷新,需要重新加载凭证 + // 因为 _doTokenRefresh 只更新了执行刷新的实例的内存状态 + // 注意:withDeduplication 会让所有等待者共享同一个 Promise + // 但只有第一个调用者的实例会执行 _doTokenRefresh 并更新自己的内存状态 + // 其他等待者需要从文件重新加载 + if (!this.accessToken || this.isExpiryDateNear()) { + await this._reloadCredentialsAfterRefresh(tokenFilePath); } } diff --git a/src/providers/gemini/antigravity-core.js b/src/providers/gemini/antigravity-core.js index 2150c86..aa5f4a4 100644 --- a/src/providers/gemini/antigravity-core.js +++ b/src/providers/gemini/antigravity-core.js @@ -14,7 +14,7 @@ import { getProviderModels } from '../provider-models.js'; import { handleGeminiAntigravityOAuth } from '../../auth/oauth-handlers.js'; import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js'; import { cleanJsonSchemaProperties } from '../../converters/utils.js'; -import { acquireFileLock } from '../../utils/file-lock.js'; +import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; // 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏 const httpAgent = new http.Agent({ @@ -795,12 +795,26 @@ export class AntigravityApiService { console.log('[Antigravity Auth] Authentication configured successfully from file.'); if (needsRefresh) { - console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...'); - const { credentials: newCredentials } = await this.authClient.refreshAccessToken(); - this.authClient.setCredentials(newCredentials); - // 保存刷新后的凭证到文件(使用文件锁) - await this._saveCredentialsToFile(credPath, newCredentials); - console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`); + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + const dedupeKey = `antigravity-token-refresh:${credPath}`; + await withDeduplication(dedupeKey, async () => { + console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...'); + const { credentials: newCredentials } = await this.authClient.refreshAccessToken(); + this.authClient.setCredentials(newCredentials); + // 保存刷新后的凭证到文件(使用文件锁) + await this._saveCredentialsToFile(credPath, newCredentials); + console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`); + }); + + // 如果是等待其他请求完成的刷新,需要重新加载凭证 + // 因为 withDeduplication 只让第一个调用者执行刷新并更新自己的内存状态 + // 其他等待者需要从文件重新加载 + if (this.isTokenExpiringSoon()) { + const refreshedData = await fs.readFile(credPath, "utf8"); + const refreshedCredentials = JSON.parse(refreshedData); + this.authClient.setCredentials(refreshedCredentials); + console.log('[Antigravity Auth] Credentials reloaded after concurrent refresh'); + } } } catch (error) { console.error('[Antigravity Auth] Error initializing authentication:', error.code); diff --git a/src/providers/gemini/gemini-core.js b/src/providers/gemini/gemini-core.js index 18d8736..026eeaf 100644 --- a/src/providers/gemini/gemini-core.js +++ b/src/providers/gemini/gemini-core.js @@ -10,7 +10,7 @@ import { API_ACTIONS, formatExpiryTime, isRetryableNetworkError } from '../../ut import { getProviderModels } from '../provider-models.js'; import { handleGeminiCliOAuth } from '../../auth/oauth-handlers.js'; import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js'; -import { acquireFileLock } from '../../utils/file-lock.js'; +import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; // 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏 const httpAgent = new http.Agent({ @@ -273,11 +273,24 @@ export class GeminiApiService { if (forceRefresh) { console.log('[Gemini Auth] Forcing token refresh...'); - const { credentials: newCredentials } = await this.authClient.refreshAccessToken(); - this.authClient.setCredentials(newCredentials); - // Save refreshed credentials back to file (with file locking) - await this._saveCredentialsToFile(credPath, newCredentials); - console.log('[Gemini Auth] Token refreshed and saved successfully.'); + + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + const dedupeKey = `gemini-token-refresh:${credPath}`; + await withDeduplication(dedupeKey, async () => { + const { credentials: newCredentials } = await this.authClient.refreshAccessToken(); + this.authClient.setCredentials(newCredentials); + // Save refreshed credentials back to file (with file locking) + await this._saveCredentialsToFile(credPath, newCredentials); + console.log('[Gemini Auth] Token refreshed and saved successfully.'); + }); + + // 如果是等待其他请求完成的刷新,需要重新加载凭证 + if (this.isExpiryDateNear()) { + const refreshedData = await fs.readFile(credPath, "utf8"); + const refreshedCredentials = JSON.parse(refreshedData); + this.authClient.setCredentials(refreshedCredentials); + console.log('[Gemini Auth] Credentials reloaded after concurrent refresh'); + } } } catch (error) { console.error('[Gemini Auth] Error initializing authentication:', error.code); diff --git a/src/providers/openai/iflow-core.js b/src/providers/openai/iflow-core.js index 93083c5..9b64110 100644 --- a/src/providers/openai/iflow-core.js +++ b/src/providers/openai/iflow-core.js @@ -24,7 +24,7 @@ import * as path from 'path'; import * as os from 'os'; import { configureAxiosProxy } from '../../utils/proxy-utils.js'; import { isRetryableNetworkError } from '../../utils/common.js'; -import { acquireFileLock } from '../../utils/file-lock.js'; +import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; // iFlow API 端点 const IFLOW_API_BASE_URL = 'https://apis.iflow.cn/v1'; @@ -570,7 +570,23 @@ export class IFlowApiService { console.log('[iFlow] Token is expiring soon, attempting refresh...'); try { - await this._refreshOAuthTokens(); + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + const dedupeKey = `iflow-token-refresh:${this.tokenFilePath}`; + await withDeduplication(dedupeKey, async () => { + await this._refreshOAuthTokens(); + }); + + // 如果是等待其他请求完成的刷新,需要重新加载凭证 + if (this.isExpiryDateNear()) { + const refreshedStorage = await loadTokenFromFile(this.tokenFilePath); + if (refreshedStorage && refreshedStorage.apiKey) { + this.tokenStorage = refreshedStorage; + this.apiKey = refreshedStorage.apiKey; + this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`; + console.log('[iFlow] Credentials reloaded after concurrent refresh'); + } + } + return true; } catch (error) { console.error('[iFlow] Token refresh failed:', error.message); diff --git a/src/providers/openai/qwen-core.js b/src/providers/openai/qwen-core.js index 06e75a0..f5527e2 100644 --- a/src/providers/openai/qwen-core.js +++ b/src/providers/openai/qwen-core.js @@ -12,7 +12,7 @@ import { getProviderModels } from '../provider-models.js'; import { handleQwenOAuth } from '../../auth/oauth-handlers.js'; import { configureAxiosProxy } from '../../utils/proxy-utils.js'; import { isRetryableNetworkError } from '../../utils/common.js'; -import { acquireFileLock } from '../../utils/file-lock.js'; +import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js'; // --- Constants --- const QWEN_DIR = '.qwen'; @@ -789,41 +789,62 @@ class SharedTokenManager { } async performTokenRefresh(context, qwenClient, forceRefresh = false) { + const currentCredentials = qwenClient.getCredentials() || context.memoryCache.credentials; + if (!currentCredentials || !currentCredentials.refresh_token) { + throw new TokenManagerError(TokenError.NO_REFRESH_TOKEN, 'No refresh token available'); + } + + // 使用去重锁:多个并发刷新请求只执行一次,共享结果 + const dedupeKey = `qwen-token-refresh:${context.credentialFilePath}`; + try { - const currentCredentials = qwenClient.getCredentials() || context.memoryCache.credentials; - if (!currentCredentials || !currentCredentials.refresh_token) { - throw new TokenManagerError(TokenError.NO_REFRESH_TOKEN, 'No refresh token available'); - } + const credentials = await withDeduplication(dedupeKey, async () => { + await this.acquireLock(context); + try { + await this.checkAndReloadIfNeeded(context); - await this.acquireLock(context); - await this.checkAndReloadIfNeeded(context); + if (!forceRefresh && context.memoryCache.credentials && this.isTokenValid(context.memoryCache.credentials)) { + qwenClient.setCredentials(context.memoryCache.credentials); + return context.memoryCache.credentials; + } - if (!forceRefresh && context.memoryCache.credentials && this.isTokenValid(context.memoryCache.credentials)) { - qwenClient.setCredentials(context.memoryCache.credentials); - return context.memoryCache.credentials; - } + const response = await qwenClient.refreshAccessToken(); + if (!response || isErrorResponse(response)) { + throw new TokenManagerError(TokenError.REFRESH_FAILED, `Token refresh failed: ${response?.error}`); + } + if (!response.access_token) { + throw new TokenManagerError(TokenError.REFRESH_FAILED, 'No access token in refresh response'); + } + const newCredentials = { + access_token: response.access_token, + token_type: response.token_type, + refresh_token: response.refresh_token || currentCredentials.refresh_token, + resource_url: response.resource_url, + expiry_date: Date.now() + response.expires_in * 1000, + }; - const response = await qwenClient.refreshAccessToken(); - if (!response || isErrorResponse(response)) { - throw new TokenManagerError(TokenError.REFRESH_FAILED, `Token refresh failed: ${response?.error}`); + context.memoryCache.credentials = newCredentials; + qwenClient.setCredentials(newCredentials); + await this.saveCredentialsToFile(context, newCredentials); + console.log('[Qwen Auth] Token refresh response: ok'); + return newCredentials; + } finally { + await this.releaseLock(context); + } + }); + + // 如果是等待其他请求完成的刷新,需要重新加载凭证 + // 因为 withDeduplication 会让所有等待者共享同一个 Promise + // 但只有第一个调用者的实例会执行刷新并更新自己的内存状态 + // 其他等待者需要从文件重新加载 + if (!context.memoryCache.credentials || !this.isTokenValid(context.memoryCache.credentials)) { + await this.reloadCredentialsFromFile(context); + if (context.memoryCache.credentials) { + qwenClient.setCredentials(context.memoryCache.credentials); + } } - if (!response.access_token) { - throw new TokenManagerError(TokenError.REFRESH_FAILED, 'No access token in refresh response'); - } - const credentials = { - access_token: response.access_token, - token_type: response.token_type, - refresh_token: response.refresh_token || currentCredentials.refresh_token, - resource_url: response.resource_url, - expiry_date: Date.now() + response.expires_in * 1000, - }; - - context.memoryCache.credentials = credentials; - qwenClient.setCredentials(credentials); - await this.saveCredentialsToFile(context, credentials); - console.log('[Qwen Auth] Token refresh response: ok'); + return credentials; - } catch (error) { if (error instanceof TokenManagerError) throw error; @@ -845,8 +866,6 @@ class SharedTokenManager { `Unexpected error during token refresh: ${error.message}`, error, ); - } finally { - await this.releaseLock(context); } } diff --git a/src/providers/provider-pool-manager.js b/src/providers/provider-pool-manager.js index ec63455..e2a667f 100644 --- a/src/providers/provider-pool-manager.js +++ b/src/providers/provider-pool-manager.js @@ -43,6 +43,10 @@ export class ProviderPoolManager { // Model Fallback 映射配置 this.modelFallbackMapping = options.globalConfig?.modelFallbackMapping || {}; + // 并发控制:每个 providerType 的选择锁 + // 用于确保 selectProvider 的排序和更新操作是原子的 + this._selectionLocks = {}; + this.initializeProviderStatus(); } @@ -79,6 +83,7 @@ export class ProviderPoolManager { for (const providerType in this.providerPools) { this.providerStatus[providerType] = []; this.roundRobinIndex[providerType] = 0; // Initialize round-robin index for each type + this._selectionLocks[providerType] = Promise.resolve(); // 初始化选择锁 this.providerPools[providerType].forEach((providerConfig) => { // Ensure initial health and usage stats are present in the config providerConfig.isHealthy = providerConfig.isHealthy !== undefined ? providerConfig.isHealthy : true; @@ -111,17 +116,40 @@ export class ProviderPoolManager { * Selects a provider from the pool for a given provider type. * Currently uses a simple round-robin for healthy providers. * If requestedModel is provided, providers that don't support the model will be excluded. + * + * 注意:此方法现在返回 Promise,使用链式锁确保并发安全。 + * * @param {string} providerType - The type of provider to select (e.g., 'gemini-cli', 'openai-custom'). * @param {string} [requestedModel] - Optional. The model name to filter providers by. - * @returns {object|null} The selected provider's configuration, or null if no healthy provider is found. + * @returns {Promise} The selected provider's configuration, or null if no healthy provider is found. */ selectProvider(providerType, requestedModel = null, options = {}) { // 参数校验 if (!providerType || typeof providerType !== 'string') { this._log('error', `Invalid providerType: ${providerType}`); - return null; + return Promise.resolve(null); } + // 使用链式锁确保同一 providerType 的选择操作串行执行 + // 这样可以避免并发场景下多个请求选择到同一个 provider + const currentLock = this._selectionLocks[providerType] || Promise.resolve(); + + const selectionPromise = currentLock.then(() => { + return this._doSelectProvider(providerType, requestedModel, options); + }); + + // 更新锁,确保下一个请求等待当前请求完成 + // 使用 catch 确保即使出错也不会阻塞后续请求 + this._selectionLocks[providerType] = selectionPromise.catch(() => {}); + + return selectionPromise; + } + + /** + * 实际执行 provider 选择的内部方法(同步执行,由锁保护) + * @private + */ + _doSelectProvider(providerType, requestedModel, options) { const availableProviders = this.providerStatus[providerType] || []; let availableAndHealthyProviders = availableProviders.filter(p => p.config.isHealthy && !p.config.isDisabled @@ -152,7 +180,7 @@ export class ProviderPoolManager { return null; } - // 改进:使用“最久未被使用”策略(LRU)代替取模轮询 + // 改进:使用"最久未被使用"策略(LRU)代替取模轮询 // 这样即使可用列表长度动态变化,也能确保每个账号被平均轮到 const selected = availableAndHealthyProviders.sort((a, b) => { const timeA = a.config.lastUsed ? new Date(a.config.lastUsed).getTime() : 0; @@ -164,14 +192,15 @@ export class ProviderPoolManager { })[0]; // 更新使用信息(除非明确跳过) + // 注意:这里的更新是同步的,在锁保护下执行,确保下一个请求能看到最新的 lastUsed if (!options.skipUsageCount) { selected.config.lastUsed = new Date().toISOString(); selected.config.usageCount++; - // 使用防抖保存 + // 使用防抖保存(文件 I/O 是异步的,但内存已经更新) this._debouncedSave(providerType); } - this._log('debug', `Selected provider for ${providerType} (round-robin): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}${options.skipUsageCount ? ' (skip usage count)' : ''}`); + this._log('debug', `Selected provider for ${providerType} (LRU): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}${options.skipUsageCount ? ' (skip usage count)' : ''}`); return selected.config; } @@ -185,7 +214,19 @@ export class ProviderPoolManager { * @param {boolean} [options.skipUsageCount] - Optional. If true, skip incrementing usage count. * @returns {object|null} An object containing the selected provider's configuration and the actual provider type used, or null if no healthy provider is found. */ - selectProviderWithFallback(providerType, requestedModel = null, options = {}) { + /** + * Selects a provider from the pool with fallback support. + * When the primary provider type has no healthy providers, it will try fallback types. + * + * 注意:此方法现在返回 Promise,因为内部调用的 selectProvider 是异步的。 + * + * @param {string} providerType - The primary type of provider to select. + * @param {string} [requestedModel] - Optional. The model name to filter providers by. + * @param {Object} [options] - Optional. Additional options. + * @param {boolean} [options.skipUsageCount] - Optional. If true, skip incrementing usage count. + * @returns {Promise} An object containing the selected provider's configuration and the actual provider type used, or null if no healthy provider is found. + */ + async selectProviderWithFallback(providerType, requestedModel = null, options = {}) { // 参数校验 if (!providerType || typeof providerType !== 'string') { this._log('error', `Invalid providerType: ${providerType}`); @@ -237,8 +278,8 @@ export class ProviderPoolManager { } } - // 尝试从当前类型选择提供商 - const selectedConfig = this.selectProvider(currentType, requestedModel, options); + // 尝试从当前类型选择提供商(现在是异步的) + const selectedConfig = await this.selectProvider(currentType, requestedModel, options); if (selectedConfig) { if (currentType !== providerType) { @@ -270,8 +311,8 @@ export class ProviderPoolManager { // 检查目标类型是否有配置的池 if (this.providerStatus[targetProviderType] && this.providerStatus[targetProviderType].length > 0) { - // 尝试从目标类型选择提供商(使用转换后的模型名) - const selectedConfig = this.selectProvider(targetProviderType, targetModel, options); + // 尝试从目标类型选择提供商(使用转换后的模型名,现在是异步的) + const selectedConfig = await this.selectProvider(targetProviderType, targetModel, options); if (selectedConfig) { this._log('info', `Fallback activated (Model Mapping): ${providerType} (${requestedModel}) -> ${targetProviderType} (${targetModel}) (uuid: ${selectedConfig.uuid})`); @@ -299,7 +340,7 @@ export class ProviderPoolManager { const supportedModels = getProviderModels(fallbackType); if (supportedModels.length > 0 && !supportedModels.includes(targetModel)) continue; - const fallbackSelectedConfig = this.selectProvider(fallbackType, targetModel, options); + const fallbackSelectedConfig = await this.selectProvider(fallbackType, targetModel, options); if (fallbackSelectedConfig) { this._log('info', `Fallback activated (Model Mapping -> Chain): ${providerType} (${requestedModel}) -> ${targetProviderType} -> ${fallbackType} (${targetModel}) (uuid: ${fallbackSelectedConfig.uuid})`); return { diff --git a/src/services/service-manager.js b/src/services/service-manager.js index 3bdba07..37933aa 100644 --- a/src/services/service-manager.js +++ b/src/services/service-manager.js @@ -248,7 +248,8 @@ export async function getApiService(config, requestedModel = null, options = {}) let serviceConfig = config; if (providerPoolManager && config.providerPools && config.providerPools[config.MODEL_PROVIDER]) { // 如果有号池管理器,并且当前模型提供者类型有对应的号池,则从号池中选择一个提供者配置 - const selectedProviderConfig = providerPoolManager.selectProvider(config.MODEL_PROVIDER, requestedModel, { skipUsageCount: true }); + // selectProvider 现在是异步的,使用链式锁确保并发安全 + const selectedProviderConfig = await providerPoolManager.selectProvider(config.MODEL_PROVIDER, requestedModel, { skipUsageCount: true }); if (selectedProviderConfig) { // 合并选中的提供者配置到当前请求的 config 中 serviceConfig = deepmerge(config, selectedProviderConfig); @@ -281,7 +282,8 @@ export async function getApiServiceWithFallback(config, requestedModel = null, o let actualModel = null; if (providerPoolManager && config.providerPools && config.providerPools[config.MODEL_PROVIDER]) { - const selectedResult = providerPoolManager.selectProviderWithFallback( + // selectProviderWithFallback 现在是异步的,使用链式锁确保并发安全 + const selectedResult = await providerPoolManager.selectProviderWithFallback( config.MODEL_PROVIDER, requestedModel, { skipUsageCount: true } diff --git a/src/utils/file-lock.js b/src/utils/file-lock.js index 3f427d4..46c5a5a 100644 --- a/src/utils/file-lock.js +++ b/src/utils/file-lock.js @@ -16,6 +16,10 @@ import * as path from 'path'; // 每个文件对应一个 Promise,新的锁请求会链接到当前 Promise 之后 const fileLockQueues = new Map(); +// 存储去重锁的进行中 Promise +// 用于合并相同 key 的并发请求,只执行一次操作 +const dedupePromises = new Map(); + /** * 获取文件锁,确保同一时间只有一个操作可以访问特定文件 * @@ -111,9 +115,106 @@ export function getLockedFileCount() { return fileLockQueues.size; } +/** + * 去重执行 - 合并相同 key 的并发请求,只执行一次操作 + * + * 与 withFileLock 的区别: + * - withFileLock(队列锁):10个并发请求 → 排队执行10次 + * - withDeduplication(去重锁):10个并发请求 → 只执行1次,共享结果 + * + * 使用场景: + * - Token 刷新:多个请求同时发现 token 过期,只需刷新一次 + * - 缓存填充:多个请求同时 cache miss,只需加载一次 + * - 任何"结果可共享"的昂贵操作 + * + * @param {string} key - 去重的唯一标识符 + * @param {Function} operation - 要执行的异步操作 + * @returns {Promise} 操作的返回值(所有等待者共享同一结果) + * + * @example + * // 多个并发调用只会执行一次 refreshToken + * const newToken = await withDeduplication('token-refresh', async () => { + * const response = await fetch('/refresh'); + * return response.json(); + * }); + */ +export async function withDeduplication(key, operation) { + // 如果已有相同 key 的操作在进行中,直接等待它的结果 + if (dedupePromises.has(key)) { + return dedupePromises.get(key); + } + + // 创建新的操作 Promise + const operationPromise = (async () => { + try { + return await operation(); + } finally { + // 操作完成后清理 + dedupePromises.delete(key); + } + })(); + + // 存入 Map,让后续请求可以共享 + dedupePromises.set(key, operationPromise); + + return operationPromise; +} + +/** + * 组合去重锁和文件锁 - 先去重再加文件锁 + * + * 典型场景:Token 刷新 + * 1. 去重层:10个并发刷新请求 → 合并为1次刷新操作 + * 2. 文件锁层:保护那1次刷新操作的文件写入不与其他操作冲突 + * + * @param {string} dedupeKey - 去重的唯一标识符 + * @param {string} filePath - 需要保护的文件路径 + * @param {Function} operation - 要执行的异步操作 + * @returns {Promise} 操作的返回值 + * + * @example + * // Token 刷新场景 + * const newToken = await withDeduplicationAndFileLock( + * 'token-refresh-' + credentialId, + * '/path/to/token.json', + * async () => { + * const response = await fetch('/refresh'); + * const data = await response.json(); + * await fs.writeFile('/path/to/token.json', JSON.stringify(data)); + * return data; + * } + * ); + */ +export async function withDeduplicationAndFileLock(dedupeKey, filePath, operation) { + return withDeduplication(dedupeKey, async () => { + return withFileLock(filePath, operation); + }); +} + +/** + * 检查是否有去重操作正在进行 + * @param {string} key - 去重的唯一标识符 + * @returns {boolean} 是否有操作在进行中 + */ +export function isDedupeInProgress(key) { + return dedupePromises.has(key); +} + +/** + * 获取当前进行中的去重操作数量(用于调试) + * @returns {number} 进行中的去重操作数量 + */ +export function getDedupeCount() { + return dedupePromises.size; +} + export default { acquireFileLock, withFileLock, isFileLocked, - getLockedFileCount + getLockedFileCount, + withDeduplication, + withDeduplicationAndFileLock, + isDedupeInProgress, + getDedupeCount }; \ No newline at end of file