fix(cli-installer): queue provider runtime checks per provider
This commit is contained in:
parent
b0790e6797
commit
48782c76a5
2 changed files with 413 additions and 7 deletions
345
src/main/ipc/cliInstaller.test.ts
Normal file
345
src/main/ipc/cliInstaller.test.ts
Normal file
|
|
@ -0,0 +1,345 @@
|
|||
import {
|
||||
CLI_INSTALLER_GET_PROVIDER_STATUS,
|
||||
CLI_INSTALLER_GET_STATUS,
|
||||
CLI_INSTALLER_INVALIDATE_STATUS,
|
||||
CLI_INSTALLER_VERIFY_PROVIDER_MODELS,
|
||||
} from '@preload/constants/ipcChannels';
|
||||
import { createDefaultCliExtensionCapabilities } from '@shared/utils/providerExtensionCapabilities';
|
||||
import { describe, expect, test, vi } from 'vitest';
|
||||
|
||||
import { initializeCliInstallerHandlers, registerCliInstallerHandlers } from './cliInstaller';
|
||||
|
||||
import type { CliInstallerService } from '@main/services';
|
||||
import type {
|
||||
CliInstallationStatus,
|
||||
CliProviderId,
|
||||
CliProviderStatus,
|
||||
IpcResult,
|
||||
} from '@shared/types';
|
||||
import type { IpcMain, IpcMainInvokeEvent } from 'electron';
|
||||
|
||||
vi.mock('@shared/utils/logger', () => ({
|
||||
createLogger: () => ({
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
}),
|
||||
}));
|
||||
|
||||
interface Deferred<T> {
|
||||
promise: Promise<T>;
|
||||
resolve: (value: T) => void;
|
||||
reject: (error: unknown) => void;
|
||||
}
|
||||
|
||||
type IpcHandler = (event: IpcMainInvokeEvent, ...args: unknown[]) => unknown;
|
||||
|
||||
function createDeferred<T>(): Deferred<T> {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (error: unknown) => void;
|
||||
const promise = new Promise<T>((promiseResolve, promiseReject) => {
|
||||
resolve = promiseResolve;
|
||||
reject = promiseReject;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
async function flushMicrotasks(): Promise<void> {
|
||||
for (let index = 0; index < 5; index += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
function createProviderStatus(providerId: CliProviderId): CliProviderStatus {
|
||||
return {
|
||||
providerId,
|
||||
displayName: providerId,
|
||||
supported: true,
|
||||
authenticated: true,
|
||||
authMethod: 'test',
|
||||
verificationState: 'verified',
|
||||
models: [],
|
||||
canLoginFromUi: false,
|
||||
capabilities: {
|
||||
teamLaunch: true,
|
||||
oneShot: true,
|
||||
extensions: createDefaultCliExtensionCapabilities(),
|
||||
},
|
||||
backend: null,
|
||||
};
|
||||
}
|
||||
|
||||
function createCliStatus(providers: CliProviderStatus[] = []): CliInstallationStatus {
|
||||
const authenticatedProvider = providers.find((provider) => provider.authenticated) ?? null;
|
||||
return {
|
||||
flavor: 'agent_teams_orchestrator',
|
||||
displayName: 'Agent Teams Runtime',
|
||||
supportsSelfUpdate: false,
|
||||
showVersionDetails: true,
|
||||
showBinaryPath: true,
|
||||
installed: true,
|
||||
installedVersion: '1.0.0',
|
||||
binaryPath: '/usr/local/bin/claude',
|
||||
launchError: null,
|
||||
latestVersion: null,
|
||||
updateAvailable: false,
|
||||
authLoggedIn: authenticatedProvider !== null,
|
||||
authStatusChecking: false,
|
||||
authMethod: authenticatedProvider?.authMethod ?? null,
|
||||
providers,
|
||||
};
|
||||
}
|
||||
|
||||
function createIpcMainHarness(): {
|
||||
ipcMain: IpcMain;
|
||||
invoke: <T>(channel: string, ...args: unknown[]) => Promise<T>;
|
||||
} {
|
||||
const handlers = new Map<string, IpcHandler>();
|
||||
const ipcMain = {
|
||||
handle: vi.fn((channel: string, handler: IpcHandler) => {
|
||||
handlers.set(channel, handler);
|
||||
}),
|
||||
removeHandler: vi.fn((channel: string) => {
|
||||
handlers.delete(channel);
|
||||
}),
|
||||
} as unknown as IpcMain;
|
||||
|
||||
return {
|
||||
ipcMain,
|
||||
invoke: async <T>(channel: string, ...args: unknown[]): Promise<T> => {
|
||||
const handler = handlers.get(channel);
|
||||
if (!handler) {
|
||||
throw new Error(`Missing IPC handler: ${channel}`);
|
||||
}
|
||||
return (await handler({} as IpcMainInvokeEvent, ...args)) as T;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function createInstallerService(overrides: Partial<CliInstallerService>): CliInstallerService {
|
||||
return {
|
||||
getStatus: vi.fn(() => Promise.resolve(createCliStatus())),
|
||||
getLatestStatusSnapshot: vi.fn(() => null),
|
||||
getProviderStatus: vi.fn(),
|
||||
install: vi.fn(() => Promise.resolve()),
|
||||
invalidateStatusCache: vi.fn(),
|
||||
verifyProviderModels: vi.fn(),
|
||||
...overrides,
|
||||
} as unknown as CliInstallerService;
|
||||
}
|
||||
|
||||
function setupHandlers(service: CliInstallerService): ReturnType<typeof createIpcMainHarness> {
|
||||
const harness = createIpcMainHarness();
|
||||
initializeCliInstallerHandlers(service);
|
||||
registerCliInstallerHandlers(harness.ipcMain);
|
||||
return harness;
|
||||
}
|
||||
|
||||
describe('cliInstaller IPC provider runtime scheduling', () => {
|
||||
test('runs different provider status requests concurrently up to the provider limit', async () => {
|
||||
const started: CliProviderId[] = [];
|
||||
const deferredByProvider = new Map<CliProviderId, Deferred<CliProviderStatus | null>>();
|
||||
const service = createInstallerService({
|
||||
getProviderStatus: vi.fn((providerId: CliProviderId) => {
|
||||
started.push(providerId);
|
||||
const deferred = createDeferred<CliProviderStatus | null>();
|
||||
deferredByProvider.set(providerId, deferred);
|
||||
return deferred.promise;
|
||||
}),
|
||||
});
|
||||
const { invoke } = setupHandlers(service);
|
||||
|
||||
const requests = (['anthropic', 'codex', 'opencode', 'gemini'] as CliProviderId[]).map(
|
||||
(providerId) =>
|
||||
invoke<IpcResult<CliProviderStatus | null>>(CLI_INSTALLER_GET_PROVIDER_STATUS, providerId)
|
||||
);
|
||||
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['anthropic', 'codex', 'opencode']);
|
||||
|
||||
deferredByProvider.get('anthropic')?.resolve(createProviderStatus('anthropic'));
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['anthropic', 'codex', 'opencode', 'gemini']);
|
||||
|
||||
deferredByProvider.get('codex')?.resolve(createProviderStatus('codex'));
|
||||
deferredByProvider.get('opencode')?.resolve(createProviderStatus('opencode'));
|
||||
deferredByProvider.get('gemini')?.resolve(createProviderStatus('gemini'));
|
||||
|
||||
const results = await Promise.all(requests);
|
||||
expect(results.every((result) => result.success)).toBe(true);
|
||||
});
|
||||
|
||||
test('dedupes concurrent status requests for the same provider', async () => {
|
||||
const deferred = createDeferred<CliProviderStatus | null>();
|
||||
const getProviderStatus = vi.fn(() => deferred.promise);
|
||||
const service = createInstallerService({ getProviderStatus });
|
||||
const { invoke } = setupHandlers(service);
|
||||
|
||||
const firstRequest = invoke<IpcResult<CliProviderStatus | null>>(
|
||||
CLI_INSTALLER_GET_PROVIDER_STATUS,
|
||||
'codex'
|
||||
);
|
||||
const secondRequest = invoke<IpcResult<CliProviderStatus | null>>(
|
||||
CLI_INSTALLER_GET_PROVIDER_STATUS,
|
||||
'codex'
|
||||
);
|
||||
|
||||
await flushMicrotasks();
|
||||
expect(getProviderStatus).toHaveBeenCalledTimes(1);
|
||||
|
||||
const providerStatus = createProviderStatus('codex');
|
||||
deferred.resolve(providerStatus);
|
||||
|
||||
await expect(Promise.all([firstRequest, secondRequest])).resolves.toEqual([
|
||||
{ success: true, data: providerStatus },
|
||||
{ success: true, data: providerStatus },
|
||||
]);
|
||||
});
|
||||
|
||||
test('keeps status and model verification sequential for the same provider', async () => {
|
||||
const started: string[] = [];
|
||||
const statusDeferred = createDeferred<CliProviderStatus | null>();
|
||||
const verifyDeferred = createDeferred<CliProviderStatus | null>();
|
||||
const service = createInstallerService({
|
||||
getProviderStatus: vi.fn(() => {
|
||||
started.push('status');
|
||||
return statusDeferred.promise;
|
||||
}),
|
||||
verifyProviderModels: vi.fn(() => {
|
||||
started.push('verify');
|
||||
return verifyDeferred.promise;
|
||||
}),
|
||||
});
|
||||
const { invoke } = setupHandlers(service);
|
||||
|
||||
const statusRequest = invoke<IpcResult<CliProviderStatus | null>>(
|
||||
CLI_INSTALLER_GET_PROVIDER_STATUS,
|
||||
'opencode'
|
||||
);
|
||||
const verifyRequest = invoke<IpcResult<CliProviderStatus | null>>(
|
||||
CLI_INSTALLER_VERIFY_PROVIDER_MODELS,
|
||||
'opencode'
|
||||
);
|
||||
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['status']);
|
||||
|
||||
statusDeferred.resolve(createProviderStatus('opencode'));
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['status', 'verify']);
|
||||
|
||||
verifyDeferred.resolve(createProviderStatus('opencode'));
|
||||
|
||||
const [statusResult, verifyResult] = await Promise.all([statusRequest, verifyRequest]);
|
||||
expect(statusResult.success).toBe(true);
|
||||
expect(verifyResult.success).toBe(true);
|
||||
});
|
||||
|
||||
test('does not strand queued provider requests if handlers are reinitialized', async () => {
|
||||
const started: CliProviderId[] = [];
|
||||
const deferredByProvider = new Map<CliProviderId, Deferred<CliProviderStatus | null>>();
|
||||
const originalService = createInstallerService({
|
||||
getProviderStatus: vi.fn((providerId: CliProviderId) => {
|
||||
started.push(providerId);
|
||||
const deferred = createDeferred<CliProviderStatus | null>();
|
||||
deferredByProvider.set(providerId, deferred);
|
||||
return deferred.promise;
|
||||
}),
|
||||
});
|
||||
const replacementService = createInstallerService({
|
||||
getProviderStatus: vi.fn(() => Promise.resolve(createProviderStatus('anthropic'))),
|
||||
});
|
||||
const { invoke } = setupHandlers(originalService);
|
||||
|
||||
const requests = (['anthropic', 'codex', 'opencode', 'gemini'] as CliProviderId[]).map(
|
||||
(providerId) =>
|
||||
invoke<IpcResult<CliProviderStatus | null>>(CLI_INSTALLER_GET_PROVIDER_STATUS, providerId)
|
||||
);
|
||||
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['anthropic', 'codex', 'opencode']);
|
||||
|
||||
initializeCliInstallerHandlers(replacementService);
|
||||
|
||||
deferredByProvider.get('anthropic')?.resolve(createProviderStatus('anthropic'));
|
||||
deferredByProvider.get('codex')?.resolve(createProviderStatus('codex'));
|
||||
deferredByProvider.get('opencode')?.resolve(createProviderStatus('opencode'));
|
||||
await flushMicrotasks();
|
||||
|
||||
expect(started).toEqual(['anthropic', 'codex', 'opencode', 'gemini']);
|
||||
expect(replacementService.getProviderStatus).not.toHaveBeenCalled();
|
||||
|
||||
deferredByProvider.get('gemini')?.resolve(createProviderStatus('gemini'));
|
||||
const results = await Promise.all(requests);
|
||||
expect(results.every((result) => result.success)).toBe(true);
|
||||
});
|
||||
|
||||
test('releases a provider runtime slot after a failed request', async () => {
|
||||
const started: CliProviderId[] = [];
|
||||
const deferredByProvider = new Map<CliProviderId, Deferred<CliProviderStatus | null>>();
|
||||
const service = createInstallerService({
|
||||
getProviderStatus: vi.fn((providerId: CliProviderId) => {
|
||||
started.push(providerId);
|
||||
const deferred = createDeferred<CliProviderStatus | null>();
|
||||
deferredByProvider.set(providerId, deferred);
|
||||
return deferred.promise;
|
||||
}),
|
||||
});
|
||||
const { invoke } = setupHandlers(service);
|
||||
|
||||
const requests = (['anthropic', 'codex', 'opencode', 'gemini'] as CliProviderId[]).map(
|
||||
(providerId) =>
|
||||
invoke<IpcResult<CliProviderStatus | null>>(CLI_INSTALLER_GET_PROVIDER_STATUS, providerId)
|
||||
);
|
||||
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['anthropic', 'codex', 'opencode']);
|
||||
|
||||
deferredByProvider.get('anthropic')?.reject(new Error('provider failed'));
|
||||
await flushMicrotasks();
|
||||
expect(started).toEqual(['anthropic', 'codex', 'opencode', 'gemini']);
|
||||
|
||||
deferredByProvider.get('codex')?.resolve(createProviderStatus('codex'));
|
||||
deferredByProvider.get('opencode')?.resolve(createProviderStatus('opencode'));
|
||||
deferredByProvider.get('gemini')?.resolve(createProviderStatus('gemini'));
|
||||
|
||||
const results = await Promise.all(requests);
|
||||
expect(results[0]).toEqual({ success: false, error: 'provider failed' });
|
||||
expect(results.slice(1).every((result) => result.success)).toBe(true);
|
||||
});
|
||||
|
||||
test('does not patch a fresh status cache with stale provider results after invalidation', async () => {
|
||||
const providerDeferred = createDeferred<CliProviderStatus | null>();
|
||||
const service = createInstallerService({
|
||||
getStatus: vi.fn(() => Promise.resolve(createCliStatus())),
|
||||
getProviderStatus: vi.fn(() => providerDeferred.promise),
|
||||
});
|
||||
const { invoke } = setupHandlers(service);
|
||||
|
||||
const firstStatus = await invoke<IpcResult<CliInstallationStatus>>(CLI_INSTALLER_GET_STATUS);
|
||||
expect(firstStatus.success).toBe(true);
|
||||
|
||||
const providerRequest = invoke<IpcResult<CliProviderStatus | null>>(
|
||||
CLI_INSTALLER_GET_PROVIDER_STATUS,
|
||||
'codex'
|
||||
);
|
||||
await flushMicrotasks();
|
||||
|
||||
const invalidateResult = await invoke<IpcResult<void>>(CLI_INSTALLER_INVALIDATE_STATUS);
|
||||
expect(invalidateResult.success).toBe(true);
|
||||
|
||||
const freshStatus = await invoke<IpcResult<CliInstallationStatus>>(CLI_INSTALLER_GET_STATUS);
|
||||
expect(freshStatus).toEqual({ success: true, data: createCliStatus() });
|
||||
|
||||
providerDeferred.resolve(createProviderStatus('codex'));
|
||||
await expect(providerRequest).resolves.toEqual({
|
||||
success: true,
|
||||
data: createProviderStatus('codex'),
|
||||
});
|
||||
|
||||
const cachedStatusResult =
|
||||
await invoke<IpcResult<CliInstallationStatus>>(CLI_INSTALLER_GET_STATUS);
|
||||
expect(cachedStatusResult).toEqual({ success: true, data: createCliStatus() });
|
||||
});
|
||||
});
|
||||
|
|
@ -38,13 +38,16 @@ const logger = createLogger('IPC:cliInstaller');
|
|||
let service: CliInstallerService;
|
||||
const statusInFlight = new Map<CliInstallerProviderStatusMode, Promise<CliInstallationStatus>>();
|
||||
const providerStatusInFlight = new Map<CliProviderId, Promise<CliProviderStatus | null>>();
|
||||
let providerRuntimeRequestTail: Promise<void> = Promise.resolve();
|
||||
const providerRuntimeRequestTails = new Map<CliProviderId, Promise<void>>();
|
||||
const providerRuntimeRequestQueue: Array<() => void> = [];
|
||||
let activeProviderRuntimeRequestCount = 0;
|
||||
const cachedStatus = new Map<
|
||||
CliInstallerProviderStatusMode,
|
||||
{ value: CliInstallationStatus; at: number }
|
||||
>();
|
||||
let statusCacheGeneration = 0;
|
||||
const STATUS_CACHE_TTL_MS = 5_000;
|
||||
const MAX_PARALLEL_PROVIDER_RUNTIME_REQUESTS = 3;
|
||||
const FRONTEND_MULTIMODEL_PROVIDER_IDS = new Set<CliProviderId>(['anthropic', 'codex', 'opencode']);
|
||||
|
||||
function isFrontendMultimodelProviderId(providerId: CliProviderId): boolean {
|
||||
|
|
@ -111,12 +114,64 @@ function canUseStatusForCacheKey(
|
|||
);
|
||||
}
|
||||
|
||||
function runProviderRuntimeRequest<T>(operation: () => Promise<T>): Promise<T> {
|
||||
const request = providerRuntimeRequestTail.then(operation, operation);
|
||||
providerRuntimeRequestTail = request.then(
|
||||
function resetProviderRuntimeRequestLimiter(): void {
|
||||
if (activeProviderRuntimeRequestCount === 0 && providerRuntimeRequestQueue.length === 0) {
|
||||
providerRuntimeRequestTails.clear();
|
||||
}
|
||||
}
|
||||
|
||||
function acquireProviderRuntimeRequestSlot(): Promise<void> {
|
||||
if (activeProviderRuntimeRequestCount < MAX_PARALLEL_PROVIDER_RUNTIME_REQUESTS) {
|
||||
activeProviderRuntimeRequestCount += 1;
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
providerRuntimeRequestQueue.push(() => {
|
||||
activeProviderRuntimeRequestCount += 1;
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function releaseProviderRuntimeRequestSlot(): void {
|
||||
activeProviderRuntimeRequestCount = Math.max(0, activeProviderRuntimeRequestCount - 1);
|
||||
const next = providerRuntimeRequestQueue.shift();
|
||||
if (next) {
|
||||
next();
|
||||
}
|
||||
}
|
||||
|
||||
async function runWithProviderRuntimeSlot<T>(operation: () => Promise<T>): Promise<T> {
|
||||
await acquireProviderRuntimeRequestSlot();
|
||||
try {
|
||||
return await operation();
|
||||
} finally {
|
||||
releaseProviderRuntimeRequestSlot();
|
||||
}
|
||||
}
|
||||
|
||||
function runProviderRuntimeRequest<T>(
|
||||
providerId: CliProviderId,
|
||||
operation: () => Promise<T>
|
||||
): Promise<T> {
|
||||
const previousProviderRequest = providerRuntimeRequestTails.get(providerId) ?? Promise.resolve();
|
||||
const request = previousProviderRequest.then(
|
||||
() => runWithProviderRuntimeSlot(operation),
|
||||
() => runWithProviderRuntimeSlot(operation)
|
||||
);
|
||||
const tail = request.then(
|
||||
() => undefined,
|
||||
() => undefined
|
||||
);
|
||||
|
||||
providerRuntimeRequestTails.set(providerId, tail);
|
||||
void tail.finally(() => {
|
||||
if (providerRuntimeRequestTails.get(providerId) === tail) {
|
||||
providerRuntimeRequestTails.delete(providerId);
|
||||
}
|
||||
});
|
||||
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
@ -125,7 +180,7 @@ function runProviderRuntimeRequest<T>(operation: () => Promise<T>): Promise<T> {
|
|||
*/
|
||||
export function initializeCliInstallerHandlers(installerService: CliInstallerService): void {
|
||||
service = installerService;
|
||||
providerRuntimeRequestTail = Promise.resolve();
|
||||
resetProviderRuntimeRequestLimiter();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -266,7 +321,10 @@ async function handleGetProviderStatus(
|
|||
}
|
||||
|
||||
const generation = statusCacheGeneration;
|
||||
const request = runProviderRuntimeRequest(() => service.getProviderStatus(providerId))
|
||||
const currentService = service;
|
||||
const request = runProviderRuntimeRequest(providerId, () =>
|
||||
currentService.getProviderStatus(providerId)
|
||||
)
|
||||
.then((status) => {
|
||||
if (generation === statusCacheGeneration) {
|
||||
patchCachedProviderStatus(status);
|
||||
|
|
@ -306,7 +364,10 @@ async function handleVerifyProviderModels(
|
|||
): Promise<IpcResult<CliProviderStatus | null>> {
|
||||
try {
|
||||
const generation = statusCacheGeneration;
|
||||
const status = await runProviderRuntimeRequest(() => service.verifyProviderModels(providerId));
|
||||
const currentService = service;
|
||||
const status = await runProviderRuntimeRequest(providerId, () =>
|
||||
currentService.verifyProviderModels(providerId)
|
||||
);
|
||||
if (generation === statusCacheGeneration) {
|
||||
patchCachedProviderStatus(status);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue