fix(provider-pool): 修复并发选点时的竞争条件并改进评分算法

- 将链式 Promise 锁改为标志位锁,解决同一微任务循环内的并发问题
- 引入自增序列号确保毫秒级并发下的原子排序,避免节点重复选择
- 优化节点评分算法,平衡 lastUsedTime、usageCount 和 selectionSeq
- 增加并发测试脚本,支持压力测试和性能统计
This commit is contained in:
hex2077 2026-01-24 16:30:57 +08:00
parent 33d867fa73
commit 13ed2087d2
2 changed files with 519 additions and 37 deletions

View file

@ -50,6 +50,7 @@ export class ProviderPoolManager {
// 并发控制:每个 providerType 的选择锁
// 用于确保 selectProvider 的排序 and 更新操作是原子的
this._selectionLocks = {};
this._isSelecting = {}; // 同步标志位锁
// --- V2: 读写分离 and 异步刷新队列 ---
// 刷新并发控制配置
@ -69,7 +70,10 @@ export class ProviderPoolManager {
this.refreshBufferQueues = {}; // 按 providerType 分组的缓冲队列
this.refreshBufferTimers = {}; // 按 providerType 分组的定时器
this.bufferDelay = options.globalConfig?.REFRESH_BUFFER_DELAY ?? 5000; // 默认5秒缓冲延迟
// 用于并发选点时的原子排序辅助(自增序列)
this._selectionSequence = 0;
this.initializeProviderStatus();
}
@ -401,34 +405,38 @@ export class ProviderPoolManager {
* 分数越低优先级越高
* @private
*/
_calculateNodeScore(providerStatus) {
_calculateNodeScore(providerStatus, now = Date.now()) {
const config = providerStatus.config;
const now = Date.now();
// 1. 基础健康分:不健康的排最后
if (!config.isHealthy || config.isDisabled) return 1e16;
if (!config.isHealthy || config.isDisabled) return 1e18;
// 2. 预热/刷新分2分钟内刷新过且使用次数极少的节点视为“新鲜”分数极低最高优
const isFresh = config.lastHealthCheckTime &&
(now - new Date(config.lastHealthCheckTime).getTime() < 120000) &&
const isFresh = config.lastHealthCheckTime &&
(now - new Date(config.lastHealthCheckTime).getTime() < 120000) &&
(config.usageCount === 0);
if (isFresh) return -1e16;
if (isFresh) return -2e18; // 极其优先
// 3. 权重计算逻辑:
// 核心痛点:使用过一次的节点 lastUsed 变成巨大的毫秒时间戳,导致它永远比 lastUsed 为 null (0) 的节点分数高得多。
// 改进点:使用 lastUsedTime + usageCount 惩罚 + selectionSequence 惩罚
// selectionSequence 用于在同一毫秒内彻底打破平局
// 改进思路:
// a) 统一量级:如果没用过,我们也给它一个相对于“现在”比较旧的时间戳,而不是 0。
// b) 或者:使用偏移量而非绝对时间戳。
const lastUsedTime = config.lastUsed ? new Date(config.lastUsed).getTime() : (now - 3600000); // 没用过的视为 1 小时前用过
const lastUsedTime = config.lastUsed ? new Date(config.lastUsed).getTime() : (now - 86400000); // 没用过的视为 24 小时前用过(更旧)
const usageCount = config.usageCount || 0;
const checkScore = config.lastHealthCheckTime ? new Date(config.lastHealthCheckTime).getTime() : 0;
// 分数计算(越小越优先):
// 使用时间戳(升序 -> 越旧越优先) + 使用次数惩罚
// usageCount * 60000 表示每多用一次,相当于在时间排队上往后挪 1 分钟
return lastUsedTime + (usageCount * 60000) - (checkScore / 1e9);
const lastSelectionSeq = config._lastSelectionSeq || 0;
// 核心目标:选分最小的。
// - lastUsedTime 越久,分越小。
// - usageCount 越多,分越大。
// - lastSelectionSeq 越大(最近选过),分越大。
// usageCount * 10000: 每多用一次,权重增加 10 秒
// lastSelectionSeq * 1000: 即使毫秒时间相同,序列号也会让分数产生差异(增加 1 秒权重)
// 这样可以确保在毫秒级并发下,刚被选中的节点会立刻排到队列末尾
const baseScore = lastUsedTime + (usageCount * 10000);
const sequenceScore = lastSelectionSeq * 1000;
return baseScore + sequenceScore;
}
/**
@ -471,7 +479,10 @@ export class ProviderPoolManager {
for (const providerType in this.providerPools) {
this.providerStatus[providerType] = [];
this.roundRobinIndex[providerType] = 0; // Initialize round-robin index for each type
this._selectionLocks[providerType] = Promise.resolve(); // 初始化选择锁
// 只有在锁不存在时才初始化,避免在运行中被重置导致并发问题
if (!this._selectionLocks[providerType]) {
this._selectionLocks[providerType] = Promise.resolve();
}
this.providerPools[providerType].forEach((providerConfig) => {
// Ensure initial health and usage stats are present in the config
providerConfig.isHealthy = providerConfig.isHealthy !== undefined ? providerConfig.isHealthy : true;
@ -509,32 +520,33 @@ export class ProviderPoolManager {
* Currently uses a simple round-robin for healthy providers.
* If requestedModel is provided, providers that don't support the model will be excluded.
*
* 注意此方法现在返回 Promise使用链式锁确保并发安全
* 注意此方法现在返回 Promise使用互斥锁确保并发安全
*
* @param {string} providerType - The type of provider to select (e.g., 'gemini-cli', 'openai-custom').
* @param {string} [requestedModel] - Optional. The model name to filter providers by.
* @returns {Promise<object|null>} The selected provider's configuration, or null if no healthy provider is found.
*/
selectProvider(providerType, requestedModel = null, options = {}) {
async selectProvider(providerType, requestedModel = null, options = {}) {
// 参数校验
if (!providerType || typeof providerType !== 'string') {
this._log('error', `Invalid providerType: ${providerType}`);
return Promise.resolve(null);
return null;
}
// 使用标志位 + 异步等待实现更强力的互斥锁
// 这种方式能更好地处理同一微任务循环内的并发
while (this._isSelecting[providerType]) {
await new Promise(resolve => setImmediate(resolve));
}
// 使用链式锁确保同一 providerType 的选择操作串行执行
// 这样可以避免并发场景下多个请求选择到同一个 provider
const currentLock = this._selectionLocks[providerType] || Promise.resolve();
const selectionPromise = currentLock.then(() => {
this._isSelecting[providerType] = true;
try {
// 在锁内部执行同步选择
return this._doSelectProvider(providerType, requestedModel, options);
});
// 更新锁,确保下一个请求等待当前请求完成
// 使用 catch 确保即使出错也不会阻塞后续请求
this._selectionLocks[providerType] = selectionPromise.catch(() => {});
return selectionPromise;
} finally {
this._isSelecting[providerType] = false;
}
}
/**
@ -547,6 +559,9 @@ export class ProviderPoolManager {
// 检查并恢复已到恢复时间的提供商
this._checkAndRecoverScheduledProviders(providerType);
// 获取固定时间戳,确保排序过程中一致
const now = Date.now();
let availableAndHealthyProviders = availableProviders.filter(p =>
p.config.isHealthy && !p.config.isDisabled && !p.config.needsRefresh
);
@ -577,13 +592,26 @@ export class ProviderPoolManager {
}
// 改进:使用统一的评分策略进行选择
// 传入当前时间戳 now 确保一致性
const selected = availableAndHealthyProviders.sort((a, b) => {
return this._calculateNodeScore(a) - this._calculateNodeScore(b);
const scoreA = this._calculateNodeScore(a, now);
const scoreB = this._calculateNodeScore(b, now);
if (scoreA !== scoreB) return scoreA - scoreB;
// 如果分值相同,使用 UUID 排序确保确定性
return a.uuid < b.uuid ? -1 : 1;
})[0];
// 始终更新 lastUsed确保 LRU 策略生效,避免并发请求选到同一个 provider
// usageCount 只在请求成功后才增加(由 skipUsageCount 控制)
selected.config.lastUsed = new Date().toISOString();
// 更新自增序列号,确保即使毫秒级并发,也能在下一轮排序中被区分开
this._selectionSequence++;
selected.config._lastSelectionSeq = this._selectionSequence;
// 强制打印选中日志,方便排查并发问题
this._log('info', `[Concurrency Control] Atomic selection: ${selected.config.uuid} (Seq: ${this._selectionSequence})`);
if (!options.skipUsageCount) {
selected.config.usageCount++;
}

454
tests/concurrent-test.js Normal file
View file

@ -0,0 +1,454 @@
/**
* 并发测试脚本
* 用于测试 API 服务器在高并发场景下的性能和稳定性
*
* 使用方法:
* node tests/concurrent-test.js [选项]
*
* 选项:
* --url <url> API 服务器地址 (默认: http://localhost:3000)
* --api-key <key> API 密钥 (默认: 123456)
* --concurrency <n> 并发数 (默认: 10)
* --requests <n> 总请求数 (默认: 100)
* --endpoint <path> 测试端点 (默认: /v1/chat/completions)
* --model <model> 模型名称 (默认: gpt-4)
* --stream 使用流式响应 (默认: false)
* --timeout <ms> 请求超时时间 (默认: 60000)
* --verbose 显示详细日志
*/
import http from 'http';
import https from 'https';
// 解析命令行参数
function parseArgs() {
const args = process.argv.slice(2);
const config = {
url: 'http://localhost:3000',
apiKey: '123456',
concurrency: 10,
totalRequests: 100,
rpm: 0,
endpoint: '/v1/chat/completions',
model: 'gpt-4',
stream: false,
timeout: 60000,
verbose: false
};
for (let i = 0; i < args.length; i++) {
switch (args[i]) {
case '--url':
config.url = args[++i];
break;
case '--api-key':
config.apiKey = args[++i];
break;
case '--concurrency':
config.concurrency = parseInt(args[++i], 10);
break;
case '--requests':
config.totalRequests = parseInt(args[++i], 10);
break;
case '--rpm':
config.rpm = parseInt(args[++i], 10);
break;
case '--endpoint':
config.endpoint = args[++i];
break;
case '--model':
config.model = args[++i];
break;
case '--stream':
config.stream = true;
break;
case '--timeout':
config.timeout = parseInt(args[++i], 10);
break;
case '--verbose':
config.verbose = true;
break;
case '--help':
console.log(`
并发测试脚本 - 测试 API 服务器性能
使用方法:
node tests/concurrent-test.js [选项]
选项:
--url <url> API 服务器地址 (默认: http://localhost:3000)
--api-key <key> API 密钥 (默认: 123456)
--concurrency <n> 并发数 (默认: 10)
--requests <n> 总请求数 (默认: 100)
--endpoint <path> 测试端点 (默认: /v1/chat/completions)
--model <model> 模型名称 (默认: gpt-4)
--stream 使用流式响应 (默认: false)
--timeout <ms> 请求超时时间 (默认: 60000)
--verbose 显示详细日志
--help 显示帮助信息
`);
process.exit(0);
}
}
return config;
}
// 统计数据
class Statistics {
constructor() {
this.completed = 0;
this.failed = 0;
this.responseTimes = [];
this.errors = {};
this.startTime = null;
this.endTime = null;
}
recordSuccess(responseTime) {
this.completed++;
this.responseTimes.push(responseTime);
}
recordFailure(error) {
this.failed++;
const errorKey = error.message || String(error);
this.errors[errorKey] = (this.errors[errorKey] || 0) + 1;
}
start() {
this.startTime = Date.now();
}
end() {
this.endTime = Date.now();
}
getReport() {
const totalTime = this.endTime - this.startTime;
const sortedTimes = [...this.responseTimes].sort((a, b) => a - b);
const percentile = (p) => {
if (sortedTimes.length === 0) return 0;
const index = Math.ceil((p / 100) * sortedTimes.length) - 1;
return sortedTimes[Math.max(0, index)];
};
const avg = sortedTimes.length > 0
? sortedTimes.reduce((a, b) => a + b, 0) / sortedTimes.length
: 0;
return {
totalRequests: this.completed + this.failed,
completed: this.completed,
failed: this.failed,
successRate: ((this.completed / (this.completed + this.failed)) * 100).toFixed(2) + '%',
totalTime: totalTime,
requestsPerSecond: ((this.completed + this.failed) / (totalTime / 1000)).toFixed(2),
responseTime: {
min: sortedTimes.length > 0 ? sortedTimes[0] : 0,
max: sortedTimes.length > 0 ? sortedTimes[sortedTimes.length - 1] : 0,
avg: avg.toFixed(2),
p50: percentile(50),
p90: percentile(90),
p95: percentile(95),
p99: percentile(99)
},
errors: this.errors
};
}
}
// 创建测试请求体
function createRequestBody(config, requestId) {
// OpenAI Chat Completions 格式
if (config.endpoint.includes('/chat/completions')) {
return JSON.stringify({
model: config.model,
messages: [
{
role: 'user',
content: `这是并发测试请求 #${requestId}。请简短回复"收到"。`
}
],
stream: config.stream,
max_tokens: 50
});
}
// OpenAI Responses 格式
if (config.endpoint.includes('/responses')) {
return JSON.stringify({
model: config.model,
input: `这是并发测试请求 #${requestId}。请简短回复"收到"。`,
stream: config.stream
});
}
// Claude Messages 格式
if (config.endpoint.includes('/messages')) {
return JSON.stringify({
model: config.model,
messages: [
{
role: 'user',
content: `这是并发测试请求 #${requestId}。请简短回复"收到"。`
}
],
stream: config.stream,
max_tokens: 50
});
}
// 默认格式
return JSON.stringify({
model: config.model,
messages: [
{
role: 'user',
content: `这是并发测试请求 #${requestId}。请简短回复"收到"。`
}
],
stream: config.stream,
max_tokens: 50
});
}
// 发送单个请求
function sendRequest(config, requestId) {
return new Promise((resolve, reject) => {
const startTime = Date.now();
const url = new URL(config.endpoint, config.url);
const isHttps = url.protocol === 'https:';
const client = isHttps ? https : http;
const requestBody = createRequestBody(config, requestId);
const options = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(requestBody),
'Authorization': `Bearer ${config.apiKey}`
},
timeout: config.timeout
};
const req = client.request(options, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
const responseTime = Date.now() - startTime;
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve({
success: true,
requestId,
statusCode: res.statusCode,
responseTime,
dataLength: data.length
});
} else {
reject({
success: false,
requestId,
statusCode: res.statusCode,
responseTime,
error: `HTTP ${res.statusCode}: ${data.substring(0, 200)}`
});
}
});
});
req.on('error', (error) => {
const responseTime = Date.now() - startTime;
reject({
success: false,
requestId,
responseTime,
error: error.code === 'ECONNREFUSED'
? `连接被拒绝 (${url.hostname}:${url.port || (isHttps ? 443 : 80)})`
: (error.message || error.code || 'Unknown error')
});
});
req.on('timeout', () => {
req.destroy();
const responseTime = Date.now() - startTime;
reject({
success: false,
requestId,
responseTime,
error: '请求超时'
});
});
req.write(requestBody);
req.end();
});
}
// 并发控制器
class ConcurrencyController {
constructor(concurrency) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
async run(task) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.processQueue();
});
}
async processQueue() {
while (this.running < this.concurrency && this.queue.length > 0) {
const { task, resolve, reject } = this.queue.shift();
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.processQueue();
});
}
}
}
// 进度条显示
function showProgress(current, total, stats) {
const percentage = ((current / total) * 100).toFixed(1);
const barLength = 30;
const filled = Math.round((current / total) * barLength);
const bar = '█'.repeat(filled) + '░'.repeat(barLength - filled);
process.stdout.write(`\r[${bar}] ${percentage}% (${current}/${total}) | 成功: ${stats.completed} | 失败: ${stats.failed}`);
}
// 主函数
async function main() {
const config = parseArgs();
console.log('╔════════════════════════════════════════════════════════════╗');
console.log('║ API 并发测试脚本 ║');
console.log('╠════════════════════════════════════════════════════════════╣');
console.log(`║ 目标地址: ${config.url.padEnd(47)}`);
console.log(`║ 测试端点: ${config.endpoint.padEnd(47)}`);
console.log(`║ 并发数量: ${String(config.concurrency).padEnd(47)}`);
console.log(`║ 总请求数: ${String(config.totalRequests).padEnd(47)}`);
console.log(`║ 模型名称: ${config.model.padEnd(47)}`);
console.log(`║ 流式响应: ${String(config.stream).padEnd(47)}`);
console.log(`║ 超时时间: ${(config.timeout + 'ms').padEnd(47)}`);
console.log('╚════════════════════════════════════════════════════════════╝');
console.log('');
const stats = new Statistics();
const controller = new ConcurrencyController(config.concurrency);
console.log('开始测试...\n');
stats.start();
const tasks = [];
for (let i = 1; i <= config.totalRequests; i++) {
const requestId = i;
// 如果设置了 RPM计算延迟时间
if (config.rpm > 0) {
const delay = (60000 / config.rpm) * (i - 1);
tasks.push(
new Promise(resolve => setTimeout(resolve, delay))
.then(() => controller.run(() => sendRequest(config, requestId)))
.then((result) => {
stats.recordSuccess(result.responseTime);
if (config.verbose) {
console.log(`\n[成功] 请求 #${result.requestId} - ${result.responseTime}ms - ${result.dataLength} bytes`);
}
})
.catch((result) => {
stats.recordFailure(new Error(result.error));
if (config.verbose) {
console.log(`\n[失败] 请求 #${result.requestId} - ${result.error}`);
}
})
.finally(() => {
showProgress(stats.completed + stats.failed, config.totalRequests, stats);
})
);
} else {
tasks.push(
controller.run(() => sendRequest(config, requestId))
.then((result) => {
stats.recordSuccess(result.responseTime);
if (config.verbose) {
console.log(`\n[成功] 请求 #${result.requestId} - ${result.responseTime}ms - ${result.dataLength} bytes`);
}
})
.catch((result) => {
stats.recordFailure(new Error(result.error));
if (config.verbose) {
console.log(`\n[失败] 请求 #${result.requestId} - ${result.error}`);
}
})
.finally(() => {
showProgress(stats.completed + stats.failed, config.totalRequests, stats);
})
);
}
}
await Promise.all(tasks);
stats.end();
console.log('\n\n');
console.log('╔════════════════════════════════════════════════════════════╗');
console.log('║ 测试结果报告 ║');
console.log('╚════════════════════════════════════════════════════════════╝');
const report = stats.getReport();
console.log('\n📊 总体统计:');
console.log(` 总请求数: ${report.totalRequests}`);
console.log(` 成功请求: ${report.completed}`);
console.log(` 失败请求: ${report.failed}`);
console.log(` 成功率: ${report.successRate}`);
console.log(` 总耗时: ${report.totalTime}ms`);
console.log(` 吞吐量: ${report.requestsPerSecond} req/s`);
console.log('\n⏱ 响应时间统计 (ms):');
console.log(` 最小值: ${report.responseTime.min}`);
console.log(` 最大值: ${report.responseTime.max}`);
console.log(` 平均值: ${report.responseTime.avg}`);
console.log(` P50: ${report.responseTime.p50}`);
console.log(` P90: ${report.responseTime.p90}`);
console.log(` P95: ${report.responseTime.p95}`);
console.log(` P99: ${report.responseTime.p99}`);
if (Object.keys(report.errors).length > 0) {
console.log('\n❌ 错误统计:');
for (const [error, count] of Object.entries(report.errors)) {
console.log(` ${error}: ${count}`);
}
}
console.log('\n════════════════════════════════════════════════════════════════');
// 返回退出码
process.exit(report.failed > 0 ? 1 : 0);
}
// 运行主函数
main().catch((error) => {
console.error('测试脚本执行失败:', error);
process.exit(1);
});