feat(member-work-sync): add guarded nudge dispatcher

This commit is contained in:
777genius 2026-04-29 15:26:06 +03:00
parent 3552a4ba61
commit 23714f5ca8
8 changed files with 386 additions and 5 deletions

View file

@ -36,6 +36,7 @@ Current implementation note:
- Phase 1.5 exposes a machine-readable `phase2Readiness` assessment from shadow metrics. It can say `collecting_shadow_data`, `blocked`, or `shadow_ready`; it still does not dispatch nudges.
- Phase 2 storage foundation is implemented as an inert durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. It is not wired to dispatch inbox nudges yet.
- Reconciler can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; otherwise it records normal shadow status and does nothing. This preserves the anti-spam gate before any dispatcher exists.
- Dispatcher use case exists behind explicit facade invocation. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port.
- Phase 2 must not start until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low.
Patterns used:

View file

@ -0,0 +1,192 @@
import type { MemberWorkSyncOutboxItem } from '../../contracts';
import type { MemberWorkSyncUseCaseDeps } from './ports';
export interface MemberWorkSyncNudgeDispatchSummary {
claimed: number;
delivered: number;
superseded: number;
retryable: number;
terminal: number;
}
export interface MemberWorkSyncNudgeDispatchOptions {
claimedBy: string;
teamNames: string[];
limit?: number;
}
function emptySummary(): MemberWorkSyncNudgeDispatchSummary {
return { claimed: 0, delivered: 0, superseded: 0, retryable: 0, terminal: 0 };
}
function addMinutes(iso: string, minutes: number): string {
return new Date(Date.parse(iso) + minutes * 60_000).toISOString();
}
export class MemberWorkSyncNudgeDispatcher {
constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {}
async dispatchDue(options: MemberWorkSyncNudgeDispatchOptions): Promise<MemberWorkSyncNudgeDispatchSummary> {
const outbox = this.deps.outboxStore;
const inbox = this.deps.inboxNudge;
if (!outbox || !inbox) {
return emptySummary();
}
const nowIso = this.deps.clock.now().toISOString();
const summary = emptySummary();
for (const teamName of [...new Set(options.teamNames.map((name) => name.trim()).filter(Boolean))]) {
const claimed = await outbox.claimDue({
teamName,
claimedBy: options.claimedBy,
nowIso,
limit: options.limit ?? 10,
});
summary.claimed += claimed.length;
for (const item of claimed) {
const result = await this.dispatchItem(item, nowIso);
summary[result] += 1;
}
}
return summary;
}
private async dispatchItem(
item: MemberWorkSyncOutboxItem,
nowIso: string
): Promise<keyof Omit<MemberWorkSyncNudgeDispatchSummary, 'claimed'>> {
const outbox = this.deps.outboxStore;
const inbox = this.deps.inboxNudge;
if (!outbox || !inbox) {
return 'terminal';
}
const revalidation = await this.revalidate(item, nowIso);
if (!revalidation.ok) {
if (revalidation.retryable) {
await outbox.markFailed({
teamName: item.teamName,
id: item.id,
attemptGeneration: item.attemptGeneration,
error: revalidation.reason,
retryable: true,
nowIso,
nextAttemptAt: revalidation.nextAttemptAt ?? addMinutes(nowIso, 10),
});
return 'retryable';
}
await outbox.markSuperseded({
teamName: item.teamName,
id: item.id,
reason: revalidation.reason,
nowIso,
});
return 'superseded';
}
try {
const inserted = await inbox.insertIfAbsent({
teamName: item.teamName,
memberName: item.memberName,
messageId: item.id,
payloadHash: item.payloadHash,
payload: item.payload,
timestamp: nowIso,
});
if (inserted.conflict) {
await outbox.markFailed({
teamName: item.teamName,
id: item.id,
attemptGeneration: item.attemptGeneration,
error: 'inbox_payload_conflict',
retryable: false,
nowIso,
});
return 'terminal';
}
await outbox.markDelivered({
teamName: item.teamName,
id: item.id,
attemptGeneration: item.attemptGeneration,
deliveredMessageId: inserted.messageId,
nowIso,
});
return 'delivered';
} catch (error) {
await outbox.markFailed({
teamName: item.teamName,
id: item.id,
attemptGeneration: item.attemptGeneration,
error: String(error),
retryable: true,
nowIso,
nextAttemptAt: addMinutes(nowIso, 10),
});
return 'retryable';
}
}
private async revalidate(
item: MemberWorkSyncOutboxItem,
nowIso: string
): Promise<
| { ok: true }
| { ok: false; reason: string; retryable: boolean; nextAttemptAt?: string }
> {
if (this.deps.lifecycle && !(await this.deps.lifecycle.isTeamActive(item.teamName))) {
return { ok: false, reason: 'team_inactive', retryable: false };
}
const status = await this.deps.statusStore.read({
teamName: item.teamName,
memberName: item.memberName,
});
if (!status) {
return { ok: false, reason: 'status_missing', retryable: false };
}
if (
status.state !== 'needs_sync' ||
status.shadow?.wouldNudge !== true ||
status.agenda.fingerprint !== item.agendaFingerprint
) {
return { ok: false, reason: 'status_no_longer_matches_outbox', retryable: false };
}
if (!this.deps.statusStore.readTeamMetrics) {
return { ok: false, reason: 'metrics_unavailable', retryable: true };
}
const metrics = await this.deps.statusStore.readTeamMetrics(item.teamName);
if (metrics.phase2Readiness.state !== 'shadow_ready') {
return { ok: false, reason: 'phase2_not_ready', retryable: true };
}
const busy = await this.deps.busySignal?.isBusy({
teamName: item.teamName,
memberName: item.memberName,
nowIso,
});
if (busy?.busy) {
return {
ok: false,
reason: `member_busy:${busy.reason ?? 'unknown'}`,
retryable: true,
nextAttemptAt: busy.retryAfterIso,
};
}
const taskIds = item.payload.taskRefs.map((taskRef) => taskRef.taskId);
if (
this.deps.watchdogCooldown &&
(await this.deps.watchdogCooldown.hasRecentNudge({
teamName: item.teamName,
memberName: item.memberName,
taskIds,
nowIso,
}))
) {
return { ok: false, reason: 'watchdog_cooldown_active', retryable: true };
}
return { ok: true };
}
}

View file

@ -1,5 +1,6 @@
export * from './MemberWorkSyncDiagnosticsReader';
export * from './MemberWorkSyncMetricsReader';
export * from './MemberWorkSyncNudgeDispatcher';
export * from './MemberWorkSyncNudgeOutboxPlanner';
export * from './MemberWorkSyncPendingReportIntentReplayer';
export * from './MemberWorkSyncReconciler';

View file

@ -102,6 +102,34 @@ export interface MemberWorkSyncOutboxStorePort {
markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise<void>;
}
export interface MemberWorkSyncInboxNudgePort {
insertIfAbsent(input: {
teamName: string;
memberName: string;
messageId: string;
payloadHash: string;
payload: MemberWorkSyncOutboxItem['payload'];
timestamp: string;
}): Promise<{ inserted: boolean; messageId: string; conflict?: boolean }>;
}
export interface MemberWorkSyncWatchdogCooldownPort {
hasRecentNudge(input: {
teamName: string;
memberName: string;
taskIds: string[];
nowIso: string;
}): Promise<boolean>;
}
export interface MemberWorkSyncBusySignalPort {
isBusy(input: {
teamName: string;
memberName: string;
nowIso: string;
}): Promise<{ busy: boolean; reason?: string; retryAfterIso?: string }>;
}
export interface MemberWorkSyncUseCaseDeps {
clock: MemberWorkSyncClockPort;
hash: MemberWorkSyncHashPort;
@ -109,6 +137,9 @@ export interface MemberWorkSyncUseCaseDeps {
statusStore: MemberWorkSyncStatusStorePort;
reportStore?: MemberWorkSyncReportStorePort;
outboxStore?: MemberWorkSyncOutboxStorePort;
inboxNudge?: MemberWorkSyncInboxNudgePort;
watchdogCooldown?: MemberWorkSyncWatchdogCooldownPort;
busySignal?: MemberWorkSyncBusySignalPort;
reportToken?: MemberWorkSyncReportTokenPort;
lifecycle?: MemberWorkSyncLifecyclePort;
logger?: MemberWorkSyncLoggerPort;

View file

@ -0,0 +1,36 @@
import { TeamInboxReader } from '@main/services/team/TeamInboxReader';
import { TeamInboxWriter } from '@main/services/team/TeamInboxWriter';
import type { MemberWorkSyncInboxNudgePort } from '../../../core/application';
export class TeamInboxMemberWorkSyncNudgeSink implements MemberWorkSyncInboxNudgePort {
constructor(
private readonly inboxReader: Pick<TeamInboxReader, 'getMessagesFor'> = new TeamInboxReader(),
private readonly inboxWriter: Pick<TeamInboxWriter, 'sendMessage'> = new TeamInboxWriter()
) {}
async insertIfAbsent(input: Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]) {
const existing = await this.inboxReader.getMessagesFor(input.teamName, input.memberName);
if (existing.some((message) => message.messageId === input.messageId)) {
return { inserted: false, messageId: input.messageId };
}
const result = await this.inboxWriter.sendMessage(input.teamName, {
member: input.memberName,
from: input.payload.from,
to: input.payload.to,
messageId: input.messageId,
timestamp: input.timestamp,
text: input.payload.text,
taskRefs: input.payload.taskRefs,
actionMode: input.payload.actionMode,
summary: 'Work sync check',
source: 'system_notification',
messageKind: input.payload.messageKind,
});
return {
inserted: true,
messageId: result.messageId,
};
}
}

View file

@ -9,6 +9,8 @@ import type {
import {
MemberWorkSyncDiagnosticsReader,
MemberWorkSyncMetricsReader,
MemberWorkSyncNudgeDispatcher,
type MemberWorkSyncNudgeDispatchSummary,
MemberWorkSyncPendingReportIntentReplayer,
type MemberWorkSyncPendingReportReplaySummary,
MemberWorkSyncReconciler,
@ -16,6 +18,7 @@ import {
type MemberWorkSyncReconcileContext,
} from '../../core/application';
import { MemberWorkSyncTeamChangeRouter } from '../adapters/input/MemberWorkSyncTeamChangeRouter';
import { TeamInboxMemberWorkSyncNudgeSink } from '../adapters/output/TeamInboxMemberWorkSyncNudgeSink';
import { TeamTaskAgendaSource } from '../adapters/output/TeamTaskAgendaSource';
import { HmacMemberWorkSyncReportTokenAdapter } from '../infrastructure/HmacMemberWorkSyncReportTokenAdapter';
import {
@ -41,6 +44,7 @@ export interface MemberWorkSyncFeatureFacade {
noteTeamChange(event: TeamChangeEvent): void;
enqueueStartupScan(teamNames: string[]): Promise<void>;
replayPendingReports(teamNames: string[]): Promise<MemberWorkSyncPendingReportReplaySummary>;
dispatchDueNudges(teamNames: string[]): Promise<MemberWorkSyncNudgeDispatchSummary>;
getQueueDiagnostics(): MemberWorkSyncQueueDiagnostics;
dispose(): Promise<void>;
}
@ -67,6 +71,7 @@ export function createMemberWorkSyncFeature(deps: {
const storePaths = new MemberWorkSyncStorePaths(deps.teamsBasePath);
const store = new JsonMemberWorkSyncStore(storePaths);
const reportToken = new HmacMemberWorkSyncReportTokenAdapter(storePaths);
const inboxNudge = new TeamInboxMemberWorkSyncNudgeSink();
const useCaseDeps = {
clock,
hash,
@ -74,6 +79,7 @@ export function createMemberWorkSyncFeature(deps: {
statusStore: store,
reportStore: store,
outboxStore: store,
inboxNudge,
reportToken,
...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}),
logger: deps.logger,
@ -83,6 +89,7 @@ export function createMemberWorkSyncFeature(deps: {
const reporter = new MemberWorkSyncReporter(useCaseDeps);
const reconciler = new MemberWorkSyncReconciler(useCaseDeps);
const pendingReportReplayer = new MemberWorkSyncPendingReportIntentReplayer(useCaseDeps);
const nudgeDispatcher = new MemberWorkSyncNudgeDispatcher(useCaseDeps);
const queue = new MemberWorkSyncEventQueue({
reconcile: async (request, context: MemberWorkSyncReconcileContext) => {
await reconciler.execute(request, context);
@ -116,6 +123,8 @@ export function createMemberWorkSyncFeature(deps: {
{ processed: 0, accepted: 0, rejected: 0, superseded: 0 }
);
},
dispatchDueNudges: (teamNames) =>
nudgeDispatcher.dispatchDue({ teamNames, claimedBy: `member-work-sync:${process.pid}` }),
getQueueDiagnostics: () => queue.getDiagnostics(),
dispose: () => queue.stop(),
};

View file

@ -413,7 +413,8 @@ export type InboxMessageKind =
| 'default'
| 'slash_command'
| 'slash_command_result'
| 'task_comment_notification';
| 'task_comment_notification'
| 'member_work_sync_nudge';
export interface SlashCommandMeta {
name: string;

View file

@ -2,9 +2,11 @@ import { describe, expect, it } from 'vitest';
import {
MemberWorkSyncDiagnosticsReader,
MemberWorkSyncNudgeDispatcher,
MemberWorkSyncPendingReportIntentReplayer,
MemberWorkSyncReporter,
type MemberWorkSyncAgendaSourceResult,
type MemberWorkSyncInboxNudgePort,
type MemberWorkSyncOutboxStorePort,
type MemberWorkSyncStatusStorePort,
type MemberWorkSyncUseCaseDeps,
@ -131,9 +133,14 @@ class InMemoryStatusStore implements MemberWorkSyncStatusStorePort {
class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
readonly ensures: MemberWorkSyncOutboxEnsureInput[] = [];
readonly items = new Map<string, MemberWorkSyncOutboxItem>();
async ensurePending(input: MemberWorkSyncOutboxEnsureInput) {
this.ensures.push(input);
const current = this.items.get(input.id);
if (current) {
return { ok: true as const, outcome: 'existing' as const, item: current };
}
const item: MemberWorkSyncOutboxItem = {
...input,
status: 'pending',
@ -141,18 +148,59 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
createdAt: input.nowIso,
updatedAt: input.nowIso,
};
this.items.set(input.id, item);
return { ok: true as const, outcome: 'created' as const, item };
}
async claimDue(): Promise<MemberWorkSyncOutboxItem[]> {
return [];
const due = [...this.items.values()].filter((item) => item.status === 'pending');
for (const item of due) {
this.items.set(item.id, {
...item,
status: 'claimed',
attemptGeneration: item.attemptGeneration + 1,
});
}
return due.map((item) => this.items.get(item.id) as MemberWorkSyncOutboxItem);
}
async markDelivered(_input: MemberWorkSyncOutboxMarkDeliveredInput): Promise<void> {}
async markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise<void> {
const current = this.items.get(input.id);
if (current?.attemptGeneration === input.attemptGeneration) {
this.items.set(input.id, {
...current,
status: 'delivered',
deliveredMessageId: input.deliveredMessageId,
});
}
}
async markSuperseded(_input: MemberWorkSyncOutboxMarkSupersededInput): Promise<void> {}
async markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise<void> {
const current = this.items.get(input.id);
if (current) {
this.items.set(input.id, { ...current, status: 'superseded', lastError: input.reason });
}
}
async markFailed(_input: MemberWorkSyncOutboxMarkFailedInput): Promise<void> {}
async markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise<void> {
const current = this.items.get(input.id);
if (current?.attemptGeneration === input.attemptGeneration) {
this.items.set(input.id, {
...current,
status: input.retryable ? 'failed_retryable' : 'failed_terminal',
lastError: input.error,
});
}
}
}
class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort {
readonly inserted: Array<Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]> = [];
async insertIfAbsent(input: Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]) {
this.inserted.push(input);
return { inserted: true, messageId: input.messageId };
}
}
function createDeps(options?: {
@ -162,6 +210,7 @@ function createDeps(options?: {
teamActive?: boolean;
providerId?: 'opencode' | 'codex';
outboxStore?: MemberWorkSyncOutboxStorePort;
inboxNudge?: MemberWorkSyncInboxNudgePort;
}) {
const clock = new MutableClock();
const store = new InMemoryStatusStore();
@ -189,6 +238,7 @@ function createDeps(options?: {
statusStore: store,
reportStore: store,
...(options?.outboxStore ? { outboxStore: options.outboxStore } : {}),
...(options?.inboxNudge ? { inboxNudge: options.inboxNudge } : {}),
reportToken: {
create: async (input) => ({
token: `token:${input.teamName}:${input.memberName}:${input.agendaFingerprint}`,
@ -395,6 +445,66 @@ describe('MemberWorkSync use cases', () => {
});
});
it('dispatches due nudges only after revalidating current status and readiness', async () => {
const outbox = new InMemoryOutboxStore();
const inbox = new InMemoryInboxNudge();
const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
store.phase2ReadinessState = 'shadow_ready';
const status = await new MemberWorkSyncDiagnosticsReader(deps).execute({
teamName: 'team-a',
memberName: 'bob',
});
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
expect(summary).toMatchObject({ claimed: 1, delivered: 1, superseded: 0 });
expect(inbox.inserted).toHaveLength(1);
expect(inbox.inserted[0]).toMatchObject({
teamName: 'team-a',
memberName: 'bob',
messageId: `member-work-sync:team-a:bob:${status.agenda.fingerprint}`,
});
expect(outbox.items.get(`member-work-sync:team-a:bob:${status.agenda.fingerprint}`)).toMatchObject({
status: 'delivered',
deliveredMessageId: `member-work-sync:team-a:bob:${status.agenda.fingerprint}`,
});
});
it('does not dispatch stale outbox items after the member reports still working', async () => {
const outbox = new InMemoryOutboxStore();
const inbox = new InMemoryInboxNudge();
const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
store.phase2ReadinessState = 'shadow_ready';
const reader = new MemberWorkSyncDiagnosticsReader(deps);
const reporter = new MemberWorkSyncReporter(deps);
const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
await reporter.execute({
teamName: 'team-a',
memberName: 'bob',
state: 'still_working',
agendaFingerprint: current.agenda.fingerprint,
reportToken: current.reportToken,
leaseTtlMs: 120_000,
source: 'test',
});
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
expect(summary).toMatchObject({ claimed: 1, delivered: 0, superseded: 1 });
expect(inbox.inserted).toEqual([]);
expect(outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`)).toMatchObject({
status: 'superseded',
lastError: 'status_no_longer_matches_outbox',
});
});
it('rejects invalid report tokens without recording replayable intents', async () => {
const { deps, store } = createDeps();
const reader = new MemberWorkSyncDiagnosticsReader(deps);