feat(文件锁): 添加文件锁机制防止凭证文件并发写入冲突

实现文件锁工具类,用于防止多个异步操作同时写入同一文件导致的竞争条件。修改各认证模块的凭证保存逻辑,在写入文件前获取锁,确保写入操作的原子性。

- 新增 file-lock.js 工具模块,提供 acquireFileLock 和 withFileLock 方法
- 修改所有认证模块的凭证保存逻辑,使用文件锁保护写入操作
- 添加适当的错误处理和锁释放机制,确保资源不会泄漏
This commit is contained in:
hex2077 2026-01-13 22:10:49 +08:00
parent 4a3bea4f7d
commit 58c66fcd4b
6 changed files with 200 additions and 16 deletions

View file

@ -11,6 +11,7 @@ import { countTokens } from '@anthropic-ai/tokenizer';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError, MODEL_PROVIDER } from '../../utils/common.js';
import { getProviderPoolManager } from '../../services/service-manager.js';
import { acquireFileLock } from '../../utils/file-lock.js';
const KIRO_THINKING = {
MAX_BUDGET_TOKENS: 24576,
@ -426,8 +427,10 @@ async initializeAuth(forceRefresh = false) {
}
};
// Helper to save credentials to a file
// Helper to save credentials to a file (with file locking to prevent concurrent write corruption)
const saveCredentialsToFile = async (filePath, newData) => {
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(filePath);
try {
let existingData = {};
try {
@ -445,6 +448,9 @@ async initializeAuth(forceRefresh = false) {
console.info(`[Kiro Auth] Updated token file: ${filePath}`);
} catch (error) {
console.error(`[Kiro Auth] Failed to write token to file ${filePath}: ${error.message}`);
} finally {
// 确保锁被释放
releaseLock();
}
};

View file

@ -14,6 +14,7 @@ import { getProviderModels } from '../provider-models.js';
import { handleGeminiAntigravityOAuth } from '../../auth/oauth-handlers.js';
import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js';
import { cleanJsonSchemaProperties } from '../../converters/utils.js';
import { acquireFileLock } from '../../utils/file-lock.js';
// 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏
const httpAgent = new http.Agent({
@ -797,8 +798,8 @@ export class AntigravityApiService {
console.log('[Antigravity Auth] Token expiring soon or force refresh requested. Refreshing token...');
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// 保存刷新后的凭证到文件
await fs.writeFile(credPath, JSON.stringify(newCredentials, null, 2));
// 保存刷新后的凭证到文件(使用文件锁)
await this._saveCredentialsToFile(credPath, newCredentials);
console.log(`[Antigravity Auth] Token refreshed and saved to ${credPath} successfully.`);
}
} catch (error) {
@ -874,6 +875,23 @@ export class AntigravityApiService {
return expiryTime <= (currentTime + refreshSkewMs);
}
/**
* 保存凭证到文件使用文件锁防止并发写入
* @param {string} filePath - 凭证文件路径
* @param {Object} credentials - 凭证数据
*/
async _saveCredentialsToFile(filePath, credentials) {
const releaseLock = await acquireFileLock(filePath);
try {
await fs.writeFile(filePath, JSON.stringify(credentials, null, 2));
console.log(`[Antigravity Auth] Credentials saved to ${filePath}`);
} catch (error) {
console.error(`[Antigravity Auth] Failed to save credentials to ${filePath}: ${error.message}`);
} finally {
releaseLock();
}
}
async discoverProjectAndModels() {
if (this.projectId) {
console.log(`[Antigravity] Using pre-configured Project ID: ${this.projectId}`);

View file

@ -10,6 +10,7 @@ import { API_ACTIONS, formatExpiryTime, isRetryableNetworkError } from '../../ut
import { getProviderModels } from '../provider-models.js';
import { handleGeminiCliOAuth } from '../../auth/oauth-handlers.js';
import { getProxyConfigForProvider, getGoogleAuthProxyConfig } from '../../utils/proxy-utils.js';
import { acquireFileLock } from '../../utils/file-lock.js';
// 配置 HTTP/HTTPS agent 限制连接池大小,避免资源泄漏
const httpAgent = new http.Agent({
@ -274,8 +275,8 @@ export class GeminiApiService {
console.log('[Gemini Auth] Forcing token refresh...');
const { credentials: newCredentials } = await this.authClient.refreshAccessToken();
this.authClient.setCredentials(newCredentials);
// Save refreshed credentials back to file
await fs.writeFile(credPath, JSON.stringify(newCredentials, null, 2));
// Save refreshed credentials back to file (with file locking)
await this._saveCredentialsToFile(credPath, newCredentials);
console.log('[Gemini Auth] Token refreshed and saved successfully.');
}
} catch (error) {
@ -621,6 +622,23 @@ export class GeminiApiService {
}
}
/**
* 保存凭证到文件使用文件锁防止并发写入
* @param {string} filePath - 凭证文件路径
* @param {Object} credentials - 凭证数据
*/
async _saveCredentialsToFile(filePath, credentials) {
const releaseLock = await acquireFileLock(filePath);
try {
await fs.writeFile(filePath, JSON.stringify(credentials, null, 2));
console.log(`[Gemini Auth] Credentials saved to ${filePath}`);
} catch (error) {
console.error(`[Gemini Auth] Failed to save credentials to ${filePath}: ${error.message}`);
} finally {
releaseLock();
}
}
/**
* 获取模型配额信息
* @returns {Promise<Object>} 模型配额信息

View file

@ -24,6 +24,7 @@ import * as path from 'path';
import * as os from 'os';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
import { acquireFileLock } from '../../utils/file-lock.js';
// iFlow API 端点
const IFLOW_API_BASE_URL = 'https://apis.iflow.cn/v1';
@ -132,11 +133,13 @@ async function loadTokenFromFile(filePath) {
* @param {IFlowTokenStorage} tokenStorage - Token 存储对象
*/
async function saveTokenToFile(filePath, tokenStorage) {
const absolutePath = path.isAbsolute(filePath)
? filePath
: path.join(process.cwd(), filePath);
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(absolutePath);
try {
const absolutePath = path.isAbsolute(filePath)
? filePath
: path.join(process.cwd(), filePath);
// 确保目录存在
const dir = path.dirname(absolutePath);
await fs.mkdir(dir, { recursive: true });
@ -157,6 +160,9 @@ async function saveTokenToFile(filePath, tokenStorage) {
console.log(`[iFlow] Token saved to: ${filePath} (refresh_token: ${json.refresh_token ? json.refresh_token.substring(0, 8) + '...' : 'EMPTY'})`);
} catch (error) {
throw new Error(`[iFlow] Failed to save token to file: ${error.message}`);
} finally {
// 确保锁被释放
releaseLock();
}
}

View file

@ -12,6 +12,7 @@ import { getProviderModels } from '../provider-models.js';
import { handleQwenOAuth } from '../../auth/oauth-handlers.js';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
import { acquireFileLock } from '../../utils/file-lock.js';
// --- Constants ---
const QWEN_DIR = '.qwen';
@ -388,9 +389,18 @@ export class QwenApiService {
async _cacheQwenCredentials(credentials) {
const filePath = this._getQwenCachedCredentialPath();
await fs.mkdir(path.dirname(filePath), { recursive: true });
const credString = JSON.stringify(credentials, null, 2);
await fs.writeFile(filePath, credString);
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(filePath);
try {
await fs.mkdir(path.dirname(filePath), { recursive: true });
const credString = JSON.stringify(credentials, null, 2);
await fs.writeFile(filePath, credString);
console.log(`[Qwen Auth] Credentials cached to ${filePath}`);
} catch (error) {
console.error(`[Qwen Auth] Failed to cache credentials to ${filePath}: ${error.message}`);
} finally {
releaseLock();
}
}
getCurrentEndpoint(resourceUrl) {
@ -841,10 +851,17 @@ class SharedTokenManager {
}
async saveCredentialsToFile(context, credentials) {
await fs.mkdir(path.dirname(context.credentialFilePath), { recursive: true, mode: 0o700 });
await fs.writeFile(context.credentialFilePath, JSON.stringify(credentials, null, 2), { mode: 0o600 });
const stats = await fs.stat(context.credentialFilePath);
context.memoryCache.fileModTime = stats.mtimeMs;
// 获取文件锁,防止并发写入
const releaseLock = await acquireFileLock(context.credentialFilePath);
try {
await fs.mkdir(path.dirname(context.credentialFilePath), { recursive: true, mode: 0o700 });
await fs.writeFile(context.credentialFilePath, JSON.stringify(credentials, null, 2), { mode: 0o600 });
const stats = await fs.stat(context.credentialFilePath);
context.memoryCache.fileModTime = stats.mtimeMs;
} finally {
// 确保锁被释放
releaseLock();
}
}
isTokenValid(credentials) {

119
src/utils/file-lock.js Normal file
View file

@ -0,0 +1,119 @@
import * as path from 'path';
/**
* 文件锁管理器 - 防止并发写入导致文件损坏
*
* 使用场景
* - 多个异步操作同时读写同一文件
* - 防止读--写竞争条件Race Condition
* - 防止写入交错导致文件内容损坏
*
* 注意这是进程内锁只能防止同一 Node.js 进程内的并发
* 如果需要跨进程文件锁请使用 proper-lockfile 等库
*/
// 存储每个文件的锁队列Promise 链)
// 每个文件对应一个 Promise新的锁请求会链接到当前 Promise 之后
const fileLockQueues = new Map();
/**
* 获取文件锁确保同一时间只有一个操作可以访问特定文件
*
* 实现原理使用 Promise 链实现队列机制
* - 每个文件维护一个 Promise
* - 新的锁请求会等待当前链完成然后创建新的链节点
* - 这确保了锁的获取是严格串行的避免竞态条件
*
* @param {string} filePath - 文件路径
* @returns {Promise<() => void>} 释放锁的函数
*
* @example
* const releaseLock = await acquireFileLock('/path/to/file.json');
* try {
* // 读取、修改、写入文件
* const data = await fs.readFile(filePath, 'utf8');
* const modified = JSON.parse(data);
* modified.key = 'new value';
* await fs.writeFile(filePath, JSON.stringify(modified, null, 2));
* } finally {
* releaseLock(); // 确保锁被释放
* }
*/
export async function acquireFileLock(filePath) {
const normalizedPath = path.resolve(filePath);
// 获取当前队列中的最后一个 Promise如果存在
const currentLock = fileLockQueues.get(normalizedPath) || Promise.resolve();
// 创建释放锁的 resolver
let releaseLock;
const newLockPromise = new Promise(resolve => {
releaseLock = resolve;
});
// 立即将新的 Promise 加入队列(在 await 之前!)
// 这是关键:确保后续请求会等待这个新的 Promise
fileLockQueues.set(normalizedPath, newLockPromise);
// 等待前一个锁释放
await currentLock;
// 返回释放锁的函数
return () => {
// 只有当当前锁仍是队列中的最后一个时才清理
// 否则保留队列让后续请求继续等待
if (fileLockQueues.get(normalizedPath) === newLockPromise) {
fileLockQueues.delete(normalizedPath);
}
releaseLock();
};
}
/**
* 使用文件锁执行操作的便捷函数
* @param {string} filePath - 文件路径
* @param {Function} operation - 要执行的异步操作
* @returns {Promise<any>} 操作的返回值
*
* @example
* const result = await withFileLock('/path/to/file.json', async () => {
* const data = await fs.readFile(filePath, 'utf8');
* const modified = JSON.parse(data);
* modified.key = 'new value';
* await fs.writeFile(filePath, JSON.stringify(modified, null, 2));
* return modified;
* });
*/
export async function withFileLock(filePath, operation) {
const releaseLock = await acquireFileLock(filePath);
try {
return await operation();
} finally {
releaseLock();
}
}
/**
* 检查文件是否被锁定有等待中的锁队列
* @param {string} filePath - 文件路径
* @returns {boolean} 是否被锁定
*/
export function isFileLocked(filePath) {
const normalizedPath = path.resolve(filePath);
return fileLockQueues.has(normalizedPath);
}
/**
* 获取当前被锁定的文件数量用于调试
* @returns {number} 被锁定的文件数量
*/
export function getLockedFileCount() {
return fileLockQueues.size;
}
export default {
acquireFileLock,
withFileLock,
isFileLocked,
getLockedFileCount
};