feat(member-work-sync): schedule due nudge dispatch

This commit is contained in:
777genius 2026-04-29 15:54:29 +03:00
parent 53c72e56ae
commit f705cd25cb
5 changed files with 190 additions and 1 deletions

View file

@ -38,6 +38,7 @@ Current implementation note:
- Queue reconciles can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; read-only diagnostics never create outbox intents. This preserves the anti-spam gate and keeps UI/status reads passive.
- Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port.
- Production busy revalidation is wired through a tool-activity busy signal adapter. Active or recently finished tool calls defer Phase 2 nudges instead of interrupting work.
- A feature-owned dispatch scheduler wakes due retryable outbox items for active teams. It is bounded, unref'ed, and still relies on dispatcher revalidation before any inbox write.
- Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts.
- Phase 2 dispatch stays blocked until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low.

View file

@ -28,6 +28,7 @@ import {
} from '../infrastructure/MemberWorkSyncEventQueue';
import { JsonMemberWorkSyncStore } from '../infrastructure/JsonMemberWorkSyncStore';
import { MemberWorkSyncStorePaths } from '../infrastructure/MemberWorkSyncStorePaths';
import { MemberWorkSyncNudgeDispatchScheduler } from '../infrastructure/MemberWorkSyncNudgeDispatchScheduler';
import { MemberWorkSyncToolActivityBusySignal } from '../infrastructure/MemberWorkSyncToolActivityBusySignal';
import { NodeHashAdapter } from '../infrastructure/NodeHashAdapter';
import { SystemClockAdapter } from '../infrastructure/SystemClockAdapter';
@ -58,6 +59,7 @@ export function createMemberWorkSyncFeature(deps: {
kanbanManager: TeamKanbanManager;
membersMetaStore: TeamMembersMetaStore;
isTeamActive?: (teamName: string) => Promise<boolean> | boolean;
listActiveTeamNames?: () => Promise<string[]>;
logger?: MemberWorkSyncLoggerPort;
}): MemberWorkSyncFeatureFacade {
const clock = new SystemClockAdapter();
@ -108,6 +110,18 @@ export function createMemberWorkSyncFeature(deps: {
logger: deps.logger,
});
const router = new MemberWorkSyncTeamChangeRouter(agendaSource, queue);
const nudgeDispatchScheduler = deps.listActiveTeamNames
? new MemberWorkSyncNudgeDispatchScheduler({
listActiveTeamNames: deps.listActiveTeamNames,
dispatchDue: (teamNames) =>
nudgeDispatcher.dispatchDue({
teamNames,
claimedBy: `member-work-sync:${process.pid}:scheduled`,
}),
logger: deps.logger,
})
: null;
nudgeDispatchScheduler?.start();
return {
getStatus: (request) => diagnosticsReader.execute(request),
@ -139,6 +153,8 @@ export function createMemberWorkSyncFeature(deps: {
dispatchDueNudges: (teamNames) =>
nudgeDispatcher.dispatchDue({ teamNames, claimedBy: `member-work-sync:${process.pid}` }),
getQueueDiagnostics: () => queue.getDiagnostics(),
dispose: () => queue.stop(),
dispose: async () => {
await Promise.allSettled([queue.stop(), nudgeDispatchScheduler?.dispose()]);
},
};
}

View file

@ -0,0 +1,104 @@
import type {
MemberWorkSyncLoggerPort,
MemberWorkSyncNudgeDispatchSummary,
} from '../../core/application';
const DEFAULT_NUDGE_DISPATCH_INTERVAL_MS = 60_000;
function uniqueNonEmpty(values: string[]): string[] {
return [...new Set(values.map((value) => value.trim()).filter(Boolean))];
}
function unrefTimer(timer: ReturnType<typeof setTimeout>): void {
timer.unref?.();
}
export interface MemberWorkSyncNudgeDispatchSchedulerDeps {
listActiveTeamNames(): Promise<string[]>;
dispatchDue(teamNames: string[]): Promise<MemberWorkSyncNudgeDispatchSummary>;
intervalMs?: number;
logger?: MemberWorkSyncLoggerPort;
}
export class MemberWorkSyncNudgeDispatchScheduler {
private readonly intervalMs: number;
private timer: ReturnType<typeof setTimeout> | null = null;
private running: Promise<void> | null = null;
private stopped = false;
constructor(private readonly deps: MemberWorkSyncNudgeDispatchSchedulerDeps) {
this.intervalMs = Math.max(10_000, deps.intervalMs ?? DEFAULT_NUDGE_DISPATCH_INTERVAL_MS);
}
start(): void {
if (this.stopped || this.timer) {
return;
}
this.schedule(this.intervalMs);
}
async runOnce(): Promise<void> {
if (this.stopped) {
return;
}
if (this.running) {
await this.running;
return;
}
const work = this.dispatchOnce();
this.running = work;
try {
await work;
} finally {
if (this.running === work) {
this.running = null;
}
}
}
async dispose(): Promise<void> {
this.stopped = true;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.running) {
await this.running.catch(() => undefined);
}
}
private schedule(delayMs: number): void {
if (this.stopped) {
return;
}
if (this.timer) {
clearTimeout(this.timer);
}
this.timer = setTimeout(() => {
this.timer = null;
void this.runOnce().finally(() => this.schedule(this.intervalMs));
}, delayMs);
unrefTimer(this.timer);
}
private async dispatchOnce(): Promise<void> {
try {
const teamNames = uniqueNonEmpty(await this.deps.listActiveTeamNames());
if (teamNames.length === 0) {
return;
}
const summary = await this.deps.dispatchDue(teamNames);
if (summary.claimed > 0 || summary.delivered > 0 || summary.retryable > 0) {
this.deps.logger?.debug('member work sync scheduled nudge dispatch completed', {
teamCount: teamNames.length,
...summary,
});
}
} catch (error) {
this.deps.logger?.warn('member work sync scheduled nudge dispatch failed', {
error: String(error),
});
}
}
}

View file

@ -1236,6 +1236,10 @@ async function initializeServices(): Promise<void> {
isTeamActive: (teamName) =>
teamProvisioningService.isTeamAlive(teamName) ||
teamProvisioningService.hasProvisioningRun(teamName),
listActiveTeamNames: async () =>
(await teamDataService.listTeams())
.filter((team) => !team.deletedAt)
.map((team) => team.teamName),
logger: createLogger('Feature:MemberWorkSync'),
});
void teamDataService

View file

@ -0,0 +1,64 @@
import { describe, expect, it, vi } from 'vitest';
import { MemberWorkSyncNudgeDispatchScheduler } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler';
describe('MemberWorkSyncNudgeDispatchScheduler', () => {
it('dispatches due nudges for unique active teams without overlapping runs', async () => {
let release!: () => void;
const firstDispatch = new Promise<void>((resolve) => {
release = resolve;
});
const dispatchDue = vi.fn(async () => {
await firstDispatch;
return { claimed: 1, delivered: 1, superseded: 0, retryable: 0, terminal: 0 };
});
const scheduler = new MemberWorkSyncNudgeDispatchScheduler({
listActiveTeamNames: async () => ['team-a', 'team-a', ' ', 'team-b'],
dispatchDue,
});
const first = scheduler.runOnce();
const second = scheduler.runOnce();
await Promise.resolve();
expect(dispatchDue).toHaveBeenCalledTimes(1);
release();
await Promise.all([first, second]);
expect(dispatchDue).toHaveBeenCalledWith(['team-a', 'team-b']);
});
it('skips dispatch when there are no active teams', async () => {
const dispatchDue = vi.fn();
const scheduler = new MemberWorkSyncNudgeDispatchScheduler({
listActiveTeamNames: async () => [],
dispatchDue,
});
await scheduler.runOnce();
expect(dispatchDue).not.toHaveBeenCalled();
});
it('logs and survives list failures without throwing', async () => {
const warn = vi.fn();
const scheduler = new MemberWorkSyncNudgeDispatchScheduler({
listActiveTeamNames: async () => {
throw new Error('list failed');
},
dispatchDue: vi.fn(),
logger: {
debug: vi.fn(),
warn,
error: vi.fn(),
},
});
await expect(scheduler.runOnce()).resolves.toBeUndefined();
expect(warn).toHaveBeenCalledWith(
'member work sync scheduled nudge dispatch failed',
expect.objectContaining({ error: 'Error: list failed' })
);
});
});