Merge branch 'justlovemaki:main' into main

This commit is contained in:
Meo 2026-01-14 16:11:10 +08:00 committed by GitHub
commit 1ec7ec3fcc
10 changed files with 286 additions and 95 deletions

View file

@ -1 +1 @@
2.6.9
2.7.0

View file

@ -533,8 +533,8 @@ export async function handleOllamaChat(req, res, apiService, currentConfig, prov
// If apiService is null or provider is different, get the appropriate service from pool
if (!apiService || detectedProvider !== currentConfig.MODEL_PROVIDER) {
if (providerPoolManager) {
// Select provider from pool
const providerConfig = providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true });
// Select provider from pool (now async)
const providerConfig = await providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true });
if (providerConfig) {
actualConfig = {
...currentConfig,
@ -640,8 +640,8 @@ export async function handleOllamaGenerate(req, res, apiService, currentConfig,
// If apiService is null or provider is different, get the appropriate service from pool
if (!apiService || detectedProvider !== currentConfig.MODEL_PROVIDER) {
if (providerPoolManager) {
// Select provider from pool
const providerConfig = providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true });
// Select provider from pool (now async)
const providerConfig = await providerPoolManager.selectProvider(detectedProvider, modelName, { skipUsageCount: true });
if (providerConfig) {
actualConfig = {
...currentConfig,

View file

@ -11,7 +11,7 @@ import { countTokens } from '@anthropic-ai/tokenizer';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError, MODEL_PROVIDER } from '../../utils/common.js';
import { getProviderPoolManager } from '../../services/service-manager.js';
import { acquireFileLock } from '../../utils/file-lock.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
const KIRO_THINKING = {
MAX_BUDGET_TOKENS: 24576,
@ -61,10 +61,6 @@ const MODEL_MAPPING = Object.fromEntries(
const KIRO_AUTH_TOKEN_FILE = "kiro-auth-token.json";
// Token 刷新单例锁 - 按凭证文件路径索引,防止多个并发请求同时刷新同一个 token
// 这解决了文件锁导致的并发请求串行化问题
const tokenRefreshPromises = new Map();
/**
* Kiro API Service - Node.js implementation based on the Python ki2api
* Provides OpenAI-compatible API for Claude Sonnet 4 via Kiro/CodeWhisperer
@ -414,23 +410,8 @@ async initializeAuth(forceRefresh = false) {
return;
}
// 获取凭证文件路径,用于单例锁的 key
// 获取凭证文件路径,用于去重锁的 key
const tokenFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE);
// 单例刷新逻辑:如果已有刷新在进行中,等待它完成而不是重复刷新
if (forceRefresh && tokenRefreshPromises.has(tokenFilePath)) {
console.log('[Kiro Auth] Token refresh already in progress for this credential, waiting...');
try {
await tokenRefreshPromises.get(tokenFilePath);
// 刷新完成后,重新加载凭证(因为其他请求可能已经更新了 token
await this._reloadCredentialsAfterRefresh(tokenFilePath);
console.log('[Kiro Auth] Reused token from concurrent refresh');
return;
} catch (error) {
// 如果等待的刷新失败了,我们需要自己尝试刷新
console.warn('[Kiro Auth] Concurrent refresh failed, will attempt own refresh:', error.message);
}
}
// Helper to load credentials from a file
const loadCredentialsFromFile = async (filePath) => {
@ -575,15 +556,19 @@ async initializeAuth(forceRefresh = false) {
throw new Error('No refresh token available to refresh access token.');
}
// 创建刷新 Promise 并存入单例锁 Map
const refreshPromise = this._doTokenRefresh(saveCredentialsToFile, tokenFilePath);
tokenRefreshPromises.set(tokenFilePath, refreshPromise);
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `kiro-token-refresh:${tokenFilePath}`;
await withDeduplication(dedupeKey, async () => {
await this._doTokenRefresh(saveCredentialsToFile, tokenFilePath);
});
try {
await refreshPromise;
} finally {
// 刷新完成后清理单例锁
tokenRefreshPromises.delete(tokenFilePath);
// 如果是等待其他请求完成的刷新,需要重新加载凭证
// 因为 _doTokenRefresh 只更新了执行刷新的实例的内存状态
// 注意withDeduplication 会让所有等待者共享同一个 Promise
// 但只有第一个调用者的实例会执行 _doTokenRefresh 并更新自己的内存状态
// 其他等待者需要从文件重新加载
if (!this.accessToken || this.isExpiryDateNear()) {
await this._reloadCredentialsAfterRefresh(tokenFilePath);
}
}

View file

@ -14,7 +14,7 @@ import { getProviderModels } from '../provider-models.js';
import { handleGeminiAntigravityOAuth } from '../../auth/oauth-handlers.js';
import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js';
import { cleanJsonSchemaProperties } from '../../converters/utils.js';
import { acquireFileLock } from '../../utils/file-lock.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
// 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏
const httpAgent = new http.Agent({
@ -795,12 +795,26 @@ export class AntigravityApiService {
console.log('[Antigravity Auth] Authentication configured successfully from file.');
if (needsRefresh) {
console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...');
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// 保存刷新后的凭证到文件(使用文件锁)
await this._saveCredentialsToFile(credPath, newCredentials);
console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`);
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `antigravity-token-refresh:${credPath}`;
await withDeduplication(dedupeKey, async () => {
console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...');
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// 保存刷新后的凭证到文件(使用文件锁)
await this._saveCredentialsToFile(credPath, newCredentials);
console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`);
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
// 因为 withDeduplication 只让第一个调用者执行刷新并更新自己的内存状态
// 其他等待者需要从文件重新加载
if (this.isTokenExpiringSoon()) {
const refreshedData = await fs.readFile(credPath, "utf8");
const refreshedCredentials = JSON.parse(refreshedData);
this.authClient.setCredentials(refreshedCredentials);
console.log('[Antigravity Auth] Credentials reloaded after concurrent refresh');
}
}
} catch (error) {
console.error('[Antigravity Auth] Error initializing authentication:', error.code);

View file

@ -10,7 +10,7 @@ import { API_ACTIONS, formatExpiryTime, isRetryableNetworkError } from '../../ut
import { getProviderModels } from '../provider-models.js';
import { handleGeminiCliOAuth } from '../../auth/oauth-handlers.js';
import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js';
import { acquireFileLock } from '../../utils/file-lock.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
// 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏
const httpAgent = new http.Agent({
@ -273,11 +273,24 @@ export class GeminiApiService {
if (forceRefresh) {
console.log('[Gemini Auth] Forcing token refresh...');
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// Save refreshed credentials back to file (with file locking)
await this._saveCredentialsToFile(credPath, newCredentials);
console.log('[Gemini Auth] Token refreshed and saved successfully.');
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `gemini-token-refresh:${credPath}`;
await withDeduplication(dedupeKey, async () => {
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// Save refreshed credentials back to file (with file locking)
await this._saveCredentialsToFile(credPath, newCredentials);
console.log('[Gemini Auth] Token refreshed and saved successfully.');
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
if (this.isExpiryDateNear()) {
const refreshedData = await fs.readFile(credPath, "utf8");
const refreshedCredentials = JSON.parse(refreshedData);
this.authClient.setCredentials(refreshedCredentials);
console.log('[Gemini Auth] Credentials reloaded after concurrent refresh');
}
}
} catch (error) {
console.error('[Gemini Auth] Error initializing authentication:', error.code);

View file

@ -24,7 +24,7 @@ import * as path from 'path';
import * as os from 'os';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
import { acquireFileLock } from '../../utils/file-lock.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
// iFlow API 端点
const IFLOW_API_BASE_URL = 'https://apis.iflow.cn/v1';
@ -570,7 +570,23 @@ export class IFlowApiService {
console.log('[iFlow] Token is expiring soon, attempting refresh...');
try {
await this._refreshOAuthTokens();
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `iflow-token-refresh:${this.tokenFilePath}`;
await withDeduplication(dedupeKey, async () => {
await this._refreshOAuthTokens();
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
if (this.isExpiryDateNear()) {
const refreshedStorage = await loadTokenFromFile(this.tokenFilePath);
if (refreshedStorage && refreshedStorage.apiKey) {
this.tokenStorage = refreshedStorage;
this.apiKey = refreshedStorage.apiKey;
this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`;
console.log('[iFlow] Credentials reloaded after concurrent refresh');
}
}
return true;
} catch (error) {
console.error('[iFlow] Token refresh failed:', error.message);

View file

@ -12,7 +12,7 @@ import { getProviderModels } from '../provider-models.js';
import { handleQwenOAuth } from '../../auth/oauth-handlers.js';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
import { acquireFileLock } from '../../utils/file-lock.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
// --- Constants ---
const QWEN_DIR = '.qwen';
@ -789,41 +789,62 @@ class SharedTokenManager {
}
async performTokenRefresh(context, qwenClient, forceRefresh = false) {
const currentCredentials = qwenClient.getCredentials() || context.memoryCache.credentials;
if (!currentCredentials || !currentCredentials.refresh_token) {
throw new TokenManagerError(TokenError.NO_REFRESH_TOKEN, 'No refresh token available');
}
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `qwen-token-refresh:${context.credentialFilePath}`;
try {
const currentCredentials = qwenClient.getCredentials() || context.memoryCache.credentials;
if (!currentCredentials || !currentCredentials.refresh_token) {
throw new TokenManagerError(TokenError.NO_REFRESH_TOKEN, 'No refresh token available');
}
const credentials = await withDeduplication(dedupeKey, async () => {
await this.acquireLock(context);
try {
await this.checkAndReloadIfNeeded(context);
await this.acquireLock(context);
await this.checkAndReloadIfNeeded(context);
if (!forceRefresh && context.memoryCache.credentials && this.isTokenValid(context.memoryCache.credentials)) {
qwenClient.setCredentials(context.memoryCache.credentials);
return context.memoryCache.credentials;
}
if (!forceRefresh && context.memoryCache.credentials && this.isTokenValid(context.memoryCache.credentials)) {
qwenClient.setCredentials(context.memoryCache.credentials);
return context.memoryCache.credentials;
}
const response = await qwenClient.refreshAccessToken();
if (!response || isErrorResponse(response)) {
throw new TokenManagerError(TokenError.REFRESH_FAILED, `Token refresh failed: ${response?.error}`);
}
if (!response.access_token) {
throw new TokenManagerError(TokenError.REFRESH_FAILED, 'No access token in refresh response');
}
const newCredentials = {
access_token: response.access_token,
token_type: response.token_type,
refresh_token: response.refresh_token || currentCredentials.refresh_token,
resource_url: response.resource_url,
expiry_date: Date.now() + response.expires_in * 1000,
};
const response = await qwenClient.refreshAccessToken();
if (!response || isErrorResponse(response)) {
throw new TokenManagerError(TokenError.REFRESH_FAILED, `Token refresh failed: ${response?.error}`);
context.memoryCache.credentials = newCredentials;
qwenClient.setCredentials(newCredentials);
await this.saveCredentialsToFile(context, newCredentials);
console.log('[Qwen Auth] Token refresh response: ok');
return newCredentials;
} finally {
await this.releaseLock(context);
}
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
// 因为 withDeduplication 会让所有等待者共享同一个 Promise
// 但只有第一个调用者的实例会执行刷新并更新自己的内存状态
// 其他等待者需要从文件重新加载
if (!context.memoryCache.credentials || !this.isTokenValid(context.memoryCache.credentials)) {
await this.reloadCredentialsFromFile(context);
if (context.memoryCache.credentials) {
qwenClient.setCredentials(context.memoryCache.credentials);
}
}
if (!response.access_token) {
throw new TokenManagerError(TokenError.REFRESH_FAILED, 'No access token in refresh response');
}
const credentials = {
access_token: response.access_token,
token_type: response.token_type,
refresh_token: response.refresh_token || currentCredentials.refresh_token,
resource_url: response.resource_url,
expiry_date: Date.now() + response.expires_in * 1000,
};
context.memoryCache.credentials = credentials;
qwenClient.setCredentials(credentials);
await this.saveCredentialsToFile(context, credentials);
console.log('[Qwen Auth] Token refresh response: ok');
return credentials;
} catch (error) {
if (error instanceof TokenManagerError) throw error;
@ -845,8 +866,6 @@ class SharedTokenManager {
`Unexpected error during token refresh: ${error.message}`,
error,
);
} finally {
await this.releaseLock(context);
}
}

View file

@ -43,6 +43,10 @@ export class ProviderPoolManager {
// Model Fallback 映射配置
this.modelFallbackMapping = options.globalConfig?.modelFallbackMapping || {};
// 并发控制:每个 providerType 的选择锁
// 用于确保 selectProvider 的排序和更新操作是原子的
this._selectionLocks = {};
this.initializeProviderStatus();
}
@ -79,6 +83,7 @@ export class ProviderPoolManager {
for (const providerType in this.providerPools) {
this.providerStatus[providerType] = [];
this.roundRobinIndex[providerType] = 0; // Initialize round-robin index for each type
this._selectionLocks[providerType] = Promise.resolve(); // 初始化选择锁
this.providerPools[providerType].forEach((providerConfig) => {
// Ensure initial health and usage stats are present in the config
providerConfig.isHealthy = providerConfig.isHealthy !== undefined ? providerConfig.isHealthy : true;
@ -111,17 +116,40 @@ export class ProviderPoolManager {
* Selects a provider from the pool for a given provider type.
* Currently uses a simple round-robin for healthy providers.
* If requestedModel is provided, providers that don't support the model will be excluded.
*
* 注意此方法现在返回 Promise使用链式锁确保并发安全
*
* @param {string} providerType - The type of provider to select (e.g., 'gemini-cli', 'openai-custom').
* @param {string} [requestedModel] - Optional. The model name to filter providers by.
* @returns {object|null} The selected provider's configuration, or null if no healthy provider is found.
* @returns {Promise<object|null>} The selected provider's configuration, or null if no healthy provider is found.
*/
selectProvider(providerType, requestedModel = null, options = {}) {
// 参数校验
if (!providerType || typeof providerType !== 'string') {
this._log('error', `Invalid providerType: ${providerType}`);
return null;
return Promise.resolve(null);
}
// 使用链式锁确保同一 providerType 的选择操作串行执行
// 这样可以避免并发场景下多个请求选择到同一个 provider
const currentLock = this._selectionLocks[providerType] || Promise.resolve();
const selectionPromise = currentLock.then(() => {
return this._doSelectProvider(providerType, requestedModel, options);
});
// 更新锁,确保下一个请求等待当前请求完成
// 使用 catch 确保即使出错也不会阻塞后续请求
this._selectionLocks[providerType] = selectionPromise.catch(() => {});
return selectionPromise;
}
/**
* 实际执行 provider 选择的内部方法同步执行由锁保护
* @private
*/
_doSelectProvider(providerType, requestedModel, options) {
const availableProviders = this.providerStatus[providerType] || [];
let availableAndHealthyProviders = availableProviders.filter(p =>
p.config.isHealthy && !p.config.isDisabled
@ -152,7 +180,7 @@ export class ProviderPoolManager {
return null;
}
// 改进:使用“最久未被使用”策略LRU代替取模轮询
// 改进:使用"最久未被使用"策略LRU代替取模轮询
// 这样即使可用列表长度动态变化,也能确保每个账号被平均轮到
const selected = availableAndHealthyProviders.sort((a, b) => {
const timeA = a.config.lastUsed ? new Date(a.config.lastUsed).getTime() : 0;
@ -164,14 +192,15 @@ export class ProviderPoolManager {
})[0];
// 更新使用信息(除非明确跳过)
// 注意:这里的更新是同步的,在锁保护下执行,确保下一个请求能看到最新的 lastUsed
if (!options.skipUsageCount) {
selected.config.lastUsed = new Date().toISOString();
selected.config.usageCount++;
// 使用防抖保存
// 使用防抖保存(文件 I/O 是异步的,但内存已经更新)
this._debouncedSave(providerType);
}
this._log('debug', `Selected provider for ${providerType} (round-robin): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}${options.skipUsageCount ? ' (skip usage count)' : ''}`);
this._log('debug', `Selected provider for ${providerType} (LRU): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}${options.skipUsageCount ? ' (skip usage count)' : ''}`);
return selected.config;
}
@ -185,7 +214,19 @@ export class ProviderPoolManager {
* @param {boolean} [options.skipUsageCount] - Optional. If true, skip incrementing usage count.
* @returns {object|null} An object containing the selected provider's configuration and the actual provider type used, or null if no healthy provider is found.
*/
selectProviderWithFallback(providerType, requestedModel = null, options = {}) {
/**
* Selects a provider from the pool with fallback support.
* When the primary provider type has no healthy providers, it will try fallback types.
*
* 注意此方法现在返回 Promise因为内部调用的 selectProvider 是异步的
*
* @param {string} providerType - The primary type of provider to select.
* @param {string} [requestedModel] - Optional. The model name to filter providers by.
* @param {Object} [options] - Optional. Additional options.
* @param {boolean} [options.skipUsageCount] - Optional. If true, skip incrementing usage count.
* @returns {Promise<object|null>} An object containing the selected provider's configuration and the actual provider type used, or null if no healthy provider is found.
*/
async selectProviderWithFallback(providerType, requestedModel = null, options = {}) {
// 参数校验
if (!providerType || typeof providerType !== 'string') {
this._log('error', `Invalid providerType: ${providerType}`);
@ -237,8 +278,8 @@ export class ProviderPoolManager {
}
}
// 尝试从当前类型选择提供商
const selectedConfig = this.selectProvider(currentType, requestedModel, options);
// 尝试从当前类型选择提供商(现在是异步的)
const selectedConfig = await this.selectProvider(currentType, requestedModel, options);
if (selectedConfig) {
if (currentType !== providerType) {
@ -270,8 +311,8 @@ export class ProviderPoolManager {
// 检查目标类型是否有配置的池
if (this.providerStatus[targetProviderType] && this.providerStatus[targetProviderType].length > 0) {
// 尝试从目标类型选择提供商(使用转换后的模型名
const selectedConfig = this.selectProvider(targetProviderType, targetModel, options);
// 尝试从目标类型选择提供商(使用转换后的模型名,现在是异步的
const selectedConfig = await this.selectProvider(targetProviderType, targetModel, options);
if (selectedConfig) {
this._log('info', `Fallback activated (Model Mapping): ${providerType} (${requestedModel}) -> ${targetProviderType} (${targetModel}) (uuid: ${selectedConfig.uuid})`);
@ -299,7 +340,7 @@ export class ProviderPoolManager {
const supportedModels = getProviderModels(fallbackType);
if (supportedModels.length > 0 && !supportedModels.includes(targetModel)) continue;
const fallbackSelectedConfig = this.selectProvider(fallbackType, targetModel, options);
const fallbackSelectedConfig = await this.selectProvider(fallbackType, targetModel, options);
if (fallbackSelectedConfig) {
this._log('info', `Fallback activated (Model Mapping -> Chain): ${providerType} (${requestedModel}) -> ${targetProviderType} -> ${fallbackType} (${targetModel}) (uuid: ${fallbackSelectedConfig.uuid})`);
return {

View file

@ -248,7 +248,8 @@ export async function getApiService(config, requestedModel = null, options = {})
let serviceConfig = config;
if (providerPoolManager && config.providerPools && config.providerPools[config.MODEL_PROVIDER]) {
// 如果有号池管理器,并且当前模型提供者类型有对应的号池,则从号池中选择一个提供者配置
const selectedProviderConfig = providerPoolManager.selectProvider(config.MODEL_PROVIDER, requestedModel, { skipUsageCount: true });
// selectProvider 现在是异步的,使用链式锁确保并发安全
const selectedProviderConfig = await providerPoolManager.selectProvider(config.MODEL_PROVIDER, requestedModel, { skipUsageCount: true });
if (selectedProviderConfig) {
// 合并选中的提供者配置到当前请求的 config 中
serviceConfig = deepmerge(config, selectedProviderConfig);
@ -281,7 +282,8 @@ export async function getApiServiceWithFallback(config, requestedModel = null, o
let actualModel = null;
if (providerPoolManager && config.providerPools && config.providerPools[config.MODEL_PROVIDER]) {
const selectedResult = providerPoolManager.selectProviderWithFallback(
// selectProviderWithFallback 现在是异步的,使用链式锁确保并发安全
const selectedResult = await providerPoolManager.selectProviderWithFallback(
config.MODEL_PROVIDER,
requestedModel,
{ skipUsageCount: true }

View file

@ -16,6 +16,10 @@ import * as path from 'path';
// 每个文件对应一个 Promise新的锁请求会链接到当前 Promise 之后
const fileLockQueues = new Map();
// 存储去重锁的进行中 Promise
// 用于合并相同 key 的并发请求,只执行一次操作
const dedupePromises = new Map();
/**
* 获取文件锁确保同一时间只有一个操作可以访问特定文件
*
@ -111,9 +115,106 @@ export function getLockedFileCount() {
return fileLockQueues.size;
}
/**
* 去重执行 - 合并相同 key 的并发请求只执行一次操作
*
* withFileLock 的区别
* - withFileLock队列锁10个并发请求 排队执行10次
* - withDeduplication去重锁10个并发请求 只执行1次共享结果
*
* 使用场景
* - Token 刷新多个请求同时发现 token 过期只需刷新一次
* - 缓存填充多个请求同时 cache miss只需加载一次
* - 任何"结果可共享"的昂贵操作
*
* @param {string} key - 去重的唯一标识符
* @param {Function} operation - 要执行的异步操作
* @returns {Promise<any>} 操作的返回值所有等待者共享同一结果
*
* @example
* // 多个并发调用只会执行一次 refreshToken
* const newToken = await withDeduplication('token-refresh', async () => {
* const response = await fetch('/refresh');
* return response.json();
* });
*/
export async function withDeduplication(key, operation) {
// 如果已有相同 key 的操作在进行中,直接等待它的结果
if (dedupePromises.has(key)) {
return dedupePromises.get(key);
}
// 创建新的操作 Promise
const operationPromise = (async () => {
try {
return await operation();
} finally {
// 操作完成后清理
dedupePromises.delete(key);
}
})();
// 存入 Map让后续请求可以共享
dedupePromises.set(key, operationPromise);
return operationPromise;
}
/**
* 组合去重锁和文件锁 - 先去重再加文件锁
*
* 典型场景Token 刷新
* 1. 去重层10个并发刷新请求 合并为1次刷新操作
* 2. 文件锁层保护那1次刷新操作的文件写入不与其他操作冲突
*
* @param {string} dedupeKey - 去重的唯一标识符
* @param {string} filePath - 需要保护的文件路径
* @param {Function} operation - 要执行的异步操作
* @returns {Promise<any>} 操作的返回值
*
* @example
* // Token 刷新场景
* const newToken = await withDeduplicationAndFileLock(
* 'token-refresh-' + credentialId,
* '/path/to/token.json',
* async () => {
* const response = await fetch('/refresh');
* const data = await response.json();
* await fs.writeFile('/path/to/token.json', JSON.stringify(data));
* return data;
* }
* );
*/
export async function withDeduplicationAndFileLock(dedupeKey, filePath, operation) {
return withDeduplication(dedupeKey, async () => {
return withFileLock(filePath, operation);
});
}
/**
* 检查是否有去重操作正在进行
* @param {string} key - 去重的唯一标识符
* @returns {boolean} 是否有操作在进行中
*/
export function isDedupeInProgress(key) {
return dedupePromises.has(key);
}
/**
* 获取当前进行中的去重操作数量用于调试
* @returns {number} 进行中的去重操作数量
*/
export function getDedupeCount() {
return dedupePromises.size;
}
export default {
acquireFileLock,
withFileLock,
isFileLocked,
getLockedFileCount
getLockedFileCount,
withDeduplication,
withDeduplicationAndFileLock,
isDedupeInProgress,
getDedupeCount
};