From 48782c76a54e52198a5146fd2e4a8a2e78bec751 Mon Sep 17 00:00:00 2001 From: 777genius Date: Sun, 31 May 2026 20:18:43 +0300 Subject: [PATCH] fix(cli-installer): queue provider runtime checks per provider --- src/main/ipc/cliInstaller.test.ts | 345 ++++++++++++++++++++++++++++++ src/main/ipc/cliInstaller.ts | 75 ++++++- 2 files changed, 413 insertions(+), 7 deletions(-) create mode 100644 src/main/ipc/cliInstaller.test.ts diff --git a/src/main/ipc/cliInstaller.test.ts b/src/main/ipc/cliInstaller.test.ts new file mode 100644 index 00000000..f081fcfe --- /dev/null +++ b/src/main/ipc/cliInstaller.test.ts @@ -0,0 +1,345 @@ +import { + CLI_INSTALLER_GET_PROVIDER_STATUS, + CLI_INSTALLER_GET_STATUS, + CLI_INSTALLER_INVALIDATE_STATUS, + CLI_INSTALLER_VERIFY_PROVIDER_MODELS, +} from '@preload/constants/ipcChannels'; +import { createDefaultCliExtensionCapabilities } from '@shared/utils/providerExtensionCapabilities'; +import { describe, expect, test, vi } from 'vitest'; + +import { initializeCliInstallerHandlers, registerCliInstallerHandlers } from './cliInstaller'; + +import type { CliInstallerService } from '@main/services'; +import type { + CliInstallationStatus, + CliProviderId, + CliProviderStatus, + IpcResult, +} from '@shared/types'; +import type { IpcMain, IpcMainInvokeEvent } from 'electron'; + +vi.mock('@shared/utils/logger', () => ({ + createLogger: () => ({ + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }), +})); + +interface Deferred { + promise: Promise; + resolve: (value: T) => void; + reject: (error: unknown) => void; +} + +type IpcHandler = (event: IpcMainInvokeEvent, ...args: unknown[]) => unknown; + +function createDeferred(): Deferred { + let resolve!: (value: T) => void; + let reject!: (error: unknown) => void; + const promise = new Promise((promiseResolve, promiseReject) => { + resolve = promiseResolve; + reject = promiseReject; + }); + return { promise, resolve, reject }; +} + +async function flushMicrotasks(): Promise { + for (let index = 0; index < 5; index += 1) { + await Promise.resolve(); + } +} + +function createProviderStatus(providerId: CliProviderId): CliProviderStatus { + return { + providerId, + displayName: providerId, + supported: true, + authenticated: true, + authMethod: 'test', + verificationState: 'verified', + models: [], + canLoginFromUi: false, + capabilities: { + teamLaunch: true, + oneShot: true, + extensions: createDefaultCliExtensionCapabilities(), + }, + backend: null, + }; +} + +function createCliStatus(providers: CliProviderStatus[] = []): CliInstallationStatus { + const authenticatedProvider = providers.find((provider) => provider.authenticated) ?? null; + return { + flavor: 'agent_teams_orchestrator', + displayName: 'Agent Teams Runtime', + supportsSelfUpdate: false, + showVersionDetails: true, + showBinaryPath: true, + installed: true, + installedVersion: '1.0.0', + binaryPath: '/usr/local/bin/claude', + launchError: null, + latestVersion: null, + updateAvailable: false, + authLoggedIn: authenticatedProvider !== null, + authStatusChecking: false, + authMethod: authenticatedProvider?.authMethod ?? null, + providers, + }; +} + +function createIpcMainHarness(): { + ipcMain: IpcMain; + invoke: (channel: string, ...args: unknown[]) => Promise; +} { + const handlers = new Map(); + const ipcMain = { + handle: vi.fn((channel: string, handler: IpcHandler) => { + handlers.set(channel, handler); + }), + removeHandler: vi.fn((channel: string) => { + handlers.delete(channel); + }), + } as unknown as IpcMain; + + return { + ipcMain, + invoke: async (channel: string, ...args: unknown[]): Promise => { + const handler = handlers.get(channel); + if (!handler) { + throw new Error(`Missing IPC handler: ${channel}`); + } + return (await handler({} as IpcMainInvokeEvent, ...args)) as T; + }, + }; +} + +function createInstallerService(overrides: Partial): CliInstallerService { + return { + getStatus: vi.fn(() => Promise.resolve(createCliStatus())), + getLatestStatusSnapshot: vi.fn(() => null), + getProviderStatus: vi.fn(), + install: vi.fn(() => Promise.resolve()), + invalidateStatusCache: vi.fn(), + verifyProviderModels: vi.fn(), + ...overrides, + } as unknown as CliInstallerService; +} + +function setupHandlers(service: CliInstallerService): ReturnType { + const harness = createIpcMainHarness(); + initializeCliInstallerHandlers(service); + registerCliInstallerHandlers(harness.ipcMain); + return harness; +} + +describe('cliInstaller IPC provider runtime scheduling', () => { + test('runs different provider status requests concurrently up to the provider limit', async () => { + const started: CliProviderId[] = []; + const deferredByProvider = new Map>(); + const service = createInstallerService({ + getProviderStatus: vi.fn((providerId: CliProviderId) => { + started.push(providerId); + const deferred = createDeferred(); + deferredByProvider.set(providerId, deferred); + return deferred.promise; + }), + }); + const { invoke } = setupHandlers(service); + + const requests = (['anthropic', 'codex', 'opencode', 'gemini'] as CliProviderId[]).map( + (providerId) => + invoke>(CLI_INSTALLER_GET_PROVIDER_STATUS, providerId) + ); + + await flushMicrotasks(); + expect(started).toEqual(['anthropic', 'codex', 'opencode']); + + deferredByProvider.get('anthropic')?.resolve(createProviderStatus('anthropic')); + await flushMicrotasks(); + expect(started).toEqual(['anthropic', 'codex', 'opencode', 'gemini']); + + deferredByProvider.get('codex')?.resolve(createProviderStatus('codex')); + deferredByProvider.get('opencode')?.resolve(createProviderStatus('opencode')); + deferredByProvider.get('gemini')?.resolve(createProviderStatus('gemini')); + + const results = await Promise.all(requests); + expect(results.every((result) => result.success)).toBe(true); + }); + + test('dedupes concurrent status requests for the same provider', async () => { + const deferred = createDeferred(); + const getProviderStatus = vi.fn(() => deferred.promise); + const service = createInstallerService({ getProviderStatus }); + const { invoke } = setupHandlers(service); + + const firstRequest = invoke>( + CLI_INSTALLER_GET_PROVIDER_STATUS, + 'codex' + ); + const secondRequest = invoke>( + CLI_INSTALLER_GET_PROVIDER_STATUS, + 'codex' + ); + + await flushMicrotasks(); + expect(getProviderStatus).toHaveBeenCalledTimes(1); + + const providerStatus = createProviderStatus('codex'); + deferred.resolve(providerStatus); + + await expect(Promise.all([firstRequest, secondRequest])).resolves.toEqual([ + { success: true, data: providerStatus }, + { success: true, data: providerStatus }, + ]); + }); + + test('keeps status and model verification sequential for the same provider', async () => { + const started: string[] = []; + const statusDeferred = createDeferred(); + const verifyDeferred = createDeferred(); + const service = createInstallerService({ + getProviderStatus: vi.fn(() => { + started.push('status'); + return statusDeferred.promise; + }), + verifyProviderModels: vi.fn(() => { + started.push('verify'); + return verifyDeferred.promise; + }), + }); + const { invoke } = setupHandlers(service); + + const statusRequest = invoke>( + CLI_INSTALLER_GET_PROVIDER_STATUS, + 'opencode' + ); + const verifyRequest = invoke>( + CLI_INSTALLER_VERIFY_PROVIDER_MODELS, + 'opencode' + ); + + await flushMicrotasks(); + expect(started).toEqual(['status']); + + statusDeferred.resolve(createProviderStatus('opencode')); + await flushMicrotasks(); + expect(started).toEqual(['status', 'verify']); + + verifyDeferred.resolve(createProviderStatus('opencode')); + + const [statusResult, verifyResult] = await Promise.all([statusRequest, verifyRequest]); + expect(statusResult.success).toBe(true); + expect(verifyResult.success).toBe(true); + }); + + test('does not strand queued provider requests if handlers are reinitialized', async () => { + const started: CliProviderId[] = []; + const deferredByProvider = new Map>(); + const originalService = createInstallerService({ + getProviderStatus: vi.fn((providerId: CliProviderId) => { + started.push(providerId); + const deferred = createDeferred(); + deferredByProvider.set(providerId, deferred); + return deferred.promise; + }), + }); + const replacementService = createInstallerService({ + getProviderStatus: vi.fn(() => Promise.resolve(createProviderStatus('anthropic'))), + }); + const { invoke } = setupHandlers(originalService); + + const requests = (['anthropic', 'codex', 'opencode', 'gemini'] as CliProviderId[]).map( + (providerId) => + invoke>(CLI_INSTALLER_GET_PROVIDER_STATUS, providerId) + ); + + await flushMicrotasks(); + expect(started).toEqual(['anthropic', 'codex', 'opencode']); + + initializeCliInstallerHandlers(replacementService); + + deferredByProvider.get('anthropic')?.resolve(createProviderStatus('anthropic')); + deferredByProvider.get('codex')?.resolve(createProviderStatus('codex')); + deferredByProvider.get('opencode')?.resolve(createProviderStatus('opencode')); + await flushMicrotasks(); + + expect(started).toEqual(['anthropic', 'codex', 'opencode', 'gemini']); + expect(replacementService.getProviderStatus).not.toHaveBeenCalled(); + + deferredByProvider.get('gemini')?.resolve(createProviderStatus('gemini')); + const results = await Promise.all(requests); + expect(results.every((result) => result.success)).toBe(true); + }); + + test('releases a provider runtime slot after a failed request', async () => { + const started: CliProviderId[] = []; + const deferredByProvider = new Map>(); + const service = createInstallerService({ + getProviderStatus: vi.fn((providerId: CliProviderId) => { + started.push(providerId); + const deferred = createDeferred(); + deferredByProvider.set(providerId, deferred); + return deferred.promise; + }), + }); + const { invoke } = setupHandlers(service); + + const requests = (['anthropic', 'codex', 'opencode', 'gemini'] as CliProviderId[]).map( + (providerId) => + invoke>(CLI_INSTALLER_GET_PROVIDER_STATUS, providerId) + ); + + await flushMicrotasks(); + expect(started).toEqual(['anthropic', 'codex', 'opencode']); + + deferredByProvider.get('anthropic')?.reject(new Error('provider failed')); + await flushMicrotasks(); + expect(started).toEqual(['anthropic', 'codex', 'opencode', 'gemini']); + + deferredByProvider.get('codex')?.resolve(createProviderStatus('codex')); + deferredByProvider.get('opencode')?.resolve(createProviderStatus('opencode')); + deferredByProvider.get('gemini')?.resolve(createProviderStatus('gemini')); + + const results = await Promise.all(requests); + expect(results[0]).toEqual({ success: false, error: 'provider failed' }); + expect(results.slice(1).every((result) => result.success)).toBe(true); + }); + + test('does not patch a fresh status cache with stale provider results after invalidation', async () => { + const providerDeferred = createDeferred(); + const service = createInstallerService({ + getStatus: vi.fn(() => Promise.resolve(createCliStatus())), + getProviderStatus: vi.fn(() => providerDeferred.promise), + }); + const { invoke } = setupHandlers(service); + + const firstStatus = await invoke>(CLI_INSTALLER_GET_STATUS); + expect(firstStatus.success).toBe(true); + + const providerRequest = invoke>( + CLI_INSTALLER_GET_PROVIDER_STATUS, + 'codex' + ); + await flushMicrotasks(); + + const invalidateResult = await invoke>(CLI_INSTALLER_INVALIDATE_STATUS); + expect(invalidateResult.success).toBe(true); + + const freshStatus = await invoke>(CLI_INSTALLER_GET_STATUS); + expect(freshStatus).toEqual({ success: true, data: createCliStatus() }); + + providerDeferred.resolve(createProviderStatus('codex')); + await expect(providerRequest).resolves.toEqual({ + success: true, + data: createProviderStatus('codex'), + }); + + const cachedStatusResult = + await invoke>(CLI_INSTALLER_GET_STATUS); + expect(cachedStatusResult).toEqual({ success: true, data: createCliStatus() }); + }); +}); diff --git a/src/main/ipc/cliInstaller.ts b/src/main/ipc/cliInstaller.ts index 9775cc43..0b11a907 100644 --- a/src/main/ipc/cliInstaller.ts +++ b/src/main/ipc/cliInstaller.ts @@ -38,13 +38,16 @@ const logger = createLogger('IPC:cliInstaller'); let service: CliInstallerService; const statusInFlight = new Map>(); const providerStatusInFlight = new Map>(); -let providerRuntimeRequestTail: Promise = Promise.resolve(); +const providerRuntimeRequestTails = new Map>(); +const providerRuntimeRequestQueue: Array<() => void> = []; +let activeProviderRuntimeRequestCount = 0; const cachedStatus = new Map< CliInstallerProviderStatusMode, { value: CliInstallationStatus; at: number } >(); let statusCacheGeneration = 0; const STATUS_CACHE_TTL_MS = 5_000; +const MAX_PARALLEL_PROVIDER_RUNTIME_REQUESTS = 3; const FRONTEND_MULTIMODEL_PROVIDER_IDS = new Set(['anthropic', 'codex', 'opencode']); function isFrontendMultimodelProviderId(providerId: CliProviderId): boolean { @@ -111,12 +114,64 @@ function canUseStatusForCacheKey( ); } -function runProviderRuntimeRequest(operation: () => Promise): Promise { - const request = providerRuntimeRequestTail.then(operation, operation); - providerRuntimeRequestTail = request.then( +function resetProviderRuntimeRequestLimiter(): void { + if (activeProviderRuntimeRequestCount === 0 && providerRuntimeRequestQueue.length === 0) { + providerRuntimeRequestTails.clear(); + } +} + +function acquireProviderRuntimeRequestSlot(): Promise { + if (activeProviderRuntimeRequestCount < MAX_PARALLEL_PROVIDER_RUNTIME_REQUESTS) { + activeProviderRuntimeRequestCount += 1; + return Promise.resolve(); + } + + return new Promise((resolve) => { + providerRuntimeRequestQueue.push(() => { + activeProviderRuntimeRequestCount += 1; + resolve(); + }); + }); +} + +function releaseProviderRuntimeRequestSlot(): void { + activeProviderRuntimeRequestCount = Math.max(0, activeProviderRuntimeRequestCount - 1); + const next = providerRuntimeRequestQueue.shift(); + if (next) { + next(); + } +} + +async function runWithProviderRuntimeSlot(operation: () => Promise): Promise { + await acquireProviderRuntimeRequestSlot(); + try { + return await operation(); + } finally { + releaseProviderRuntimeRequestSlot(); + } +} + +function runProviderRuntimeRequest( + providerId: CliProviderId, + operation: () => Promise +): Promise { + const previousProviderRequest = providerRuntimeRequestTails.get(providerId) ?? Promise.resolve(); + const request = previousProviderRequest.then( + () => runWithProviderRuntimeSlot(operation), + () => runWithProviderRuntimeSlot(operation) + ); + const tail = request.then( () => undefined, () => undefined ); + + providerRuntimeRequestTails.set(providerId, tail); + void tail.finally(() => { + if (providerRuntimeRequestTails.get(providerId) === tail) { + providerRuntimeRequestTails.delete(providerId); + } + }); + return request; } @@ -125,7 +180,7 @@ function runProviderRuntimeRequest(operation: () => Promise): Promise { */ export function initializeCliInstallerHandlers(installerService: CliInstallerService): void { service = installerService; - providerRuntimeRequestTail = Promise.resolve(); + resetProviderRuntimeRequestLimiter(); } /** @@ -266,7 +321,10 @@ async function handleGetProviderStatus( } const generation = statusCacheGeneration; - const request = runProviderRuntimeRequest(() => service.getProviderStatus(providerId)) + const currentService = service; + const request = runProviderRuntimeRequest(providerId, () => + currentService.getProviderStatus(providerId) + ) .then((status) => { if (generation === statusCacheGeneration) { patchCachedProviderStatus(status); @@ -306,7 +364,10 @@ async function handleVerifyProviderModels( ): Promise> { try { const generation = statusCacheGeneration; - const status = await runProviderRuntimeRequest(() => service.verifyProviderModels(providerId)); + const currentService = service; + const status = await runProviderRuntimeRequest(providerId, () => + currentService.verifyProviderModels(providerId) + ); if (generation === statusCacheGeneration) { patchCachedProviderStatus(status); }