fix(team): suppress replayed member spawn inbox churn

This commit is contained in:
777genius 2026-04-16 22:15:49 +03:00
parent 821e23e633
commit d8672c32f6
2 changed files with 570 additions and 17 deletions

View file

@ -700,6 +700,13 @@ interface ProvisioningRun {
>;
/** Agent tool_use_id -> teammate name for persistent teammate spawns. */
memberSpawnToolUseIds: Map<string, string>;
/** Per-member latest processed lead-inbox bootstrap signal cursor for the current live run. */
memberSpawnLeadInboxCursorByMember: Map<string, MemberSpawnInboxCursor>;
/**
* Per-member exact processed lead-inbox messageIds for the current live run.
* This owns live-path correctness and protects against out-of-order inserts.
*/
memberSpawnProcessedLeadInboxMessageIdsByMember: Map<string, Set<string>>;
/** Highest accepted deterministic bootstrap event sequence for this run. */
lastDeterministicBootstrapSeq: number;
/** Throttles config/inbox audit work triggered by frequent status polling. */
@ -787,6 +794,88 @@ interface LiveTeamAgentRuntimeMetadata {
model?: string;
}
interface MemberSpawnInboxCursor {
timestamp: string;
messageId: string;
}
type LeadInboxMemberSpawnMessage = InboxMessage & { messageId: string };
function compareMemberSpawnInboxCursor(
left: MemberSpawnInboxCursor,
right: MemberSpawnInboxCursor
): number {
const leftMs = Date.parse(left.timestamp);
const rightMs = Date.parse(right.timestamp);
const leftValid = Number.isFinite(leftMs);
const rightValid = Number.isFinite(rightMs);
if (leftValid && rightValid && leftMs !== rightMs) {
return leftMs - rightMs;
}
if (leftValid !== rightValid) {
return leftValid ? -1 : 1;
}
return left.messageId.localeCompare(right.messageId);
}
function toMemberSpawnInboxCursor(
message: Pick<InboxMessage, 'timestamp' | 'messageId'>
): MemberSpawnInboxCursor | null {
const messageId = typeof message.messageId === 'string' ? message.messageId.trim() : '';
if (!messageId) {
return null;
}
return {
timestamp: message.timestamp,
messageId,
};
}
function maxMemberSpawnInboxCursor(
left: MemberSpawnInboxCursor | undefined,
right: MemberSpawnInboxCursor
): MemberSpawnInboxCursor {
if (!left) {
return right;
}
return compareMemberSpawnInboxCursor(left, right) >= 0 ? left : right;
}
function getOrCreateMemberSpawnProcessedMessageIds(
run: ProvisioningRun,
memberName: string
): Set<string> {
const existing = run.memberSpawnProcessedLeadInboxMessageIdsByMember.get(memberName);
if (existing) {
return existing;
}
const created = new Set<string>();
run.memberSpawnProcessedLeadInboxMessageIdsByMember.set(memberName, created);
return created;
}
function isMemberSpawnHeartbeatTimestampNewer(
previous: string | undefined,
incoming: string | undefined
): boolean {
const normalizedIncoming = incoming?.trim();
if (!normalizedIncoming) {
return false;
}
const normalizedPrevious = previous?.trim();
if (!normalizedPrevious) {
return true;
}
const previousMs = Date.parse(normalizedPrevious);
const incomingMs = Date.parse(normalizedIncoming);
if (Number.isFinite(previousMs) && Number.isFinite(incomingMs)) {
return incomingMs > previousMs;
}
return normalizedIncoming > normalizedPrevious;
}
function stripWrappedCliFlagValue(raw: string | undefined): string | undefined {
const trimmed = raw?.trim();
if (!trimmed) {
@ -2841,12 +2930,15 @@ export class TeamProvisioningService {
}
const runStartedAtMs = Date.parse(run.startedAt);
const expectedMembers = Array.isArray(run.expectedMembers) ? run.expectedMembers : [];
const expectedMembers = new Set(Array.isArray(run.expectedMembers) ? run.expectedMembers : []);
const teammateMessages = leadInboxMessages
.filter((message) => {
.filter((message): message is LeadInboxMemberSpawnMessage => {
const from = typeof message.from === 'string' ? message.from.trim() : '';
if (!from || from === leadName || from === 'user' || from === 'system') return false;
if (!expectedMembers.includes(from)) return false;
if (!expectedMembers.has(from)) return false;
if (typeof message.messageId !== 'string' || message.messageId.trim().length === 0) {
return false;
}
const messageTs = Date.parse(message.timestamp);
if (
Number.isFinite(messageTs) &&
@ -2857,26 +2949,83 @@ export class TeamProvisioningService {
}
return typeof message.text === 'string' && message.text.trim().length > 0;
})
.sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp));
.sort((left, right) =>
compareMemberSpawnInboxCursor(
{ timestamp: left.timestamp, messageId: left.messageId },
{ timestamp: right.timestamp, messageId: right.messageId }
)
);
const messagesByMember = new Map<string, LeadInboxMemberSpawnMessage[]>();
for (const message of teammateMessages) {
const from = message.from.trim();
const reason = extractBootstrapFailureReason(message.text);
if (reason) {
this.setMemberSpawnStatus(run, from, 'error', reason);
const memberName = message.from.trim();
const bucket = messagesByMember.get(memberName) ?? [];
bucket.push(message);
messagesByMember.set(memberName, bucket);
}
for (const [memberName, messages] of messagesByMember.entries()) {
const processedMessageIds = getOrCreateMemberSpawnProcessedMessageIds(run, memberName);
const currentCursor = run.memberSpawnLeadInboxCursorByMember.get(memberName);
const newlyProcessedMessageIds: string[] = [];
let nextCursor = currentCursor;
for (const message of messages) {
if (processedMessageIds.has(message.messageId)) {
continue;
}
const messageCursor = toMemberSpawnInboxCursor(message);
const shouldApplySignal =
messageCursor == null ||
currentCursor == null ||
compareMemberSpawnInboxCursor(messageCursor, currentCursor) > 0;
if (shouldApplySignal) {
this.applyLeadInboxSpawnSignal(run, memberName, message);
if (messageCursor) {
nextCursor = maxMemberSpawnInboxCursor(nextCursor, messageCursor);
}
}
// Mark late out-of-order signals as seen so they cannot replay forever, but only
// let strictly newer cursors mutate the already-advanced live member state.
newlyProcessedMessageIds.push(message.messageId);
}
if (newlyProcessedMessageIds.length === 0) {
continue;
}
this.setMemberSpawnStatus(
run,
from,
'online',
undefined,
'heartbeat',
extractHeartbeatTimestamp(message.text, message.timestamp)
);
for (const messageId of newlyProcessedMessageIds) {
processedMessageIds.add(messageId);
}
if (nextCursor) {
run.memberSpawnLeadInboxCursorByMember.set(memberName, nextCursor);
}
}
}
private applyLeadInboxSpawnSignal(
run: ProvisioningRun,
memberName: string,
message: LeadInboxMemberSpawnMessage
): void {
const reason = extractBootstrapFailureReason(message.text);
if (reason) {
this.setMemberSpawnStatus(run, memberName, 'error', reason);
return;
}
this.setMemberSpawnStatus(
run,
memberName,
'online',
undefined,
'heartbeat',
extractHeartbeatTimestamp(message.text, message.timestamp)
);
}
private persistSentMessage(teamName: string, message: InboxMessage): void {
try {
createController({
@ -3341,8 +3490,14 @@ export class TeamProvisioningService {
next.livenessSource = livenessSource;
next.firstSpawnAcceptedAt = prev.firstSpawnAcceptedAt ?? updatedAt;
if (livenessSource === 'heartbeat') {
const incomingHeartbeatAt = heartbeatAt?.trim() || updatedAt;
next.bootstrapConfirmed = true;
next.lastHeartbeatAt = heartbeatAt?.trim() || prev.lastHeartbeatAt || updatedAt;
next.lastHeartbeatAt = isMemberSpawnHeartbeatTimestampNewer(
prev.lastHeartbeatAt,
incomingHeartbeatAt
)
? incomingHeartbeatAt
: prev.lastHeartbeatAt;
}
next.hardFailure = false;
next.error = undefined;
@ -4971,6 +5126,8 @@ export class TeamProvisioningService {
request.members.map((m) => [m.name, createInitialMemberSpawnStatusEntry()])
),
memberSpawnToolUseIds: new Map(),
memberSpawnLeadInboxCursorByMember: new Map(),
memberSpawnProcessedLeadInboxMessageIdsByMember: new Map(),
lastDeterministicBootstrapSeq: 0,
lastMemberSpawnAuditAt: 0,
lastMemberSpawnAuditConfigReadWarningAt: 0,
@ -5550,6 +5707,8 @@ export class TeamProvisioningService {
expectedMembers.map((name) => [name, createInitialMemberSpawnStatusEntry()])
),
memberSpawnToolUseIds: new Map(),
memberSpawnLeadInboxCursorByMember: new Map(),
memberSpawnProcessedLeadInboxMessageIdsByMember: new Map(),
lastDeterministicBootstrapSeq: 0,
lastMemberSpawnAuditAt: 0,
lastMemberSpawnAuditConfigReadWarningAt: 0,

View file

@ -143,6 +143,68 @@ function writeLaunchState(
);
}
function createMemberSpawnStatusEntry(
overrides: Record<string, unknown> = {}
): Record<string, unknown> {
return {
status: 'waiting',
launchState: 'runtime_pending_bootstrap',
error: undefined,
updatedAt: new Date().toISOString(),
runtimeAlive: false,
livenessSource: undefined,
bootstrapConfirmed: false,
hardFailure: false,
agentToolAccepted: true,
firstSpawnAcceptedAt: new Date().toISOString(),
lastHeartbeatAt: undefined,
...overrides,
};
}
function createMemberSpawnRun(params?: {
runId?: string;
teamName?: string;
startedAt?: string;
expectedMembers?: string[];
memberSpawnStatuses?: Map<string, Record<string, unknown>>;
memberSpawnLeadInboxCursorByMember?: Map<string, { timestamp: string; messageId: string }>;
memberSpawnProcessedLeadInboxMessageIdsByMember?: Map<string, Set<string>>;
}) {
const teamName = params?.teamName ?? 'member-spawn-team';
const expectedMembers = params?.expectedMembers ?? ['alice'];
const memberSpawnStatuses =
params?.memberSpawnStatuses ??
new Map([
[
expectedMembers[0]!,
createMemberSpawnStatusEntry({
firstSpawnAcceptedAt: new Date(Date.now() - 5_000).toISOString(),
}),
],
]);
return {
runId: params?.runId ?? 'run-member-spawn-1',
teamName,
startedAt: params?.startedAt ?? new Date(Date.now() - 60_000).toISOString(),
request: {
members: [],
},
expectedMembers,
memberSpawnStatuses,
memberSpawnToolUseIds: new Map(),
memberSpawnLeadInboxCursorByMember:
params?.memberSpawnLeadInboxCursorByMember ?? new Map(),
memberSpawnProcessedLeadInboxMessageIdsByMember:
params?.memberSpawnProcessedLeadInboxMessageIdsByMember ?? new Map(),
provisioningOutputParts: [],
activeToolCalls: new Map(),
isLaunch: false,
provisioningComplete: false,
} as any;
}
describe('TeamProvisioningService', () => {
beforeEach(() => {
vi.clearAllMocks();
@ -919,4 +981,336 @@ describe('TeamProvisioningService', () => {
expect(result.statuses.jack?.hardFailureReason).toContain('requested model is not available');
expect(result.teamLaunchState).toBe('partial_failure');
});
it('does not reprocess already-seen teammate lead inbox messages', async () => {
const svc = new TeamProvisioningService();
const run = createMemberSpawnRun({
startedAt: '2026-04-16T09:00:00.000Z',
memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([
['alice', new Set(['msg-1', 'msg-2'])],
]),
memberSpawnLeadInboxCursorByMember: new Map([
[
'alice',
{
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-2',
},
],
]),
});
vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([
{
from: 'alice',
text: 'heartbeat',
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-1',
read: false,
},
{
from: 'alice',
text: 'heartbeat',
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-2',
read: false,
},
]);
const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal');
await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run);
expect(applySignalSpy).not.toHaveBeenCalled();
});
it('processes an unseen teammate heartbeat on the first refresh', async () => {
const svc = new TeamProvisioningService();
const run = createMemberSpawnRun({
startedAt: '2026-04-16T09:00:00.000Z',
});
vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([
{
from: 'alice',
text: '{"type":"heartbeat","timestamp":"2026-04-16T10:00:00.000Z"}',
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-1',
read: false,
},
]);
await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run);
expect(run.memberSpawnStatuses.get('alice')).toMatchObject({
status: 'online',
launchState: 'confirmed_alive',
bootstrapConfirmed: true,
hardFailure: false,
lastHeartbeatAt: '2026-04-16T10:00:00.000Z',
});
expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-1',
});
expect(run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')).toEqual(
new Set(['msg-1'])
);
});
it('ignores teammate lead inbox signals that predate the current run', async () => {
const svc = new TeamProvisioningService();
const run = createMemberSpawnRun({
startedAt: '2026-04-16T10:00:00.000Z',
});
vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([
{
from: 'alice',
text: '{"type":"heartbeat","timestamp":"2026-04-16T09:59:59.000Z"}',
timestamp: '2026-04-16T09:59:59.000Z',
messageId: 'msg-early',
read: false,
},
]);
const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal');
await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run);
expect(applySignalSpy).not.toHaveBeenCalled();
expect(run.memberSpawnLeadInboxCursorByMember.size).toBe(0);
expect(run.memberSpawnProcessedLeadInboxMessageIdsByMember.size).toBe(0);
expect(run.memberSpawnStatuses.get('alice')).toMatchObject({
status: 'waiting',
launchState: 'runtime_pending_bootstrap',
bootstrapConfirmed: false,
});
});
it('marks an unseen older lead inbox signal as processed without replaying older state', async () => {
const latestHeartbeatAt = '2026-04-16T10:05:00.000Z';
const existingEntry = createMemberSpawnStatusEntry({
status: 'online',
launchState: 'confirmed_alive',
runtimeAlive: true,
livenessSource: 'heartbeat',
bootstrapConfirmed: true,
lastHeartbeatAt: latestHeartbeatAt,
});
const run = createMemberSpawnRun({
startedAt: '2026-04-16T09:00:00.000Z',
memberSpawnStatuses: new Map([['alice', existingEntry]]),
memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([
['alice', new Set(['msg-3'])],
]),
memberSpawnLeadInboxCursorByMember: new Map([
[
'alice',
{
timestamp: latestHeartbeatAt,
messageId: 'msg-3',
},
],
]),
});
const svc = new TeamProvisioningService();
vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([
{
from: 'alice',
text: 'Bootstrap failed: unsupported model',
timestamp: '2026-04-16T10:04:00.000Z',
messageId: 'msg-2b',
read: false,
},
{
from: 'alice',
text: 'heartbeat',
timestamp: latestHeartbeatAt,
messageId: 'msg-3',
read: false,
},
]);
const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal');
await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run);
expect(applySignalSpy).not.toHaveBeenCalled();
expect(run.memberSpawnStatuses.get('alice')).toBe(existingEntry);
expect(
run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')?.has('msg-2b')
).toBe(true);
expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({
timestamp: latestHeartbeatAt,
messageId: 'msg-3',
});
});
it('applies an unseen newer failure signal and transitions the member to failed_to_start', async () => {
const latestHeartbeatAt = '2026-04-16T10:00:00.000Z';
const run = createMemberSpawnRun({
startedAt: '2026-04-16T09:00:00.000Z',
memberSpawnStatuses: new Map([
[
'alice',
createMemberSpawnStatusEntry({
status: 'online',
launchState: 'confirmed_alive',
runtimeAlive: true,
livenessSource: 'heartbeat',
bootstrapConfirmed: true,
lastHeartbeatAt: latestHeartbeatAt,
}),
],
]),
memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([
['alice', new Set(['msg-1'])],
]),
memberSpawnLeadInboxCursorByMember: new Map([
[
'alice',
{
timestamp: latestHeartbeatAt,
messageId: 'msg-1',
},
],
]),
});
const svc = new TeamProvisioningService();
vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([
{
from: 'alice',
text: 'Bootstrap failed: unsupported model',
timestamp: '2026-04-16T10:01:00.000Z',
messageId: 'msg-2',
read: false,
},
]);
await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run);
expect(run.memberSpawnStatuses.get('alice')).toMatchObject({
status: 'error',
launchState: 'failed_to_start',
hardFailure: true,
hardFailureReason: 'Bootstrap failed: unsupported model',
});
expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({
timestamp: '2026-04-16T10:01:00.000Z',
messageId: 'msg-2',
});
expect(run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')).toEqual(
new Set(['msg-1', 'msg-2'])
);
});
it('applies an unseen same-timestamp signal with a greater messageId and advances the cursor', async () => {
const run = createMemberSpawnRun({
startedAt: '2026-04-16T09:00:00.000Z',
memberSpawnProcessedLeadInboxMessageIdsByMember: new Map([
['alice', new Set(['msg-2'])],
]),
memberSpawnLeadInboxCursorByMember: new Map([
[
'alice',
{
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-2',
},
],
]),
});
const svc = new TeamProvisioningService();
vi.spyOn((svc as any).inboxReader, 'getMessagesFor').mockResolvedValue([
{
from: 'alice',
text: 'heartbeat',
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-2',
read: false,
},
{
from: 'alice',
text: 'heartbeat',
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-3',
read: false,
},
]);
const applySignalSpy = vi.spyOn(svc as any, 'applyLeadInboxSpawnSignal');
await (svc as any).refreshMemberSpawnStatusesFromLeadInbox(run);
expect(applySignalSpy).toHaveBeenCalledTimes(1);
expect(applySignalSpy).toHaveBeenCalledWith(
run,
'alice',
expect.objectContaining({ messageId: 'msg-3' })
);
expect(run.memberSpawnLeadInboxCursorByMember.get('alice')).toEqual({
timestamp: '2026-04-16T10:00:00.000Z',
messageId: 'msg-3',
});
expect(
run.memberSpawnProcessedLeadInboxMessageIdsByMember.get('alice')
).toEqual(new Set(['msg-2', 'msg-3']));
});
it('does not bump lastHeartbeatAt for an equal heartbeat timestamp', () => {
const existingEntry = createMemberSpawnStatusEntry({
status: 'online',
launchState: 'confirmed_alive',
runtimeAlive: true,
livenessSource: 'heartbeat',
bootstrapConfirmed: true,
lastHeartbeatAt: '2026-04-16T10:00:00.000Z',
});
const run = createMemberSpawnRun({
memberSpawnStatuses: new Map([['alice', existingEntry]]),
});
const svc = new TeamProvisioningService();
(svc as any).setMemberSpawnStatus(
run,
'alice',
'online',
undefined,
'heartbeat',
'2026-04-16T10:00:00.000Z'
);
expect(run.memberSpawnStatuses.get('alice')).toBe(existingEntry);
});
it('does not bump lastHeartbeatAt for an older heartbeat timestamp', () => {
const existingEntry = createMemberSpawnStatusEntry({
status: 'online',
launchState: 'confirmed_alive',
runtimeAlive: true,
livenessSource: 'heartbeat',
bootstrapConfirmed: true,
lastHeartbeatAt: '2026-04-16T10:00:00.000Z',
});
const run = createMemberSpawnRun({
memberSpawnStatuses: new Map([['alice', existingEntry]]),
});
const svc = new TeamProvisioningService();
(svc as any).setMemberSpawnStatus(
run,
'alice',
'online',
undefined,
'heartbeat',
'2026-04-16T09:59:59.000Z'
);
expect(run.memberSpawnStatuses.get('alice')).toBe(existingEntry);
});
});