From ec7935e593b18efeb46f888da70670b6d0c5d23a Mon Sep 17 00:00:00 2001 From: 777genius Date: Wed, 29 Apr 2026 15:15:42 +0300 Subject: [PATCH] feat(member-work-sync): add durable nudge outbox store --- .../member-work-sync-control-plane-plan.md | 1 + .../member-work-sync/contracts/types.ts | 93 +++++++ .../core/application/ports.ts | 16 ++ .../infrastructure/JsonMemberWorkSyncStore.ts | 240 +++++++++++++++++- .../MemberWorkSyncStorePaths.ts | 4 + .../main/JsonMemberWorkSyncStore.test.ts | 113 ++++++++- 6 files changed, 465 insertions(+), 2 deletions(-) diff --git a/docs/team-management/member-work-sync-control-plane-plan.md b/docs/team-management/member-work-sync-control-plane-plan.md index 5a17c0b3..14f64d56 100644 --- a/docs/team-management/member-work-sync-control-plane-plan.md +++ b/docs/team-management/member-work-sync-control-plane-plan.md @@ -34,6 +34,7 @@ Current implementation note: - Phase 1 is intentionally shadow-only: it computes agendas, fingerprints, report tokens, reports, persisted status, passive queue reconciliation, startup replay, diagnostics, metrics, and a neutral read-only member details surface. - Phase 1 does not insert inbox messages, send nudges, mark tasks/messages read, or change `TeamTaskStallMonitor` semantics. - Phase 1.5 exposes a machine-readable `phase2Readiness` assessment from shadow metrics. It can say `collecting_shadow_data`, `blocked`, or `shadow_ready`; it still does not dispatch nudges. +- Phase 2 storage foundation is implemented as an inert durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. It is not wired to dispatch inbox nudges yet. - Phase 2 must not start until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low. Patterns used: diff --git a/src/features/member-work-sync/contracts/types.ts b/src/features/member-work-sync/contracts/types.ts index d7a8566b..9f74309d 100644 --- a/src/features/member-work-sync/contracts/types.ts +++ b/src/features/member-work-sync/contracts/types.ts @@ -205,3 +205,96 @@ export interface MemberWorkSyncStatusRequest { export interface MemberWorkSyncMetricsRequest { teamName: string; } + +export type MemberWorkSyncOutboxStatus = + | 'pending' + | 'claimed' + | 'delivered' + | 'superseded' + | 'failed_retryable' + | 'failed_terminal'; + +export interface MemberWorkSyncNudgePayload { + from: 'system'; + to: string; + messageKind: 'member_work_sync_nudge'; + source: 'member-work-sync'; + actionMode: 'do'; + text: string; + taskRefs: Array<{ + taskId: string; + displayId: string; + teamName: string; + }>; +} + +export interface MemberWorkSyncOutboxItem { + id: string; + teamName: string; + memberName: string; + agendaFingerprint: string; + payloadHash: string; + payload: MemberWorkSyncNudgePayload; + status: MemberWorkSyncOutboxStatus; + attemptGeneration: number; + claimedBy?: string; + claimedAt?: string; + deliveredMessageId?: string; + lastError?: string; + nextAttemptAt?: string; + createdAt: string; + updatedAt: string; +} + +export type MemberWorkSyncOutboxEnsureResult = + | { ok: true; outcome: 'created' | 'existing'; item: MemberWorkSyncOutboxItem } + | { + ok: false; + outcome: 'payload_conflict'; + item: MemberWorkSyncOutboxItem; + existingPayloadHash: string; + requestedPayloadHash: string; + }; + +export interface MemberWorkSyncOutboxEnsureInput { + id: string; + teamName: string; + memberName: string; + agendaFingerprint: string; + payloadHash: string; + payload: MemberWorkSyncNudgePayload; + nowIso: string; + nextAttemptAt?: string; +} + +export interface MemberWorkSyncOutboxClaimInput { + teamName: string; + claimedBy: string; + nowIso: string; + limit: number; +} + +export interface MemberWorkSyncOutboxMarkDeliveredInput { + teamName: string; + id: string; + attemptGeneration: number; + deliveredMessageId: string; + nowIso: string; +} + +export interface MemberWorkSyncOutboxMarkSupersededInput { + teamName: string; + id: string; + reason: string; + nowIso: string; +} + +export interface MemberWorkSyncOutboxMarkFailedInput { + teamName: string; + id: string; + attemptGeneration: number; + error: string; + retryable: boolean; + nowIso: string; + nextAttemptAt?: string; +} diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index b3865eb8..47487cd8 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -2,6 +2,13 @@ import type { MemberWorkSyncAgenda, MemberWorkSyncTeamMetrics, MemberWorkSyncProviderId, + MemberWorkSyncOutboxClaimInput, + MemberWorkSyncOutboxEnsureInput, + MemberWorkSyncOutboxEnsureResult, + MemberWorkSyncOutboxItem, + MemberWorkSyncOutboxMarkDeliveredInput, + MemberWorkSyncOutboxMarkFailedInput, + MemberWorkSyncOutboxMarkSupersededInput, MemberWorkSyncReport, MemberWorkSyncReportIntent, MemberWorkSyncReportIntentStatus, @@ -87,12 +94,21 @@ export interface MemberWorkSyncReportStorePort { ): Promise; } +export interface MemberWorkSyncOutboxStorePort { + ensurePending(input: MemberWorkSyncOutboxEnsureInput): Promise; + claimDue(input: MemberWorkSyncOutboxClaimInput): Promise; + markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise; + markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise; + markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise; +} + export interface MemberWorkSyncUseCaseDeps { clock: MemberWorkSyncClockPort; hash: MemberWorkSyncHashPort; agendaSource: MemberWorkSyncAgendaSourcePort; statusStore: MemberWorkSyncStatusStorePort; reportStore?: MemberWorkSyncReportStorePort; + outboxStore?: MemberWorkSyncOutboxStorePort; reportToken?: MemberWorkSyncReportTokenPort; lifecycle?: MemberWorkSyncLifecyclePort; logger?: MemberWorkSyncLoggerPort; diff --git a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts index d7b61e8a..586c1c34 100644 --- a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts +++ b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts @@ -5,6 +5,13 @@ import { mkdir, readFile, rename } from 'fs/promises'; import { withFileLock } from '@main/services/team/fileLock'; import type { MemberWorkSyncMetricEvent, + MemberWorkSyncOutboxClaimInput, + MemberWorkSyncOutboxEnsureInput, + MemberWorkSyncOutboxEnsureResult, + MemberWorkSyncOutboxItem, + MemberWorkSyncOutboxMarkDeliveredInput, + MemberWorkSyncOutboxMarkFailedInput, + MemberWorkSyncOutboxMarkSupersededInput, MemberWorkSyncReportIntent, MemberWorkSyncReportRequest, MemberWorkSyncStatus, @@ -13,6 +20,7 @@ import type { } from '../../contracts'; import { assessMemberWorkSyncPhase2Readiness } from '../../core/domain'; import type { + MemberWorkSyncOutboxStorePort, MemberWorkSyncReportStorePort, MemberWorkSyncStatusStorePort, } from '../../core/application'; @@ -31,6 +39,11 @@ interface PendingReportFile { intents: Record; } +interface OutboxFile { + schemaVersion: 1; + items: Record; +} + function normalizeMemberKey(memberName: string): string { return memberName.trim().toLowerCase(); } @@ -68,6 +81,31 @@ function isPendingReportFile(value: unknown): value is PendingReportFile { ); } +function isOutboxFile(value: unknown): value is OutboxFile { + return ( + value != null && + typeof value === 'object' && + (value as OutboxFile).schemaVersion === 1 && + (value as OutboxFile).items != null && + typeof (value as OutboxFile).items === 'object' && + !Array.isArray((value as OutboxFile).items) + ); +} + +function isOutboxTerminal(status: MemberWorkSyncOutboxItem['status']): boolean { + return status === 'delivered' || status === 'superseded' || status === 'failed_terminal'; +} + +function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean { + if (item.status !== 'pending' && item.status !== 'failed_retryable') { + return false; + } + if (!item.nextAttemptAt) { + return true; + } + return item.nextAttemptAt <= nowIso; +} + function stableStringify(value: unknown): string { if (value == null || typeof value !== 'object') { return JSON.stringify(value); @@ -194,7 +232,10 @@ async function quarantineFile(filePath: string): Promise { } export class JsonMemberWorkSyncStore - implements MemberWorkSyncStatusStorePort, MemberWorkSyncReportStorePort + implements + MemberWorkSyncStatusStorePort, + MemberWorkSyncReportStorePort, + MemberWorkSyncOutboxStorePort { private readonly writeQueues = new Map>(); @@ -317,6 +358,161 @@ export class JsonMemberWorkSyncStore }); } + async ensurePending( + input: MemberWorkSyncOutboxEnsureInput + ): Promise { + let result: MemberWorkSyncOutboxEnsureResult | null = null; + await this.enqueue(input.teamName, async () => { + await withFileLock(this.paths.getOutboxPath(input.teamName), async () => { + const existing = await this.readOutboxFile(input.teamName); + const current = existing.items[input.id]; + if (current) { + if (current.payloadHash !== input.payloadHash) { + result = { + ok: false, + outcome: 'payload_conflict', + item: current, + existingPayloadHash: current.payloadHash, + requestedPayloadHash: input.payloadHash, + }; + return; + } + + if (!isOutboxTerminal(current.status) && current.status !== 'pending') { + const next: MemberWorkSyncOutboxItem = { + ...current, + status: 'pending', + updatedAt: input.nowIso, + }; + const nextAttemptAt = input.nextAttemptAt ?? current.nextAttemptAt; + if (nextAttemptAt) { + next.nextAttemptAt = nextAttemptAt; + } else { + delete next.nextAttemptAt; + } + delete next.claimedBy; + delete next.claimedAt; + delete next.lastError; + existing.items[input.id] = next; + await this.writeOutboxFile(input.teamName, existing); + result = { ok: true, outcome: 'existing', item: existing.items[input.id] }; + return; + } + + result = { ok: true, outcome: 'existing', item: current }; + return; + } + + const item: MemberWorkSyncOutboxItem = { + id: input.id, + teamName: input.teamName, + memberName: input.memberName, + agendaFingerprint: input.agendaFingerprint, + payloadHash: input.payloadHash, + payload: input.payload, + status: 'pending', + attemptGeneration: 0, + ...(input.nextAttemptAt ? { nextAttemptAt: input.nextAttemptAt } : {}), + createdAt: input.nowIso, + updatedAt: input.nowIso, + }; + existing.items[input.id] = item; + await this.writeOutboxFile(input.teamName, existing); + result = { ok: true, outcome: 'created', item }; + }); + }); + + if (!result) { + throw new Error('Member work sync outbox write did not produce a result'); + } + return result; + } + + async claimDue(input: MemberWorkSyncOutboxClaimInput): Promise { + const claimed: MemberWorkSyncOutboxItem[] = []; + await this.enqueue(input.teamName, async () => { + await withFileLock(this.paths.getOutboxPath(input.teamName), async () => { + const existing = await this.readOutboxFile(input.teamName); + const due = Object.values(existing.items) + .filter((item) => canClaimOutboxItem(item, input.nowIso)) + .sort((left, right) => { + const leftTime = left.nextAttemptAt ?? left.updatedAt; + const rightTime = right.nextAttemptAt ?? right.updatedAt; + return leftTime.localeCompare(rightTime); + }) + .slice(0, Math.max(0, input.limit)); + + for (const item of due) { + const next: MemberWorkSyncOutboxItem = { + ...item, + status: 'claimed', + attemptGeneration: item.attemptGeneration + 1, + claimedBy: input.claimedBy, + claimedAt: input.nowIso, + updatedAt: input.nowIso, + }; + delete next.lastError; + existing.items[item.id] = next; + claimed.push(next); + } + + if (due.length > 0) { + await this.writeOutboxFile(input.teamName, existing); + } + }); + }); + return claimed; + } + + async markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise { + await this.updateOutboxItem(input.teamName, input.id, (current) => { + if (!current || current.attemptGeneration !== input.attemptGeneration) { + return current; + } + const next: MemberWorkSyncOutboxItem = { + ...current, + status: 'delivered', + deliveredMessageId: input.deliveredMessageId, + updatedAt: input.nowIso, + }; + delete next.lastError; + return next; + }); + } + + async markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise { + await this.updateOutboxItem(input.teamName, input.id, (current) => { + if (!current || isOutboxTerminal(current.status)) { + return current; + } + return { + ...current, + status: 'superseded', + lastError: input.reason, + updatedAt: input.nowIso, + }; + }); + } + + async markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise { + await this.updateOutboxItem(input.teamName, input.id, (current) => { + if (!current || current.attemptGeneration !== input.attemptGeneration) { + return current; + } + const next: MemberWorkSyncOutboxItem = { + ...current, + status: input.retryable ? 'failed_retryable' : 'failed_terminal', + lastError: input.error, + ...(input.retryable && input.nextAttemptAt ? { nextAttemptAt: input.nextAttemptAt } : {}), + updatedAt: input.nowIso, + }; + if (!input.retryable) { + delete next.nextAttemptAt; + } + return next; + }); + } + private async readFile(teamName: string): Promise { const filePath = this.paths.getStatusPath(teamName); try { @@ -351,6 +547,48 @@ export class JsonMemberWorkSyncStore return { schemaVersion: 1, intents: {} }; } + private async readOutboxFile(teamName: string): Promise { + const filePath = this.paths.getOutboxPath(teamName); + try { + const raw = await readFile(filePath, 'utf8'); + const parsed = JSON.parse(raw); + if (isOutboxFile(parsed)) { + return parsed; + } + await quarantineFile(filePath); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + await quarantineFile(filePath); + } + } + return { schemaVersion: 1, items: {} }; + } + + private async writeOutboxFile(teamName: string, file: OutboxFile): Promise { + await mkdir(this.paths.getTeamDir(teamName), { recursive: true }); + await atomicWriteAsync(this.paths.getOutboxPath(teamName), JSON.stringify(file, null, 2)); + } + + private async updateOutboxItem( + teamName: string, + id: string, + updater: ( + current: MemberWorkSyncOutboxItem | undefined + ) => MemberWorkSyncOutboxItem | undefined + ): Promise { + await this.enqueue(teamName, async () => { + await withFileLock(this.paths.getOutboxPath(teamName), async () => { + const existing = await this.readOutboxFile(teamName); + const next = updater(existing.items[id]); + if (!next) { + return; + } + existing.items[id] = next; + await this.writeOutboxFile(teamName, existing); + }); + }); + } + private async writePendingFile(teamName: string, file: PendingReportFile): Promise { await mkdir(this.paths.getTeamDir(teamName), { recursive: true }); await atomicWriteAsync( diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts index 6d72b670..55d1facc 100644 --- a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts @@ -15,6 +15,10 @@ export class MemberWorkSyncStorePaths { return join(this.getTeamDir(teamName), 'pending-reports.json'); } + getOutboxPath(teamName: string): string { + return join(this.getTeamDir(teamName), 'outbox.json'); + } + getReportTokenSecretPath(teamName: string): string { return join(this.getTeamDir(teamName), 'report-token-secret.json'); } diff --git a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts index f5acfff2..6d7685a3 100644 --- a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts +++ b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts @@ -5,7 +5,10 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore'; import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths'; -import type { MemberWorkSyncStatus } from '@features/member-work-sync/contracts'; +import type { + MemberWorkSyncNudgePayload, + MemberWorkSyncStatus, +} from '@features/member-work-sync/contracts'; function makeStatus(overrides: Partial): MemberWorkSyncStatus { return { @@ -42,6 +45,19 @@ function makeStatus(overrides: Partial): MemberWorkSyncSta }; } +function makeNudgePayload(overrides: Partial = {}): MemberWorkSyncNudgePayload { + return { + from: 'system', + to: 'bob', + messageKind: 'member_work_sync_nudge', + source: 'member-work-sync', + actionMode: 'do', + text: 'Work sync check: continue the current task or report a blocker.', + taskRefs: [{ teamName: 'team-a', taskId: 'task-1', displayId: '11111111' }], + ...overrides, + }; +} + describe('JsonMemberWorkSyncStore', () => { let root: string; let store: JsonMemberWorkSyncStore; @@ -152,4 +168,99 @@ describe('JsonMemberWorkSyncStore', () => { ]), }); }); + + it('deduplicates outbox items by id and rejects payload hash conflicts', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + + await expect(store.ensurePending(input)).resolves.toMatchObject({ + ok: true, + outcome: 'created', + item: { status: 'pending', attemptGeneration: 0 }, + }); + await expect(store.ensurePending(input)).resolves.toMatchObject({ + ok: true, + outcome: 'existing', + }); + await expect(store.ensurePending({ ...input, payloadHash: 'hash-b' })).resolves.toMatchObject({ + ok: false, + outcome: 'payload_conflict', + existingPayloadHash: 'hash-a', + requestedPayloadHash: 'hash-b', + }); + }); + + it('claims due outbox items and fences terminal updates by attempt generation', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + + const claimed = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 5, + }); + expect(claimed).toHaveLength(1); + expect(claimed[0]).toMatchObject({ + id: input.id, + status: 'claimed', + attemptGeneration: 1, + claimedBy: 'dispatcher-a', + }); + + await store.markDelivered({ + teamName: 'team-a', + id: input.id, + attemptGeneration: 0, + deliveredMessageId: 'wrong-generation', + nowIso: '2026-04-29T00:02:00.000Z', + }); + await expect( + store.ensurePending({ + ...input, + nowIso: '2026-04-29T00:03:00.000Z', + }) + ).resolves.toMatchObject({ + ok: true, + item: { status: 'pending', attemptGeneration: 1 }, + }); + + const claimedAgain = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:04:00.000Z', + limit: 1, + }); + await store.markDelivered({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimedAgain[0].attemptGeneration, + deliveredMessageId: 'message-1', + nowIso: '2026-04-29T00:05:00.000Z', + }); + + const file = JSON.parse( + await readFile(join(root, 'team-a', '.member-work-sync', 'outbox.json'), 'utf8') + ); + expect(file.items[input.id]).toMatchObject({ + status: 'delivered', + deliveredMessageId: 'message-1', + attemptGeneration: 2, + }); + }); });