From f705cd25cb12322686a9e51ad9e9ca56cf082d4d Mon Sep 17 00:00:00 2001 From: 777genius Date: Wed, 29 Apr 2026 15:54:29 +0300 Subject: [PATCH] feat(member-work-sync): schedule due nudge dispatch --- .../member-work-sync-control-plane-plan.md | 1 + .../createMemberWorkSyncFeature.ts | 18 ++- .../MemberWorkSyncNudgeDispatchScheduler.ts | 104 ++++++++++++++++++ src/main/index.ts | 4 + ...mberWorkSyncNudgeDispatchScheduler.test.ts | 64 +++++++++++ 5 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts create mode 100644 test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts diff --git a/docs/team-management/member-work-sync-control-plane-plan.md b/docs/team-management/member-work-sync-control-plane-plan.md index ebe8b0ef..7cb98ad3 100644 --- a/docs/team-management/member-work-sync-control-plane-plan.md +++ b/docs/team-management/member-work-sync-control-plane-plan.md @@ -38,6 +38,7 @@ Current implementation note: - Queue reconciles can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; read-only diagnostics never create outbox intents. This preserves the anti-spam gate and keeps UI/status reads passive. - Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port. - Production busy revalidation is wired through a tool-activity busy signal adapter. Active or recently finished tool calls defer Phase 2 nudges instead of interrupting work. +- A feature-owned dispatch scheduler wakes due retryable outbox items for active teams. It is bounded, unref'ed, and still relies on dispatcher revalidation before any inbox write. - Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts. - Phase 2 dispatch stays blocked until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low. diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index aa6a8ef1..cbd5acbb 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -28,6 +28,7 @@ import { } from '../infrastructure/MemberWorkSyncEventQueue'; import { JsonMemberWorkSyncStore } from '../infrastructure/JsonMemberWorkSyncStore'; import { MemberWorkSyncStorePaths } from '../infrastructure/MemberWorkSyncStorePaths'; +import { MemberWorkSyncNudgeDispatchScheduler } from '../infrastructure/MemberWorkSyncNudgeDispatchScheduler'; import { MemberWorkSyncToolActivityBusySignal } from '../infrastructure/MemberWorkSyncToolActivityBusySignal'; import { NodeHashAdapter } from '../infrastructure/NodeHashAdapter'; import { SystemClockAdapter } from '../infrastructure/SystemClockAdapter'; @@ -58,6 +59,7 @@ export function createMemberWorkSyncFeature(deps: { kanbanManager: TeamKanbanManager; membersMetaStore: TeamMembersMetaStore; isTeamActive?: (teamName: string) => Promise | boolean; + listActiveTeamNames?: () => Promise; logger?: MemberWorkSyncLoggerPort; }): MemberWorkSyncFeatureFacade { const clock = new SystemClockAdapter(); @@ -108,6 +110,18 @@ export function createMemberWorkSyncFeature(deps: { logger: deps.logger, }); const router = new MemberWorkSyncTeamChangeRouter(agendaSource, queue); + const nudgeDispatchScheduler = deps.listActiveTeamNames + ? new MemberWorkSyncNudgeDispatchScheduler({ + listActiveTeamNames: deps.listActiveTeamNames, + dispatchDue: (teamNames) => + nudgeDispatcher.dispatchDue({ + teamNames, + claimedBy: `member-work-sync:${process.pid}:scheduled`, + }), + logger: deps.logger, + }) + : null; + nudgeDispatchScheduler?.start(); return { getStatus: (request) => diagnosticsReader.execute(request), @@ -139,6 +153,8 @@ export function createMemberWorkSyncFeature(deps: { dispatchDueNudges: (teamNames) => nudgeDispatcher.dispatchDue({ teamNames, claimedBy: `member-work-sync:${process.pid}` }), getQueueDiagnostics: () => queue.getDiagnostics(), - dispose: () => queue.stop(), + dispose: async () => { + await Promise.allSettled([queue.stop(), nudgeDispatchScheduler?.dispose()]); + }, }; } diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts new file mode 100644 index 00000000..f2d5a5c0 --- /dev/null +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts @@ -0,0 +1,104 @@ +import type { + MemberWorkSyncLoggerPort, + MemberWorkSyncNudgeDispatchSummary, +} from '../../core/application'; + +const DEFAULT_NUDGE_DISPATCH_INTERVAL_MS = 60_000; + +function uniqueNonEmpty(values: string[]): string[] { + return [...new Set(values.map((value) => value.trim()).filter(Boolean))]; +} + +function unrefTimer(timer: ReturnType): void { + timer.unref?.(); +} + +export interface MemberWorkSyncNudgeDispatchSchedulerDeps { + listActiveTeamNames(): Promise; + dispatchDue(teamNames: string[]): Promise; + intervalMs?: number; + logger?: MemberWorkSyncLoggerPort; +} + +export class MemberWorkSyncNudgeDispatchScheduler { + private readonly intervalMs: number; + private timer: ReturnType | null = null; + private running: Promise | null = null; + private stopped = false; + + constructor(private readonly deps: MemberWorkSyncNudgeDispatchSchedulerDeps) { + this.intervalMs = Math.max(10_000, deps.intervalMs ?? DEFAULT_NUDGE_DISPATCH_INTERVAL_MS); + } + + start(): void { + if (this.stopped || this.timer) { + return; + } + this.schedule(this.intervalMs); + } + + async runOnce(): Promise { + if (this.stopped) { + return; + } + if (this.running) { + await this.running; + return; + } + + const work = this.dispatchOnce(); + this.running = work; + try { + await work; + } finally { + if (this.running === work) { + this.running = null; + } + } + } + + async dispose(): Promise { + this.stopped = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + if (this.running) { + await this.running.catch(() => undefined); + } + } + + private schedule(delayMs: number): void { + if (this.stopped) { + return; + } + if (this.timer) { + clearTimeout(this.timer); + } + this.timer = setTimeout(() => { + this.timer = null; + void this.runOnce().finally(() => this.schedule(this.intervalMs)); + }, delayMs); + unrefTimer(this.timer); + } + + private async dispatchOnce(): Promise { + try { + const teamNames = uniqueNonEmpty(await this.deps.listActiveTeamNames()); + if (teamNames.length === 0) { + return; + } + const summary = await this.deps.dispatchDue(teamNames); + if (summary.claimed > 0 || summary.delivered > 0 || summary.retryable > 0) { + this.deps.logger?.debug('member work sync scheduled nudge dispatch completed', { + teamCount: teamNames.length, + ...summary, + }); + } + } catch (error) { + this.deps.logger?.warn('member work sync scheduled nudge dispatch failed', { + error: String(error), + }); + } + } +} diff --git a/src/main/index.ts b/src/main/index.ts index eb8f057f..f3df407b 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -1236,6 +1236,10 @@ async function initializeServices(): Promise { isTeamActive: (teamName) => teamProvisioningService.isTeamAlive(teamName) || teamProvisioningService.hasProvisioningRun(teamName), + listActiveTeamNames: async () => + (await teamDataService.listTeams()) + .filter((team) => !team.deletedAt) + .map((team) => team.teamName), logger: createLogger('Feature:MemberWorkSync'), }); void teamDataService diff --git a/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts b/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts new file mode 100644 index 00000000..ceb65ab5 --- /dev/null +++ b/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { MemberWorkSyncNudgeDispatchScheduler } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler'; + +describe('MemberWorkSyncNudgeDispatchScheduler', () => { + it('dispatches due nudges for unique active teams without overlapping runs', async () => { + let release!: () => void; + const firstDispatch = new Promise((resolve) => { + release = resolve; + }); + const dispatchDue = vi.fn(async () => { + await firstDispatch; + return { claimed: 1, delivered: 1, superseded: 0, retryable: 0, terminal: 0 }; + }); + const scheduler = new MemberWorkSyncNudgeDispatchScheduler({ + listActiveTeamNames: async () => ['team-a', 'team-a', ' ', 'team-b'], + dispatchDue, + }); + + const first = scheduler.runOnce(); + const second = scheduler.runOnce(); + await Promise.resolve(); + expect(dispatchDue).toHaveBeenCalledTimes(1); + + release(); + await Promise.all([first, second]); + + expect(dispatchDue).toHaveBeenCalledWith(['team-a', 'team-b']); + }); + + it('skips dispatch when there are no active teams', async () => { + const dispatchDue = vi.fn(); + const scheduler = new MemberWorkSyncNudgeDispatchScheduler({ + listActiveTeamNames: async () => [], + dispatchDue, + }); + + await scheduler.runOnce(); + + expect(dispatchDue).not.toHaveBeenCalled(); + }); + + it('logs and survives list failures without throwing', async () => { + const warn = vi.fn(); + const scheduler = new MemberWorkSyncNudgeDispatchScheduler({ + listActiveTeamNames: async () => { + throw new Error('list failed'); + }, + dispatchDue: vi.fn(), + logger: { + debug: vi.fn(), + warn, + error: vi.fn(), + }, + }); + + await expect(scheduler.runOnce()).resolves.toBeUndefined(); + + expect(warn).toHaveBeenCalledWith( + 'member work sync scheduled nudge dispatch failed', + expect.objectContaining({ error: 'Error: list failed' }) + ); + }); +});