diff --git a/docs/team-management/member-work-sync-control-plane-plan.md b/docs/team-management/member-work-sync-control-plane-plan.md index 43ec1bc7..e71aee38 100644 --- a/docs/team-management/member-work-sync-control-plane-plan.md +++ b/docs/team-management/member-work-sync-control-plane-plan.md @@ -37,6 +37,7 @@ Current implementation note: - Phase 2 storage foundation is implemented as an inert durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. It is not wired to dispatch inbox nudges yet. - Reconciler can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; otherwise it records normal shadow status and does nothing. This preserves the anti-spam gate before any dispatcher exists. - Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port. +- Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts. - Phase 2 must not start until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low. Patterns used: diff --git a/src/features/member-work-sync/contracts/types.ts b/src/features/member-work-sync/contracts/types.ts index 9f74309d..c6daa966 100644 --- a/src/features/member-work-sync/contracts/types.ts +++ b/src/features/member-work-sync/contracts/types.ts @@ -298,3 +298,9 @@ export interface MemberWorkSyncOutboxMarkFailedInput { nowIso: string; nextAttemptAt?: string; } + +export interface MemberWorkSyncOutboxCountRecentDeliveredInput { + teamName: string; + memberName: string; + sinceIso: string; +} diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts index 47d8ce87..ebcbe89f 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts @@ -1,6 +1,10 @@ import type { MemberWorkSyncOutboxItem } from '../../contracts'; import type { MemberWorkSyncUseCaseDeps } from './ports'; +const MEMBER_WORK_SYNC_MAX_NUDGES_PER_MEMBER_PER_HOUR = 2; +const MEMBER_WORK_SYNC_RETRY_BASE_MINUTES = 10; +const MEMBER_WORK_SYNC_RETRY_MAX_MINUTES = 60; + export interface MemberWorkSyncNudgeDispatchSummary { claimed: number; delivered: number; @@ -23,6 +27,26 @@ function addMinutes(iso: string, minutes: number): string { return new Date(Date.parse(iso) + minutes * 60_000).toISOString(); } +function subtractMinutes(iso: string, minutes: number): string { + return new Date(Date.parse(iso) - minutes * 60_000).toISOString(); +} + +function stableJitterMinutes(id: string, attemptGeneration: number): number { + const seed = `${id}:${attemptGeneration}`; + let value = 0; + for (const char of seed) { + value = (value * 31 + char.charCodeAt(0)) % 997; + } + return value % 5; +} + +function nextRetryAt(item: MemberWorkSyncOutboxItem, nowIso: string): string { + const exponentialMinutes = + MEMBER_WORK_SYNC_RETRY_BASE_MINUTES * 2 ** Math.max(0, item.attemptGeneration - 1); + const cappedMinutes = Math.min(MEMBER_WORK_SYNC_RETRY_MAX_MINUTES, exponentialMinutes); + return addMinutes(nowIso, cappedMinutes + stableJitterMinutes(item.id, item.attemptGeneration)); +} + export class MemberWorkSyncNudgeDispatcher { constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {} @@ -71,7 +95,7 @@ export class MemberWorkSyncNudgeDispatcher { error: revalidation.reason, retryable: true, nowIso, - nextAttemptAt: revalidation.nextAttemptAt ?? addMinutes(nowIso, 10), + nextAttemptAt: revalidation.nextAttemptAt ?? nextRetryAt(item, nowIso), }); return 'retryable'; } @@ -120,7 +144,7 @@ export class MemberWorkSyncNudgeDispatcher { error: String(error), retryable: true, nowIso, - nextAttemptAt: addMinutes(nowIso, 10), + nextAttemptAt: nextRetryAt(item, nowIso), }); return 'retryable'; } @@ -160,6 +184,23 @@ export class MemberWorkSyncNudgeDispatcher { return { ok: false, reason: 'phase2_not_ready', retryable: true }; } + const recentDelivered = await this.deps.outboxStore?.countRecentDelivered({ + teamName: item.teamName, + memberName: item.memberName, + sinceIso: subtractMinutes(nowIso, 60), + }); + if ( + recentDelivered != null && + recentDelivered >= MEMBER_WORK_SYNC_MAX_NUDGES_PER_MEMBER_PER_HOUR + ) { + return { + ok: false, + reason: 'member_nudge_rate_limited', + retryable: true, + nextAttemptAt: addMinutes(nowIso, 60), + }; + } + const busy = await this.deps.busySignal?.isBusy({ teamName: item.teamName, memberName: item.memberName, diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index 362e3da5..8cb0eeeb 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -3,6 +3,7 @@ import type { MemberWorkSyncTeamMetrics, MemberWorkSyncProviderId, MemberWorkSyncOutboxClaimInput, + MemberWorkSyncOutboxCountRecentDeliveredInput, MemberWorkSyncOutboxEnsureInput, MemberWorkSyncOutboxEnsureResult, MemberWorkSyncOutboxItem, @@ -100,6 +101,7 @@ export interface MemberWorkSyncOutboxStorePort { markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise; markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise; markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise; + countRecentDelivered(input: MemberWorkSyncOutboxCountRecentDeliveredInput): Promise; } export interface MemberWorkSyncInboxNudgePort { diff --git a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts index 586c1c34..00aac225 100644 --- a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts +++ b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts @@ -6,6 +6,7 @@ import { withFileLock } from '@main/services/team/fileLock'; import type { MemberWorkSyncMetricEvent, MemberWorkSyncOutboxClaimInput, + MemberWorkSyncOutboxCountRecentDeliveredInput, MemberWorkSyncOutboxEnsureInput, MemberWorkSyncOutboxEnsureResult, MemberWorkSyncOutboxItem, @@ -513,6 +514,18 @@ export class JsonMemberWorkSyncStore }); } + async countRecentDelivered( + input: MemberWorkSyncOutboxCountRecentDeliveredInput + ): Promise { + const file = await this.readOutboxFile(input.teamName); + return Object.values(file.items).filter( + (item) => + item.memberName.trim().toLowerCase() === input.memberName.trim().toLowerCase() && + item.status === 'delivered' && + item.updatedAt >= input.sinceIso + ).length; + } + private async readFile(teamName: string): Promise { const filePath = this.paths.getStatusPath(teamName); try { diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index e08b077d..6d49b96f 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -171,6 +171,7 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { ...current, status: 'delivered', deliveredMessageId: input.deliveredMessageId, + updatedAt: input.nowIso, }); } } @@ -189,15 +190,33 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { ...current, status: input.retryable ? 'failed_retryable' : 'failed_terminal', lastError: input.error, + ...(input.nextAttemptAt ? { nextAttemptAt: input.nextAttemptAt } : {}), + updatedAt: input.nowIso, }); } } + + async countRecentDelivered(input: { + memberName: string; + sinceIso: string; + }): Promise { + return [...this.items.values()].filter( + (item) => + item.status === 'delivered' && + item.memberName === input.memberName && + item.updatedAt >= input.sinceIso + ).length; + } } class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort { readonly inserted: Array[0]> = []; + fail = false; async insertIfAbsent(input: Parameters[0]) { + if (this.fail) { + throw new Error('inbox unavailable'); + } this.inserted.push(input); return { inserted: true, messageId: input.messageId }; } @@ -505,6 +524,74 @@ describe('MemberWorkSync use cases', () => { }); }); + it('rate-limits delivered nudges per member per hour', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const current = await new MemberWorkSyncDiagnosticsReader(deps).execute({ + teamName: 'team-a', + memberName: 'bob', + }); + const firstId = `member-work-sync:team-a:bob:${current.agenda.fingerprint}:old-1`; + const secondId = `member-work-sync:team-a:bob:${current.agenda.fingerprint}:old-2`; + const baseItem = outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`); + expect(baseItem).toBeDefined(); + for (const id of [firstId, secondId]) { + outbox.items.set(id, { + ...(baseItem as NonNullable), + id, + status: 'delivered', + deliveredMessageId: id, + updatedAt: '2026-04-29T00:00:00.000Z', + }); + } + + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 }); + expect(inbox.inserted).toEqual([]); + expect(outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`)).toMatchObject({ + status: 'failed_retryable', + lastError: 'member_nudge_rate_limited', + nextAttemptAt: '2026-04-29T01:00:00.000Z', + }); + }); + + it('uses bounded retry backoff when inbox delivery fails', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + inbox.fail = true; + const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const current = await new MemberWorkSyncDiagnosticsReader(deps).execute({ + teamName: 'team-a', + memberName: 'bob', + }); + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + const item = outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`); + expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 }); + expect(item).toMatchObject({ + status: 'failed_retryable', + lastError: 'Error: inbox unavailable', + }); + expect(Date.parse(item?.nextAttemptAt ?? '')).toBeGreaterThan( + Date.parse('2026-04-29T00:09:59.000Z') + ); + expect(Date.parse(item?.nextAttemptAt ?? '')).toBeLessThanOrEqual( + Date.parse('2026-04-29T00:14:00.000Z') + ); + }); + it('rejects invalid report tokens without recording replayable intents', async () => { const { deps, store } = createDeps(); const reader = new MemberWorkSyncDiagnosticsReader(deps);