diff --git a/src/main/services/team/TeamReconcileDrainScheduler.ts b/src/main/services/team/TeamReconcileDrainScheduler.ts index 0861ae10..4564b873 100644 --- a/src/main/services/team/TeamReconcileDrainScheduler.ts +++ b/src/main/services/team/TeamReconcileDrainScheduler.ts @@ -11,15 +11,61 @@ interface TeamReconcileDrainState { lastTrigger: TeamReconcileTrigger | null; } +const DEFAULT_TEAM_RECONCILE_DRAIN_RUN_TIMEOUT_MS = 2 * 60_000; + export interface TeamReconcileDrainScheduler { schedule(teamName: string, trigger: TeamReconcileTrigger): void; dispose(): void; } +class TeamReconcileDrainTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = 'TeamReconcileDrainTimeoutError'; + } +} + +function unrefTimer(timer: ReturnType): void { + timer.unref?.(); +} + +async function runWithTimeout(options: { + run: () => Promise; + timeoutMs: number; + teamName: string; + trigger: TeamReconcileTrigger; +}): Promise { + let timeout: ReturnType | null = null; + try { + await Promise.race([ + options.run(), + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject( + new TeamReconcileDrainTimeoutError( + `team reconcile drain timed out for ${options.teamName} source=${options.trigger.source} detail=${options.trigger.detail} after ${options.timeoutMs}ms` + ) + ); + }, options.timeoutMs); + unrefTimer(timeout); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } +} + export function createTeamReconcileDrainScheduler(options: { run: (teamName: string, trigger: TeamReconcileTrigger) => Promise; + runTimeoutMs?: number; }): TeamReconcileDrainScheduler { const states = new Map(); + const runTimeoutMs = Math.max( + 1, + options.runTimeoutMs ?? DEFAULT_TEAM_RECONCILE_DRAIN_RUN_TIMEOUT_MS + ); let disposed = false; const drainTeam = async (teamName: string): Promise => { @@ -40,9 +86,18 @@ export function createTeamReconcileDrainScheduler(options: { } try { - await options.run(teamName, trigger); + await runWithTimeout({ + run: () => options.run(teamName, trigger), + timeoutMs: runTimeoutMs, + teamName, + trigger, + }); } catch (error) { failed = true; + if (error instanceof TeamReconcileDrainTimeoutError && !state.pending) { + state.pending = true; + state.lastTrigger = trigger; + } throw error; } finally { if (!disposed) { @@ -54,10 +109,7 @@ export function createTeamReconcileDrainScheduler(options: { state.running = false; if (disposed || !state.pending) { states.delete(teamName); - return; - } - - if (failed) { + } else if (failed) { void drainTeam(teamName).catch(() => undefined); } } diff --git a/test/main/services/team/TeamReconcileDrainScheduler.test.ts b/test/main/services/team/TeamReconcileDrainScheduler.test.ts index 98efd778..c8976ce8 100644 --- a/test/main/services/team/TeamReconcileDrainScheduler.test.ts +++ b/test/main/services/team/TeamReconcileDrainScheduler.test.ts @@ -30,13 +30,14 @@ function createDeferred(): Deferred { } async function flushAsyncWork(): Promise { - await Promise.resolve(); - await Promise.resolve(); - await Promise.resolve(); + for (let i = 0; i < 8; i += 1) { + await Promise.resolve(); + } } describe('TeamReconcileDrainScheduler', () => { afterEach(() => { + vi.useRealTimers(); vi.restoreAllMocks(); mockYieldToEventLoop.mockReset(); }); @@ -176,6 +177,72 @@ describe('TeamReconcileDrainScheduler', () => { scheduler.dispose(); }); + it('times out a hung run so pending team reconciles can continue', async () => { + vi.useFakeTimers(); + mockYieldToEventLoop.mockResolvedValue(undefined); + const hungRun = createDeferred(); + const run = vi + .fn<(teamName: string, trigger: TeamReconcileTrigger) => Promise>() + .mockImplementationOnce(async () => { + await hungRun.promise; + }) + .mockResolvedValueOnce(undefined); + const scheduler = createTeamReconcileDrainScheduler({ + run, + runTimeoutMs: 10, + }); + + scheduler.schedule('team-a', { source: 'inbox', detail: 'inboxes/alice.json' }); + await flushAsyncWork(); + expect(run).toHaveBeenCalledTimes(1); + + scheduler.schedule('team-a', { source: 'task', detail: 'task-2.json' }); + await flushAsyncWork(); + expect(run).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(10); + await flushAsyncWork(); + + expect(run).toHaveBeenCalledTimes(2); + expect(run).toHaveBeenNthCalledWith(2, 'team-a', { + source: 'task', + detail: 'task-2.json', + }); + + scheduler.dispose(); + }); + + it('retries the timed out trigger when no newer event arrived', async () => { + vi.useFakeTimers(); + mockYieldToEventLoop.mockResolvedValue(undefined); + const hungRun = createDeferred(); + const run = vi + .fn<(teamName: string, trigger: TeamReconcileTrigger) => Promise>() + .mockImplementationOnce(async () => { + await hungRun.promise; + }) + .mockResolvedValueOnce(undefined); + const scheduler = createTeamReconcileDrainScheduler({ + run, + runTimeoutMs: 10, + }); + + scheduler.schedule('team-a', { source: 'inbox', detail: 'inboxes/alice.json' }); + await flushAsyncWork(); + expect(run).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(10); + await flushAsyncWork(); + + expect(run).toHaveBeenCalledTimes(2); + expect(run).toHaveBeenNthCalledWith(2, 'team-a', { + source: 'inbox', + detail: 'inboxes/alice.json', + }); + + scheduler.dispose(); + }); + it('does not lose a new event that arrives while a failed pass is yielding', async () => { const yieldGate = createDeferred(); mockYieldToEventLoop.mockImplementationOnce(() => yieldGate.promise).mockResolvedValue(undefined);