From d8672c32f68d5d75ffbca4976878db1f291e1a76 Mon Sep 17 00:00:00 2001 From: 777genius Date: Thu, 16 Apr 2026 22:15:49 +0300 Subject: [PATCH] fix(team): suppress replayed member spawn inbox churn --- .../services/team/TeamProvisioningService.ts | 193 ++++++++- .../team/TeamProvisioningService.test.ts | 394 ++++++++++++++++++ 2 files changed, 570 insertions(+), 17 deletions(-) diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index 9ace3693..483b91e9 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -700,6 +700,13 @@ interface ProvisioningRun { >; /** Agent tool_use_id -> teammate name for persistent teammate spawns. */ memberSpawnToolUseIds: Map; + /** Per-member latest processed lead-inbox bootstrap signal cursor for the current live run. */ + memberSpawnLeadInboxCursorByMember: Map; + /** + * Per-member exact processed lead-inbox messageIds for the current live run. + * This owns live-path correctness and protects against out-of-order inserts. + */ + memberSpawnProcessedLeadInboxMessageIdsByMember: Map>; /** Highest accepted deterministic bootstrap event sequence for this run. */ lastDeterministicBootstrapSeq: number; /** Throttles config/inbox audit work triggered by frequent status polling. */ @@ -787,6 +794,88 @@ interface LiveTeamAgentRuntimeMetadata { model?: string; } +interface MemberSpawnInboxCursor { + timestamp: string; + messageId: string; +} + +type LeadInboxMemberSpawnMessage = InboxMessage & { messageId: string }; + +function compareMemberSpawnInboxCursor( + left: MemberSpawnInboxCursor, + right: MemberSpawnInboxCursor +): number { + const leftMs = Date.parse(left.timestamp); + const rightMs = Date.parse(right.timestamp); + const leftValid = Number.isFinite(leftMs); + const rightValid = Number.isFinite(rightMs); + + if (leftValid && rightValid && leftMs !== rightMs) { + return leftMs - rightMs; + } + if (leftValid !== rightValid) { + return leftValid ? -1 : 1; + } + return left.messageId.localeCompare(right.messageId); +} + +function toMemberSpawnInboxCursor( + message: Pick +): MemberSpawnInboxCursor | null { + const messageId = typeof message.messageId === 'string' ? message.messageId.trim() : ''; + if (!messageId) { + return null; + } + return { + timestamp: message.timestamp, + messageId, + }; +} + +function maxMemberSpawnInboxCursor( + left: MemberSpawnInboxCursor | undefined, + right: MemberSpawnInboxCursor +): MemberSpawnInboxCursor { + if (!left) { + return right; + } + return compareMemberSpawnInboxCursor(left, right) >= 0 ? left : right; +} + +function getOrCreateMemberSpawnProcessedMessageIds( + run: ProvisioningRun, + memberName: string +): Set { + const existing = run.memberSpawnProcessedLeadInboxMessageIdsByMember.get(memberName); + if (existing) { + return existing; + } + const created = new Set(); + run.memberSpawnProcessedLeadInboxMessageIdsByMember.set(memberName, created); + return created; +} + +function isMemberSpawnHeartbeatTimestampNewer( + previous: string | undefined, + incoming: string | undefined +): boolean { + const normalizedIncoming = incoming?.trim(); + if (!normalizedIncoming) { + return false; + } + const normalizedPrevious = previous?.trim(); + if (!normalizedPrevious) { + return true; + } + + const previousMs = Date.parse(normalizedPrevious); + const incomingMs = Date.parse(normalizedIncoming); + if (Number.isFinite(previousMs) && Number.isFinite(incomingMs)) { + return incomingMs > previousMs; + } + return normalizedIncoming > normalizedPrevious; +} + function stripWrappedCliFlagValue(raw: string | undefined): string | undefined { const trimmed = raw?.trim(); if (!trimmed) { @@ -2841,12 +2930,15 @@ export class TeamProvisioningService { } const runStartedAtMs = Date.parse(run.startedAt); - const expectedMembers = Array.isArray(run.expectedMembers) ? run.expectedMembers : []; + const expectedMembers = new Set(Array.isArray(run.expectedMembers) ? run.expectedMembers : []); const teammateMessages = leadInboxMessages - .filter((message) => { + .filter((message): message is LeadInboxMemberSpawnMessage => { const from = typeof message.from === 'string' ? message.from.trim() : ''; if (!from || from === leadName || from === 'user' || from === 'system') return false; - if (!expectedMembers.includes(from)) return false; + if (!expectedMembers.has(from)) return false; + if (typeof message.messageId !== 'string' || message.messageId.trim().length === 0) { + return false; + } const messageTs = Date.parse(message.timestamp); if ( Number.isFinite(messageTs) && @@ -2857,26 +2949,83 @@ export class TeamProvisioningService { } return typeof message.text === 'string' && message.text.trim().length > 0; }) - .sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp)); + .sort((left, right) => + compareMemberSpawnInboxCursor( + { timestamp: left.timestamp, messageId: left.messageId }, + { timestamp: right.timestamp, messageId: right.messageId } + ) + ); + const messagesByMember = new Map(); for (const message of teammateMessages) { - const from = message.from.trim(); - const reason = extractBootstrapFailureReason(message.text); - if (reason) { - this.setMemberSpawnStatus(run, from, 'error', reason); + const memberName = message.from.trim(); + const bucket = messagesByMember.get(memberName) ?? []; + bucket.push(message); + messagesByMember.set(memberName, bucket); + } + + for (const [memberName, messages] of messagesByMember.entries()) { + const processedMessageIds = getOrCreateMemberSpawnProcessedMessageIds(run, memberName); + const currentCursor = run.memberSpawnLeadInboxCursorByMember.get(memberName); + const newlyProcessedMessageIds: string[] = []; + let nextCursor = currentCursor; + + for (const message of messages) { + if (processedMessageIds.has(message.messageId)) { + continue; + } + + const messageCursor = toMemberSpawnInboxCursor(message); + const shouldApplySignal = + messageCursor == null || + currentCursor == null || + compareMemberSpawnInboxCursor(messageCursor, currentCursor) > 0; + + if (shouldApplySignal) { + this.applyLeadInboxSpawnSignal(run, memberName, message); + if (messageCursor) { + nextCursor = maxMemberSpawnInboxCursor(nextCursor, messageCursor); + } + } + + // Mark late out-of-order signals as seen so they cannot replay forever, but only + // let strictly newer cursors mutate the already-advanced live member state. + newlyProcessedMessageIds.push(message.messageId); + } + + if (newlyProcessedMessageIds.length === 0) { continue; } - this.setMemberSpawnStatus( - run, - from, - 'online', - undefined, - 'heartbeat', - extractHeartbeatTimestamp(message.text, message.timestamp) - ); + + for (const messageId of newlyProcessedMessageIds) { + processedMessageIds.add(messageId); + } + if (nextCursor) { + run.memberSpawnLeadInboxCursorByMember.set(memberName, nextCursor); + } } } + private applyLeadInboxSpawnSignal( + run: ProvisioningRun, + memberName: string, + message: LeadInboxMemberSpawnMessage + ): void { + const reason = extractBootstrapFailureReason(message.text); + if (reason) { + this.setMemberSpawnStatus(run, memberName, 'error', reason); + return; + } + this.setMemberSpawnStatus( + run, + memberName, + 'online', + undefined, + 'heartbeat', + extractHeartbeatTimestamp(message.text, message.timestamp) + ); + } + private persistSentMessage(teamName: string, message: InboxMessage): void { try { createController({ @@ -3341,8 +3490,14 @@ export class TeamProvisioningService { next.livenessSource = livenessSource; next.firstSpawnAcceptedAt = prev.firstSpawnAcceptedAt ?? updatedAt; if (livenessSource === 'heartbeat') { + const incomingHeartbeatAt = heartbeatAt?.trim() || updatedAt; next.bootstrapConfirmed = true; - next.lastHeartbeatAt = heartbeatAt?.trim() || prev.lastHeartbeatAt || updatedAt; + next.lastHeartbeatAt = isMemberSpawnHeartbeatTimestampNewer( + prev.lastHeartbeatAt, + incomingHeartbeatAt + ) + ? incomingHeartbeatAt + : prev.lastHeartbeatAt; } next.hardFailure = false; next.error = undefined; @@ -4971,6 +5126,8 @@ export class TeamProvisioningService { request.members.map((m) => [m.name, createInitialMemberSpawnStatusEntry()]) ), memberSpawnToolUseIds: new Map(), + memberSpawnLeadInboxCursorByMember: new Map(), + memberSpawnProcessedLeadInboxMessageIdsByMember: new Map(), lastDeterministicBootstrapSeq: 0, lastMemberSpawnAuditAt: 0, lastMemberSpawnAuditConfigReadWarningAt: 0, @@ -5550,6 +5707,8 @@ export class TeamProvisioningService { expectedMembers.map((name) => [name, createInitialMemberSpawnStatusEntry()]) ), memberSpawnToolUseIds: new Map(), + memberSpawnLeadInboxCursorByMember: new Map(), + memberSpawnProcessedLeadInboxMessageIdsByMember: new Map(), lastDeterministicBootstrapSeq: 0, lastMemberSpawnAuditAt: 0, lastMemberSpawnAuditConfigReadWarningAt: 0, diff --git a/test/main/services/team/TeamProvisioningService.test.ts b/test/main/services/team/TeamProvisioningService.test.ts index 889c74c8..7049185b 100644 --- a/test/main/services/team/TeamProvisioningService.test.ts +++ b/test/main/services/team/TeamProvisioningService.test.ts @@ -143,6 +143,68 @@ function writeLaunchState( ); } +function createMemberSpawnStatusEntry( + overrides: Record = {} +): Record { + return { + status: 'waiting', + launchState: 'runtime_pending_bootstrap', + error: undefined, + updatedAt: new Date().toISOString(), + runtimeAlive: false, + livenessSource: undefined, + bootstrapConfirmed: false, + hardFailure: false, + agentToolAccepted: true, + firstSpawnAcceptedAt: new Date().toISOString(), + lastHeartbeatAt: undefined, + ...overrides, + }; +} + +function createMemberSpawnRun(params?: { + runId?: string; + teamName?: string; + startedAt?: string; + expectedMembers?: string[]; + memberSpawnStatuses?: Map>; + memberSpawnLeadInboxCursorByMember?: Map; + memberSpawnProcessedLeadInboxMessageIdsByMember?: Map>; +}) { + const teamName = params?.teamName ?? 'member-spawn-team'; + const expectedMembers = params?.expectedMembers ?? ['alice']; + const memberSpawnStatuses = + params?.memberSpawnStatuses ?? + new Map([ + [ + expectedMembers[0]!, + createMemberSpawnStatusEntry({ + firstSpawnAcceptedAt: new Date(Date.now() - 5_000).toISOString(), + }), + ], + ]); + + return { + runId: params?.runId ?? 'run-member-spawn-1', + teamName, + startedAt: params?.startedAt ?? new Date(Date.now() - 60_000).toISOString(), + request: { + members: [], + }, + expectedMembers, + memberSpawnStatuses, + memberSpawnToolUseIds: new Map(), + memberSpawnLeadInboxCursorByMember: + params?.memberSpawnLeadInboxCursorByMember ?? new Map(), + memberSpawnProcessedLeadInboxMessageIdsByMember: + params?.memberSpawnProcessedLeadInboxMessageIdsByMember ?? new Map(), + provisioningOutputParts: [], + activeToolCalls: new Map(), + isLaunch: false, + provisioningComplete: false, + } as any; +} + describe('TeamProvisioningService', () => { beforeEach(() => { vi.clearAllMocks(); @@ -919,4 +981,336 @@ describe('TeamProvisioningService', () => { expect(result.statuses.jack?.hardFailureReason).toContain('requested model is not available'); expect(result.teamLaunchState).toBe('partial_failure'); }); + + it('does not reprocess already-seen teammate lead inbox messages', async () => { + const svc = new TeamProvisioningService(); + const run = createMemberSpawnRun({ + startedAt: '2026-04-16T09:00:00.000Z', + memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([ + ['alice', new Set(['msg-1', 'msg-2'])], + ]), + memberSpawnLeadInboxCursorByMember: new Map([ + [ + 'alice', + { + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-2', + }, + ], + ]), + }); + + vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([ + { + from: 'alice', + text: 'heartbeat', + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-1', + read: false, + }, + { + from: 'alice', + text: 'heartbeat', + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-2', + read: false, + }, + ]); + + const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal'); + + await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run); + + expect(applySignalSpy).not.toHaveBeenCalled(); + }); + + it('processes an unseen teammate heartbeat on the first refresh', async () => { + const svc = new TeamProvisioningService(); + const run = createMemberSpawnRun({ + startedAt: '2026-04-16T09:00:00.000Z', + }); + + vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([ + { + from: 'alice', + text: '{"type":"heartbeat","timestamp":"2026-04-16T10:00:00.000Z"}', + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-1', + read: false, + }, + ]); + + await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run); + + expect(run.memberSpawnStatuses.get('alice')).toMatchObject({ + status: 'online', + launchState: 'confirmed_alive', + bootstrapConfirmed: true, + hardFailure: false, + lastHeartbeatAt: '2026-04-16T10:00:00.000Z', + }); + expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({ + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-1', + }); + expect(run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')).toEqual( + new Set(['msg-1']) + ); + }); + + it('ignores teammate lead inbox signals that predate the current run', async () => { + const svc = new TeamProvisioningService(); + const run = createMemberSpawnRun({ + startedAt: '2026-04-16T10:00:00.000Z', + }); + + vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([ + { + from: 'alice', + text: '{"type":"heartbeat","timestamp":"2026-04-16T09:59:59.000Z"}', + timestamp: '2026-04-16T09:59:59.000Z', + messageId: 'msg-early', + read: false, + }, + ]); + + const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal'); + + await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run); + + expect(applySignalSpy).not.toHaveBeenCalled(); + expect(run.memberSpawnLeadInboxCursorByMember.size).toBe(0); + expect(run.memberSpawnProcessedLeadInboxMessageIdsByMember.size).toBe(0); + expect(run.memberSpawnStatuses.get('alice')).toMatchObject({ + status: 'waiting', + launchState: 'runtime_pending_bootstrap', + bootstrapConfirmed: false, + }); + }); + + it('marks an unseen older lead inbox signal as processed without replaying older state', async () => { + const latestHeartbeatAt = '2026-04-16T10:05:00.000Z'; + const existingEntry = createMemberSpawnStatusEntry({ + status: 'online', + launchState: 'confirmed_alive', + runtimeAlive: true, + livenessSource: 'heartbeat', + bootstrapConfirmed: true, + lastHeartbeatAt: latestHeartbeatAt, + }); + const run = createMemberSpawnRun({ + startedAt: '2026-04-16T09:00:00.000Z', + memberSpawnStatuses: new Map([['alice', existingEntry]]), + memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([ + ['alice', new Set(['msg-3'])], + ]), + memberSpawnLeadInboxCursorByMember: new Map([ + [ + 'alice', + { + timestamp: latestHeartbeatAt, + messageId: 'msg-3', + }, + ], + ]), + }); + const svc = new TeamProvisioningService(); + + vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([ + { + from: 'alice', + text: 'Bootstrap failed: unsupported model', + timestamp: '2026-04-16T10:04:00.000Z', + messageId: 'msg-2b', + read: false, + }, + { + from: 'alice', + text: 'heartbeat', + timestamp: latestHeartbeatAt, + messageId: 'msg-3', + read: false, + }, + ]); + + const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal'); + + await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run); + + expect(applySignalSpy).not.toHaveBeenCalled(); + expect(run.memberSpawnStatuses.get('alice')).toBe(existingEntry); + expect( + run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')?.has('msg-2b') + ).toBe(true); + expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({ + timestamp: latestHeartbeatAt, + messageId: 'msg-3', + }); + }); + + it('applies an unseen newer failure signal and transitions the member to failed_to_start', async () => { + const latestHeartbeatAt = '2026-04-16T10:00:00.000Z'; + const run = createMemberSpawnRun({ + startedAt: '2026-04-16T09:00:00.000Z', + memberSpawnStatuses: new Map([ + [ + 'alice', + createMemberSpawnStatusEntry({ + status: 'online', + launchState: 'confirmed_alive', + runtimeAlive: true, + livenessSource: 'heartbeat', + bootstrapConfirmed: true, + lastHeartbeatAt: latestHeartbeatAt, + }), + ], + ]), + memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([ + ['alice', new Set(['msg-1'])], + ]), + memberSpawnLeadInboxCursorByMember: new Map([ + [ + 'alice', + { + timestamp: latestHeartbeatAt, + messageId: 'msg-1', + }, + ], + ]), + }); + const svc = new TeamProvisioningService(); + + vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([ + { + from: 'alice', + text: 'Bootstrap failed: unsupported model', + timestamp: '2026-04-16T10:01:00.000Z', + messageId: 'msg-2', + read: false, + }, + ]); + + await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run); + + expect(run.memberSpawnStatuses.get('alice')).toMatchObject({ + status: 'error', + launchState: 'failed_to_start', + hardFailure: true, + hardFailureReason: 'Bootstrap failed: unsupported model', + }); + expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({ + timestamp: '2026-04-16T10:01:00.000Z', + messageId: 'msg-2', + }); + expect(run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')).toEqual( + new Set(['msg-1', 'msg-2']) + ); + }); + + it('applies an unseen same-timestamp signal with a greater messageId and advances the cursor', async () => { + const run = createMemberSpawnRun({ + startedAt: '2026-04-16T09:00:00.000Z', + memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([ + ['alice', new Set(['msg-2'])], + ]), + memberSpawnLeadInboxCursorByMember: new Map([ + [ + 'alice', + { + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-2', + }, + ], + ]), + }); + const svc = new TeamProvisioningService(); + + vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([ + { + from: 'alice', + text: 'heartbeat', + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-2', + read: false, + }, + { + from: 'alice', + text: 'heartbeat', + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-3', + read: false, + }, + ]); + + const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal'); + + await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run); + + expect(applySignalSpy).toHaveBeenCalledTimes(1); + expect(applySignalSpy).toHaveBeenCalledWith( + run, + 'alice', + expect.objectContaining({ messageId: 'msg-3' }) + ); + expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({ + timestamp: '2026-04-16T10:00:00.000Z', + messageId: 'msg-3', + }); + expect( + run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice') + ).toEqual(new Set(['msg-2', 'msg-3'])); + }); + + it('does not bump lastHeartbeatAt for an equal heartbeat timestamp', () => { + const existingEntry = createMemberSpawnStatusEntry({ + status: 'online', + launchState: 'confirmed_alive', + runtimeAlive: true, + livenessSource: 'heartbeat', + bootstrapConfirmed: true, + lastHeartbeatAt: '2026-04-16T10:00:00.000Z', + }); + const run = createMemberSpawnRun({ + memberSpawnStatuses: new Map([['alice', existingEntry]]), + }); + const svc = new TeamProvisioningService(); + + (svc as any).setMemberSpawnStatus( + run, + 'alice', + 'online', + undefined, + 'heartbeat', + '2026-04-16T10:00:00.000Z' + ); + + expect(run.memberSpawnStatuses.get('alice')).toBe(existingEntry); + }); + + it('does not bump lastHeartbeatAt for an older heartbeat timestamp', () => { + const existingEntry = createMemberSpawnStatusEntry({ + status: 'online', + launchState: 'confirmed_alive', + runtimeAlive: true, + livenessSource: 'heartbeat', + bootstrapConfirmed: true, + lastHeartbeatAt: '2026-04-16T10:00:00.000Z', + }); + const run = createMemberSpawnRun({ + memberSpawnStatuses: new Map([['alice', existingEntry]]), + }); + const svc = new TeamProvisioningService(); + + (svc as any).setMemberSpawnStatus( + run, + 'alice', + 'online', + undefined, + 'heartbeat', + '2026-04-16T09:59:59.000Z' + ); + + expect(run.memberSpawnStatuses.get('alice')).toBe(existingEntry); + }); + });