diff --git a/src/features/member-work-sync/main/infrastructure/FileRuntimeTurnSettledEventStore.ts b/src/features/member-work-sync/main/infrastructure/FileRuntimeTurnSettledEventStore.ts index fdf0bd87..bd85fc94 100644 --- a/src/features/member-work-sync/main/infrastructure/FileRuntimeTurnSettledEventStore.ts +++ b/src/features/member-work-sync/main/infrastructure/FileRuntimeTurnSettledEventStore.ts @@ -13,12 +13,14 @@ import type { RuntimeTurnSettledSpoolPaths } from './RuntimeTurnSettledSpoolPath const DEFAULT_MAX_PAYLOAD_BYTES = 256 * 1024; const DEFAULT_PROCESSED_RETENTION_MS = 24 * 60 * 60 * 1000; const DEFAULT_PROCESSED_RETENTION_COUNT = 1000; +const DEFAULT_PROCESSING_STALE_MS = 5 * 60 * 1000; export interface FileRuntimeTurnSettledEventStoreDeps { paths: RuntimeTurnSettledSpoolPaths; maxPayloadBytes?: number; processedRetentionMs?: number; processedRetentionCount?: number; + processingStaleMs?: number; now?: () => Date; } @@ -47,6 +49,7 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent private readonly maxPayloadBytes: number; private readonly processedRetentionMs: number; private readonly processedRetentionCount: number; + private readonly processingStaleMs: number; private readonly now: () => Date; constructor(private readonly deps: FileRuntimeTurnSettledEventStoreDeps) { @@ -54,11 +57,13 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent this.processedRetentionMs = deps.processedRetentionMs ?? DEFAULT_PROCESSED_RETENTION_MS; this.processedRetentionCount = deps.processedRetentionCount ?? DEFAULT_PROCESSED_RETENTION_COUNT; + this.processingStaleMs = deps.processingStaleMs ?? DEFAULT_PROCESSING_STALE_MS; this.now = deps.now ?? (() => new Date()); } async claimPending(limit: number): Promise { await this.ensureDirectories(); + await this.recoverStaleProcessingPayloads(); const entries = await readdir(this.deps.paths.getIncomingDir(), { withFileTypes: true }).catch( () => [] @@ -149,6 +154,35 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent ]); } + private async recoverStaleProcessingPayloads(): Promise { + const cutoff = this.now().getTime() - this.processingStaleMs; + const entries = await readdir(this.deps.paths.getProcessingDir(), { + withFileTypes: true, + }).catch(() => []); + + await Promise.allSettled( + entries + .filter( + (entry) => + entry.isFile() && + !entry.name.startsWith('.') && + !entry.name.endsWith('.meta.json') + ) + .map(async (entry) => { + const processingPath = path.join(this.deps.paths.getProcessingDir(), entry.name); + const fileStat = await stat(processingPath).catch(() => null); + if (!fileStat?.isFile() || fileStat.mtimeMs > cutoff) { + return; + } + + await moveFileBestEffort( + processingPath, + path.join(this.deps.paths.getIncomingDir(), entry.name) + ); + }) + ); + } + private async quarantineIncoming( incomingPath: string, fileName: string, @@ -185,7 +219,10 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent ); await Promise.allSettled( - toRemove.flatMap((file) => [rm(file.filePath, { force: true }), rm(buildMetaFilePath(file.filePath), { force: true })]) + toRemove.flatMap((file) => [ + rm(file.filePath, { force: true }), + rm(buildMetaFilePath(file.filePath), { force: true }), + ]) ); } } diff --git a/test/features/member-work-sync/main/FileRuntimeTurnSettledEventStore.test.ts b/test/features/member-work-sync/main/FileRuntimeTurnSettledEventStore.test.ts index 052982d9..b349d399 100644 --- a/test/features/member-work-sync/main/FileRuntimeTurnSettledEventStore.test.ts +++ b/test/features/member-work-sync/main/FileRuntimeTurnSettledEventStore.test.ts @@ -86,4 +86,52 @@ describe('FileRuntimeTurnSettledEventStore', () => { }; expect(meta).toMatchObject({ outcome: 'enqueued', teamName: 'team-a' }); }); + + it('reclaims stale processing payloads before claiming pending events', async () => { + const paths = await makePaths(); + await fs.mkdir(paths.getProcessingDir(), { recursive: true }); + const filePath = path.join(paths.getProcessingDir(), '20260429-1.codex.json'); + await fs.writeFile(filePath, '{"eventName":"runtime_turn_settled"}', 'utf8'); + await fs.utimes( + filePath, + new Date('2026-04-29T11:00:00.000Z'), + new Date('2026-04-29T11:00:00.000Z') + ); + const store = new FileRuntimeTurnSettledEventStore({ + paths, + now: () => new Date('2026-04-29T12:00:00.000Z'), + processingStaleMs: 60_000, + }); + + const claimed = await store.claimPending(10); + + expect(claimed).toHaveLength(1); + expect(claimed[0]).toMatchObject({ + fileName: '20260429-1.codex.json', + provider: 'codex', + raw: '{"eventName":"runtime_turn_settled"}', + }); + }); + + it('does not reclaim fresh processing payloads from an active drain', async () => { + const paths = await makePaths(); + await fs.mkdir(paths.getProcessingDir(), { recursive: true }); + const filePath = path.join(paths.getProcessingDir(), '20260429-1.codex.json'); + await fs.writeFile(filePath, '{"eventName":"runtime_turn_settled"}', 'utf8'); + await fs.utimes( + filePath, + new Date('2026-04-29T11:59:45.000Z'), + new Date('2026-04-29T11:59:45.000Z') + ); + const store = new FileRuntimeTurnSettledEventStore({ + paths, + now: () => new Date('2026-04-29T12:00:00.000Z'), + processingStaleMs: 60_000, + }); + + const claimed = await store.claimPending(10); + + expect(claimed).toHaveLength(0); + await expect(fs.stat(filePath)).resolves.toMatchObject({ isFile: expect.any(Function) }); + }); });