feat: 引入凭证缓存管理器,优化 OAuth Token 并发处理

- 新增 CredentialCacheManager 替代文件锁机制
- 所有 OAuth Provider 迁移到内存缓存
- Token 刷新策略优化:过期阻塞,即将过期后台刷新
- 健康检查增加 Token 预检和超时保护
- 进程退出时自动同步凭证到文件
This commit is contained in:
leonai 2026-01-15 21:49:54 +08:00
parent ec82841c26
commit 9f58db2c1f
9 changed files with 1334 additions and 425 deletions

View file

@ -11,7 +11,7 @@ import { countTokens } from '@anthropic-ai/tokenizer';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError, MODEL_PROVIDER } from '../../utils/common.js';
import { getProviderPoolManager } from '../../services/service-manager.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
import { CredentialCacheManager } from '../../utils/credential-cache-manager.js';
const KIRO_THINKING = {
MAX_BUDGET_TOKENS: 24576,
@ -29,7 +29,8 @@ const KIRO_CONSTANTS = {
AMAZON_Q_URL: 'https://codewhisperer.{{region}}.amazonaws.com/SendMessageStreaming',
USAGE_LIMITS_URL: 'https://q.{{region}}.amazonaws.com/getUsageLimits',
DEFAULT_MODEL_NAME: 'claude-opus-4-5',
AXIOS_TIMEOUT: 120000, // 2 minutes timeout (increased from 2 minutes)
AXIOS_TIMEOUT: 120000, // 2 minutes timeout for normal requests
TOKEN_REFRESH_TIMEOUT: 15000, // 15 seconds timeout for token refresh (shorter to avoid blocking)
USER_AGENT: 'KiroIDE',
KIRO_VERSION: '0.7.5',
CONTENT_TYPE_JSON: 'application/json',
@ -429,7 +430,11 @@ async initializeAuth(forceRefresh = false) {
// 获取凭证文件路径,用于去重锁的 key
const tokenFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE);
// Helper to load credentials from a file
// 获取凭证缓存管理器
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'claude-kiro-oauth';
// Helper to load credentials from a file (fallback when cache miss)
const loadCredentialsFromFile = async (filePath) => {
try {
const fileContent = await fs.readFile(filePath, 'utf8');
@ -457,11 +462,21 @@ async initializeAuth(forceRefresh = false) {
}
};
// Helper to save credentials to a file (with file locking to prevent concurrent write corruption)
// Helper to save credentials - 使用内存缓存替代直接文件写入
const saveCredentialsToFile = async (filePath, newData) => {
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(filePath);
try {
// 优先更新内存缓存
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const entry = credentialCache.getCredentials(providerType, this.uuid);
if (entry) {
const mergedData = { ...entry.credentials, ...newData };
credentialCache.updateCredentials(providerType, this.uuid, mergedData, filePath);
console.info(`[Kiro Auth] Updated credentials in memory cache: ${this.uuid}`);
return;
}
}
// 如果没有缓存条目,使用内存锁方式写入
await credentialCache.withMemoryLock(`kiro-save:${filePath}`, async () => {
let existingData = {};
try {
const fileContent = await fs.readFile(filePath, 'utf8');
@ -488,12 +503,7 @@ async initializeAuth(forceRefresh = false) {
const mergedData = { ...existingData, ...newData };
await fs.writeFile(filePath, JSON.stringify(mergedData, null, 2), 'utf8');
console.info(`[Kiro Auth] Updated token file: ${filePath}`);
} catch (error) {
console.error(`[Kiro Auth] Failed to write token to file ${filePath}: ${error.message}`);
} finally {
// 确保锁被释放
releaseLock();
}
});
};
try {
@ -503,46 +513,49 @@ async initializeAuth(forceRefresh = false) {
if (this.base64Creds) {
Object.assign(mergedCredentials, this.base64Creds);
console.info('[Kiro Auth] Successfully loaded credentials from Base64 (constructor).');
// Clear base64Creds after use to prevent re-processing
this.base64Creds = null;
}
// Priority 2 & 3 合并: 从指定文件路径或目录加载凭证
// 读取指定的 credPath 文件以及目录下的其他 JSON 文件(排除当前文件)
const targetFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE);
const dirPath = path.dirname(targetFilePath);
const targetFileName = path.basename(targetFilePath);
console.debug(`[Kiro Auth] Attempting to load credentials from directory: ${dirPath}`);
try {
// 首先尝试读取目标文件
const targetCredentials = await loadCredentialsFromFile(targetFilePath);
if (targetCredentials) {
Object.assign(mergedCredentials, targetCredentials);
console.info(`[Kiro Auth] Successfully loaded OAuth credentials from ${targetFilePath}`);
// Priority 2: 尝试从内存缓存加载凭证
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, this.uuid);
if (cachedEntry && cachedEntry.credentials) {
Object.assign(mergedCredentials, cachedEntry.credentials);
console.info(`[Kiro Auth] Successfully loaded credentials from memory cache: ${this.uuid}`);
}
// 然后读取目录下的其他 JSON 文件(排除目标文件本身)
const files = await fs.readdir(dirPath);
for (const file of files) {
if (file.endsWith('.json') && file !== targetFileName) {
const filePath = path.join(dirPath, file);
const credentials = await loadCredentialsFromFile(filePath);
if (credentials) {
// 保留已有的 expiresAt,避免被覆盖
credentials.expiresAt = mergedCredentials.expiresAt;
Object.assign(mergedCredentials, credentials);
console.debug(`[Kiro Auth] Loaded Client credentials from ${file}`);
} else {
// Priority 3: 从文件加载(缓存未命中时的回退)
const targetFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AUTH_TOKEN_FILE);
const dirPath = path.dirname(targetFilePath);
const targetFileName = path.basename(targetFilePath);
console.debug(`[Kiro Auth] Cache miss, loading credentials from directory: ${dirPath}`);
try {
const targetCredentials = await loadCredentialsFromFile(targetFilePath);
if (targetCredentials) {
Object.assign(mergedCredentials, targetCredentials);
console.info(`[Kiro Auth] Successfully loaded OAuth credentials from ${targetFilePath}`);
}
const files = await fs.readdir(dirPath);
for (const file of files) {
if (file.endsWith('.json') && file !== targetFileName) {
const filePath = path.join(dirPath, file);
const credentials = await loadCredentialsFromFile(filePath);
if (credentials) {
credentials.expiresAt = mergedCredentials.expiresAt;
Object.assign(mergedCredentials, credentials);
console.debug(`[Kiro Auth] Loaded Client credentials from ${file}`);
}
}
}
} catch (error) {
console.warn(`[Kiro Auth] Error loading credentials from directory ${dirPath}: ${error.message}`);
}
} catch (error) {
console.warn(`[Kiro Auth] Error loading credentials from directory ${dirPath}: ${error.message}`);
}
// console.log('[Kiro Auth] Merged credentials:', mergedCredentials);
// Apply loaded credentials, prioritizing existing values if they are not null/undefined
// Apply loaded credentials
this.accessToken = this.accessToken || mergedCredentials.accessToken;
this.refreshToken = this.refreshToken || mergedCredentials.refreshToken;
this.clientId = this.clientId || mergedCredentials.clientId;
@ -552,10 +565,9 @@ async initializeAuth(forceRefresh = false) {
this.profileArn = this.profileArn || mergedCredentials.profileArn;
this.region = this.region || mergedCredentials.region;
// Ensure region is set before using it in URLs
if (!this.region) {
console.warn('[Kiro Auth] Region not found in credentials. Using default region us-east-1 for URLs.');
this.region = 'us-east-1'; // Set default region
this.region = 'us-east-1';
}
this.refreshUrl = (this.config.KIRO_REFRESH_URL || KIRO_CONSTANTS.REFRESH_URL).replace("{{region}}", this.region);
@ -571,21 +583,16 @@ async initializeAuth(forceRefresh = false) {
if (!this.refreshToken) {
throw new Error('No refresh token available to refresh access token.');
}
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
// 使用内存锁替代文件锁进行去重
const dedupeKey = `kiro-token-refresh:${tokenFilePath}`;
await withDeduplication(dedupeKey, async () => {
await credentialCache.withDeduplication(dedupeKey, async () => {
await this._doTokenRefresh(saveCredentialsToFile, tokenFilePath);
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
// 因为 _doTokenRefresh 只更新了执行刷新的实例的内存状态
// 注意withDeduplication 会让所有等待者共享同一个 Promise
// 但只有第一个调用者的实例会执行 _doTokenRefresh 并更新自己的内存状态
// 其他等待者需要从文件重新加载
if (!this.accessToken || this.isExpiryDateNear()) {
await this._reloadCredentialsAfterRefresh(tokenFilePath);
}
// 刷新完成后(无论是自己执行还是等待其他请求),都从缓存重新加载凭证
// 确保所有并发请求都能获取到最新的 token
await this._reloadCredentialsAfterRefresh(tokenFilePath);
}
if (!this.accessToken) {
@ -613,11 +620,13 @@ async initializeAuth(forceRefresh = false) {
}
let response = null;
// 使用更短的超时时间进行 token 刷新,避免阻塞其他请求
const refreshConfig = { timeout: KIRO_CONSTANTS.TOKEN_REFRESH_TIMEOUT };
if (this.authMethod === KIRO_CONSTANTS.AUTH_METHOD_SOCIAL) {
response = await this.axiosSocialRefreshInstance.post(refreshUrl, requestBody);
response = await this.axiosSocialRefreshInstance.post(refreshUrl, requestBody, refreshConfig);
console.log('[Kiro Auth] Token refresh social response: ok');
} else {
response = await this.axiosInstance.post(refreshUrl, requestBody);
response = await this.axiosInstance.post(refreshUrl, requestBody, refreshConfig);
console.log('[Kiro Auth] Token refresh idc response: ok');
}
@ -653,6 +662,25 @@ async initializeAuth(forceRefresh = false) {
* @param {string} tokenFilePath - 凭证文件路径
*/
async _reloadCredentialsAfterRefresh(tokenFilePath) {
// 优先从内存缓存加载
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'claude-kiro-oauth';
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, this.uuid);
if (cachedEntry && cachedEntry.credentials) {
this.accessToken = cachedEntry.credentials.accessToken;
this.refreshToken = cachedEntry.credentials.refreshToken;
this.expiresAt = cachedEntry.credentials.expiresAt;
if (cachedEntry.credentials.profileArn) {
this.profileArn = cachedEntry.credentials.profileArn;
}
console.debug('[Kiro Auth] Credentials reloaded from memory cache after concurrent refresh');
return;
}
}
// 回退到文件加载
try {
const fileContent = await fs.readFile(tokenFilePath, 'utf8');
let credentials;
@ -675,7 +703,7 @@ async initializeAuth(forceRefresh = false) {
if (credentials.profileArn) {
this.profileArn = credentials.profileArn;
}
console.debug('[Kiro Auth] Credentials reloaded after concurrent refresh');
console.debug('[Kiro Auth] Credentials reloaded from file after concurrent refresh');
} catch (error) {
console.warn(`[Kiro Auth] Failed to reload credentials after refresh: ${error.message}`);
throw error;
@ -1564,11 +1592,16 @@ async initializeAuth(forceRefresh = false) {
async generateContent(model, requestBody) {
if (!this.isInitialized) await this.initialize();
// 检查 token 是否即将过期,如果是则先刷新
if (this.isExpiryDateNear()) {
console.log('[Kiro] Token is near expiry, refreshing before generateContent request...');
// Token 刷新策略:
// 1. 已过期 → 必须等待刷新
// 2. 即将过期但还能用 → 后台异步刷新,不阻塞当前请求
if (this.isTokenExpired()) {
console.log('[Kiro] Token is expired, must refresh before generateContent request...');
await this.initializeAuth(true);
} else if (this.isExpiryDateNear()) {
console.log('[Kiro] Token is near expiry, triggering background refresh...');
this.triggerBackgroundRefresh();
}
const finalModel = MODEL_MAPPING[model] ? model : this.modelName;
@ -1902,11 +1935,16 @@ async initializeAuth(forceRefresh = false) {
// 真正的流式传输实现
async * generateContentStream(model, requestBody) {
if (!this.isInitialized) await this.initialize();
// 检查 token 是否即将过期,如果是则先刷新
if (this.isExpiryDateNear()) {
console.log('[Kiro] Token is near expiry, refreshing before generateContentStream request...');
// Token 刷新策略:
// 1. 已过期 → 必须等待刷新
// 2. 即将过期但还能用 → 后台异步刷新,不阻塞当前请求
if (this.isTokenExpired()) {
console.log('[Kiro] Token is expired, must refresh before generateContentStream request...');
await this.initializeAuth(true);
} else if (this.isExpiryDateNear()) {
console.log('[Kiro] Token is near expiry, triggering background refresh...');
this.triggerBackgroundRefresh();
}
const finalModel = MODEL_MAPPING[model] ? model : this.modelName;
@ -2528,7 +2566,25 @@ async initializeAuth(forceRefresh = false) {
}
/**
* Checks if the given expiresAt timestamp is within 10 minutes from now.
* Checks if the token is completely expired (cannot be used at all).
* @returns {boolean} - True if token is expired, false otherwise.
*/
isTokenExpired() {
try {
if (!this.expiresAt) return true;
const expirationTime = new Date(this.expiresAt);
const currentTime = new Date();
// 给 30 秒缓冲,避免请求过程中过期
const bufferMs = 30 * 1000;
return expirationTime.getTime() <= (currentTime.getTime() + bufferMs);
} catch (error) {
console.error(`[Kiro] Error checking token expiry: ${error.message}`);
return true; // Treat as expired if parsing fails
}
}
/**
* Checks if the given expiresAt timestamp is within 10 minutes from now (needs refresh soon).
* @returns {boolean} - True if expiresAt is less than 10 minutes from now, false otherwise.
*/
isExpiryDateNear() {
@ -2545,6 +2601,29 @@ async initializeAuth(forceRefresh = false) {
}
}
/**
* 后台异步刷新 token不阻塞当前请求
*/
triggerBackgroundRefresh() {
const tokenFilePath = this.credsFilePath || path.join(this.credPath, KIRO_AH_TOKEN_FILE);
const dedupeKey = `kiro-token-refresh:${tokenFilePath}`;
const credentialCache = CredentialCacheManager.getInstance();
// 使用 withDeduplication 确保只有一个刷新任务在执行
credentialCache.withDeduplication(dedupeKey, async () => {
console.log('[Kiro] Background token refresh started...');
try {
await this.initializeAuth(true);
console.log('[Kiro] Background token refresh completed successfully');
} catch (error) {
console.error('[Kiro] Background token refresh failed:', error.message);
// 后台刷新失败不抛出错误,下次请求会重试
}
}).catch(() => {
// 忽略后台刷新错误,不影响当前请求
});
}
/**
* Count tokens for a message request (compatible with Anthropic API)
* POST /v1/messages/count_tokens
@ -2618,11 +2697,16 @@ async initializeAuth(forceRefresh = false) {
*/
async getUsageLimits() {
if (!this.isInitialized) await this.initialize();
// 检查 token 是否即将过期,如果是则先刷新
if (this.isExpiryDateNear()) {
console.log('[Kiro] Token is near expiry, refreshing before getUsageLimits request...');
// Token 刷新策略:
// 1. 已过期 → 必须等待刷新
// 2. 即将过期但还能用 → 后台异步刷新,不阻塞当前请求
if (this.isTokenExpired()) {
console.log('[Kiro] Token is expired, must refresh before getUsageLimits request...');
await this.initializeAuth(true);
} else if (this.isExpiryDateNear()) {
console.log('[Kiro] Token is near expiry, triggering background refresh...');
this.triggerBackgroundRefresh();
}
// 内部固定的资源类型

View file

@ -14,7 +14,7 @@ import { getProviderModels } from '../provider-models.js';
import { handleGeminiAntigravityOAuth } from '../../auth/oauth-handlers.js';
import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js';
import { cleanJsonSchemaProperties } from '../../converters/utils.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
import { CredentialCacheManager } from '../../utils/credential-cache-manager.js';
// 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏
const httpAgent = new http.Agent({
@ -707,13 +707,13 @@ export class AntigravityApiService {
constructor(config) {
// 检查是否需要使用代理
const proxyConfig = getGoogleAuthProxyConfig(config, 'gemini-antigravity');
// 配置 OAuth2Client 使用自定义的 HTTP agent
const oauth2Options = {
clientId: OAUTH_CLIENT_ID,
clientSecret: OAUTH_CLIENT_SECRET,
};
if (proxyConfig) {
oauth2Options.transporterOptions = proxyConfig;
console.log('[Antigravity] Using proxy for OAuth2Client');
@ -722,7 +722,7 @@ export class AntigravityApiService {
agent: httpsAgent,
};
}
this.authClient = new OAuth2Client(oauth2Options);
this.availableModels = [];
this.isInitialized = false;
@ -732,10 +732,11 @@ export class AntigravityApiService {
this.oauthCredsFilePath = config.ANTIGRAVITY_OAUTH_CREDS_FILE_PATH;
this.userAgent = DEFAULT_USER_AGENT; // 支持通用 USER_AGENT 配置
this.projectId = config.PROJECT_ID;
this.uuid = config.uuid; // 保存 uuid 用于缓存管理
// 多环境降级顺序 - 按照 Go 代码的顺序
this.baseURLs = this.getBaseURLFallbackOrder(config);
// 保存代理配置供后续使用
this.proxyConfig = getProxyConfigForProvider(config, 'gemini-antigravity');
}
@ -777,6 +778,9 @@ export class AntigravityApiService {
}
async initializeAuth(forceRefresh = false) {
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'gemini-antigravity';
// 检查是否需要刷新 Token
const needsRefresh = forceRefresh || this.isTokenExpiringSoon();
@ -789,31 +793,59 @@ export class AntigravityApiService {
const credPath = this.oauthCredsFilePath || path.join(os.homedir(), CREDENTIALS_DIR, CREDENTIALS_FILE);
try {
const data = await fs.readFile(credPath, "utf8");
const credentials = JSON.parse(data);
// 优先从内存缓存加载
let credentials = null;
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, this.uuid);
if (cachedEntry && cachedEntry.credentials) {
credentials = cachedEntry.credentials;
console.log('[Antigravity Auth] Loaded credentials from memory cache');
}
}
// Fallback: 从文件加载
if (!credentials) {
const data = await fs.readFile(credPath, "utf8");
credentials = JSON.parse(data);
console.log('[Antigravity Auth] Loaded credentials from file');
}
this.authClient.setCredentials(credentials);
console.log('[Antigravity Auth] Authentication configured successfully from file.');
if (needsRefresh) {
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `antigravity-token-refresh:${credPath}`;
await withDeduplication(dedupeKey, async () => {
await credentialCache.withDeduplication(dedupeKey, async () => {
console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...');
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// 保存刷新后的凭证到文件(使用文件锁)
await this._saveCredentialsToFile(credPath, newCredentials);
// 保存刷新后的凭证(优先保存到内存缓存)
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
credentialCache.updateCredentials(providerType, this.uuid, newCredentials, credPath);
console.log(`[Antigravity Auth] Token refreshed and saved to memory cache: ${this.uuid}`);
} else {
await this._saveCredentialsToFile(credPath, newCredentials);
}
console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`);
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
// 因为 withDeduplication 只让第一个调用者执行刷新并更新自己的内存状态
// 其他等待者需要从文件重新加载
// 其他等待者需要从缓存重新加载
if (this.isTokenExpiringSoon()) {
const refreshedData = await fs.readFile(credPath, "utf8");
const refreshedCredentials = JSON.parse(refreshedData);
this.authClient.setCredentials(refreshedCredentials);
console.log('[Antigravity Auth] Credentials reloaded after concurrent refresh');
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, this.uuid);
if (cachedEntry && cachedEntry.credentials) {
this.authClient.setCredentials(cachedEntry.credentials);
console.log('[Antigravity Auth] Credentials reloaded from memory cache after concurrent refresh');
}
} else {
const refreshedData = await fs.readFile(credPath, "utf8");
const refreshedCredentials = JSON.parse(refreshedData);
this.authClient.setCredentials(refreshedCredentials);
console.log('[Antigravity Auth] Credentials reloaded from file after concurrent refresh');
}
}
}
} catch (error) {
@ -890,20 +922,30 @@ export class AntigravityApiService {
}
/**
* 保存凭证到文件使用文件锁防止并发写入
* 保存凭证到文件优先使用内存缓存
* @param {string} filePath - 凭证文件路径
* @param {Object} credentials - 凭证数据
*/
async _saveCredentialsToFile(filePath, credentials) {
const releaseLock = await acquireFileLock(filePath);
try {
await fs.writeFile(filePath, JSON.stringify(credentials, null, 2));
console.log(`[Antigravity Auth] Credentials saved to ${filePath}`);
} catch (error) {
console.error(`[Antigravity Auth] Failed to save credentials to ${filePath}: ${error.message}`);
} finally {
releaseLock();
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'gemini-antigravity';
// 优先保存到内存缓存
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
credentialCache.updateCredentials(providerType, this.uuid, credentials, filePath);
console.log(`[Antigravity Auth] Credentials saved to memory cache: ${this.uuid}`);
return;
}
// Fallback: 使用内存锁的文件写入
await credentialCache.withMemoryLock(`antigravity-save:${filePath}`, async () => {
try {
await fs.writeFile(filePath, JSON.stringify(credentials, null, 2));
console.log(`[Antigravity Auth] Credentials saved to ${filePath}`);
} catch (error) {
console.error(`[Antigravity Auth] Failed to save credentials to ${filePath}: ${error.message}`);
}
});
}
async discoverProjectAndModels() {

View file

@ -10,7 +10,7 @@ import { API_ACTIONS, formatExpiryTime, isRetryableNetworkError } from '../../ut
import { getProviderModels } from '../provider-models.js';
import { handleGeminiCliOAuth } from '../../auth/oauth-handlers.js';
import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
import { CredentialCacheManager } from '../../utils/credential-cache-manager.js';
// 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏
const httpAgent = new http.Agent({
@ -273,10 +273,11 @@ export class GeminiApiService {
if (forceRefresh) {
console.log('[Gemini Auth] Forcing token refresh...');
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `gemini-token-refresh:${credPath}`;
await withDeduplication(dedupeKey, async () => {
const credentialCache = CredentialCacheManager.getInstance();
await credentialCache.withDeduplication(dedupeKey, async () => {
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// Save refreshed credentials back to file (with file locking)
@ -636,20 +637,30 @@ export class GeminiApiService {
}
/**
* 保存凭证到文件使用文件锁防止并发写入
* 保存凭证到文件使用内存缓存优先
* @param {string} filePath - 凭证文件路径
* @param {Object} credentials - 凭证数据
*/
async _saveCredentialsToFile(filePath, credentials) {
const releaseLock = await acquireFileLock(filePath);
try {
await fs.writeFile(filePath, JSON.stringify(credentials, null, 2));
console.log(`[Gemini Auth] Credentials saved to ${filePath}`);
} catch (error) {
console.error(`[Gemini Auth] Failed to save credentials to ${filePath}: ${error.message}`);
} finally {
releaseLock();
// 优先更新内存缓存
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'gemini-cli-oauth';
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
credentialCache.updateCredentials(providerType, this.uuid, credentials, filePath);
console.log(`[Gemini Auth] Credentials saved to memory cache: ${this.uuid}`);
return;
}
// 回退到使用内存锁写入文件
await credentialCache.withMemoryLock(`gemini-save:${filePath}`, async () => {
try {
await fs.writeFile(filePath, JSON.stringify(credentials, null, 2));
console.log(`[Gemini Auth] Credentials saved to ${filePath}`);
} catch (error) {
console.error(`[Gemini Auth] Failed to save credentials to ${filePath}: ${error.message}`);
}
});
}
/**

View file

@ -24,7 +24,7 @@ import * as path from 'path';
import * as os from 'os';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
import { CredentialCacheManager } from '../../utils/credential-cache-manager.js';
// iFlow API 端点
const IFLOW_API_BASE_URL = 'https://apis.iflow.cn/v1';
@ -132,38 +132,56 @@ async function loadTokenFromFile(filePath) {
* @param {string} filePath - Token 文件路径
* @param {IFlowTokenStorage} tokenStorage - Token 存储对象
*/
async function saveTokenToFile(filePath, tokenStorage) {
async function saveTokenToFile(filePath, tokenStorage, uuid = null) {
const absolutePath = path.isAbsolute(filePath)
? filePath
: path.join(process.cwd(), filePath);
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(absolutePath);
try {
// 确保目录存在
const dir = path.dirname(absolutePath);
await fs.mkdir(dir, { recursive: true });
// 写入文件
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'openai-iflow';
// 优先保存到内存缓存
if (uuid && credentialCache.hasCredentials(providerType, uuid)) {
const json = tokenStorage.toJSON();
// 验证关键字段是否存在
if (!json.refresh_token || json.refresh_token.trim() === '') {
console.error('[iFlow] WARNING: Attempting to save token file with empty refresh_token!');
console.error('[iFlow] WARNING: Attempting to save token with empty refresh_token!');
}
if (!json.apiKey || json.apiKey.trim() === '') {
console.error('[iFlow] WARNING: Attempting to save token file with empty apiKey!');
console.error('[iFlow] WARNING: Attempting to save token with empty apiKey!');
}
await fs.writeFile(absolutePath, JSON.stringify(json, null, 2), 'utf-8');
console.log(`[iFlow] Token saved to: ${filePath} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`);
} catch (error) {
throw new Error(`[iFlow] Failed to save token to file: ${error.message}`);
} finally {
// 确保锁被释放
releaseLock();
credentialCache.updateCredentials(providerType, uuid, json, absolutePath);
console.log(`[iFlow] Token saved to memory cache: ${uuid} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`);
return;
}
// Fallback: 使用内存锁的文件写入
await credentialCache.withMemoryLock(`iflow-save:${absolutePath}`, async () => {
try {
// 确保目录存在
const dir = path.dirname(absolutePath);
await fs.mkdir(dir, { recursive: true });
// 写入文件
const json = tokenStorage.toJSON();
// 验证关键字段是否存在
if (!json.refresh_token || json.refresh_token.trim() === '') {
console.error('[iFlow] WARNING: Attempting to save token file with empty refresh_token!');
}
if (!json.apiKey || json.apiKey.trim() === '') {
console.error('[iFlow] WARNING: Attempting to save token file with empty apiKey!');
}
await fs.writeFile(absolutePath, JSON.stringify(json, null, 2), 'utf-8');
console.log(`[iFlow] Token saved to: ${filePath} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`);
} catch (error) {
throw new Error(`[iFlow] Failed to save token to file: ${error.message}`);
}
});
}
// ==================== Token 刷新逻辑 ====================
@ -458,9 +476,10 @@ export class IFlowApiService {
this.apiKey = null;
this.baseUrl = config.IFLOW_BASE_URL || IFLOW_API_BASE_URL;
this.tokenFilePath = config.IFLOW_TOKEN_FILE_PATH || DEFAULT_TOKEN_FILE_PATH;
this.uuid = config.uuid; // 保存 uuid 用于缓存管理
this.isInitialized = false;
this.tokenStorage = null;
// 配置 HTTP/HTTPS agent
const httpAgent = new http.Agent({
keepAlive: true,
@ -484,10 +503,10 @@ export class IFlowApiService {
'User-Agent': IFLOW_USER_AGENT,
},
};
// 配置自定义代理
configureAxiosProxy(axiosConfig, config, 'openai-iflow');
this.axiosInstance = axios.create(axiosConfig);
}
@ -509,20 +528,39 @@ export class IFlowApiService {
* @param {boolean} forceRefresh - 是否强制刷新 Token
*/
async initializeAuth(forceRefresh = false) {
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'openai-iflow';
// 如果已有 API Key 且不强制刷新,直接返回
if (this.apiKey && !forceRefresh) return;
// 从 Token 文件加载 API Key
if (!this.tokenFilePath) {
throw new Error('[iFlow] IFLOW_TOKEN_FILE_PATH is required.');
}
try {
this.tokenStorage = await loadTokenFromFile(this.tokenFilePath);
// 优先从内存缓存加载
let tokenData = null;
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, this.uuid);
if (cachedEntry && cachedEntry.credentials) {
tokenData = cachedEntry.credentials;
this.tokenStorage = IFlowTokenStorage.fromJSON(tokenData);
console.log('[iFlow Auth] Loaded credentials from memory cache');
}
}
// Fallback: 从文件加载
if (!this.tokenStorage) {
this.tokenStorage = await loadTokenFromFile(this.tokenFilePath);
console.log('[iFlow Auth] Loaded credentials from file');
}
if (this.tokenStorage && this.tokenStorage.apiKey) {
this.apiKey = this.tokenStorage.apiKey;
console.log('[iFlow Auth] Authentication configured successfully from file.');
if (forceRefresh) {
console.log('[iFlow Auth] Forcing token refresh...');
await this._refreshOAuthTokens();
@ -541,7 +579,7 @@ export class IFlowApiService {
throw new Error(`[iFlow Auth] Failed to load OAuth credentials.`);
}
}
// 更新 axios 实例的 Authorization header
this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`;
}
@ -551,42 +589,55 @@ export class IFlowApiService {
* @returns {Promise<boolean>} - 是否执行了刷新
*/
async _checkAndRefreshTokenIfNeeded() {
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'openai-iflow';
if (!this.tokenStorage) {
return false;
}
// 检查是否有 refresh_token
if (!this.tokenStorage.refreshToken || this.tokenStorage.refreshToken.trim() === '') {
console.log('[iFlow] No refresh_token available, skipping token refresh check');
return false;
}
// 使用 isExpiryDateNear 检查过期时间
if (!this.isExpiryDateNear()) {
console.log('[iFlow] Token is valid, no refresh needed');
return false;
}
console.log('[iFlow] Token is expiring soon, attempting refresh...');
try {
// 使用去重锁:多个并发刷新请求只执行一次共享结果
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
const dedupeKey = `iflow-token-refresh:${this.tokenFilePath}`;
await withDeduplication(dedupeKey, async () => {
await credentialCache.withDeduplication(dedupeKey, async () => {
await this._refreshOAuthTokens();
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
if (this.isExpiryDateNear()) {
const refreshedStorage = await loadTokenFromFile(this.tokenFilePath);
if (refreshedStorage && refreshedStorage.apiKey) {
this.tokenStorage = refreshedStorage;
this.apiKey = refreshedStorage.apiKey;
this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`;
console.log('[iFlow] Credentials reloaded after concurrent refresh');
if (this.uuid && credentialCache.hasCredentials(providerType, this.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, this.uuid);
if (cachedEntry && cachedEntry.credentials) {
this.tokenStorage = IFlowTokenStorage.fromJSON(cachedEntry.credentials);
this.apiKey = this.tokenStorage.apiKey;
this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`;
console.log('[iFlow] Credentials reloaded from memory cache after concurrent refresh');
}
} else {
const refreshedStorage = await loadTokenFromFile(this.tokenFilePath);
if (refreshedStorage && refreshedStorage.apiKey) {
this.tokenStorage = refreshedStorage;
this.apiKey = refreshedStorage.apiKey;
this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`;
console.log('[iFlow] Credentials reloaded from file after concurrent refresh');
}
}
}
return true;
} catch (error) {
console.error('[iFlow] Token refresh failed:', error.message);
@ -637,7 +688,7 @@ export class IFlowApiService {
this.axiosInstance.defaults.headers['Authorization'] = `Bearer ${this.apiKey}`;
// 保存到文件
await saveTokenToFile(this.tokenFilePath, this.tokenStorage);
await saveTokenToFile(this.tokenFilePath, this.tokenStorage, this.uuid);
console.log(`[iFlow] Token refresh successful, new: ${this._maskToken(tokenData.accessToken)}`);
}

View file

@ -12,7 +12,7 @@ import { getProviderModels } from '../provider-models.js';
import { handleQwenOAuth } from '../../auth/oauth-handlers.js';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
import { acquireFileLock, withDeduplication } from '../../utils/file-lock.js';
import { CredentialCacheManager } from '../../utils/credential-cache-manager.js';
// --- Constants ---
const QWEN_DIR = '.qwen';
@ -389,18 +389,18 @@ export class QwenApiService {
async _cacheQwenCredentials(credentials) {
const filePath = this._getQwenCachedCredentialPath();
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(filePath);
try {
await fs.mkdir(path.dirname(filePath), { recursive: true });
const credString = JSON.stringify(credentials, null, 2);
await fs.writeFile(filePath, credString);
console.log(`[Qwen Auth] Credentials cached to ${filePath}`);
} catch (error) {
console.error(`[Qwen Auth] Failed to cache credentials to ${filePath}: ${error.message}`);
} finally {
releaseLock();
}
const credentialCache = CredentialCacheManager.getInstance();
// 使用内存锁替代文件锁,防止并发写入
await credentialCache.withMemoryLock(`qwen-cache:${filePath}`, async () => {
try {
await fs.mkdir(path.dirname(filePath), { recursive: true });
const credString = JSON.stringify(credentials, null, 2);
await fs.writeFile(filePath, credString);
console.log(`[Qwen Auth] Credentials cached to ${filePath}`);
} catch (error) {
console.error(`[Qwen Auth] Failed to cache credentials to ${filePath}: ${error.message}`);
}
});
}
getCurrentEndpoint(resourceUrl) {
@ -794,13 +794,27 @@ class SharedTokenManager {
throw new TokenManagerError(TokenError.NO_REFRESH_TOKEN, 'No refresh token available');
}
// 使用去重锁:多个并发刷新请求只执行一次,共享结果
// 获取凭证缓存管理器
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'openai-qwen-oauth';
// 使用内存锁进行去重
const dedupeKey = `qwen-token-refresh:${context.credentialFilePath}`;
try {
const credentials = await withDeduplication(dedupeKey, async () => {
const credentials = await credentialCache.withDeduplication(dedupeKey, async () => {
await this.acquireLock(context);
try {
// 优先从全局缓存检查
if (context.uuid && credentialCache.hasCredentials(providerType, context.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, context.uuid);
if (cachedEntry && cachedEntry.credentials && !forceRefresh && this.isTokenValid(cachedEntry.credentials)) {
context.memoryCache.credentials = cachedEntry.credentials;
qwenClient.setCredentials(cachedEntry.credentials);
return cachedEntry.credentials;
}
}
await this.checkAndReloadIfNeeded(context);
if (!forceRefresh && context.memoryCache.credentials && this.isTokenValid(context.memoryCache.credentials)) {
@ -832,22 +846,28 @@ class SharedTokenManager {
await this.releaseLock(context);
}
});
// 如果是等待其他请求完成的刷新,需要重新加载凭证
// 因为 withDeduplication 会让所有等待者共享同一个 Promise
// 但只有第一个调用者的实例会执行刷新并更新自己的内存状态
// 其他等待者需要从文件重新加载
if (!context.memoryCache.credentials || !this.isTokenValid(context.memoryCache.credentials)) {
await this.reloadCredentialsFromFile(context);
if (context.memoryCache.credentials) {
qwenClient.setCredentials(context.memoryCache.credentials);
// 优先从全局缓存加载
if (context.uuid && credentialCache.hasCredentials(providerType, context.uuid)) {
const cachedEntry = credentialCache.getCredentials(providerType, context.uuid);
if (cachedEntry && cachedEntry.credentials) {
context.memoryCache.credentials = cachedEntry.credentials;
qwenClient.setCredentials(cachedEntry.credentials);
}
} else {
await this.reloadCredentialsFromFile(context);
if (context.memoryCache.credentials) {
qwenClient.setCredentials(context.memoryCache.credentials);
}
}
}
return credentials;
} catch (error) {
if (error instanceof TokenManagerError) throw error;
// 处理 CredentialsClearRequiredError - 清除凭证文件
if (error instanceof CredentialsClearRequiredError) {
try {
@ -856,7 +876,7 @@ class SharedTokenManager {
} catch (_) { /* ignore */ }
throw error; // 重新抛出以便上层处理
}
// 如果刷新令牌无效/过期,删除此上下文对应的凭证文件
if (error && (error.status === 400 || /expired|invalid/i.test(error.message || ''))) {
try { await fs.unlink(context.credentialFilePath); } catch (_) { /* ignore */ }
@ -870,17 +890,26 @@ class SharedTokenManager {
}
async saveCredentialsToFile(context, credentials) {
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(context.credentialFilePath);
try {
// 优先更新内存缓存
const credentialCache = CredentialCacheManager.getInstance();
const providerType = 'openai-qwen-oauth';
if (context.uuid && credentialCache.hasCredentials(providerType, context.uuid)) {
credentialCache.updateCredentials(providerType, context.uuid, credentials, context.credentialFilePath);
console.info(`[Qwen Auth] Updated credentials in memory cache: ${context.uuid}`);
// 同时更新本地内存缓存
context.memoryCache.credentials = credentials;
context.memoryCache.fileModTime = Date.now();
return;
}
// 回退到使用内存锁写入文件
await credentialCache.withMemoryLock(`qwen-save:${context.credentialFilePath}`, async () => {
await fs.mkdir(path.dirname(context.credentialFilePath), { recursive: true, mode: 0o700 });
await fs.writeFile(context.credentialFilePath, JSON.stringify(credentials, null, 2), { mode: 0o600 });
const stats = await fs.stat(context.credentialFilePath);
context.memoryCache.fileModTime = stats.mtimeMs;
} finally {
// 确保锁被释放
releaseLock();
}
});
}
isTokenValid(credentials) {

View file

@ -2,6 +2,7 @@ import * as fs from 'fs'; // Import fs module
import { getServiceAdapter } from './adapter.js';
import { MODEL_PROVIDER, getProtocolPrefix } from '../utils/common.js';
import { getProviderModels } from './provider-models.js';
import { CredentialCacheManager } from '../utils/credential-cache-manager.js';
import axios from 'axios';
/**
@ -194,15 +195,15 @@ export class ProviderPoolManager {
// 如果时间相同,使用使用次数辅助判断
return (a.config.usageCount || 0) - (b.config.usageCount || 0);
})[0];
// 更新使用信息(除非明确跳过)
// 注意:这里的更新是同步的,在锁保护下执行,确保下一个请求能看到最新的 lastUsed
// 始终更新 lastUsed确保 LRU 策略生效,避免并发请求选到同一个 provider
// usageCount 只在请求成功后才增加(由 skipUsageCount 控制)
selected.config.lastUsed = new Date().toISOString();
if (!options.skipUsageCount) {
selected.config.lastUsed = new Date().toISOString();
selected.config.usageCount++;
// 使用防抖保存(文件 I/O 是异步的,但内存已经更新)
this._debouncedSave(providerType);
}
// 使用防抖保存(文件 I/O 是异步的,但内存已经更新)
this._debouncedSave(providerType);
this._log('debug', `Selected provider for ${providerType} (LRU): ${selected.config.uuid}${requestedModel ? ` for model: ${requestedModel}` : ''}${options.skipUsageCount ? ' (skip usage count)' : ''}`);
@ -853,7 +854,7 @@ export class ProviderPoolManager {
// 确定健康检查使用的模型名称
const modelName = providerConfig.checkModelName ||
ProviderPoolManager.DEFAULT_HEALTH_CHECK_MODELS[providerType];
// 如果未启用健康检查且不是强制检查,返回 null
if (!providerConfig.checkHealth && !forceCheck) {
return null;
@ -864,13 +865,48 @@ export class ProviderPoolManager {
return { success: false, modelName: null, errorMessage: 'Unknown provider type for health check' };
}
// 使用内部服务适配器方式进行健康检查
// ========== 快速预检:从内存缓存检查 OAuth 凭证状态 ==========
// 对于 OAuth 类型的 provider先检查 token 是否过期,避免阻塞
const oauthProviderTypes = ['claude-kiro-oauth', 'gemini-cli-oauth', 'gemini-antigravity', 'openai-qwen-oauth', 'openai-iflow-oauth', 'claude-orchids-oauth'];
if (oauthProviderTypes.includes(providerType) && providerConfig.uuid) {
const credentialCache = CredentialCacheManager.getInstance();
const cachedEntry = credentialCache.getCredentials(providerType, providerConfig.uuid);
if (cachedEntry && cachedEntry.credentials) {
const { expiresAt, accessToken } = cachedEntry.credentials;
// 检查 token 是否存在
if (!accessToken) {
this._log('warn', `Health check fast-fail for ${providerConfig.uuid}: No access token in cache`);
return { success: false, modelName, errorMessage: 'No access token available' };
}
// 检查 token 是否已过期30秒缓冲
if (expiresAt) {
const expirationTime = new Date(expiresAt).getTime();
const now = Date.now();
const bufferMs = 30 * 1000;
if (expirationTime <= now + bufferMs) {
this._log('warn', `Health check fast-fail for ${providerConfig.uuid}: Token expired`);
return { success: false, modelName, errorMessage: 'Token expired' };
}
}
this._log('debug', `Health check pre-check passed for ${providerConfig.uuid}: Token valid in cache`);
}
// 注意:如果缓存中没有凭证,不要直接返回失败
// 让后续的实际健康检查去尝试初始化和加载凭证
// 这样可以支持刚重置健康状态或新添加的 provider
}
// ========== 实际 API 健康检查(带超时保护)==========
const proxyKeys = ['GEMINI', 'OPENAI', 'CLAUDE', 'QWEN', 'KIRO'];
const tempConfig = {
...providerConfig,
MODEL_PROVIDER: providerType
};
proxyKeys.forEach(key => {
const proxyKey = `USE_SYSTEM_PROXY_${key}`;
if (this.globalConfig[proxyKey] !== undefined) {
@ -879,19 +915,32 @@ export class ProviderPoolManager {
});
const serviceAdapter = getServiceAdapter(tempConfig);
// 获取所有可能的请求格式
const healthCheckRequests = this._buildHealthCheckRequests(providerType, modelName);
// 健康检查超时时间15秒避免长时间阻塞
const healthCheckTimeout = 15000;
// 重试机制:尝试不同的请求格式
const maxRetries = healthCheckRequests.length;
let lastError = null;
for (let i = 0; i < maxRetries; i++) {
const healthCheckRequest = healthCheckRequests[i];
try {
this._log('debug', `Health check attempt ${i + 1}/${maxRetries} for ${modelName}: ${JSON.stringify(healthCheckRequest)}`);
await serviceAdapter.generateContent(modelName, healthCheckRequest);
// 带超时的健康检查
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Health check timeout')), healthCheckTimeout)
);
await Promise.race([
serviceAdapter.generateContent(modelName, healthCheckRequest),
timeoutPromise
]);
return { success: true, modelName, errorMessage: null };
} catch (error) {
lastError = error;
@ -899,7 +948,7 @@ export class ProviderPoolManager {
// 继续尝试下一个格式
}
}
// 所有尝试都失败
this._log('error', `Health check failed for ${providerType} after ${maxRetries} attempts: ${lastError?.message}`);
return { success: false, modelName, errorMessage: lastError?.message || 'All health check attempts failed' };

View file

@ -1,5 +1,6 @@
import { getServiceAdapter, serviceInstances } from '../providers/adapter.js';
import { ProviderPoolManager } from '../providers/provider-pool-manager.js';
import { CredentialCacheManager } from '../utils/credential-cache-manager.js';
import deepmerge from 'deepmerge';
import * as fs from 'fs';
import { promises as pfs } from 'fs';
@ -166,7 +167,7 @@ async function scanProviderDirectory(dirPath, linkedPaths, newProviders, options
* @returns {Promise<Object>} The initialized services
*/
export async function initApiService(config) {
if (config.providerPools && Object.keys(config.providerPools).length > 0) {
providerPoolManager = new ProviderPoolManager(config.providerPools, {
globalConfig: config,
@ -175,6 +176,62 @@ export async function initApiService(config) {
});
console.log('[Initialization] ProviderPoolManager initialized with configured pools.');
// 健康检查将在服务器完全启动后执行
// 初始化凭证缓存管理器
const credentialCache = CredentialCacheManager.getInstance();
// 获取单实例锁,防止多实例并发运行
try {
await credentialCache.acquireInstanceLock();
} catch (lockError) {
console.error('[Initialization] Failed to acquire instance lock:', lockError.message);
console.error('[Initialization] Please ensure no other instance is running.');
process.exit(1);
}
await credentialCache.preloadAllCredentials(config.providerPools);
credentialCache.startPeriodicSync(config.CREDENTIAL_SYNC_INTERVAL || 5000);
// 注册进程退出钩子
const shutdownHandler = async () => {
await credentialCache.shutdown();
};
process.on('beforeExit', shutdownHandler);
process.on('SIGINT', async () => {
await shutdownHandler();
process.exit(0);
});
process.on('SIGTERM', async () => {
await shutdownHandler();
process.exit(0);
});
// 处理未捕获的异常,进行紧急同步
process.on('uncaughtException', async (error) => {
console.error('[FATAL] Uncaught exception:', error);
console.log('[CredentialCache] Attempting emergency sync before crash...');
try {
await credentialCache.syncToFile();
console.log('[CredentialCache] Emergency sync completed');
} catch (syncError) {
console.error('[CredentialCache] Emergency sync failed:', syncError.message);
}
await credentialCache.shutdown();
process.exit(1);
});
process.on('unhandledRejection', async (reason, promise) => {
console.error('[FATAL] Unhandled rejection at:', promise, 'reason:', reason);
console.log('[CredentialCache] Attempting emergency sync...');
try {
await credentialCache.syncToFile();
console.log('[CredentialCache] Emergency sync completed');
} catch (syncError) {
console.error('[CredentialCache] Emergency sync failed:', syncError.message);
}
});
console.log('[Initialization] CredentialCacheManager initialized.');
} else {
console.log('[Initialization] No provider pools configured. Using single provider mode.');
}

View file

@ -0,0 +1,806 @@
/**
* 凭证缓存管理器
* 纯内存操作,使用内存锁保证并发安全,定时同步到文件
* 优先考虑并发性能,支持单实例部署
*/
import { promises as fs } from 'fs';
import * as path from 'path';
import { PROVIDER_MAPPINGS } from './provider-utils.js';
import * as os from 'os';
/**
* 凭证条目结构
* @typedef {Object} CredentialEntry
* @property {string} providerType - 提供商类型
* @property {string} uuid - 节点唯一标识
* @property {string} credPath - 凭证文件路径
* @property {Object} credentials - 凭证数据
* @property {number} lastModified - 最后修改时间戳
* @property {number} lastAccessed - 最后访问时间戳
* @property {boolean} isDirty - 是否有未同步的修改
* @property {number} retryCount - 同步失败重试次数
*/
export class CredentialCacheManager {
static instance = null;
constructor() {
// 凭证缓存: Map<cacheKey, CredentialEntry>
// cacheKey = `${providerType}:${uuid}`
this.credentialCache = new Map();
// Promise链锁: Map<lockKey, Promise>
this.lockChains = new Map();
// 脏数据标记(需要同步到文件的 cacheKey
this.dirtyKeys = new Set();
// 同步定时器
this.syncTimer = null;
this.syncIntervalMs = 5000; // 默认5秒
// 是否已初始化
this.isInitialized = false;
// 是否正在同步
this.isSyncing = false;
// 最大重试次数
this.maxRetries = 5;
// 最大脏数据条目数
this.maxDirtyKeys = 1000;
// 死信队列 - 存储同步失败的凭证
this.deadLetterQueue = new Map();
// 实例锁文件句柄
this.instanceLockRelease = null;
// 提供商类型到凭证路径键的映射
this.providerCredPathKeys = {};
for (const mapping of PROVIDER_MAPPINGS) {
this.providerCredPathKeys[mapping.providerType] = mapping.credPathKey;
}
}
/**
* 获取单例实例
* @returns {CredentialCacheManager}
*/
static getInstance() {
if (!CredentialCacheManager.instance) {
CredentialCacheManager.instance = new CredentialCacheManager();
}
return CredentialCacheManager.instance;
}
/**
* 生成缓存键
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
* @returns {string}
*/
_getCacheKey(providerType, uuid) {
return `${providerType}:${uuid}`;
}
/**
* 获取单实例锁,防止多实例并发运行
* @throws {Error} 如果已有实例在运行
*/
async acquireInstanceLock() {
const lockPath = path.join(os.tmpdir(), 'credential-cache.lock');
const pidPath = path.join(os.tmpdir(), 'credential-cache.pid');
try {
// 尝试读取已存在的 PID 文件
try {
const existingPid = await fs.readFile(pidPath, 'utf8');
const pid = parseInt(existingPid.trim(), 10);
// 检查进程是否还在运行
try {
process.kill(pid, 0); // 0 信号仅检查进程存在性
throw new Error(`[CredentialCache] Another instance is running (PID: ${pid}). Please stop it first.`);
} catch (killError) {
if (killError.code === 'ESRCH') {
// 进程已死亡,可以继续
console.log(`[CredentialCache] Stale lock detected (PID: ${pid}), cleaning up...`);
} else {
throw killError;
}
}
} catch (readError) {
if (readError.code !== 'ENOENT') {
throw readError;
}
// PID 文件不存在,可以继续
}
// 写入当前进程 PID
await fs.writeFile(pidPath, String(process.pid), 'utf8');
console.log(`[CredentialCache] Instance lock acquired (PID: ${process.pid})`);
// 注册清理钩子
this.instanceLockRelease = async () => {
try {
await fs.unlink(pidPath);
console.log('[CredentialCache] Instance lock released');
} catch (error) {
if (error.code !== 'ENOENT') {
console.warn(`[CredentialCache] Failed to release lock: ${error.message}`);
}
}
};
} catch (error) {
console.error(`[CredentialCache] Failed to acquire instance lock: ${error.message}`);
throw error;
}
}
/**
* 预加载所有凭证到内存
* @param {Object} providerPools - 提供商池配置
*/
async preloadAllCredentials(providerPools) {
if (!providerPools || typeof providerPools !== 'object') {
console.log('[CredentialCache] No provider pools to preload');
return;
}
let loadedCount = 0;
let failedCount = 0;
for (const [providerType, providers] of Object.entries(providerPools)) {
if (!Array.isArray(providers)) continue;
const credPathKey = this.providerCredPathKeys[providerType];
if (!credPathKey) {
console.warn(`[CredentialCache] Unknown provider type: ${providerType}`);
continue;
}
for (const providerConfig of providers) {
const uuid = providerConfig.uuid;
const credPath = providerConfig[credPathKey];
if (!uuid || !credPath) {
continue;
}
try {
const credentials = await this._loadCredentialsFromFile(credPath);
if (credentials) {
const cacheKey = this._getCacheKey(providerType, uuid);
this.credentialCache.set(cacheKey, {
providerType,
uuid,
credPath,
credentials,
lastModified: Date.now(),
lastAccessed: Date.now(),
isDirty: false,
retryCount: 0
});
loadedCount++;
}
} catch (error) {
failedCount++;
console.warn(`[CredentialCache] Failed to load credentials for ${providerType}:${uuid}: ${error.message}`);
}
}
}
this.isInitialized = true;
console.log(`[CredentialCache] Preloaded ${loadedCount} credentials (${failedCount} failed)`);
}
/**
* 原子写入文件 (temp + rename 模式)
* @param {string} filePath - 目标文件路径
* @param {string} data - 要写入的数据
*/
async _atomicWriteFile(filePath, data) {
const tmpPath = `${filePath}.tmp.${Date.now()}.${process.pid}`;
try {
// 1. 写入临时文件
await fs.writeFile(tmpPath, data, 'utf8');
// 2. fsync 确保落盘 (需要文件句柄)
const fileHandle = await fs.open(tmpPath, 'r+');
try {
await fileHandle.sync();
} finally {
await fileHandle.close();
}
// 3. 原子重命名
await fs.rename(tmpPath, filePath);
return true;
} catch (error) {
// 清理临时文件
try {
await fs.unlink(tmpPath);
} catch (unlinkError) {
// 忽略清理失败
}
throw error;
}
}
/**
* 从文件加载凭证仅用于初始导入和手动重载
* 支持从损坏文件的备份恢复
* @param {string} filePath - 文件路径
* @returns {Promise<Object|null>}
*/
async _loadCredentialsFromFile(filePath) {
try {
// 处理相对路径
const absolutePath = path.isAbsolute(filePath)
? filePath
: path.join(process.cwd(), filePath);
const content = await fs.readFile(absolutePath, 'utf8');
return JSON.parse(content);
} catch (error) {
if (error.code === 'ENOENT') {
return null;
}
throw error;
}
}
/**
* 获取凭证从内存
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
* @returns {CredentialEntry|null}
*/
getCredentials(providerType, uuid) {
const cacheKey = this._getCacheKey(providerType, uuid);
const entry = this.credentialCache.get(cacheKey);
if (entry) {
entry.lastAccessed = Date.now();
return entry;
}
return null;
}
/**
* 检查凭证是否存在于缓存中
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
* @returns {boolean}
*/
hasCredentials(providerType, uuid) {
const cacheKey = this._getCacheKey(providerType, uuid);
return this.credentialCache.has(cacheKey);
}
/**
* 更新凭证仅更新内存标记为dirty
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
* @param {Object} newCredentials - 新凭证数据
* @param {string} [credPath] - 凭证文件路径可选用于新建条目
*/
updateCredentials(providerType, uuid, newCredentials, credPath = null) {
const cacheKey = this._getCacheKey(providerType, uuid);
let entry = this.credentialCache.get(cacheKey);
if (entry) {
// 更新现有条目
entry.credentials = newCredentials;
entry.lastModified = Date.now();
entry.isDirty = true;
entry.retryCount = 0; // 重置重试计数
} else if (credPath) {
// 创建新条目
entry = {
providerType,
uuid,
credPath,
credentials: newCredentials,
lastModified: Date.now(),
lastAccessed: Date.now(),
isDirty: true,
retryCount: 0
};
this.credentialCache.set(cacheKey, entry);
} else {
console.warn(`[CredentialCache] Cannot update non-existent entry without credPath: ${cacheKey}`);
return;
}
// 标记为需要同步
this.dirtyKeys.add(cacheKey);
// 检查脏数据是否过多
if (this.dirtyKeys.size > this.maxDirtyKeys) {
console.warn(`[CredentialCache] Dirty keys exceeded ${this.maxDirtyKeys}, triggering immediate sync`);
this.syncToFile().catch(err => console.error('[CredentialCache] Emergency sync failed:', err.message));
}
}
/**
* 删除凭证从内存中删除同时删除文件
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
*/
async deleteCredentials(providerType, uuid) {
const cacheKey = this._getCacheKey(providerType, uuid);
const entry = this.credentialCache.get(cacheKey);
if (!entry) {
return;
}
// 从内存中删除
this.credentialCache.delete(cacheKey);
this.dirtyKeys.delete(cacheKey);
// 删除文件
if (entry.credPath) {
try {
const absolutePath = path.isAbsolute(entry.credPath)
? entry.credPath
: path.join(process.cwd(), entry.credPath);
await fs.unlink(absolutePath);
console.log(`[CredentialCache] Deleted credential file: ${entry.credPath}`);
} catch (error) {
if (error.code !== 'ENOENT') {
console.error(`[CredentialCache] Failed to delete ${entry.credPath}: ${error.message}`);
}
}
}
}
/**
* Promise链式内存锁 - 保证串行执行
* 使用 Promise 构造器模式避免 Read-Check-Write 竞态
* @param {string} key - 锁的唯一标识
* @param {Function} operation - 要执行的操作
* @returns {Promise<T>}
*/
async withMemoryLock(key, operation) {
// 创建新的 Promise 用于链接后续操作
let resolveNext, rejectNext;
const nextPromise = new Promise((resolve, reject) => {
resolveNext = resolve;
rejectNext = reject;
});
// 原子化获取前序链并立即更新 Map (避免竞态)
const prevChain = this.lockChains.get(key);
this.lockChains.set(key, nextPromise);
// 链接到前序 Promise 执行操作
const executeOperation = async () => {
try {
// 等待前序操作完成(忽略前序的错误,继续执行当前操作)
if (prevChain) {
await prevChain.catch(() => {});
}
const result = await operation();
resolveNext(result);
} catch (error) {
rejectNext(error);
} finally {
// 只有当前 Promise 还在 Map 中时才删除
if (this.lockChains.get(key) === nextPromise) {
this.lockChains.delete(key);
}
}
};
// 立即开始执行(不阻塞)
executeOperation();
return nextPromise;
}
/**
* 去重执行 - 多个并发请求共享同一个操作结果
* @param {string} key - 去重键
* @param {Function} operation - 要执行的操作
* @returns {Promise<T>}
*/
async withDeduplication(key, operation) {
const dedupeKey = `dedupe:${key}`;
// 检查是否已有正在执行的操作
const existingPromise = this.lockChains.get(dedupeKey);
if (existingPromise) {
// 直接等待现有操作完成并返回结果
return existingPromise;
}
// 创建新操作 Promise 并立即存储(避免竞态)
const operationPromise = (async () => {
try {
return await operation();
} finally {
// 操作完成后清理
if (this.lockChains.get(dedupeKey) === operationPromise) {
this.lockChains.delete(dedupeKey);
}
}
})();
// 立即存储,确保后续请求能看到
this.lockChains.set(dedupeKey, operationPromise);
return operationPromise;
}
/**
* 启动定时同步
* @param {number} intervalMs - 同步间隔毫秒
*/
startPeriodicSync(intervalMs = 5000) {
this.syncIntervalMs = intervalMs;
if (this.syncTimer) {
clearInterval(this.syncTimer);
}
this.syncTimer = setInterval(() => {
this.syncToFile().catch(error => {
console.error('[CredentialCache] Periodic sync failed:', error.message);
});
}, this.syncIntervalMs);
// 确保定时器不阻止进程退出
if (this.syncTimer.unref) {
this.syncTimer.unref();
}
console.log(`[CredentialCache] Started periodic sync (interval: ${intervalMs}ms)`);
}
/**
* 停止定时同步
*/
stopPeriodicSync() {
if (this.syncTimer) {
clearInterval(this.syncTimer);
this.syncTimer = null;
console.log('[CredentialCache] Stopped periodic sync');
}
}
/**
* 同步脏数据到文件纯内存操作不使用文件锁
*/
async syncToFile() {
if (this.dirtyKeys.size === 0) {
return;
}
if (this.isSyncing) {
return; // 防止重入
}
this.isSyncing = true;
// 复制脏数据集合(不立即清空,同步成功后再清理)
const keysToSync = Array.from(this.dirtyKeys);
console.log(`[CredentialCache] Syncing ${keysToSync.length} credential(s) to file...`);
let successCount = 0;
let failedCount = 0;
const successKeys = new Set();
// 并发写入所有文件(提高性能)
await Promise.allSettled(
keysToSync.map(async (cacheKey) => {
const entry = this.credentialCache.get(cacheKey);
if (!entry || !entry.credPath) {
return;
}
// 检查重试次数
if (entry.retryCount >= this.maxRetries) {
// 达到最大重试次数,移入死信队列
console.error(`[CredentialCache] Max retries exceeded for ${cacheKey}, moving to dead letter queue`);
this.deadLetterQueue.set(cacheKey, {
entry: JSON.parse(JSON.stringify(entry)), // 深拷贝
failureReason: entry.lastError || 'Max retries exceeded',
firstFailureTime: entry.firstFailureTime || Date.now(),
timestamp: Date.now()
});
// 尝试导出到紧急备份
try {
const emergencyDir = path.join(process.cwd(), 'emergency_backup');
await fs.mkdir(emergencyDir, { recursive: true });
const emergencyPath = path.join(emergencyDir, `${cacheKey.replace(/:/g, '_')}.json`);
await fs.writeFile(emergencyPath, JSON.stringify(entry.credentials, null, 2), 'utf8');
console.log(`[CredentialCache] Credential backed up to: ${emergencyPath}`);
} catch (backupError) {
console.error(`[CredentialCache] Failed to backup credential: ${backupError.message}`);
}
this.dirtyKeys.delete(cacheKey);
entry.isDirty = false;
return;
}
try {
// 处理相对路径
const absolutePath = path.isAbsolute(entry.credPath)
? entry.credPath
: path.join(process.cwd(), entry.credPath);
// 确保目录存在
const dir = path.dirname(absolutePath);
await fs.mkdir(dir, { recursive: true });
// 使用原子写入
await this._atomicWriteFile(
absolutePath,
JSON.stringify(entry.credentials, null, 2)
);
entry.isDirty = false;
entry.retryCount = 0;
entry.lastError = null;
entry.firstFailureTime = null;
successKeys.add(cacheKey);
successCount++;
} catch (error) {
// 同步失败,增加重试计数并记录错误
entry.retryCount++;
entry.lastError = error.message;
if (!entry.firstFailureTime) {
entry.firstFailureTime = Date.now();
}
failedCount++;
console.error(`[CredentialCache] Failed to sync ${entry.credPath} (retry ${entry.retryCount}/${this.maxRetries}): ${error.message}`);
}
})
);
// 从脏数据集合中移除成功同步的键
for (const key of successKeys) {
this.dirtyKeys.delete(key);
}
this.isSyncing = false;
if (successCount > 0 || failedCount > 0) {
console.log(`[CredentialCache] Sync completed: ${successCount} success, ${failedCount} failed, ${this.dirtyKeys.size} pending`);
}
}
/**
* 立即同步指定凭证到文件
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
*/
async syncCredentialToFile(providerType, uuid) {
const cacheKey = this._getCacheKey(providerType, uuid);
const entry = this.credentialCache.get(cacheKey);
if (!entry || !entry.credPath) {
return;
}
try {
const absolutePath = path.isAbsolute(entry.credPath)
? entry.credPath
: path.join(process.cwd(), entry.credPath);
const dir = path.dirname(absolutePath);
await fs.mkdir(dir, { recursive: true });
// 使用原子写入
await this._atomicWriteFile(
absolutePath,
JSON.stringify(entry.credentials, null, 2)
);
entry.isDirty = false;
entry.retryCount = 0;
entry.lastError = null;
entry.firstFailureTime = null;
this.dirtyKeys.delete(cacheKey);
} catch (error) {
console.error(`[CredentialCache] Failed to sync ${entry.credPath}: ${error.message}`);
throw error;
}
}
/**
* 关闭时同步所有数据带超时
*/
async shutdown() {
console.log('[CredentialCache] Shutting down...');
// 停止定时同步
this.stopPeriodicSync();
// 同步所有脏数据(带动态超时)
if (this.dirtyKeys.size > 0) {
console.log(`[CredentialCache] Syncing ${this.dirtyKeys.size} dirty credential(s) before shutdown...`);
// 根据脏数据量动态调整超时时间
const timeoutMs = Math.max(10000, this.dirtyKeys.size * 50 + 5000);
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown sync timeout')), timeoutMs)
);
try {
await Promise.race([
this.syncToFile(),
timeoutPromise
]);
} catch (error) {
console.error(`[CredentialCache] Shutdown sync failed: ${error.message}`);
console.error(`[CredentialCache] ${this.dirtyKeys.size} credentials NOT saved`);
// 尝试紧急备份未保存的凭证
if (this.dirtyKeys.size > 0) {
console.log('[CredentialCache] Attempting emergency backup...');
try {
const emergencyDir = path.join(process.cwd(), 'emergency_backup');
await fs.mkdir(emergencyDir, { recursive: true });
for (const cacheKey of this.dirtyKeys) {
const entry = this.credentialCache.get(cacheKey);
if (entry && entry.credentials) {
const emergencyPath = path.join(emergencyDir, `${cacheKey.replace(/:/g, '_')}.json`);
await fs.writeFile(emergencyPath, JSON.stringify(entry.credentials, null, 2), 'utf8');
}
}
console.log(`[CredentialCache] Emergency backup completed to: ${emergencyDir}`);
} catch (backupError) {
console.error(`[CredentialCache] Emergency backup failed: ${backupError.message}`);
}
}
}
}
// 释放实例锁
if (this.instanceLockRelease) {
await this.instanceLockRelease();
}
// 输出死信队列状态
if (this.deadLetterQueue.size > 0) {
console.warn(`[CredentialCache] ${this.deadLetterQueue.size} credential(s) in dead letter queue`);
}
console.log('[CredentialCache] Shutdown complete');
}
/**
* 获取缓存统计信息
* @returns {Object}
*/
getStats() {
const stats = {
totalEntries: this.credentialCache.size,
dirtyEntries: this.dirtyKeys.size,
activeLocks: this.lockChains.size,
deadLetterQueueSize: this.deadLetterQueue.size,
isInitialized: this.isInitialized,
isSyncing: this.isSyncing,
syncInterval: this.syncIntervalMs,
byProvider: {}
};
for (const [cacheKey, entry] of this.credentialCache) {
const providerType = entry.providerType;
if (!stats.byProvider[providerType]) {
stats.byProvider[providerType] = {
count: 0,
dirty: 0,
maxRetries: 0
};
}
stats.byProvider[providerType].count++;
if (entry.isDirty) {
stats.byProvider[providerType].dirty++;
stats.byProvider[providerType].maxRetries = Math.max(
stats.byProvider[providerType].maxRetries,
entry.retryCount
);
}
}
return stats;
}
/**
* 获取死信队列内容
* @returns {Array}
*/
getDeadLetterQueue() {
return Array.from(this.deadLetterQueue.entries()).map(([key, value]) => ({
cacheKey: key,
...value
}));
}
/**
* 从死信队列恢复凭证
* @param {string} cacheKey - 缓存键
* @returns {boolean} 是否恢复成功
*/
recoverFromDeadLetter(cacheKey) {
const deadEntry = this.deadLetterQueue.get(cacheKey);
if (!deadEntry) {
return false;
}
// 恢复到缓存
const entry = deadEntry.entry;
entry.retryCount = 0;
entry.isDirty = true;
entry.lastError = null;
entry.firstFailureTime = null;
this.credentialCache.set(cacheKey, entry);
this.dirtyKeys.add(cacheKey);
// 从死信队列移除
this.deadLetterQueue.delete(cacheKey);
console.log(`[CredentialCache] Recovered credential from dead letter queue: ${cacheKey}`);
return true;
}
/**
* 清除所有缓存用于测试
*/
clear() {
this.credentialCache.clear();
this.lockChains.clear();
this.dirtyKeys.clear();
this.deadLetterQueue.clear();
this.isInitialized = false;
}
/**
* 重新加载指定凭证从文件
* @param {string} providerType - 提供商类型
* @param {string} uuid - 节点UUID
* @returns {Promise<CredentialEntry|null>}
*/
async reloadCredentials(providerType, uuid) {
const cacheKey = this._getCacheKey(providerType, uuid);
const entry = this.credentialCache.get(cacheKey);
if (!entry || !entry.credPath) {
return null;
}
try {
const credentials = await this._loadCredentialsFromFile(entry.credPath);
if (credentials) {
entry.credentials = credentials;
entry.lastModified = Date.now();
entry.isDirty = false;
entry.retryCount = 0;
this.dirtyKeys.delete(cacheKey);
return entry;
}
} catch (error) {
console.error(`[CredentialCache] Failed to reload ${entry.credPath}: ${error.message}`);
}
return null;
}
}
// 导出单例获取函数
export function getCredentialCacheManager() {
return CredentialCacheManager.getInstance();
}

View file

@ -1,220 +0,0 @@
import * as path from 'path';
/**
* 文件锁管理器 - 防止并发写入导致文件损坏
*
* 使用场景
* - 多个异步操作同时读写同一文件
* - 防止读--写竞争条件Race Condition
* - 防止写入交错导致文件内容损坏
*
* 注意这是进程内锁只能防止同一 Node.js 进程内的并发
* 如果需要跨进程文件锁请使用 proper-lockfile 等库
*/
// 存储每个文件的锁队列Promise 链)
// 每个文件对应一个 Promise新的锁请求会链接到当前 Promise 之后
const fileLockQueues = new Map();
// 存储去重锁的进行中 Promise
// 用于合并相同 key 的并发请求,只执行一次操作
const dedupePromises = new Map();
/**
* 获取文件锁确保同一时间只有一个操作可以访问特定文件
*
* 实现原理使用 Promise 链实现队列机制
* - 每个文件维护一个 Promise
* - 新的锁请求会等待当前链完成然后创建新的链节点
* - 这确保了锁的获取是严格串行的避免竞态条件
*
* @param {string} filePath - 文件路径
* @returns {Promise<() => void>} 释放锁的函数
*
* @example
* const releaseLock = await acquireFileLock('/path/to/file.json');
* try {
* // 读取、修改、写入文件
* const data = await fs.readFile(filePath, 'utf8');
* const modified = JSON.parse(data);
* modified.key = 'new value';
* await fs.writeFile(filePath, JSON.stringify(modified, null, 2));
* } finally {
* releaseLock(); // 确保锁被释放
* }
*/
export async function acquireFileLock(filePath) {
const normalizedPath = path.resolve(filePath);
// 获取当前队列中的最后一个 Promise如果存在
const currentLock = fileLockQueues.get(normalizedPath) || Promise.resolve();
// 创建释放锁的 resolver
let releaseLock;
const newLockPromise = new Promise(resolve => {
releaseLock = resolve;
});
// 立即将新的 Promise 加入队列(在 await 之前!)
// 这是关键:确保后续请求会等待这个新的 Promise
fileLockQueues.set(normalizedPath, newLockPromise);
// 等待前一个锁释放
await currentLock;
// 返回释放锁的函数
return () => {
// 只有当当前锁仍是队列中的最后一个时才清理
// 否则保留队列让后续请求继续等待
if (fileLockQueues.get(normalizedPath) === newLockPromise) {
fileLockQueues.delete(normalizedPath);
}
releaseLock();
};
}
/**
* 使用文件锁执行操作的便捷函数
* @param {string} filePath - 文件路径
* @param {Function} operation - 要执行的异步操作
* @returns {Promise<any>} 操作的返回值
*
* @example
* const result = await withFileLock('/path/to/file.json', async () => {
* const data = await fs.readFile(filePath, 'utf8');
* const modified = JSON.parse(data);
* modified.key = 'new value';
* await fs.writeFile(filePath, JSON.stringify(modified, null, 2));
* return modified;
* });
*/
export async function withFileLock(filePath, operation) {
const releaseLock = await acquireFileLock(filePath);
try {
return await operation();
} finally {
releaseLock();
}
}
/**
* 检查文件是否被锁定有等待中的锁队列
* @param {string} filePath - 文件路径
* @returns {boolean} 是否被锁定
*/
export function isFileLocked(filePath) {
const normalizedPath = path.resolve(filePath);
return fileLockQueues.has(normalizedPath);
}
/**
* 获取当前被锁定的文件数量用于调试
* @returns {number} 被锁定的文件数量
*/
export function getLockedFileCount() {
return fileLockQueues.size;
}
/**
* 去重执行 - 合并相同 key 的并发请求只执行一次操作
*
* withFileLock 的区别
* - withFileLock队列锁10个并发请求 排队执行10次
* - withDeduplication去重锁10个并发请求 只执行1次共享结果
*
* 使用场景
* - Token 刷新多个请求同时发现 token 过期只需刷新一次
* - 缓存填充多个请求同时 cache miss只需加载一次
* - 任何"结果可共享"的昂贵操作
*
* @param {string} key - 去重的唯一标识符
* @param {Function} operation - 要执行的异步操作
* @returns {Promise<any>} 操作的返回值所有等待者共享同一结果
*
* @example
* // 多个并发调用只会执行一次 refreshToken
* const newToken = await withDeduplication('token-refresh', async () => {
* const response = await fetch('/refresh');
* return response.json();
* });
*/
export async function withDeduplication(key, operation) {
// 如果已有相同 key 的操作在进行中,直接等待它的结果
if (dedupePromises.has(key)) {
return dedupePromises.get(key);
}
// 创建新的操作 Promise
const operationPromise = (async () => {
try {
return await operation();
} finally {
// 操作完成后清理
dedupePromises.delete(key);
}
})();
// 存入 Map让后续请求可以共享
dedupePromises.set(key, operationPromise);
return operationPromise;
}
/**
* 组合去重锁和文件锁 - 先去重再加文件锁
*
* 典型场景Token 刷新
* 1. 去重层10个并发刷新请求 合并为1次刷新操作
* 2. 文件锁层保护那1次刷新操作的文件写入不与其他操作冲突
*
* @param {string} dedupeKey - 去重的唯一标识符
* @param {string} filePath - 需要保护的文件路径
* @param {Function} operation - 要执行的异步操作
* @returns {Promise<any>} 操作的返回值
*
* @example
* // Token 刷新场景
* const newToken = await withDeduplicationAndFileLock(
* 'token-refresh-' + credentialId,
* '/path/to/token.json',
* async () => {
* const response = await fetch('/refresh');
* const data = await response.json();
* await fs.writeFile('/path/to/token.json', JSON.stringify(data));
* return data;
* }
* );
*/
export async function withDeduplicationAndFileLock(dedupeKey, filePath, operation) {
return withDeduplication(dedupeKey, async () => {
return withFileLock(filePath, operation);
});
}
/**
* 检查是否有去重操作正在进行
* @param {string} key - 去重的唯一标识符
* @returns {boolean} 是否有操作在进行中
*/
export function isDedupeInProgress(key) {
return dedupePromises.has(key);
}
/**
* 获取当前进行中的去重操作数量用于调试
* @returns {number} 进行中的去重操作数量
*/
export function getDedupeCount() {
return dedupePromises.size;
}
export default {
acquireFileLock,
withFileLock,
isFileLocked,
getLockedFileCount,
withDeduplication,
withDeduplicationAndFileLock,
isDedupeInProgress,
getDedupeCount
};