From 23714f5ca8f52848d82c9d36833019260ec0328b Mon Sep 17 00:00:00 2001 From: 777genius Date: Wed, 29 Apr 2026 15:26:06 +0300 Subject: [PATCH] feat(member-work-sync): add guarded nudge dispatcher --- .../member-work-sync-control-plane-plan.md | 1 + .../MemberWorkSyncNudgeDispatcher.ts | 192 ++++++++++++++++++ .../core/application/index.ts | 1 + .../core/application/ports.ts | 31 +++ .../TeamInboxMemberWorkSyncNudgeSink.ts | 36 ++++ .../createMemberWorkSyncFeature.ts | 9 + src/shared/types/team.ts | 3 +- .../core/MemberWorkSyncUseCases.test.ts | 118 ++++++++++- 8 files changed, 386 insertions(+), 5 deletions(-) create mode 100644 src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts create mode 100644 src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts diff --git a/docs/team-management/member-work-sync-control-plane-plan.md b/docs/team-management/member-work-sync-control-plane-plan.md index 9d51cfd9..aa5d7e3d 100644 --- a/docs/team-management/member-work-sync-control-plane-plan.md +++ b/docs/team-management/member-work-sync-control-plane-plan.md @@ -36,6 +36,7 @@ Current implementation note: - Phase 1.5 exposes a machine-readable `phase2Readiness` assessment from shadow metrics. It can say `collecting_shadow_data`, `blocked`, or `shadow_ready`; it still does not dispatch nudges. - Phase 2 storage foundation is implemented as an inert durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. It is not wired to dispatch inbox nudges yet. - Reconciler can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; otherwise it records normal shadow status and does nothing. This preserves the anti-spam gate before any dispatcher exists. +- Dispatcher use case exists behind explicit facade invocation. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port. - Phase 2 must not start until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low. Patterns used: diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts new file mode 100644 index 00000000..47d8ce87 --- /dev/null +++ b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts @@ -0,0 +1,192 @@ +import type { MemberWorkSyncOutboxItem } from '../../contracts'; +import type { MemberWorkSyncUseCaseDeps } from './ports'; + +export interface MemberWorkSyncNudgeDispatchSummary { + claimed: number; + delivered: number; + superseded: number; + retryable: number; + terminal: number; +} + +export interface MemberWorkSyncNudgeDispatchOptions { + claimedBy: string; + teamNames: string[]; + limit?: number; +} + +function emptySummary(): MemberWorkSyncNudgeDispatchSummary { + return { claimed: 0, delivered: 0, superseded: 0, retryable: 0, terminal: 0 }; +} + +function addMinutes(iso: string, minutes: number): string { + return new Date(Date.parse(iso) + minutes * 60_000).toISOString(); +} + +export class MemberWorkSyncNudgeDispatcher { + constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {} + + async dispatchDue(options: MemberWorkSyncNudgeDispatchOptions): Promise { + const outbox = this.deps.outboxStore; + const inbox = this.deps.inboxNudge; + if (!outbox || !inbox) { + return emptySummary(); + } + + const nowIso = this.deps.clock.now().toISOString(); + const summary = emptySummary(); + for (const teamName of [...new Set(options.teamNames.map((name) => name.trim()).filter(Boolean))]) { + const claimed = await outbox.claimDue({ + teamName, + claimedBy: options.claimedBy, + nowIso, + limit: options.limit ?? 10, + }); + summary.claimed += claimed.length; + for (const item of claimed) { + const result = await this.dispatchItem(item, nowIso); + summary[result] += 1; + } + } + return summary; + } + + private async dispatchItem( + item: MemberWorkSyncOutboxItem, + nowIso: string + ): Promise> { + const outbox = this.deps.outboxStore; + const inbox = this.deps.inboxNudge; + if (!outbox || !inbox) { + return 'terminal'; + } + + const revalidation = await this.revalidate(item, nowIso); + if (!revalidation.ok) { + if (revalidation.retryable) { + await outbox.markFailed({ + teamName: item.teamName, + id: item.id, + attemptGeneration: item.attemptGeneration, + error: revalidation.reason, + retryable: true, + nowIso, + nextAttemptAt: revalidation.nextAttemptAt ?? addMinutes(nowIso, 10), + }); + return 'retryable'; + } + await outbox.markSuperseded({ + teamName: item.teamName, + id: item.id, + reason: revalidation.reason, + nowIso, + }); + return 'superseded'; + } + + try { + const inserted = await inbox.insertIfAbsent({ + teamName: item.teamName, + memberName: item.memberName, + messageId: item.id, + payloadHash: item.payloadHash, + payload: item.payload, + timestamp: nowIso, + }); + if (inserted.conflict) { + await outbox.markFailed({ + teamName: item.teamName, + id: item.id, + attemptGeneration: item.attemptGeneration, + error: 'inbox_payload_conflict', + retryable: false, + nowIso, + }); + return 'terminal'; + } + await outbox.markDelivered({ + teamName: item.teamName, + id: item.id, + attemptGeneration: item.attemptGeneration, + deliveredMessageId: inserted.messageId, + nowIso, + }); + return 'delivered'; + } catch (error) { + await outbox.markFailed({ + teamName: item.teamName, + id: item.id, + attemptGeneration: item.attemptGeneration, + error: String(error), + retryable: true, + nowIso, + nextAttemptAt: addMinutes(nowIso, 10), + }); + return 'retryable'; + } + } + + private async revalidate( + item: MemberWorkSyncOutboxItem, + nowIso: string + ): Promise< + | { ok: true } + | { ok: false; reason: string; retryable: boolean; nextAttemptAt?: string } + > { + if (this.deps.lifecycle && !(await this.deps.lifecycle.isTeamActive(item.teamName))) { + return { ok: false, reason: 'team_inactive', retryable: false }; + } + + const status = await this.deps.statusStore.read({ + teamName: item.teamName, + memberName: item.memberName, + }); + if (!status) { + return { ok: false, reason: 'status_missing', retryable: false }; + } + if ( + status.state !== 'needs_sync' || + status.shadow?.wouldNudge !== true || + status.agenda.fingerprint !== item.agendaFingerprint + ) { + return { ok: false, reason: 'status_no_longer_matches_outbox', retryable: false }; + } + + if (!this.deps.statusStore.readTeamMetrics) { + return { ok: false, reason: 'metrics_unavailable', retryable: true }; + } + const metrics = await this.deps.statusStore.readTeamMetrics(item.teamName); + if (metrics.phase2Readiness.state !== 'shadow_ready') { + return { ok: false, reason: 'phase2_not_ready', retryable: true }; + } + + const busy = await this.deps.busySignal?.isBusy({ + teamName: item.teamName, + memberName: item.memberName, + nowIso, + }); + if (busy?.busy) { + return { + ok: false, + reason: `member_busy:${busy.reason ?? 'unknown'}`, + retryable: true, + nextAttemptAt: busy.retryAfterIso, + }; + } + + const taskIds = item.payload.taskRefs.map((taskRef) => taskRef.taskId); + if ( + this.deps.watchdogCooldown && + (await this.deps.watchdogCooldown.hasRecentNudge({ + teamName: item.teamName, + memberName: item.memberName, + taskIds, + nowIso, + })) + ) { + return { ok: false, reason: 'watchdog_cooldown_active', retryable: true }; + } + + return { ok: true }; + } +} diff --git a/src/features/member-work-sync/core/application/index.ts b/src/features/member-work-sync/core/application/index.ts index 9391fad3..be3d297f 100644 --- a/src/features/member-work-sync/core/application/index.ts +++ b/src/features/member-work-sync/core/application/index.ts @@ -1,5 +1,6 @@ export * from './MemberWorkSyncDiagnosticsReader'; export * from './MemberWorkSyncMetricsReader'; +export * from './MemberWorkSyncNudgeDispatcher'; export * from './MemberWorkSyncNudgeOutboxPlanner'; export * from './MemberWorkSyncPendingReportIntentReplayer'; export * from './MemberWorkSyncReconciler'; diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index 47487cd8..362e3da5 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -102,6 +102,34 @@ export interface MemberWorkSyncOutboxStorePort { markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise; } +export interface MemberWorkSyncInboxNudgePort { + insertIfAbsent(input: { + teamName: string; + memberName: string; + messageId: string; + payloadHash: string; + payload: MemberWorkSyncOutboxItem['payload']; + timestamp: string; + }): Promise<{ inserted: boolean; messageId: string; conflict?: boolean }>; +} + +export interface MemberWorkSyncWatchdogCooldownPort { + hasRecentNudge(input: { + teamName: string; + memberName: string; + taskIds: string[]; + nowIso: string; + }): Promise; +} + +export interface MemberWorkSyncBusySignalPort { + isBusy(input: { + teamName: string; + memberName: string; + nowIso: string; + }): Promise<{ busy: boolean; reason?: string; retryAfterIso?: string }>; +} + export interface MemberWorkSyncUseCaseDeps { clock: MemberWorkSyncClockPort; hash: MemberWorkSyncHashPort; @@ -109,6 +137,9 @@ export interface MemberWorkSyncUseCaseDeps { statusStore: MemberWorkSyncStatusStorePort; reportStore?: MemberWorkSyncReportStorePort; outboxStore?: MemberWorkSyncOutboxStorePort; + inboxNudge?: MemberWorkSyncInboxNudgePort; + watchdogCooldown?: MemberWorkSyncWatchdogCooldownPort; + busySignal?: MemberWorkSyncBusySignalPort; reportToken?: MemberWorkSyncReportTokenPort; lifecycle?: MemberWorkSyncLifecyclePort; logger?: MemberWorkSyncLoggerPort; diff --git a/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts b/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts new file mode 100644 index 00000000..fc2e9778 --- /dev/null +++ b/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts @@ -0,0 +1,36 @@ +import { TeamInboxReader } from '@main/services/team/TeamInboxReader'; +import { TeamInboxWriter } from '@main/services/team/TeamInboxWriter'; +import type { MemberWorkSyncInboxNudgePort } from '../../../core/application'; + +export class TeamInboxMemberWorkSyncNudgeSink implements MemberWorkSyncInboxNudgePort { + constructor( + private readonly inboxReader: Pick = new TeamInboxReader(), + private readonly inboxWriter: Pick = new TeamInboxWriter() + ) {} + + async insertIfAbsent(input: Parameters[0]) { + const existing = await this.inboxReader.getMessagesFor(input.teamName, input.memberName); + if (existing.some((message) => message.messageId === input.messageId)) { + return { inserted: false, messageId: input.messageId }; + } + + const result = await this.inboxWriter.sendMessage(input.teamName, { + member: input.memberName, + from: input.payload.from, + to: input.payload.to, + messageId: input.messageId, + timestamp: input.timestamp, + text: input.payload.text, + taskRefs: input.payload.taskRefs, + actionMode: input.payload.actionMode, + summary: 'Work sync check', + source: 'system_notification', + messageKind: input.payload.messageKind, + }); + + return { + inserted: true, + messageId: result.messageId, + }; + } +} diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index ce76d27f..4f0ea231 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -9,6 +9,8 @@ import type { import { MemberWorkSyncDiagnosticsReader, MemberWorkSyncMetricsReader, + MemberWorkSyncNudgeDispatcher, + type MemberWorkSyncNudgeDispatchSummary, MemberWorkSyncPendingReportIntentReplayer, type MemberWorkSyncPendingReportReplaySummary, MemberWorkSyncReconciler, @@ -16,6 +18,7 @@ import { type MemberWorkSyncReconcileContext, } from '../../core/application'; import { MemberWorkSyncTeamChangeRouter } from '../adapters/input/MemberWorkSyncTeamChangeRouter'; +import { TeamInboxMemberWorkSyncNudgeSink } from '../adapters/output/TeamInboxMemberWorkSyncNudgeSink'; import { TeamTaskAgendaSource } from '../adapters/output/TeamTaskAgendaSource'; import { HmacMemberWorkSyncReportTokenAdapter } from '../infrastructure/HmacMemberWorkSyncReportTokenAdapter'; import { @@ -41,6 +44,7 @@ export interface MemberWorkSyncFeatureFacade { noteTeamChange(event: TeamChangeEvent): void; enqueueStartupScan(teamNames: string[]): Promise; replayPendingReports(teamNames: string[]): Promise; + dispatchDueNudges(teamNames: string[]): Promise; getQueueDiagnostics(): MemberWorkSyncQueueDiagnostics; dispose(): Promise; } @@ -67,6 +71,7 @@ export function createMemberWorkSyncFeature(deps: { const storePaths = new MemberWorkSyncStorePaths(deps.teamsBasePath); const store = new JsonMemberWorkSyncStore(storePaths); const reportToken = new HmacMemberWorkSyncReportTokenAdapter(storePaths); + const inboxNudge = new TeamInboxMemberWorkSyncNudgeSink(); const useCaseDeps = { clock, hash, @@ -74,6 +79,7 @@ export function createMemberWorkSyncFeature(deps: { statusStore: store, reportStore: store, outboxStore: store, + inboxNudge, reportToken, ...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}), logger: deps.logger, @@ -83,6 +89,7 @@ export function createMemberWorkSyncFeature(deps: { const reporter = new MemberWorkSyncReporter(useCaseDeps); const reconciler = new MemberWorkSyncReconciler(useCaseDeps); const pendingReportReplayer = new MemberWorkSyncPendingReportIntentReplayer(useCaseDeps); + const nudgeDispatcher = new MemberWorkSyncNudgeDispatcher(useCaseDeps); const queue = new MemberWorkSyncEventQueue({ reconcile: async (request, context: MemberWorkSyncReconcileContext) => { await reconciler.execute(request, context); @@ -116,6 +123,8 @@ export function createMemberWorkSyncFeature(deps: { { processed: 0, accepted: 0, rejected: 0, superseded: 0 } ); }, + dispatchDueNudges: (teamNames) => + nudgeDispatcher.dispatchDue({ teamNames, claimedBy: `member-work-sync:${process.pid}` }), getQueueDiagnostics: () => queue.getDiagnostics(), dispose: () => queue.stop(), }; diff --git a/src/shared/types/team.ts b/src/shared/types/team.ts index beef1d6b..d35a58d0 100644 --- a/src/shared/types/team.ts +++ b/src/shared/types/team.ts @@ -413,7 +413,8 @@ export type InboxMessageKind = | 'default' | 'slash_command' | 'slash_command_result' - | 'task_comment_notification'; + | 'task_comment_notification' + | 'member_work_sync_nudge'; export interface SlashCommandMeta { name: string; diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index 23060c3d..e08b077d 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -2,9 +2,11 @@ import { describe, expect, it } from 'vitest'; import { MemberWorkSyncDiagnosticsReader, + MemberWorkSyncNudgeDispatcher, MemberWorkSyncPendingReportIntentReplayer, MemberWorkSyncReporter, type MemberWorkSyncAgendaSourceResult, + type MemberWorkSyncInboxNudgePort, type MemberWorkSyncOutboxStorePort, type MemberWorkSyncStatusStorePort, type MemberWorkSyncUseCaseDeps, @@ -131,9 +133,14 @@ class InMemoryStatusStore implements MemberWorkSyncStatusStorePort { class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { readonly ensures: MemberWorkSyncOutboxEnsureInput[] = []; + readonly items = new Map(); async ensurePending(input: MemberWorkSyncOutboxEnsureInput) { this.ensures.push(input); + const current = this.items.get(input.id); + if (current) { + return { ok: true as const, outcome: 'existing' as const, item: current }; + } const item: MemberWorkSyncOutboxItem = { ...input, status: 'pending', @@ -141,18 +148,59 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { createdAt: input.nowIso, updatedAt: input.nowIso, }; + this.items.set(input.id, item); return { ok: true as const, outcome: 'created' as const, item }; } async claimDue(): Promise { - return []; + const due = [...this.items.values()].filter((item) => item.status === 'pending'); + for (const item of due) { + this.items.set(item.id, { + ...item, + status: 'claimed', + attemptGeneration: item.attemptGeneration + 1, + }); + } + return due.map((item) => this.items.get(item.id) as MemberWorkSyncOutboxItem); } - async markDelivered(_input: MemberWorkSyncOutboxMarkDeliveredInput): Promise {} + async markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise { + const current = this.items.get(input.id); + if (current?.attemptGeneration === input.attemptGeneration) { + this.items.set(input.id, { + ...current, + status: 'delivered', + deliveredMessageId: input.deliveredMessageId, + }); + } + } - async markSuperseded(_input: MemberWorkSyncOutboxMarkSupersededInput): Promise {} + async markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise { + const current = this.items.get(input.id); + if (current) { + this.items.set(input.id, { ...current, status: 'superseded', lastError: input.reason }); + } + } - async markFailed(_input: MemberWorkSyncOutboxMarkFailedInput): Promise {} + async markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise { + const current = this.items.get(input.id); + if (current?.attemptGeneration === input.attemptGeneration) { + this.items.set(input.id, { + ...current, + status: input.retryable ? 'failed_retryable' : 'failed_terminal', + lastError: input.error, + }); + } + } +} + +class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort { + readonly inserted: Array[0]> = []; + + async insertIfAbsent(input: Parameters[0]) { + this.inserted.push(input); + return { inserted: true, messageId: input.messageId }; + } } function createDeps(options?: { @@ -162,6 +210,7 @@ function createDeps(options?: { teamActive?: boolean; providerId?: 'opencode' | 'codex'; outboxStore?: MemberWorkSyncOutboxStorePort; + inboxNudge?: MemberWorkSyncInboxNudgePort; }) { const clock = new MutableClock(); const store = new InMemoryStatusStore(); @@ -189,6 +238,7 @@ function createDeps(options?: { statusStore: store, reportStore: store, ...(options?.outboxStore ? { outboxStore: options.outboxStore } : {}), + ...(options?.inboxNudge ? { inboxNudge: options.inboxNudge } : {}), reportToken: { create: async (input) => ({ token: `token:${input.teamName}:${input.memberName}:${input.agendaFingerprint}`, @@ -395,6 +445,66 @@ describe('MemberWorkSync use cases', () => { }); }); + it('dispatches due nudges only after revalidating current status and readiness', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const status = await new MemberWorkSyncDiagnosticsReader(deps).execute({ + teamName: 'team-a', + memberName: 'bob', + }); + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 1, superseded: 0 }); + expect(inbox.inserted).toHaveLength(1); + expect(inbox.inserted[0]).toMatchObject({ + teamName: 'team-a', + memberName: 'bob', + messageId: `member-work-sync:team-a:bob:${status.agenda.fingerprint}`, + }); + expect(outbox.items.get(`member-work-sync:team-a:bob:${status.agenda.fingerprint}`)).toMatchObject({ + status: 'delivered', + deliveredMessageId: `member-work-sync:team-a:bob:${status.agenda.fingerprint}`, + }); + }); + + it('does not dispatch stale outbox items after the member reports still working', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const reader = new MemberWorkSyncDiagnosticsReader(deps); + const reporter = new MemberWorkSyncReporter(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + await reporter.execute({ + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: current.reportToken, + leaseTtlMs: 120_000, + source: 'test', + }); + + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 0, superseded: 1 }); + expect(inbox.inserted).toEqual([]); + expect(outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`)).toMatchObject({ + status: 'superseded', + lastError: 'status_no_longer_matches_outbox', + }); + }); + it('rejects invalid report tokens without recording replayable intents', async () => { const { deps, store } = createDeps(); const reader = new MemberWorkSyncDiagnosticsReader(deps);