fix(member-work-sync): serialize audit journal appends
This commit is contained in:
parent
5224fe4cda
commit
c536557991
2 changed files with 37 additions and 1 deletions
|
|
@ -125,6 +125,7 @@ export class NoopMemberWorkSyncAuditJournal implements MemberWorkSyncAuditJourna
|
|||
export class FileMemberWorkSyncAuditJournal implements MemberWorkSyncAuditJournalPort {
|
||||
private readonly maxBytes: number;
|
||||
private readonly rotatedFileCount: number;
|
||||
private readonly appendChains = new Map<string, Promise<void>>();
|
||||
|
||||
constructor(
|
||||
private readonly paths: MemberWorkSyncStorePaths,
|
||||
|
|
@ -136,9 +137,24 @@ export class FileMemberWorkSyncAuditJournal implements MemberWorkSyncAuditJourna
|
|||
}
|
||||
|
||||
async append(event: MemberWorkSyncAuditEvent): Promise<void> {
|
||||
const filePath = this.paths.getMemberJournalPath(event.teamName, event.memberName);
|
||||
const previous = this.appendChains.get(filePath) ?? Promise.resolve();
|
||||
const next = previous.catch(() => undefined).then(() => this.appendToFile(filePath, event));
|
||||
|
||||
this.appendChains.set(filePath, next);
|
||||
|
||||
try {
|
||||
await next;
|
||||
} finally {
|
||||
if (this.appendChains.get(filePath) === next) {
|
||||
this.appendChains.delete(filePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async appendToFile(filePath: string, event: MemberWorkSyncAuditEvent): Promise<void> {
|
||||
try {
|
||||
await this.paths.ensureMemberWorkSyncDir(event.teamName, event.memberName);
|
||||
const filePath = this.paths.getMemberJournalPath(event.teamName, event.memberName);
|
||||
await mkdir(dirname(filePath), { recursive: true });
|
||||
await withFileLock(filePath, async () => {
|
||||
await rotateIfNeeded(filePath, this.maxBytes, this.rotatedFileCount);
|
||||
|
|
|
|||
|
|
@ -90,6 +90,26 @@ describe('FileMemberWorkSyncAuditJournal', () => {
|
|||
expect(latest.taskRefs[0].taskId).toHaveLength(243);
|
||||
});
|
||||
|
||||
it('serializes concurrent appends for the same member journal', async () => {
|
||||
const journal = new FileMemberWorkSyncAuditJournal(new MemberWorkSyncStorePaths(root));
|
||||
const events = Array.from({ length: 80 }, (_, index) => ({
|
||||
timestamp: `2026-04-30T00:01:${String(index).padStart(2, '0')}.000Z`,
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
event: 'queue_coalesced' as const,
|
||||
source: 'test',
|
||||
reason: `event-${index}`,
|
||||
}));
|
||||
|
||||
await Promise.all(events.map((event) => journal.append(event)));
|
||||
|
||||
const lines = (await readFile(journalPath(root), 'utf8')).trim().split('\n');
|
||||
expect(lines).toHaveLength(events.length);
|
||||
expect(lines.map((line) => JSON.parse(line).reason)).toEqual(
|
||||
events.map((event) => event.reason)
|
||||
);
|
||||
});
|
||||
|
||||
it('logs and swallows append failures', async () => {
|
||||
const logger = { debug: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
const paths = new MemberWorkSyncStorePaths(root);
|
||||
|
|
|
|||
Loading…
Reference in a new issue