fix(member-work-sync): reclaim stale turn settled payloads

This commit is contained in:
777genius 2026-04-29 21:12:17 +03:00
parent 3ce183a229
commit 55ffb185c3
2 changed files with 86 additions and 1 deletions

View file

@ -13,12 +13,14 @@ import type { RuntimeTurnSettledSpoolPaths } from './RuntimeTurnSettledSpoolPath
const DEFAULT_MAX_PAYLOAD_BYTES = 256 * 1024;
const DEFAULT_PROCESSED_RETENTION_MS = 24 * 60 * 60 * 1000;
const DEFAULT_PROCESSED_RETENTION_COUNT = 1000;
const DEFAULT_PROCESSING_STALE_MS = 5 * 60 * 1000;
export interface FileRuntimeTurnSettledEventStoreDeps {
paths: RuntimeTurnSettledSpoolPaths;
maxPayloadBytes?: number;
processedRetentionMs?: number;
processedRetentionCount?: number;
processingStaleMs?: number;
now?: () => Date;
}
@ -47,6 +49,7 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent
private readonly maxPayloadBytes: number;
private readonly processedRetentionMs: number;
private readonly processedRetentionCount: number;
private readonly processingStaleMs: number;
private readonly now: () => Date;
constructor(private readonly deps: FileRuntimeTurnSettledEventStoreDeps) {
@ -54,11 +57,13 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent
this.processedRetentionMs = deps.processedRetentionMs ?? DEFAULT_PROCESSED_RETENTION_MS;
this.processedRetentionCount =
deps.processedRetentionCount ?? DEFAULT_PROCESSED_RETENTION_COUNT;
this.processingStaleMs = deps.processingStaleMs ?? DEFAULT_PROCESSING_STALE_MS;
this.now = deps.now ?? (() => new Date());
}
async claimPending(limit: number): Promise<RuntimeTurnSettledClaimedPayload[]> {
await this.ensureDirectories();
await this.recoverStaleProcessingPayloads();
const entries = await readdir(this.deps.paths.getIncomingDir(), { withFileTypes: true }).catch(
() => []
@ -149,6 +154,35 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent
]);
}
private async recoverStaleProcessingPayloads(): Promise<void> {
const cutoff = this.now().getTime() - this.processingStaleMs;
const entries = await readdir(this.deps.paths.getProcessingDir(), {
withFileTypes: true,
}).catch(() => []);
await Promise.allSettled(
entries
.filter(
(entry) =>
entry.isFile() &&
!entry.name.startsWith('.') &&
!entry.name.endsWith('.meta.json')
)
.map(async (entry) => {
const processingPath = path.join(this.deps.paths.getProcessingDir(), entry.name);
const fileStat = await stat(processingPath).catch(() => null);
if (!fileStat?.isFile() || fileStat.mtimeMs > cutoff) {
return;
}
await moveFileBestEffort(
processingPath,
path.join(this.deps.paths.getIncomingDir(), entry.name)
);
})
);
}
private async quarantineIncoming(
incomingPath: string,
fileName: string,
@ -185,7 +219,10 @@ export class FileRuntimeTurnSettledEventStore implements RuntimeTurnSettledEvent
);
await Promise.allSettled(
toRemove.flatMap((file) => [rm(file.filePath, { force: true }), rm(buildMetaFilePath(file.filePath), { force: true })])
toRemove.flatMap((file) => [
rm(file.filePath, { force: true }),
rm(buildMetaFilePath(file.filePath), { force: true }),
])
);
}
}

View file

@ -86,4 +86,52 @@ describe('FileRuntimeTurnSettledEventStore', () => {
};
expect(meta).toMatchObject({ outcome: 'enqueued', teamName: 'team-a' });
});
it('reclaims stale processing payloads before claiming pending events', async () => {
const paths = await makePaths();
await fs.mkdir(paths.getProcessingDir(), { recursive: true });
const filePath = path.join(paths.getProcessingDir(), '20260429-1.codex.json');
await fs.writeFile(filePath, '{"eventName":"runtime_turn_settled"}', 'utf8');
await fs.utimes(
filePath,
new Date('2026-04-29T11:00:00.000Z'),
new Date('2026-04-29T11:00:00.000Z')
);
const store = new FileRuntimeTurnSettledEventStore({
paths,
now: () => new Date('2026-04-29T12:00:00.000Z'),
processingStaleMs: 60_000,
});
const claimed = await store.claimPending(10);
expect(claimed).toHaveLength(1);
expect(claimed[0]).toMatchObject({
fileName: '20260429-1.codex.json',
provider: 'codex',
raw: '{"eventName":"runtime_turn_settled"}',
});
});
it('does not reclaim fresh processing payloads from an active drain', async () => {
const paths = await makePaths();
await fs.mkdir(paths.getProcessingDir(), { recursive: true });
const filePath = path.join(paths.getProcessingDir(), '20260429-1.codex.json');
await fs.writeFile(filePath, '{"eventName":"runtime_turn_settled"}', 'utf8');
await fs.utimes(
filePath,
new Date('2026-04-29T11:59:45.000Z'),
new Date('2026-04-29T11:59:45.000Z')
);
const store = new FileRuntimeTurnSettledEventStore({
paths,
now: () => new Date('2026-04-29T12:00:00.000Z'),
processingStaleMs: 60_000,
});
const claimed = await store.claimPending(10);
expect(claimed).toHaveLength(0);
await expect(fs.stat(filePath)).resolves.toMatchObject({ isFile: expect.any(Function) });
});
});