feat(member-work-sync): suppress nudges while members are busy
This commit is contained in:
parent
325f1ffba2
commit
53c72e56ae
5 changed files with 400 additions and 1 deletions
|
|
@ -37,6 +37,7 @@ Current implementation note:
|
|||
- Phase 2 storage foundation is implemented as a durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states.
|
||||
- Queue reconciles can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; read-only diagnostics never create outbox intents. This preserves the anti-spam gate and keeps UI/status reads passive.
|
||||
- Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port.
|
||||
- Production busy revalidation is wired through a tool-activity busy signal adapter. Active or recently finished tool calls defer Phase 2 nudges instead of interrupting work.
|
||||
- Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts.
|
||||
- Phase 2 dispatch stays blocked until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low.
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import {
|
|||
} from '../infrastructure/MemberWorkSyncEventQueue';
|
||||
import { JsonMemberWorkSyncStore } from '../infrastructure/JsonMemberWorkSyncStore';
|
||||
import { MemberWorkSyncStorePaths } from '../infrastructure/MemberWorkSyncStorePaths';
|
||||
import { MemberWorkSyncToolActivityBusySignal } from '../infrastructure/MemberWorkSyncToolActivityBusySignal';
|
||||
import { NodeHashAdapter } from '../infrastructure/NodeHashAdapter';
|
||||
import { SystemClockAdapter } from '../infrastructure/SystemClockAdapter';
|
||||
|
||||
|
|
@ -74,6 +75,7 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
const reportToken = new HmacMemberWorkSyncReportTokenAdapter(storePaths);
|
||||
const inboxNudge = new TeamInboxMemberWorkSyncNudgeSink();
|
||||
const watchdogCooldown = new TeamTaskStallJournalWorkSyncCooldown(deps.teamsBasePath);
|
||||
const busySignal = new MemberWorkSyncToolActivityBusySignal();
|
||||
const useCaseDeps = {
|
||||
clock,
|
||||
hash,
|
||||
|
|
@ -83,6 +85,7 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
outboxStore: store,
|
||||
inboxNudge,
|
||||
watchdogCooldown,
|
||||
busySignal,
|
||||
reportToken,
|
||||
...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}),
|
||||
logger: deps.logger,
|
||||
|
|
@ -110,7 +113,10 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
getStatus: (request) => diagnosticsReader.execute(request),
|
||||
getMetrics: (request) => metricsReader.execute(request),
|
||||
report: (request) => reporter.execute(request),
|
||||
noteTeamChange: (event) => router.noteTeamChange(event),
|
||||
noteTeamChange: (event) => {
|
||||
busySignal.noteTeamChange(event);
|
||||
router.noteTeamChange(event);
|
||||
},
|
||||
enqueueStartupScan: (teamNames) => router.enqueueStartupScan(teamNames),
|
||||
replayPendingReports: async (teamNames) => {
|
||||
const summaries = await Promise.allSettled(
|
||||
|
|
|
|||
|
|
@ -0,0 +1,208 @@
|
|||
import type { MemberWorkSyncBusySignalPort } from '../../core/application';
|
||||
|
||||
import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types';
|
||||
|
||||
const DEFAULT_TOOL_ACTIVITY_BUSY_GRACE_MS = 90_000;
|
||||
|
||||
interface MemberActivityState {
|
||||
activeToolIds: Set<string>;
|
||||
recentBusyUntilByToolId: Map<string, string>;
|
||||
}
|
||||
|
||||
function memberKey(teamName: string, memberName: string): string {
|
||||
return `${teamName}\0${memberName.trim().toLowerCase()}`;
|
||||
}
|
||||
|
||||
function parseToolActivity(detail: string | undefined): ToolActivityEventPayload | null {
|
||||
if (!detail) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(detail) as ToolActivityEventPayload;
|
||||
return parsed && typeof parsed === 'object' ? parsed : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function parseIsoMs(value: string | undefined, fallbackMs: number): number {
|
||||
const parsed = value ? Date.parse(value) : NaN;
|
||||
return Number.isFinite(parsed) ? parsed : fallbackMs;
|
||||
}
|
||||
|
||||
function addMsIso(baseIso: string, ms: number): string {
|
||||
return new Date(Date.parse(baseIso) + ms).toISOString();
|
||||
}
|
||||
|
||||
function maxIso(values: Iterable<string>): string | null {
|
||||
let max: string | null = null;
|
||||
for (const value of values) {
|
||||
if (!max || Date.parse(value) > Date.parse(max)) {
|
||||
max = value;
|
||||
}
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusySignalPort {
|
||||
private readonly activityByMember = new Map<string, MemberActivityState>();
|
||||
private readonly busyGraceMs: number;
|
||||
|
||||
constructor(options: { busyGraceMs?: number } = {}) {
|
||||
this.busyGraceMs = Math.max(0, options.busyGraceMs ?? DEFAULT_TOOL_ACTIVITY_BUSY_GRACE_MS);
|
||||
}
|
||||
|
||||
noteTeamChange(event: TeamChangeEvent): void {
|
||||
if (event.type === 'lead-activity' && event.detail === 'offline') {
|
||||
this.dropTeam(event.teamName);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type !== 'tool-activity') {
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = parseToolActivity(event.detail);
|
||||
if (!payload) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload.action === 'start' && payload.activity) {
|
||||
this.noteStart(event.teamName, payload.activity.memberName, payload.activity.toolUseId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload.action === 'finish' && payload.memberName && payload.toolUseId) {
|
||||
this.noteFinish(event.teamName, payload.memberName, payload.toolUseId, payload.finishedAt);
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload.action === 'reset') {
|
||||
this.noteReset(event.teamName, payload.memberName, payload.toolUseIds);
|
||||
}
|
||||
}
|
||||
|
||||
async isBusy(input: {
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
nowIso: string;
|
||||
}): Promise<{ busy: boolean; reason?: string; retryAfterIso?: string }> {
|
||||
const key = memberKey(input.teamName, input.memberName);
|
||||
const state = this.activityByMember.get(key);
|
||||
if (!state) {
|
||||
return { busy: false };
|
||||
}
|
||||
|
||||
this.pruneState(key, state, input.nowIso);
|
||||
|
||||
if (state.activeToolIds.size > 0) {
|
||||
return {
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
retryAfterIso: addMsIso(input.nowIso, this.busyGraceMs),
|
||||
};
|
||||
}
|
||||
|
||||
const retryAfterIso = maxIso(state.recentBusyUntilByToolId.values());
|
||||
if (retryAfterIso) {
|
||||
return {
|
||||
busy: true,
|
||||
reason: 'recent_tool_activity',
|
||||
retryAfterIso,
|
||||
};
|
||||
}
|
||||
|
||||
return { busy: false };
|
||||
}
|
||||
|
||||
private noteStart(teamName: string, memberName: string, toolUseId: string): void {
|
||||
const normalizedToolUseId = toolUseId.trim();
|
||||
if (!memberName.trim() || !normalizedToolUseId) {
|
||||
return;
|
||||
}
|
||||
const state = this.getOrCreateState(teamName, memberName);
|
||||
state.activeToolIds.add(normalizedToolUseId);
|
||||
state.recentBusyUntilByToolId.delete(normalizedToolUseId);
|
||||
}
|
||||
|
||||
private noteFinish(
|
||||
teamName: string,
|
||||
memberName: string,
|
||||
toolUseId: string,
|
||||
finishedAt: string | undefined
|
||||
): void {
|
||||
const normalizedToolUseId = toolUseId.trim();
|
||||
if (!memberName.trim() || !normalizedToolUseId) {
|
||||
return;
|
||||
}
|
||||
const finishedAtMs = parseIsoMs(finishedAt, Date.now());
|
||||
const busyUntilIso = new Date(finishedAtMs + this.busyGraceMs).toISOString();
|
||||
const state = this.getOrCreateState(teamName, memberName);
|
||||
state.activeToolIds.delete(normalizedToolUseId);
|
||||
state.recentBusyUntilByToolId.set(normalizedToolUseId, busyUntilIso);
|
||||
}
|
||||
|
||||
private noteReset(teamName: string, memberName?: string, toolUseIds?: string[]): void {
|
||||
const normalizedMemberName = memberName?.trim();
|
||||
if (!normalizedMemberName) {
|
||||
this.dropTeam(teamName);
|
||||
return;
|
||||
}
|
||||
|
||||
const key = memberKey(teamName, normalizedMemberName);
|
||||
const state = this.activityByMember.get(key);
|
||||
if (!state) {
|
||||
return;
|
||||
}
|
||||
|
||||
const normalizedToolUseIds = new Set(
|
||||
(toolUseIds ?? []).map((toolUseId) => toolUseId.trim()).filter(Boolean)
|
||||
);
|
||||
if (normalizedToolUseIds.size === 0) {
|
||||
this.activityByMember.delete(key);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const toolUseId of normalizedToolUseIds) {
|
||||
state.activeToolIds.delete(toolUseId);
|
||||
state.recentBusyUntilByToolId.delete(toolUseId);
|
||||
}
|
||||
if (state.activeToolIds.size === 0 && state.recentBusyUntilByToolId.size === 0) {
|
||||
this.activityByMember.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
private getOrCreateState(teamName: string, memberName: string): MemberActivityState {
|
||||
const key = memberKey(teamName, memberName);
|
||||
const existing = this.activityByMember.get(key);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const created: MemberActivityState = {
|
||||
activeToolIds: new Set(),
|
||||
recentBusyUntilByToolId: new Map(),
|
||||
};
|
||||
this.activityByMember.set(key, created);
|
||||
return created;
|
||||
}
|
||||
|
||||
private pruneState(key: string, state: MemberActivityState, nowIso: string): void {
|
||||
const nowMs = Date.parse(nowIso);
|
||||
for (const [toolUseId, busyUntilIso] of state.recentBusyUntilByToolId) {
|
||||
if (Date.parse(busyUntilIso) <= nowMs) {
|
||||
state.recentBusyUntilByToolId.delete(toolUseId);
|
||||
}
|
||||
}
|
||||
if (state.activeToolIds.size === 0 && state.recentBusyUntilByToolId.size === 0) {
|
||||
this.activityByMember.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
private dropTeam(teamName: string): void {
|
||||
for (const key of this.activityByMember.keys()) {
|
||||
if (key.startsWith(`${teamName}\0`)) {
|
||||
this.activityByMember.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -231,6 +231,7 @@ function createDeps(options?: {
|
|||
providerId?: 'opencode' | 'codex';
|
||||
outboxStore?: MemberWorkSyncOutboxStorePort;
|
||||
inboxNudge?: MemberWorkSyncInboxNudgePort;
|
||||
busySignal?: MemberWorkSyncUseCaseDeps['busySignal'];
|
||||
}) {
|
||||
const clock = new MutableClock();
|
||||
const store = new InMemoryStatusStore();
|
||||
|
|
@ -259,6 +260,7 @@ function createDeps(options?: {
|
|||
reportStore: store,
|
||||
...(options?.outboxStore ? { outboxStore: options.outboxStore } : {}),
|
||||
...(options?.inboxNudge ? { inboxNudge: options.inboxNudge } : {}),
|
||||
...(options?.busySignal ? { busySignal: options.busySignal } : {}),
|
||||
reportToken: {
|
||||
create: async (input) => ({
|
||||
token: `token:${input.teamName}:${input.memberName}:${input.agendaFingerprint}`,
|
||||
|
|
@ -591,6 +593,43 @@ describe('MemberWorkSync use cases', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('defers nudge dispatch while the member has active or recent tool activity', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
const { deps, store } = createDeps({
|
||||
outboxStore: outbox,
|
||||
inboxNudge: inbox,
|
||||
busySignal: {
|
||||
isBusy: async () => ({
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
retryAfterIso: '2026-04-29T00:02:00.000Z',
|
||||
}),
|
||||
},
|
||||
});
|
||||
store.phase2ReadinessState = 'shadow_ready';
|
||||
|
||||
const current = await new MemberWorkSyncReconciler(deps).execute(
|
||||
{
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
},
|
||||
{ reconciledBy: 'queue', triggerReasons: ['tool_finished'] }
|
||||
);
|
||||
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
|
||||
teamNames: ['team-a'],
|
||||
claimedBy: 'test-dispatcher',
|
||||
});
|
||||
|
||||
expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 });
|
||||
expect(inbox.inserted).toEqual([]);
|
||||
expect(outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`)).toMatchObject({
|
||||
status: 'failed_retryable',
|
||||
lastError: 'member_busy:active_tool_activity',
|
||||
nextAttemptAt: '2026-04-29T00:02:00.000Z',
|
||||
});
|
||||
});
|
||||
|
||||
it('uses bounded retry backoff when inbox delivery fails', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,145 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { MemberWorkSyncToolActivityBusySignal } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal';
|
||||
|
||||
import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types';
|
||||
|
||||
function toolEvent(teamName: string, payload: ToolActivityEventPayload): TeamChangeEvent {
|
||||
return {
|
||||
type: 'tool-activity',
|
||||
teamName,
|
||||
detail: JSON.stringify(payload),
|
||||
};
|
||||
}
|
||||
|
||||
describe('MemberWorkSyncToolActivityBusySignal', () => {
|
||||
it('treats active tools as busy and recent finishes as a bounded quiet window', async () => {
|
||||
const signal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 });
|
||||
|
||||
signal.noteTeamChange(
|
||||
toolEvent('team-a', {
|
||||
action: 'start',
|
||||
activity: {
|
||||
memberName: 'bob',
|
||||
toolUseId: 'tool-1',
|
||||
toolName: 'bash',
|
||||
startedAt: '2026-04-29T00:00:00.000Z',
|
||||
source: 'runtime',
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:00:15.000Z',
|
||||
})
|
||||
).resolves.toMatchObject({
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
retryAfterIso: '2026-04-29T00:01:45.000Z',
|
||||
});
|
||||
|
||||
signal.noteTeamChange(
|
||||
toolEvent('team-a', {
|
||||
action: 'finish',
|
||||
memberName: 'bob',
|
||||
toolUseId: 'tool-1',
|
||||
finishedAt: '2026-04-29T00:01:00.000Z',
|
||||
})
|
||||
);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:01:30.000Z',
|
||||
})
|
||||
).resolves.toMatchObject({
|
||||
busy: true,
|
||||
reason: 'recent_tool_activity',
|
||||
retryAfterIso: '2026-04-29T00:02:30.000Z',
|
||||
});
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:02:31.000Z',
|
||||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
});
|
||||
|
||||
it('does not leak activity across members and clears targeted reset events', async () => {
|
||||
const signal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 });
|
||||
|
||||
signal.noteTeamChange(
|
||||
toolEvent('team-a', {
|
||||
action: 'start',
|
||||
activity: {
|
||||
memberName: 'bob',
|
||||
toolUseId: 'tool-1',
|
||||
toolName: 'read',
|
||||
startedAt: '2026-04-29T00:00:00.000Z',
|
||||
source: 'member_log',
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'alice',
|
||||
nowIso: '2026-04-29T00:00:15.000Z',
|
||||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
|
||||
signal.noteTeamChange(
|
||||
toolEvent('team-a', {
|
||||
action: 'reset',
|
||||
memberName: 'bob',
|
||||
toolUseIds: ['tool-1'],
|
||||
})
|
||||
);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:00:15.000Z',
|
||||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
});
|
||||
|
||||
it('drops all tracked activity for a team when it goes offline', async () => {
|
||||
const signal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 });
|
||||
|
||||
signal.noteTeamChange(
|
||||
toolEvent('team-a', {
|
||||
action: 'start',
|
||||
activity: {
|
||||
memberName: 'bob',
|
||||
toolUseId: 'tool-1',
|
||||
toolName: 'write',
|
||||
startedAt: '2026-04-29T00:00:00.000Z',
|
||||
source: 'runtime',
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
signal.noteTeamChange({
|
||||
type: 'lead-activity',
|
||||
teamName: 'team-a',
|
||||
detail: 'offline',
|
||||
});
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:00:15.000Z',
|
||||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
});
|
||||
});
|
||||
Loading…
Reference in a new issue