fix(team): retry timed out reconcile drains

This commit is contained in:
777genius 2026-06-03 22:47:53 +03:00
parent 5366dbb34a
commit 3f188f9367
2 changed files with 127 additions and 8 deletions

View file

@ -11,15 +11,61 @@ interface TeamReconcileDrainState {
lastTrigger: TeamReconcileTrigger | null;
}
const DEFAULT_TEAM_RECONCILE_DRAIN_RUN_TIMEOUT_MS = 2 * 60_000;
export interface TeamReconcileDrainScheduler {
schedule(teamName: string, trigger: TeamReconcileTrigger): void;
dispose(): void;
}
class TeamReconcileDrainTimeoutError extends Error {
constructor(message: string) {
super(message);
this.name = 'TeamReconcileDrainTimeoutError';
}
}
function unrefTimer(timer: ReturnType<typeof setTimeout>): void {
timer.unref?.();
}
async function runWithTimeout(options: {
run: () => Promise<void>;
timeoutMs: number;
teamName: string;
trigger: TeamReconcileTrigger;
}): Promise<void> {
let timeout: ReturnType<typeof setTimeout> | null = null;
try {
await Promise.race([
options.run(),
new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(
new TeamReconcileDrainTimeoutError(
`team reconcile drain timed out for ${options.teamName} source=${options.trigger.source} detail=${options.trigger.detail} after ${options.timeoutMs}ms`
)
);
}, options.timeoutMs);
unrefTimer(timeout);
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
export function createTeamReconcileDrainScheduler(options: {
run: (teamName: string, trigger: TeamReconcileTrigger) => Promise<void>;
runTimeoutMs?: number;
}): TeamReconcileDrainScheduler {
const states = new Map<string, TeamReconcileDrainState>();
const runTimeoutMs = Math.max(
1,
options.runTimeoutMs ?? DEFAULT_TEAM_RECONCILE_DRAIN_RUN_TIMEOUT_MS
);
let disposed = false;
const drainTeam = async (teamName: string): Promise<void> => {
@ -40,9 +86,18 @@ export function createTeamReconcileDrainScheduler(options: {
}
try {
await options.run(teamName, trigger);
await runWithTimeout({
run: () => options.run(teamName, trigger),
timeoutMs: runTimeoutMs,
teamName,
trigger,
});
} catch (error) {
failed = true;
if (error instanceof TeamReconcileDrainTimeoutError && !state.pending) {
state.pending = true;
state.lastTrigger = trigger;
}
throw error;
} finally {
if (!disposed) {
@ -54,10 +109,7 @@ export function createTeamReconcileDrainScheduler(options: {
state.running = false;
if (disposed || !state.pending) {
states.delete(teamName);
return;
}
if (failed) {
} else if (failed) {
void drainTeam(teamName).catch(() => undefined);
}
}

View file

@ -30,13 +30,14 @@ function createDeferred<T>(): Deferred<T> {
}
async function flushAsyncWork(): Promise<void> {
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
for (let i = 0; i < 8; i += 1) {
await Promise.resolve();
}
}
describe('TeamReconcileDrainScheduler', () => {
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
mockYieldToEventLoop.mockReset();
});
@ -176,6 +177,72 @@ describe('TeamReconcileDrainScheduler', () => {
scheduler.dispose();
});
it('times out a hung run so pending team reconciles can continue', async () => {
vi.useFakeTimers();
mockYieldToEventLoop.mockResolvedValue(undefined);
const hungRun = createDeferred<void>();
const run = vi
.fn<(teamName: string, trigger: TeamReconcileTrigger) => Promise<void>>()
.mockImplementationOnce(async () => {
await hungRun.promise;
})
.mockResolvedValueOnce(undefined);
const scheduler = createTeamReconcileDrainScheduler({
run,
runTimeoutMs: 10,
});
scheduler.schedule('team-a', { source: 'inbox', detail: 'inboxes/alice.json' });
await flushAsyncWork();
expect(run).toHaveBeenCalledTimes(1);
scheduler.schedule('team-a', { source: 'task', detail: 'task-2.json' });
await flushAsyncWork();
expect(run).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(10);
await flushAsyncWork();
expect(run).toHaveBeenCalledTimes(2);
expect(run).toHaveBeenNthCalledWith(2, 'team-a', {
source: 'task',
detail: 'task-2.json',
});
scheduler.dispose();
});
it('retries the timed out trigger when no newer event arrived', async () => {
vi.useFakeTimers();
mockYieldToEventLoop.mockResolvedValue(undefined);
const hungRun = createDeferred<void>();
const run = vi
.fn<(teamName: string, trigger: TeamReconcileTrigger) => Promise<void>>()
.mockImplementationOnce(async () => {
await hungRun.promise;
})
.mockResolvedValueOnce(undefined);
const scheduler = createTeamReconcileDrainScheduler({
run,
runTimeoutMs: 10,
});
scheduler.schedule('team-a', { source: 'inbox', detail: 'inboxes/alice.json' });
await flushAsyncWork();
expect(run).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(10);
await flushAsyncWork();
expect(run).toHaveBeenCalledTimes(2);
expect(run).toHaveBeenNthCalledWith(2, 'team-a', {
source: 'inbox',
detail: 'inboxes/alice.json',
});
scheduler.dispose();
});
it('does not lose a new event that arrives while a failed pass is yielding', async () => {
const yieldGate = createDeferred<void>();
mockYieldToEventLoop.mockImplementationOnce(() => yieldGate.promise).mockResolvedValue(undefined);