From 53c72e56aed6bcfa72402600c3d9ee66767c705e Mon Sep 17 00:00:00 2001 From: 777genius Date: Wed, 29 Apr 2026 15:48:02 +0300 Subject: [PATCH] feat(member-work-sync): suppress nudges while members are busy --- .../member-work-sync-control-plane-plan.md | 1 + .../createMemberWorkSyncFeature.ts | 8 +- .../MemberWorkSyncToolActivityBusySignal.ts | 208 ++++++++++++++++++ .../core/MemberWorkSyncUseCases.test.ts | 39 ++++ ...mberWorkSyncToolActivityBusySignal.test.ts | 145 ++++++++++++ 5 files changed, 400 insertions(+), 1 deletion(-) create mode 100644 src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts create mode 100644 test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts diff --git a/docs/team-management/member-work-sync-control-plane-plan.md b/docs/team-management/member-work-sync-control-plane-plan.md index 8aaaef47..ebe8b0ef 100644 --- a/docs/team-management/member-work-sync-control-plane-plan.md +++ b/docs/team-management/member-work-sync-control-plane-plan.md @@ -37,6 +37,7 @@ Current implementation note: - Phase 2 storage foundation is implemented as a durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. - Queue reconciles can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; read-only diagnostics never create outbox intents. This preserves the anti-spam gate and keeps UI/status reads passive. - Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port. +- Production busy revalidation is wired through a tool-activity busy signal adapter. Active or recently finished tool calls defer Phase 2 nudges instead of interrupting work. - Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts. - Phase 2 dispatch stays blocked until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low. diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index 1245a5c5..aa6a8ef1 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -28,6 +28,7 @@ import { } from '../infrastructure/MemberWorkSyncEventQueue'; import { JsonMemberWorkSyncStore } from '../infrastructure/JsonMemberWorkSyncStore'; import { MemberWorkSyncStorePaths } from '../infrastructure/MemberWorkSyncStorePaths'; +import { MemberWorkSyncToolActivityBusySignal } from '../infrastructure/MemberWorkSyncToolActivityBusySignal'; import { NodeHashAdapter } from '../infrastructure/NodeHashAdapter'; import { SystemClockAdapter } from '../infrastructure/SystemClockAdapter'; @@ -74,6 +75,7 @@ export function createMemberWorkSyncFeature(deps: { const reportToken = new HmacMemberWorkSyncReportTokenAdapter(storePaths); const inboxNudge = new TeamInboxMemberWorkSyncNudgeSink(); const watchdogCooldown = new TeamTaskStallJournalWorkSyncCooldown(deps.teamsBasePath); + const busySignal = new MemberWorkSyncToolActivityBusySignal(); const useCaseDeps = { clock, hash, @@ -83,6 +85,7 @@ export function createMemberWorkSyncFeature(deps: { outboxStore: store, inboxNudge, watchdogCooldown, + busySignal, reportToken, ...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}), logger: deps.logger, @@ -110,7 +113,10 @@ export function createMemberWorkSyncFeature(deps: { getStatus: (request) => diagnosticsReader.execute(request), getMetrics: (request) => metricsReader.execute(request), report: (request) => reporter.execute(request), - noteTeamChange: (event) => router.noteTeamChange(event), + noteTeamChange: (event) => { + busySignal.noteTeamChange(event); + router.noteTeamChange(event); + }, enqueueStartupScan: (teamNames) => router.enqueueStartupScan(teamNames), replayPendingReports: async (teamNames) => { const summaries = await Promise.allSettled( diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts new file mode 100644 index 00000000..37acd736 --- /dev/null +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts @@ -0,0 +1,208 @@ +import type { MemberWorkSyncBusySignalPort } from '../../core/application'; + +import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types'; + +const DEFAULT_TOOL_ACTIVITY_BUSY_GRACE_MS = 90_000; + +interface MemberActivityState { + activeToolIds: Set; + recentBusyUntilByToolId: Map; +} + +function memberKey(teamName: string, memberName: string): string { + return `${teamName}\0${memberName.trim().toLowerCase()}`; +} + +function parseToolActivity(detail: string | undefined): ToolActivityEventPayload | null { + if (!detail) { + return null; + } + try { + const parsed = JSON.parse(detail) as ToolActivityEventPayload; + return parsed && typeof parsed === 'object' ? parsed : null; + } catch { + return null; + } +} + +function parseIsoMs(value: string | undefined, fallbackMs: number): number { + const parsed = value ? Date.parse(value) : NaN; + return Number.isFinite(parsed) ? parsed : fallbackMs; +} + +function addMsIso(baseIso: string, ms: number): string { + return new Date(Date.parse(baseIso) + ms).toISOString(); +} + +function maxIso(values: Iterable): string | null { + let max: string | null = null; + for (const value of values) { + if (!max || Date.parse(value) > Date.parse(max)) { + max = value; + } + } + return max; +} + +export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusySignalPort { + private readonly activityByMember = new Map(); + private readonly busyGraceMs: number; + + constructor(options: { busyGraceMs?: number } = {}) { + this.busyGraceMs = Math.max(0, options.busyGraceMs ?? DEFAULT_TOOL_ACTIVITY_BUSY_GRACE_MS); + } + + noteTeamChange(event: TeamChangeEvent): void { + if (event.type === 'lead-activity' && event.detail === 'offline') { + this.dropTeam(event.teamName); + return; + } + + if (event.type !== 'tool-activity') { + return; + } + + const payload = parseToolActivity(event.detail); + if (!payload) { + return; + } + + if (payload.action === 'start' && payload.activity) { + this.noteStart(event.teamName, payload.activity.memberName, payload.activity.toolUseId); + return; + } + + if (payload.action === 'finish' && payload.memberName && payload.toolUseId) { + this.noteFinish(event.teamName, payload.memberName, payload.toolUseId, payload.finishedAt); + return; + } + + if (payload.action === 'reset') { + this.noteReset(event.teamName, payload.memberName, payload.toolUseIds); + } + } + + async isBusy(input: { + teamName: string; + memberName: string; + nowIso: string; + }): Promise<{ busy: boolean; reason?: string; retryAfterIso?: string }> { + const key = memberKey(input.teamName, input.memberName); + const state = this.activityByMember.get(key); + if (!state) { + return { busy: false }; + } + + this.pruneState(key, state, input.nowIso); + + if (state.activeToolIds.size > 0) { + return { + busy: true, + reason: 'active_tool_activity', + retryAfterIso: addMsIso(input.nowIso, this.busyGraceMs), + }; + } + + const retryAfterIso = maxIso(state.recentBusyUntilByToolId.values()); + if (retryAfterIso) { + return { + busy: true, + reason: 'recent_tool_activity', + retryAfterIso, + }; + } + + return { busy: false }; + } + + private noteStart(teamName: string, memberName: string, toolUseId: string): void { + const normalizedToolUseId = toolUseId.trim(); + if (!memberName.trim() || !normalizedToolUseId) { + return; + } + const state = this.getOrCreateState(teamName, memberName); + state.activeToolIds.add(normalizedToolUseId); + state.recentBusyUntilByToolId.delete(normalizedToolUseId); + } + + private noteFinish( + teamName: string, + memberName: string, + toolUseId: string, + finishedAt: string | undefined + ): void { + const normalizedToolUseId = toolUseId.trim(); + if (!memberName.trim() || !normalizedToolUseId) { + return; + } + const finishedAtMs = parseIsoMs(finishedAt, Date.now()); + const busyUntilIso = new Date(finishedAtMs + this.busyGraceMs).toISOString(); + const state = this.getOrCreateState(teamName, memberName); + state.activeToolIds.delete(normalizedToolUseId); + state.recentBusyUntilByToolId.set(normalizedToolUseId, busyUntilIso); + } + + private noteReset(teamName: string, memberName?: string, toolUseIds?: string[]): void { + const normalizedMemberName = memberName?.trim(); + if (!normalizedMemberName) { + this.dropTeam(teamName); + return; + } + + const key = memberKey(teamName, normalizedMemberName); + const state = this.activityByMember.get(key); + if (!state) { + return; + } + + const normalizedToolUseIds = new Set( + (toolUseIds ?? []).map((toolUseId) => toolUseId.trim()).filter(Boolean) + ); + if (normalizedToolUseIds.size === 0) { + this.activityByMember.delete(key); + return; + } + + for (const toolUseId of normalizedToolUseIds) { + state.activeToolIds.delete(toolUseId); + state.recentBusyUntilByToolId.delete(toolUseId); + } + if (state.activeToolIds.size === 0 && state.recentBusyUntilByToolId.size === 0) { + this.activityByMember.delete(key); + } + } + + private getOrCreateState(teamName: string, memberName: string): MemberActivityState { + const key = memberKey(teamName, memberName); + const existing = this.activityByMember.get(key); + if (existing) { + return existing; + } + const created: MemberActivityState = { + activeToolIds: new Set(), + recentBusyUntilByToolId: new Map(), + }; + this.activityByMember.set(key, created); + return created; + } + + private pruneState(key: string, state: MemberActivityState, nowIso: string): void { + const nowMs = Date.parse(nowIso); + for (const [toolUseId, busyUntilIso] of state.recentBusyUntilByToolId) { + if (Date.parse(busyUntilIso) <= nowMs) { + state.recentBusyUntilByToolId.delete(toolUseId); + } + } + if (state.activeToolIds.size === 0 && state.recentBusyUntilByToolId.size === 0) { + this.activityByMember.delete(key); + } + } + + private dropTeam(teamName: string): void { + for (const key of this.activityByMember.keys()) { + if (key.startsWith(`${teamName}\0`)) { + this.activityByMember.delete(key); + } + } + } +} diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index f6cb0789..b1925f28 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -231,6 +231,7 @@ function createDeps(options?: { providerId?: 'opencode' | 'codex'; outboxStore?: MemberWorkSyncOutboxStorePort; inboxNudge?: MemberWorkSyncInboxNudgePort; + busySignal?: MemberWorkSyncUseCaseDeps['busySignal']; }) { const clock = new MutableClock(); const store = new InMemoryStatusStore(); @@ -259,6 +260,7 @@ function createDeps(options?: { reportStore: store, ...(options?.outboxStore ? { outboxStore: options.outboxStore } : {}), ...(options?.inboxNudge ? { inboxNudge: options.inboxNudge } : {}), + ...(options?.busySignal ? { busySignal: options.busySignal } : {}), reportToken: { create: async (input) => ({ token: `token:${input.teamName}:${input.memberName}:${input.agendaFingerprint}`, @@ -591,6 +593,43 @@ describe('MemberWorkSync use cases', () => { }); }); + it('defers nudge dispatch while the member has active or recent tool activity', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { deps, store } = createDeps({ + outboxStore: outbox, + inboxNudge: inbox, + busySignal: { + isBusy: async () => ({ + busy: true, + reason: 'active_tool_activity', + retryAfterIso: '2026-04-29T00:02:00.000Z', + }), + }, + }); + store.phase2ReadinessState = 'shadow_ready'; + + const current = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['tool_finished'] } + ); + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 }); + expect(inbox.inserted).toEqual([]); + expect(outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`)).toMatchObject({ + status: 'failed_retryable', + lastError: 'member_busy:active_tool_activity', + nextAttemptAt: '2026-04-29T00:02:00.000Z', + }); + }); + it('uses bounded retry backoff when inbox delivery fails', async () => { const outbox = new InMemoryOutboxStore(); const inbox = new InMemoryInboxNudge(); diff --git a/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts b/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts new file mode 100644 index 00000000..bb6bbb65 --- /dev/null +++ b/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts @@ -0,0 +1,145 @@ +import { describe, expect, it } from 'vitest'; + +import { MemberWorkSyncToolActivityBusySignal } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal'; + +import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types'; + +function toolEvent(teamName: string, payload: ToolActivityEventPayload): TeamChangeEvent { + return { + type: 'tool-activity', + teamName, + detail: JSON.stringify(payload), + }; +} + +describe('MemberWorkSyncToolActivityBusySignal', () => { + it('treats active tools as busy and recent finishes as a bounded quiet window', async () => { + const signal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 }); + + signal.noteTeamChange( + toolEvent('team-a', { + action: 'start', + activity: { + memberName: 'bob', + toolUseId: 'tool-1', + toolName: 'bash', + startedAt: '2026-04-29T00:00:00.000Z', + source: 'runtime', + }, + }) + ); + + await expect( + signal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:00:15.000Z', + }) + ).resolves.toMatchObject({ + busy: true, + reason: 'active_tool_activity', + retryAfterIso: '2026-04-29T00:01:45.000Z', + }); + + signal.noteTeamChange( + toolEvent('team-a', { + action: 'finish', + memberName: 'bob', + toolUseId: 'tool-1', + finishedAt: '2026-04-29T00:01:00.000Z', + }) + ); + + await expect( + signal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:01:30.000Z', + }) + ).resolves.toMatchObject({ + busy: true, + reason: 'recent_tool_activity', + retryAfterIso: '2026-04-29T00:02:30.000Z', + }); + + await expect( + signal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:02:31.000Z', + }) + ).resolves.toEqual({ busy: false }); + }); + + it('does not leak activity across members and clears targeted reset events', async () => { + const signal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 }); + + signal.noteTeamChange( + toolEvent('team-a', { + action: 'start', + activity: { + memberName: 'bob', + toolUseId: 'tool-1', + toolName: 'read', + startedAt: '2026-04-29T00:00:00.000Z', + source: 'member_log', + }, + }) + ); + + await expect( + signal.isBusy({ + teamName: 'team-a', + memberName: 'alice', + nowIso: '2026-04-29T00:00:15.000Z', + }) + ).resolves.toEqual({ busy: false }); + + signal.noteTeamChange( + toolEvent('team-a', { + action: 'reset', + memberName: 'bob', + toolUseIds: ['tool-1'], + }) + ); + + await expect( + signal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:00:15.000Z', + }) + ).resolves.toEqual({ busy: false }); + }); + + it('drops all tracked activity for a team when it goes offline', async () => { + const signal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 }); + + signal.noteTeamChange( + toolEvent('team-a', { + action: 'start', + activity: { + memberName: 'bob', + toolUseId: 'tool-1', + toolName: 'write', + startedAt: '2026-04-29T00:00:00.000Z', + source: 'runtime', + }, + }) + ); + + signal.noteTeamChange({ + type: 'lead-activity', + teamName: 'team-a', + detail: 'offline', + }); + + await expect( + signal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:00:15.000Z', + }) + ).resolves.toEqual({ busy: false }); + }); +});