diff --git a/src/providers/provider-pool-manager.js b/src/providers/provider-pool-manager.js index 353734f..30b6965 100644 --- a/src/providers/provider-pool-manager.js +++ b/src/providers/provider-pool-manager.js @@ -50,6 +50,7 @@ export class ProviderPoolManager { // 并发控制:每个 providerType 的选择锁 // 用于确保 selectProvider 的排序 and 更新操作是原子的 this._selectionLocks = {}; + this._isSelecting = {}; // 同步标志位锁 // --- V2: 读写分离 and 异步刷新队列 --- // 刷新并发控制配置 @@ -69,7 +70,10 @@ export class ProviderPoolManager { this.refreshBufferQueues = {}; // 按 providerType 分组的缓冲队列 this.refreshBufferTimers = {}; // 按 providerType 分组的定时器 this.bufferDelay = options.globalConfig?.REFRESH_BUFFER_DELAY ?? 5000; // 默认5秒缓冲延迟 - + + // 用于并发选点时的原子排序辅助(自增序列) + this._selectionSequence = 0; + this.initializeProviderStatus(); } @@ -401,34 +405,38 @@ export class ProviderPoolManager { * 分数越低,优先级越高 * @private */ - _calculateNodeScore(providerStatus) { + _calculateNodeScore(providerStatus, now = Date.now()) { const config = providerStatus.config; - const now = Date.now(); // 1. 基础健康分:不健康的排最后 - if (!config.isHealthy || config.isDisabled) return 1e16; + if (!config.isHealthy || config.isDisabled) return 1e18; // 2. 预热/刷新分:2分钟内刷新过且使用次数极少的节点视为“新鲜”,分数极低(最高优) - const isFresh = config.lastHealthCheckTime && - (now - new Date(config.lastHealthCheckTime).getTime() < 120000) && + const isFresh = config.lastHealthCheckTime && + (now - new Date(config.lastHealthCheckTime).getTime() < 120000) && (config.usageCount === 0); - if (isFresh) return -1e16; - + if (isFresh) return -2e18; // 极其优先 + // 3. 权重计算逻辑: - // 核心痛点:使用过一次的节点 lastUsed 变成巨大的毫秒时间戳,导致它永远比 lastUsed 为 null (0) 的节点分数高得多。 + // 改进点:使用 lastUsedTime + usageCount 惩罚 + selectionSequence 惩罚 + // selectionSequence 用于在同一毫秒内彻底打破平局 - // 改进思路: - // a) 统一量级:如果没用过,我们也给它一个相对于“现在”比较旧的时间戳,而不是 0。 - // b) 或者:使用偏移量而非绝对时间戳。 - - const lastUsedTime = config.lastUsed ? new Date(config.lastUsed).getTime() : (now - 3600000); // 没用过的视为 1 小时前用过 + const lastUsedTime = config.lastUsed ? new Date(config.lastUsed).getTime() : (now - 86400000); // 没用过的视为 24 小时前用过(更旧) const usageCount = config.usageCount || 0; - const checkScore = config.lastHealthCheckTime ? new Date(config.lastHealthCheckTime).getTime() : 0; - - // 分数计算(越小越优先): - // 使用时间戳(升序 -> 越旧越优先) + 使用次数惩罚 - // usageCount * 60000 表示每多用一次,相当于在时间排队上往后挪 1 分钟 - return lastUsedTime + (usageCount * 60000) - (checkScore / 1e9); + const lastSelectionSeq = config._lastSelectionSeq || 0; + + // 核心目标:选分最小的。 + // - lastUsedTime 越久,分越小。 + // - usageCount 越多,分越大。 + // - lastSelectionSeq 越大(最近选过),分越大。 + + // usageCount * 10000: 每多用一次,权重增加 10 秒 + // lastSelectionSeq * 1000: 即使毫秒时间相同,序列号也会让分数产生差异(增加 1 秒权重) + // 这样可以确保在毫秒级并发下,刚被选中的节点会立刻排到队列末尾 + const baseScore = lastUsedTime + (usageCount * 10000); + const sequenceScore = lastSelectionSeq * 1000; + + return baseScore + sequenceScore; } /** @@ -471,7 +479,10 @@ export class ProviderPoolManager { for (const providerType in this.providerPools) { this.providerStatus[providerType] = []; this.roundRobinIndex[providerType] = 0; // Initialize round-robin index for each type - this._selectionLocks[providerType] = Promise.resolve(); // 初始化选择锁 + // 只有在锁不存在时才初始化,避免在运行中被重置导致并发问题 + if (!this._selectionLocks[providerType]) { + this._selectionLocks[providerType] = Promise.resolve(); + } this.providerPools[providerType].forEach((providerConfig) => { // Ensure initial health and usage stats are present in the config providerConfig.isHealthy = providerConfig.isHealthy !== undefined ? providerConfig.isHealthy : true; @@ -509,32 +520,33 @@ export class ProviderPoolManager { * Currently uses a simple round-robin for healthy providers. * If requestedModel is provided, providers that don't support the model will be excluded. * - * 注意:此方法现在返回 Promise,使用链式锁确保并发安全。 + * 注意:此方法现在返回 Promise,使用互斥锁确保并发安全。 * * @param {string} providerType - The type of provider to select (e.g., 'gemini-cli', 'openai-custom'). * @param {string} [requestedModel] - Optional. The model name to filter providers by. * @returns {Promise} The selected provider's configuration, or null if no healthy provider is found. */ - selectProvider(providerType, requestedModel = null, options = {}) { + async selectProvider(providerType, requestedModel = null, options = {}) { // 参数校验 if (!providerType || typeof providerType !== 'string') { this._log('error', `Invalid providerType: ${providerType}`); - return Promise.resolve(null); + return null; + } + + // 使用标志位 + 异步等待实现更强力的互斥锁 + // 这种方式能更好地处理同一微任务循环内的并发 + while (this._isSelecting[providerType]) { + await new Promise(resolve => setImmediate(resolve)); } - - // 使用链式锁确保同一 providerType 的选择操作串行执行 - // 这样可以避免并发场景下多个请求选择到同一个 provider - const currentLock = this._selectionLocks[providerType] || Promise.resolve(); - const selectionPromise = currentLock.then(() => { + this._isSelecting[providerType] = true; + + try { + // 在锁内部执行同步选择 return this._doSelectProvider(providerType, requestedModel, options); - }); - - // 更新锁,确保下一个请求等待当前请求完成 - // 使用 catch 确保即使出错也不会阻塞后续请求 - this._selectionLocks[providerType] = selectionPromise.catch(() => {}); - - return selectionPromise; + } finally { + this._isSelecting[providerType] = false; + } } /** @@ -547,6 +559,9 @@ export class ProviderPoolManager { // 检查并恢复已到恢复时间的提供商 this._checkAndRecoverScheduledProviders(providerType); + // 获取固定时间戳,确保排序过程中一致 + const now = Date.now(); + let availableAndHealthyProviders = availableProviders.filter(p => p.config.isHealthy && !p.config.isDisabled && !p.config.needsRefresh ); @@ -577,13 +592,26 @@ export class ProviderPoolManager { } // 改进:使用统一的评分策略进行选择 + // 传入当前时间戳 now 确保一致性 const selected = availableAndHealthyProviders.sort((a, b) => { - return this._calculateNodeScore(a) - this._calculateNodeScore(b); + const scoreA = this._calculateNodeScore(a, now); + const scoreB = this._calculateNodeScore(b, now); + if (scoreA !== scoreB) return scoreA - scoreB; + // 如果分值相同,使用 UUID 排序确保确定性 + return a.uuid < b.uuid ? -1 : 1; })[0]; // 始终更新 lastUsed(确保 LRU 策略生效,避免并发请求选到同一个 provider) // usageCount 只在请求成功后才增加(由 skipUsageCount 控制) selected.config.lastUsed = new Date().toISOString(); + + // 更新自增序列号,确保即使毫秒级并发,也能在下一轮排序中被区分开 + this._selectionSequence++; + selected.config._lastSelectionSeq = this._selectionSequence; + + // 强制打印选中日志,方便排查并发问题 + this._log('info', `[Concurrency Control] Atomic selection: ${selected.config.uuid} (Seq: ${this._selectionSequence})`); + if (!options.skipUsageCount) { selected.config.usageCount++; } diff --git a/tests/concurrent-test.js b/tests/concurrent-test.js new file mode 100644 index 0000000..f515667 --- /dev/null +++ b/tests/concurrent-test.js @@ -0,0 +1,454 @@ +/** + * 并发测试脚本 + * 用于测试 API 服务器在高并发场景下的性能和稳定性 + * + * 使用方法: + * node tests/concurrent-test.js [选项] + * + * 选项: + * --url API 服务器地址 (默认: http://localhost:3000) + * --api-key API 密钥 (默认: 123456) + * --concurrency 并发数 (默认: 10) + * --requests 总请求数 (默认: 100) + * --endpoint 测试端点 (默认: /v1/chat/completions) + * --model 模型名称 (默认: gpt-4) + * --stream 使用流式响应 (默认: false) + * --timeout 请求超时时间 (默认: 60000) + * --verbose 显示详细日志 + */ + +import http from 'http'; +import https from 'https'; + +// 解析命令行参数 +function parseArgs() { + const args = process.argv.slice(2); + const config = { + url: 'http://localhost:3000', + apiKey: '123456', + concurrency: 10, + totalRequests: 100, + rpm: 0, + endpoint: '/v1/chat/completions', + model: 'gpt-4', + stream: false, + timeout: 60000, + verbose: false + }; + + for (let i = 0; i < args.length; i++) { + switch (args[i]) { + case '--url': + config.url = args[++i]; + break; + case '--api-key': + config.apiKey = args[++i]; + break; + case '--concurrency': + config.concurrency = parseInt(args[++i], 10); + break; + case '--requests': + config.totalRequests = parseInt(args[++i], 10); + break; + case '--rpm': + config.rpm = parseInt(args[++i], 10); + break; + case '--endpoint': + config.endpoint = args[++i]; + break; + case '--model': + config.model = args[++i]; + break; + case '--stream': + config.stream = true; + break; + case '--timeout': + config.timeout = parseInt(args[++i], 10); + break; + case '--verbose': + config.verbose = true; + break; + case '--help': + console.log(` +并发测试脚本 - 测试 API 服务器性能 + +使用方法: + node tests/concurrent-test.js [选项] + +选项: + --url API 服务器地址 (默认: http://localhost:3000) + --api-key API 密钥 (默认: 123456) + --concurrency 并发数 (默认: 10) + --requests 总请求数 (默认: 100) + --endpoint 测试端点 (默认: /v1/chat/completions) + --model 模型名称 (默认: gpt-4) + --stream 使用流式响应 (默认: false) + --timeout 请求超时时间 (默认: 60000) + --verbose 显示详细日志 + --help 显示帮助信息 + `); + process.exit(0); + } + } + + return config; +} + +// 统计数据 +class Statistics { + constructor() { + this.completed = 0; + this.failed = 0; + this.responseTimes = []; + this.errors = {}; + this.startTime = null; + this.endTime = null; + } + + recordSuccess(responseTime) { + this.completed++; + this.responseTimes.push(responseTime); + } + + recordFailure(error) { + this.failed++; + const errorKey = error.message || String(error); + this.errors[errorKey] = (this.errors[errorKey] || 0) + 1; + } + + start() { + this.startTime = Date.now(); + } + + end() { + this.endTime = Date.now(); + } + + getReport() { + const totalTime = this.endTime - this.startTime; + const sortedTimes = [...this.responseTimes].sort((a, b) => a - b); + + const percentile = (p) => { + if (sortedTimes.length === 0) return 0; + const index = Math.ceil((p / 100) * sortedTimes.length) - 1; + return sortedTimes[Math.max(0, index)]; + }; + + const avg = sortedTimes.length > 0 + ? sortedTimes.reduce((a, b) => a + b, 0) / sortedTimes.length + : 0; + + return { + totalRequests: this.completed + this.failed, + completed: this.completed, + failed: this.failed, + successRate: ((this.completed / (this.completed + this.failed)) * 100).toFixed(2) + '%', + totalTime: totalTime, + requestsPerSecond: ((this.completed + this.failed) / (totalTime / 1000)).toFixed(2), + responseTime: { + min: sortedTimes.length > 0 ? sortedTimes[0] : 0, + max: sortedTimes.length > 0 ? sortedTimes[sortedTimes.length - 1] : 0, + avg: avg.toFixed(2), + p50: percentile(50), + p90: percentile(90), + p95: percentile(95), + p99: percentile(99) + }, + errors: this.errors + }; + } +} + +// 创建测试请求体 +function createRequestBody(config, requestId) { + // OpenAI Chat Completions 格式 + if (config.endpoint.includes('/chat/completions')) { + return JSON.stringify({ + model: config.model, + messages: [ + { + role: 'user', + content: `这是并发测试请求 #${requestId}。请简短回复"收到"。` + } + ], + stream: config.stream, + max_tokens: 50 + }); + } + + // OpenAI Responses 格式 + if (config.endpoint.includes('/responses')) { + return JSON.stringify({ + model: config.model, + input: `这是并发测试请求 #${requestId}。请简短回复"收到"。`, + stream: config.stream + }); + } + + // Claude Messages 格式 + if (config.endpoint.includes('/messages')) { + return JSON.stringify({ + model: config.model, + messages: [ + { + role: 'user', + content: `这是并发测试请求 #${requestId}。请简短回复"收到"。` + } + ], + stream: config.stream, + max_tokens: 50 + }); + } + + // 默认格式 + return JSON.stringify({ + model: config.model, + messages: [ + { + role: 'user', + content: `这是并发测试请求 #${requestId}。请简短回复"收到"。` + } + ], + stream: config.stream, + max_tokens: 50 + }); +} + +// 发送单个请求 +function sendRequest(config, requestId) { + return new Promise((resolve, reject) => { + const startTime = Date.now(); + const url = new URL(config.endpoint, config.url); + const isHttps = url.protocol === 'https:'; + const client = isHttps ? https : http; + + const requestBody = createRequestBody(config, requestId); + + const options = { + hostname: url.hostname, + port: url.port || (isHttps ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(requestBody), + 'Authorization': `Bearer ${config.apiKey}` + }, + timeout: config.timeout + }; + + const req = client.request(options, (res) => { + let data = ''; + + res.on('data', (chunk) => { + data += chunk; + }); + + res.on('end', () => { + const responseTime = Date.now() - startTime; + + if (res.statusCode >= 200 && res.statusCode < 300) { + resolve({ + success: true, + requestId, + statusCode: res.statusCode, + responseTime, + dataLength: data.length + }); + } else { + reject({ + success: false, + requestId, + statusCode: res.statusCode, + responseTime, + error: `HTTP ${res.statusCode}: ${data.substring(0, 200)}` + }); + } + }); + }); + + req.on('error', (error) => { + const responseTime = Date.now() - startTime; + reject({ + success: false, + requestId, + responseTime, + error: error.code === 'ECONNREFUSED' + ? `连接被拒绝 (${url.hostname}:${url.port || (isHttps ? 443 : 80)})` + : (error.message || error.code || 'Unknown error') + }); + }); + + req.on('timeout', () => { + req.destroy(); + const responseTime = Date.now() - startTime; + reject({ + success: false, + requestId, + responseTime, + error: '请求超时' + }); + }); + + req.write(requestBody); + req.end(); + }); +} + +// 并发控制器 +class ConcurrencyController { + constructor(concurrency) { + this.concurrency = concurrency; + this.running = 0; + this.queue = []; + } + + async run(task) { + return new Promise((resolve, reject) => { + this.queue.push({ task, resolve, reject }); + this.processQueue(); + }); + } + + async processQueue() { + while (this.running < this.concurrency && this.queue.length > 0) { + const { task, resolve, reject } = this.queue.shift(); + this.running++; + + task() + .then(resolve) + .catch(reject) + .finally(() => { + this.running--; + this.processQueue(); + }); + } + } +} + +// 进度条显示 +function showProgress(current, total, stats) { + const percentage = ((current / total) * 100).toFixed(1); + const barLength = 30; + const filled = Math.round((current / total) * barLength); + const bar = '█'.repeat(filled) + '░'.repeat(barLength - filled); + + process.stdout.write(`\r[${bar}] ${percentage}% (${current}/${total}) | 成功: ${stats.completed} | 失败: ${stats.failed}`); +} + +// 主函数 +async function main() { + const config = parseArgs(); + + console.log('╔════════════════════════════════════════════════════════════╗'); + console.log('║ API 并发测试脚本 ║'); + console.log('╠════════════════════════════════════════════════════════════╣'); + console.log(`║ 目标地址: ${config.url.padEnd(47)}║`); + console.log(`║ 测试端点: ${config.endpoint.padEnd(47)}║`); + console.log(`║ 并发数量: ${String(config.concurrency).padEnd(47)}║`); + console.log(`║ 总请求数: ${String(config.totalRequests).padEnd(47)}║`); + console.log(`║ 模型名称: ${config.model.padEnd(47)}║`); + console.log(`║ 流式响应: ${String(config.stream).padEnd(47)}║`); + console.log(`║ 超时时间: ${(config.timeout + 'ms').padEnd(47)}║`); + console.log('╚════════════════════════════════════════════════════════════╝'); + console.log(''); + + const stats = new Statistics(); + const controller = new ConcurrencyController(config.concurrency); + + console.log('开始测试...\n'); + stats.start(); + + const tasks = []; + for (let i = 1; i <= config.totalRequests; i++) { + const requestId = i; + + // 如果设置了 RPM,计算延迟时间 + if (config.rpm > 0) { + const delay = (60000 / config.rpm) * (i - 1); + tasks.push( + new Promise(resolve => setTimeout(resolve, delay)) + .then(() => controller.run(() => sendRequest(config, requestId))) + .then((result) => { + stats.recordSuccess(result.responseTime); + if (config.verbose) { + console.log(`\n[成功] 请求 #${result.requestId} - ${result.responseTime}ms - ${result.dataLength} bytes`); + } + }) + .catch((result) => { + stats.recordFailure(new Error(result.error)); + if (config.verbose) { + console.log(`\n[失败] 请求 #${result.requestId} - ${result.error}`); + } + }) + .finally(() => { + showProgress(stats.completed + stats.failed, config.totalRequests, stats); + }) + ); + } else { + tasks.push( + controller.run(() => sendRequest(config, requestId)) + .then((result) => { + stats.recordSuccess(result.responseTime); + if (config.verbose) { + console.log(`\n[成功] 请求 #${result.requestId} - ${result.responseTime}ms - ${result.dataLength} bytes`); + } + }) + .catch((result) => { + stats.recordFailure(new Error(result.error)); + if (config.verbose) { + console.log(`\n[失败] 请求 #${result.requestId} - ${result.error}`); + } + }) + .finally(() => { + showProgress(stats.completed + stats.failed, config.totalRequests, stats); + }) + ); + } + } + + await Promise.all(tasks); + stats.end(); + + console.log('\n\n'); + console.log('╔════════════════════════════════════════════════════════════╗'); + console.log('║ 测试结果报告 ║'); + console.log('╚════════════════════════════════════════════════════════════╝'); + + const report = stats.getReport(); + + console.log('\n📊 总体统计:'); + console.log(` 总请求数: ${report.totalRequests}`); + console.log(` 成功请求: ${report.completed}`); + console.log(` 失败请求: ${report.failed}`); + console.log(` 成功率: ${report.successRate}`); + console.log(` 总耗时: ${report.totalTime}ms`); + console.log(` 吞吐量: ${report.requestsPerSecond} req/s`); + + console.log('\n⏱️ 响应时间统计 (ms):'); + console.log(` 最小值: ${report.responseTime.min}`); + console.log(` 最大值: ${report.responseTime.max}`); + console.log(` 平均值: ${report.responseTime.avg}`); + console.log(` P50: ${report.responseTime.p50}`); + console.log(` P90: ${report.responseTime.p90}`); + console.log(` P95: ${report.responseTime.p95}`); + console.log(` P99: ${report.responseTime.p99}`); + + if (Object.keys(report.errors).length > 0) { + console.log('\n❌ 错误统计:'); + for (const [error, count] of Object.entries(report.errors)) { + console.log(` ${error}: ${count}次`); + } + } + + console.log('\n════════════════════════════════════════════════════════════════'); + + // 返回退出码 + process.exit(report.failed > 0 ? 1 : 0); +} + +// 运行主函数 +main().catch((error) => { + console.error('测试脚本执行失败:', error); + process.exit(1); +});