feat: 新增提供商账号池模式支持

实现账号池功能,支持为所有提供商配置多个账号,提供轮询、故障转移和配置降级能力
修改适配器和服务处理逻辑以支持账号池管理
添加 ProviderPoolManager 类管理账号池健康状态和选择策略
更新文档说明账号池配置和使用方法
This commit is contained in:
hex2077 2025-08-29 17:00:18 +08:00
parent 1223c1318e
commit bdcb4320f4
8 changed files with 416 additions and 72 deletions

View file

@ -20,7 +20,10 @@
</div>
> `AIClient2API` is a versatile and lightweight API proxy designed for ultimate flexibility and ease of use. It transforms various backend APIs, such as Google Gemini CLI OAuth, OpenAI, Claude, and Kiro, into a standard OpenAI format interface via a Node.js HTTP server. The project features a modern, modular architecture, supporting strategy and adapter patterns, complete with comprehensive test coverage and health check mechanisms. It's ready to use out-of-the-box: simply `npm install` and run. You can easily switch between model providers in the configuration file, allowing any OpenAI-compatible client or application to seamlessly use different large model capabilities through the same API address, eliminating the hassle of maintaining multiple configurations and dealing with incompatible interfaces for different services.
`AIClient2API` is a versatile and lightweight API proxy designed for ultimate flexibility and ease of use. It transforms various backend APIs, such as Google Gemini CLI OAuth, OpenAI, Claude, and Kiro, into a standard OpenAI format interface via a Node.js HTTP server. The project features a modern, modular architecture, supporting strategy and adapter patterns, complete with comprehensive test coverage and health check mechanisms. It's ready to use out-of-the-box: simply `npm install` and run. You can easily switch between model providers in the configuration file, allowing any OpenAI-compatible client or application to seamlessly use different large model capabilities through the same API address, eliminating the hassle of maintaining multiple configurations and dealing with incompatible interfaces for different services.
> [!NOTE]
> Added account pool mode, which supports multiple accounts for all providers, with built-in polling, failover (requires client retry), and configuration degradation. Requires adding PROVIDER_POOLS_FILE_PATH to config, see the configuration file: provider_pools.json for details.
---

View file

@ -25,6 +25,8 @@
> [!NOTE]
> 感谢阮一峰老师在[周刊359期](https://www.ruanyifeng.com/blog/2025/08/weekly-issue-359.html)的推荐。
>
> 新增账号池模式可支持所有provider配置多个账号自带轮询故障转移需要客户端重试和配置降级。需要在 config 新增配置 PROVIDER_POOLS_FILE_PATH 详见配置文件provider_pools.json
---

View file

@ -3,7 +3,7 @@
* 用于检查API服务器是否正常运行
*/
const http = require('http');
import http from 'http';
// 从环境变量获取主机和端口,如果没有设置则使用默认值
const HOST = process.env.HOST || 'localhost';

48
provider_pools.json Normal file
View file

@ -0,0 +1,48 @@
{
"openai-custom": [
{
"OPENAI_API_KEY": "sk-openai-key1",
"OPENAI_BASE_URL": "https://api.openai.com/v1",
"uuid": "2f579c65-d3c5-41b1-9985-9f6e3d7bf39c"
},
{
"OPENAI_API_KEY": "sk-openai-key2",
"OPENAI_BASE_URL": "https://api.openai.com/v1",
"uuid": "e284628d-302f-456d-91f3-6095386fb3b8"
}
],
"gemini-cli": [
{
"GEMINI_OAUTH_CREDS_FILE_PATH": "./credentials1.json",
"PROJECT_ID": "your-project-id-1",
"uuid": "ac200154-26b8-4f5f-8650-e8cc738b06e3"
},
{
"GEMINI_OAUTH_CREDS_FILE_PATH": "./credentials2.json",
"PROJECT_ID": "your-project-id-2",
"uuid": "4f8afcc2-a9bb-4b96-bb50-3b9667a71f54"
}
],
"claude-custom": [
{
"CLAUDE_API_KEY": "sk-claude-key1",
"CLAUDE_BASE_URL": "https://api.anthropic.com",
"uuid": "bb87047a-3b1d-4249-adbb-1087ecd58128"
},
{
"CLAUDE_API_KEY": "sk-claude-key2",
"CLAUDE_BASE_URL": "https://api.anthropic.com",
"uuid": "7c2002c6-122a-4db0-af06-8a0ff433801a"
}
],
"kiro-api": [
{
"KIRO_OAUTH_CREDS_FILE_PATH": "./kiro_creds1.json",
"uuid": "2c69d0ac-b86f-43d8-9d17-0d300afc5cfd"
},
{
"KIRO_OAUTH_CREDS_FILE_PATH": "./kiro_creds2.json",
"uuid": "7482abe6-8083-4288-bb7d-d8ecb7c461e2"
}
]
}

View file

@ -55,9 +55,9 @@ export class GeminiApiServiceAdapter extends ApiServiceAdapter {
constructor(config) {
super();
this.geminiApiService = new GeminiApiService(config);
this.geminiApiService.initialize().catch(error => {
console.error("Failed to initialize geminiApiService:", error);
});
// this.geminiApiService.initialize().catch(error => {
// console.error("Failed to initialize geminiApiService:", error);
// });
}
async generateContent(model, requestBody) {
@ -158,24 +158,36 @@ export class KiroApiServiceAdapter extends ApiServiceAdapter {
constructor(config) {
super();
this.kiroApiService = new KiroApiService(config);
this.kiroApiService.initialize().catch(error => {
console.error("Failed to initialize kiroApiService:", error);
});
// this.kiroApiService.initialize().catch(error => {
// console.error("Failed to initialize kiroApiService:", error);
// });
}
async generateContent(model, requestBody) {
// The adapter expects the requestBody to be in OpenAI format for Kiro API
if (!this.kiroApiService.isInitialized) {
console.warn("kiroApiService not initialized, attempting to re-initialize...");
await this.kiroApiService.initialize();
}
return this.kiroApiService.generateContent(model, requestBody);
}
async *generateContentStream(model, requestBody) {
// The adapter expects the requestBody to be in OpenAI format for Kiro API
if (!this.kiroApiService.isInitialized) {
console.warn("kiroApiService not initialized, attempting to re-initialize...");
await this.kiroApiService.initialize();
}
const stream = this.kiroApiService.generateContentStream(model, requestBody);
yield* stream;
}
async listModels() {
// Returns the native model list from the Kiro service
if (!this.kiroApiService.isInitialized) {
console.warn("kiroApiService not initialized, attempting to re-initialize...");
await this.kiroApiService.initialize();
}
return this.kiroApiService.listModels();
}
@ -193,24 +205,26 @@ export const serviceInstances = {};
// 服务适配器工厂
export function getServiceAdapter(config) {
console.log(`[Adapter] getServiceAdapter, provider: ${config.MODEL_PROVIDER}, uuid: ${config.uuid}`);
const provider = config.MODEL_PROVIDER;
if (!serviceInstances[provider]) {
const providerKey = config.uuid ? provider + config.uuid : provider;
if (!serviceInstances[providerKey]) {
switch (provider) {
case MODEL_PROVIDER.OPENAI_CUSTOM:
serviceInstances[provider] = new OpenAIApiServiceAdapter(config);
serviceInstances[providerKey] = new OpenAIApiServiceAdapter(config);
break;
case MODEL_PROVIDER.GEMINI_CLI:
serviceInstances[provider] = new GeminiApiServiceAdapter(config);
serviceInstances[providerKey] = new GeminiApiServiceAdapter(config);
break;
case MODEL_PROVIDER.CLAUDE_CUSTOM:
serviceInstances[provider] = new ClaudeApiServiceAdapter(config);
serviceInstances[providerKey] = new ClaudeApiServiceAdapter(config);
break;
case MODEL_PROVIDER.KIRO_API:
serviceInstances[provider] = new KiroApiServiceAdapter(config);
serviceInstances[providerKey] = new KiroApiServiceAdapter(config);
break;
default:
throw new Error(`Unsupported model provider: ${provider}`);
}
}
return serviceInstances[provider];
return serviceInstances[providerKey];
}

View file

@ -98,6 +98,7 @@
* --request-base-delay <number> 自动重试之间的基础延迟时间毫秒每次重试后延迟会增加 / Base delay in milliseconds between retries, increases with each retry (default: 1000)
* --cron-near-minutes <number> OAuth 令牌刷新任务计划的间隔时间分钟 / Interval for OAuth token refresh task in minutes (default: 15)
* --cron-refresh-token <boolean> 是否开启 OAuth 令牌自动刷新任务 / Whether to enable automatic OAuth token refresh task (default: true)
* --provider-pools-file <path> 提供商号池配置文件路径 / Path to provider pools configuration file (default: null)
*
*/
@ -109,7 +110,8 @@ import { promises as pfs } from 'fs';
import 'dotenv/config'; // Import dotenv and configure it
import deepmerge from 'deepmerge';
import { getServiceAdapter, serviceInstances} from './adapter.js';
import { getServiceAdapter, serviceInstances } from './adapter.js';
import { ProviderPoolManager } from './provider-pool-manager.js';
import {
INPUT_SYSTEM_PROMPT_FILE,
API_ACTIONS,
@ -161,7 +163,8 @@ async function initializeConfig(args = process.argv.slice(2), configFilePath = '
REQUEST_MAX_RETRIES: 3,
REQUEST_BASE_DELAY: 1000,
CRON_NEAR_MINUTES: 15,
CRON_REFRESH_TOKEN: true
CRON_REFRESH_TOKEN: true,
PROVIDER_POOLS_FILE_PATH: null // 新增号池配置文件路径
};
console.log('[Config] Using default configuration.');
}
@ -313,6 +316,13 @@ async function initializeConfig(args = process.argv.slice(2), configFilePath = '
} else {
console.warn(`[Config Warning] --cron-refresh-token flag requires a value.`);
}
} else if (args[i] === '--provider-pools-file') {
if (i + 1 < args.length) {
currentConfig.PROVIDER_POOLS_FILE_PATH = args[i + 1];
i++;
} else {
console.warn(`[Config Warning] --provider-pools-file flag requires a value.`);
}
}
}
@ -321,6 +331,20 @@ async function initializeConfig(args = process.argv.slice(2), configFilePath = '
}
currentConfig.SYSTEM_PROMPT_CONTENT = await getSystemPromptFileContent(currentConfig.SYSTEM_PROMPT_FILE_PATH);
// 加载号池配置
if (currentConfig.PROVIDER_POOLS_FILE_PATH) {
try {
const poolsData = await pfs.readFile(currentConfig.PROVIDER_POOLS_FILE_PATH, 'utf8');
currentConfig.providerPools = JSON.parse(poolsData);
console.log(`[Config] Loaded provider pools from ${currentConfig.PROVIDER_POOLS_FILE_PATH}`);
} catch (error) {
console.error(`[Config Error] Failed to load provider pools from ${currentConfig.PROVIDER_POOLS_FILE_PATH}: ${error.message}`);
currentConfig.providerPools = {};
}
} else {
currentConfig.providerPools = {};
}
// Set PROMPT_LOG_FILENAME based on the determined config
if (currentConfig.PROMPT_LOG_MODE === 'file') {
const now = new Date();
@ -366,21 +390,52 @@ async function getSystemPromptFileContent(filePath) {
}
}
// 存储 ProviderPoolManager 实例
let providerPoolManager = null;
async function initApiService(config) {
if (config.providerPools && Object.keys(config.providerPools).length > 0) {
providerPoolManager = new ProviderPoolManager(config.providerPools);
console.log('[Initialization] ProviderPoolManager initialized with configured pools.');
// 可以选择在这里触发一次健康检查
providerPoolManager.performHealthChecks();
} else {
console.log('[Initialization] No provider pools configured. Using single provider mode.');
}
// Initialize all known service adapters at startup
// 当存在号池时,这里不再提前初始化所有 provider 的实例,而是按需从号池中选择和初始化
// 而是通过 providerPoolManager.selectProvider 来动态选择配置并初始化服务
for (const provider of Object.values(MODEL_PROVIDER)) {
try {
console.log(`[Initialization] Initializing service adapter for ${provider}...`);
getServiceAdapter({ ...config, MODEL_PROVIDER: provider }); // This call populates serviceInstances
} catch (error) {
console.warn(`[Initialization Warning] Failed to initialize service adapter for ${provider}: ${error.message}`);
if (!config.providerPools || !config.providerPools[provider] || config.providerPools[provider].length === 0) {
try {
// 对于没有配置号池的提供者,仍然按原来的方式初始化一个单例
console.log(`[Initialization] Initializing single service adapter for ${provider}...`);
getServiceAdapter({ ...config, MODEL_PROVIDER: provider }); // This call populates serviceInstances
} catch (error) {
console.warn(`[Initialization Warning] Failed to initialize single service adapter for ${provider}: ${error.message}`);
}
}
}
return serviceInstances; // Return the collection of initialized service instances
}
async function getApiService(config) {
return getServiceAdapter(config);
let serviceConfig = config;
if (providerPoolManager && config.providerPools && config.providerPools[config.MODEL_PROVIDER]) {
// 如果有号池管理器,并且当前模型提供者类型有对应的号池,则从号池中选择一个提供者配置
const selectedProviderConfig = providerPoolManager.selectProvider(config.MODEL_PROVIDER);
if (selectedProviderConfig) {
// 合并选中的提供者配置到当前请求的 config 中
serviceConfig = deepmerge(config, selectedProviderConfig);
delete serviceConfig.providerPools; // 移除 providerPools 属性
config.uuid = serviceConfig.uuid;
console.log(`[API Service] Using pooled configuration for ${config.MODEL_PROVIDER}: ${serviceConfig.uuid}`);
} else {
console.warn(`[API Service] No healthy provider found in pool for ${config.MODEL_PROVIDER}. Falling back to main config.`);
}
}
return getServiceAdapter(serviceConfig);
}
/**
@ -405,7 +460,7 @@ function createRequestHandler(config) {
if (modelProviderHeader) {
currentConfig.MODEL_PROVIDER = modelProviderHeader;
console.log(`[Config] MODEL_PROVIDER overridden by header to: ${currentConfig.MODEL_PROVIDER}`);
delete req.headers['model-provider'];
//delete req.headers['model-provider']; // 保持不变,以便后端可以继续处理原始头
}
const requestUrl = new URL(req.url, `http://${req.headers.host}`);
@ -429,13 +484,29 @@ function createRequestHandler(config) {
}
}
const apiService = await getApiService(currentConfig);
// 获取或选择 API Service 实例
let apiService;
try {
apiService = await getApiService(currentConfig);
} catch (error) {
handleError(res, { statusCode: 500, message: `Failed to get API service: ${error.message}` });
if (providerPoolManager) {
// 如果是号池模式,并且获取服务失败,则标记当前使用的提供者为不健康
// 这里需要一种机制来知道是哪个具体的号池成员导致了失败。
// 暂时简单的假设是 currentConfig 中包含的凭据就是来自号池选择的。
providerPoolManager.markProviderUnhealthy(currentConfig.MODEL_PROVIDER, {
uuid: currentConfig.uuid
});
}
return;
}
const method = req.method;
if (method === 'OPTIONS') {
// 设置 CORS 头部,允许所有来源和方法
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, x-goog-api-key');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, x-goog-api-key, Model-Provider'); // 添加 Model-Provider
// OPTIONS 请求通常返回 204 No Content
res.writeHead(204);
@ -462,24 +533,24 @@ function createRequestHandler(config) {
// Route model list requests
if (method === 'GET') {
if (path === '/v1/models') {
return await handleModelListRequest(req, res, apiService, ENDPOINT_TYPE.OPENAI_MODEL_LIST, currentConfig);
return await handleModelListRequest(req, res, apiService, ENDPOINT_TYPE.OPENAI_MODEL_LIST, currentConfig, providerPoolManager, currentConfig.uuid);
}
if (path === '/v1beta/models') {
return await handleModelListRequest(req, res, apiService, ENDPOINT_TYPE.GEMINI_MODEL_LIST, currentConfig);
return await handleModelListRequest(req, res, apiService, ENDPOINT_TYPE.GEMINI_MODEL_LIST, currentConfig, providerPoolManager, currentConfig.uuid);
}
}
// Route content generation requests
if (method === 'POST') {
if (path === '/v1/chat/completions') {
return await handleContentGenerationRequest(req, res, apiService, ENDPOINT_TYPE.OPENAI_CHAT, currentConfig, PROMPT_LOG_FILENAME);
return await handleContentGenerationRequest(req, res, apiService, ENDPOINT_TYPE.OPENAI_CHAT, currentConfig, PROMPT_LOG_FILENAME, providerPoolManager, currentConfig.uuid);
}
const geminiUrlPattern = new RegExp(`/v1beta/models/(.+?):(${API_ACTIONS.GENERATE_CONTENT}|${API_ACTIONS.STREAM_GENERATE_CONTENT})`);
if (geminiUrlPattern.test(path)) {
return await handleContentGenerationRequest(req, res, apiService, ENDPOINT_TYPE.GEMINI_CONTENT, currentConfig, PROMPT_LOG_FILENAME);
return await handleContentGenerationRequest(req, res, apiService, ENDPOINT_TYPE.GEMINI_CONTENT, currentConfig, PROMPT_LOG_FILENAME, providerPoolManager, currentConfig.uuid);
}
if (path === '/v1/messages') {
return await handleContentGenerationRequest(req, res, apiService, ENDPOINT_TYPE.CLAUDE_MESSAGE, currentConfig, PROMPT_LOG_FILENAME);
return await handleContentGenerationRequest(req, res, apiService, ENDPOINT_TYPE.CLAUDE_MESSAGE, currentConfig, PROMPT_LOG_FILENAME, providerPoolManager, currentConfig.uuid);
}
}
@ -503,13 +574,22 @@ async function startServer() {
const heartbeatAndRefreshToken = async () => {
console.log(`[Heartbeat] Server is running. Current time: ${new Date().toLocaleString()}`);
// 循环遍历所有已初始化的服务适配器,并尝试刷新令牌
if (providerPoolManager) {
await providerPoolManager.performHealthChecks(); // 定期执行健康检查
}
for (const providerKey in services) {
const serviceAdapter = services[providerKey];
try {
// For pooled providers, refreshToken should be handled by individual instances
// For single instances, this remains relevant
await serviceAdapter.refreshToken();
console.log(`[Token Refresh] Refreshed token for ${providerKey}`);
} catch (error) {
console.error(`[Token Refresh Error] Failed to refresh token for ${providerKey}: ${error.message}`);
// 如果是号池中的某个实例刷新失败,这里需要捕获并更新其状态
// 现有的 serviceInstances 存储的是每个配置对应的单例,而非池中的成员
// 这意味着如果一个池成员的 token 刷新失败,需要找到它并更新其在 poolManager 中的状态
// 暂时通过捕获错误日志来发现问题,更精细的控制需要在 refreshToken 中抛出更多信息
}
}
};

View file

@ -1,6 +1,7 @@
import { promises as fs } from 'fs';
import * as path from 'path';
import * as http from 'http'; // Add http for IncomingMessage and ServerResponse types
import * as crypto from 'crypto'; // Import crypto for MD5 hashing
import { ApiServiceAdapter } from './adapter.js'; // Import ApiServiceAdapter
import { convertData, getOpenAIStreamChunkStop } from './convert.js';
import { ProviderStrategyFactory } from './provider-strategies.js';
@ -198,7 +199,7 @@ export async function handleUnifiedResponse(res, responsePayload, isStream) {
}
}
export async function handleStreamRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME) {
export async function handleStreamRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid) {
let fullResponseText = '';
let responseClosed = false;
@ -241,11 +242,20 @@ export async function handleStreamRequest(res, service, model, requestBody, from
} catch (error) {
console.error('\n[Server] Error during stream processing:', error.stack);
if (providerPoolManager) {
console.log(`[Provider Pool] Marking ${toProvider} as unhealthy due to stream error`);
// 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康
providerPoolManager.markProviderUnhealthy(toProvider, {
uuid: pooluuid
});
}
if (!res.writableEnded) {
const errorPayload = { error: { message: "An error occurred during streaming.", details: error.message } };
res.end(JSON.stringify(errorPayload));
responseClosed = true;
}
} finally {
if (!responseClosed) {
res.end();
@ -254,21 +264,31 @@ export async function handleStreamRequest(res, service, model, requestBody, from
}
}
export async function handleUnaryRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME) {
// The service returns the response in its native format (toProvider).
const nativeResponse = await service.generateContent(model, requestBody);
const responseText = extractResponseText(nativeResponse, toProvider);
export async function handleUnaryRequest(res, service, model, requestBody, fromProvider, toProvider, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid) {
try{
// The service returns the response in its native format (toProvider).
const nativeResponse = await service.generateContent(model, requestBody);
const responseText = extractResponseText(nativeResponse, toProvider);
// Convert the response back to the client's format (fromProvider), if necessary.
let clientResponse = nativeResponse;
if (getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider)) {
console.log(`[Response Convert] Converting response from ${toProvider} to ${fromProvider}`);
clientResponse = convertData(nativeResponse, 'response', toProvider, fromProvider, model);
// Convert the response back to the client's format (fromProvider), if necessary.
let clientResponse = nativeResponse;
if (getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider)) {
console.log(`[Response Convert] Converting response from ${toProvider} to ${fromProvider}`);
clientResponse = convertData(nativeResponse, 'response', toProvider, fromProvider, model);
}
//console.log(`[Response] Sending response to client: ${JSON.stringify(clientResponse)}`);
await handleUnifiedResponse(res, JSON.stringify(clientResponse), false);
await logConversation('output', responseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME);
} catch (error) {
console.error('\n[Server] Error during unary processing:', error.stack);
if (providerPoolManager) {
// 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康
providerPoolManager.markProviderUnhealthy(toProvider, {
uuid: pooluuid
});
}
}
//console.log(`[Response] Sending response to client: ${JSON.stringify(clientResponse)}`);
await handleUnifiedResponse(res, JSON.stringify(clientResponse), false);
await logConversation('output', responseText, PROMPT_LOG_MODE, PROMPT_LOG_FILENAME);
}
/**
@ -281,35 +301,45 @@ export async function handleUnaryRequest(res, service, model, requestBody, fromP
* @param {string} endpointType The type of endpoint being called (e.g., OPENAI_MODEL_LIST).
* @param {Object} CONFIG - The server configuration object.
*/
export async function handleModelListRequest(req, res, service, endpointType, CONFIG) {
const clientProviderMap = {
[ENDPOINT_TYPE.OPENAI_MODEL_LIST]: MODEL_PROTOCOL_PREFIX.OPENAI,
[ENDPOINT_TYPE.GEMINI_MODEL_LIST]: MODEL_PROTOCOL_PREFIX.GEMINI,
};
export async function handleModelListRequest(req, res, service, endpointType, CONFIG, providerPoolManager, pooluuid) {
try{
const clientProviderMap = {
[ENDPOINT_TYPE.OPENAI_MODEL_LIST]: MODEL_PROTOCOL_PREFIX.OPENAI,
[ENDPOINT_TYPE.GEMINI_MODEL_LIST]: MODEL_PROTOCOL_PREFIX.GEMINI,
};
const fromProvider = clientProviderMap[endpointType];
const toProvider = CONFIG.MODEL_PROVIDER;
const fromProvider = clientProviderMap[endpointType];
const toProvider = CONFIG.MODEL_PROVIDER;
if (!fromProvider) {
throw new Error(`Unsupported endpoint type for model list: ${endpointType}`);
if (!fromProvider) {
throw new Error(`Unsupported endpoint type for model list: ${endpointType}`);
}
// 1. Get the model list in the backend's native format.
const nativeModelList = await service.listModels();
// 2. Convert the model list to the client's expected format, if necessary.
let clientModelList = nativeModelList;
if (getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider)) {
console.log(`[ModelList Convert] Converting model list from ${toProvider} to ${fromProvider}`);
clientModelList = convertData(nativeModelList, 'modelList', toProvider, fromProvider);
} else {
console.log(`[ModelList Convert] Model list format matches. No conversion needed.`);
}
console.log(`[ModelList Response] Sending model list to client: ${JSON.stringify(clientModelList)}`);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(clientModelList));
} catch (error) {
console.error('\n[Server] Error during model list processing:', error.stack);
if (providerPoolManager) {
// 如果是号池模式,并且请求处理失败,则标记当前使用的提供者为不健康
providerPoolManager.markProviderUnhealthy(toProvider, {
uuid: pooluuid
});
}
}
// 1. Get the model list in the backend's native format.
const nativeModelList = await service.listModels();
// 2. Convert the model list to the client's expected format, if necessary.
let clientModelList = nativeModelList;
if (getProtocolPrefix(fromProvider) !== getProtocolPrefix(toProvider)) {
console.log(`[ModelList Convert] Converting model list from ${toProvider} to ${fromProvider}`);
clientModelList = convertData(nativeModelList, 'modelList', toProvider, fromProvider);
} else {
console.log(`[ModelList Convert] Model list format matches. No conversion needed.`);
}
console.log(`[ModelList Response] Sending model list to client: ${JSON.stringify(clientModelList)}`);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(clientModelList));
}
/**
@ -323,7 +353,7 @@ export async function handleModelListRequest(req, res, service, endpointType, CO
* @param {Object} CONFIG - The server configuration object.
* @param {string} PROMPT_LOG_FILENAME - The prompt log filename.
*/
export async function handleContentGenerationRequest(req, res, service, endpointType, CONFIG, PROMPT_LOG_FILENAME) {
export async function handleContentGenerationRequest(req, res, service, endpointType, CONFIG, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid) {
const originalRequestBody = await getRequestBody(req);
if (!originalRequestBody) {
throw new Error("Request body is missing for content generation.");
@ -369,9 +399,9 @@ export async function handleContentGenerationRequest(req, res, service, endpoint
// 5. Call the appropriate stream or unary handler, passing the provider info.
if (isStream) {
await handleStreamRequest(res, service, model, processedRequestBody, fromProvider, toProvider, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME);
await handleStreamRequest(res, service, model, processedRequestBody, fromProvider, toProvider, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid);
} else {
await handleUnaryRequest(res, service, model, processedRequestBody, fromProvider, toProvider, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME);
await handleUnaryRequest(res, service, model, processedRequestBody, fromProvider, toProvider, CONFIG.PROMPT_LOG_MODE, PROMPT_LOG_FILENAME, providerPoolManager, pooluuid);
}
}
@ -546,3 +576,13 @@ export function extractSystemPromptFromRequestBody(requestBody, provider) {
}
return incomingSystemText;
}
/**
* Generates an MD5 hash for a given object by first converting it to a JSON string.
* @param {object} obj - The object to hash.
* @returns {string} The MD5 hash of the object's JSON string representation.
*/
export function getMD5Hash(obj) {
const jsonString = JSON.stringify(obj);
return crypto.createHash('md5').update(jsonString).digest('hex');
}

View file

@ -0,0 +1,157 @@
/**
* Manages a pool of API service providers, handling their health and selection.
*/
export class ProviderPoolManager {
constructor(providerPools, options = {}) {
this.providerPools = providerPools;
this.providerStatus = {}; // Tracks health and usage for each provider instance
this.roundRobinIndex = {}; // Tracks the current index for round-robin selection for each provider type
this.maxErrorCount = options.maxErrorCount || 1; // Default to 1 errors before marking unhealthy
this.healthCheckInterval = options.healthCheckInterval || 30 * 60 * 1000; // Default to 30 minutes
this.initializeProviderStatus();
}
/**
* Initializes the status for each provider in the pools.
* Initially, all providers are considered healthy and have zero usage.
*/
initializeProviderStatus() {
for (const providerType in this.providerPools) {
this.providerStatus[providerType] = [];
this.roundRobinIndex[providerType] = 0; // Initialize round-robin index for each type
this.providerPools[providerType].forEach((providerConfig, index) => {
this.providerStatus[providerType].push({
config: providerConfig,
uuid: providerConfig.uuid,
isHealthy: true,
lastUsed: null,
usageCount: 0,
errorCount: 0,
lastErrorTime: null, // New: Timestamp of the last error
});
});
}
console.log('[ProviderPoolManager] Initialized provider statuses: ok');
}
/**
* Selects a provider from the pool for a given provider type.
* Currently uses a simple round-robin for healthy providers.
* @param {string} providerType - The type of provider to select (e.g., 'gemini-cli', 'openai-custom').
* @returns {object|null} The selected provider's configuration, or null if no healthy provider is found.
*/
selectProvider(providerType) {
const availableProviders = this.providerStatus[providerType] || [];
const healthyProviders = availableProviders.filter(p => p.isHealthy);
if (healthyProviders.length === 0) {
console.warn(`[ProviderPoolManager] No healthy providers available for type: ${providerType}`);
return null;
}
let currentIndex = this.roundRobinIndex[providerType] || 0;
let selected = null;
// Iterate through healthy providers starting from the current index
for (let i = 0; i < healthyProviders.length; i++) {
const providerIndex = (currentIndex + i) % healthyProviders.length;
const potentialProvider = healthyProviders[providerIndex];
// For now, we simply select the next healthy provider in a round-robin fashion.
// More advanced logic (e.g., considering usage, recent errors, etc.) can be added here.
selected = potentialProvider;
this.roundRobinIndex[providerType] = (providerIndex + 1) % healthyProviders.length; // Update the index for the next call
break; // Found a provider, break the loop
}
if (selected) {
selected.lastUsed = new Date();
selected.usageCount++; // Increment usage count
console.log(`[ProviderPoolManager] Selected provider for ${providerType} (round-robin): ${JSON.stringify(selected.config)}`);
return selected.config;
}
return null;
}
/**
* Marks a provider as unhealthy (e.g., after an API error).
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to mark.
*/
markProviderUnhealthy(providerType, providerConfig) {
const pool = this.providerStatus[providerType];
if (pool) {
const provider = pool.find(p => p.uuid === providerConfig.uuid);
if (provider) {
provider.errorCount++;
provider.lastErrorTime = new Date(); // Update last error time
if (provider.errorCount >= this.maxErrorCount) {
provider.isHealthy = false;
console.warn(`[ProviderPoolManager] Marked provider as unhealthy: ${JSON.stringify(providerConfig)} for type ${providerType}. Total errors: ${provider.errorCount}`);
} else {
console.warn(`[ProviderPoolManager] Provider ${JSON.stringify(providerConfig)} for type ${providerType} error count: ${provider.errorCount}/${this.maxErrorCount}. Still healthy.`);
}
}
}
}
/**
* Marks a provider as healthy.
* @param {string} providerType - The type of the provider.
* @param {object} providerConfig - The configuration of the provider to mark.
*/
markProviderHealthy(providerType, providerConfig) {
const pool = this.providerStatus[providerType];
if (pool) {
const provider = pool.find(p => p.uuid === providerConfig.uuid);a
if (provider) {
provider.isHealthy = true;
provider.errorCount = 0; // Reset error count on health recovery
console.log(`[ProviderPoolManager] Marked provider as healthy: ${JSON.stringify(providerConfig)} for type ${providerType}`);
}
}
}
/**
* Performs health checks on all providers in the pool.
* This method would typically be called periodically (e.g., via cron job).
*/
async performHealthChecks() {
console.log('[ProviderPoolManager] Performing health checks on all providers...');
const now = new Date();
for (const providerType in this.providerStatus) {
for (const providerStatus of this.providerStatus[providerType]) {
const providerConfig = providerStatus.config;
// Only attempt to health check unhealthy providers after a certain interval
if (!providerStatus.isHealthy && providerStatus.lastErrorTime &&
(now.getTime() - providerStatus.lastErrorTime.getTime() < this.healthCheckInterval)) {
console.log(`[ProviderPoolManager] Skipping health check for ${JSON.stringify(providerConfig)} (${providerType}). Last error too recent.`);
continue;
}
try {
// TODO: Implement actual health check logic for each provider type
// For now, if a provider was unhealthy and enough time has passed,
// we optimistically mark it healthy and reset error count.
// A more robust system would involve actual API calls or pings here.
if (!providerStatus.isHealthy) {
// Only reset and mark healthy if it was unhealthy and we are attempting a check after interval
this.markProviderHealthy(providerType, providerConfig);
console.log(`[ProviderPoolManager] Health check for ${JSON.stringify(providerConfig)} (${providerType}): Marked Healthy (re-evaluation)`);
} else {
// For already healthy providers, just log or perform a lighter check if needed
console.log(`[ProviderPoolManager] Health check for ${JSON.stringify(providerConfig)} (${providerType}): Still Healthy`);
}
} catch (error) {
console.error(`[ProviderPoolManager] Health check for ${JSON.stringify(providerConfig)} (${providerType}) failed: ${error.message}`);
// If a health check fails, mark it unhealthy, which will update error count and lastErrorTime
this.markProviderUnhealthy(providerType, providerConfig);
}
}
}
}
}