From 5366dbb34ae801b52590f5b0fa0bc4bbb95d8e3f Mon Sep 17 00:00:00 2001 From: 777genius Date: Wed, 3 Jun 2026 22:47:38 +0300 Subject: [PATCH] fix(team): bound inbox relay waits --- .../services/team/TeamProvisioningService.ts | 177 +++++++++++++++- .../team/TeamProvisioningServiceRelay.test.ts | 198 ++++++++++++++++++ 2 files changed, 368 insertions(+), 7 deletions(-) diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index 5466b0ed..90e9cf04 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -3286,6 +3286,17 @@ interface OpenCodeMemberInboxDelivery { userVisibleImpact?: OpenCodeRuntimeDeliveryUserVisibleImpact; } +class InboxRelayInFlightTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = 'InboxRelayInFlightTimeoutError'; + } +} + +function isInboxRelayInFlightTimeoutError(error: unknown): error is InboxRelayInFlightTimeoutError { + return error instanceof InboxRelayInFlightTimeoutError; +} + type OpenCodeVisibleReplyCorrelation = NonNullable< OpenCodePromptDeliveryLedgerRecord['visibleReplyCorrelation'] >; @@ -3444,6 +3455,15 @@ function getOpenCodeInboxRelayPriority( return 0; } +function getMemberInboxRelayPriority( + message: Pick +): number { + if (message.messageKind === 'member_work_sync_nudge') { + return 30; + } + return 0; +} + function getLeadInboxRelayPriority(message: Pick): number { if (message.messageKind === 'member_work_sync_nudge') { return 30; @@ -3478,6 +3498,13 @@ function compareOpenCodeInboxRelayMessagesByPriority( return compareInboxRelayMessages(a, b, getOpenCodeInboxRelayPriority); } +function compareMemberInboxRelayMessagesByPriority( + a: Pick & { messageId: string }, + b: Pick & { messageId: string } +): number { + return compareInboxRelayMessages(a, b, getMemberInboxRelayPriority); +} + function compareLeadInboxRelayMessagesByPriority( a: Pick & { messageId: string }, b: Pick & { messageId: string } @@ -3527,6 +3554,7 @@ export class TeamProvisioningService { private static readonly RETAINED_PROVISIONING_PROGRESS_TTL_MS = 5 * 60_000; private static readonly OPENCODE_RUNTIME_DELIVERY_ADVISORY_EVENT_TTL_MS = 24 * 60 * 60_000; private static readonly OPENCODE_RUNTIME_DELIVERY_LEAD_NOTICE_TTL_MS = 24 * 60 * 60_000; + private static readonly INBOX_RELAY_IN_FLIGHT_TIMEOUT_MS = 2 * 60_000; private readonly runs = new Map(); private readonly provisioningRunByTeam = new Map(); @@ -11661,6 +11689,33 @@ export class TeamProvisioningService { return `opencode:${this.getMemberRelayKey(teamName, memberName)}`; } + private async waitForInboxRelayInFlight(input: { + promise: Promise; + relayName: string; + relayKey: string; + }): Promise { + let timer: ReturnType | null = null; + try { + return await Promise.race([ + input.promise, + new Promise((_, reject) => { + timer = setTimeout(() => { + reject( + new InboxRelayInFlightTimeoutError( + `${input.relayName} timed out after ${TeamProvisioningService.INBOX_RELAY_IN_FLIGHT_TIMEOUT_MS}ms: ${input.relayKey}` + ) + ); + }, TeamProvisioningService.INBOX_RELAY_IN_FLIGHT_TIMEOUT_MS); + timer.unref?.(); + }), + ]); + } finally { + if (timer) { + clearTimeout(timer); + } + } + } + private normalizeRelayCandidateText(text: string): string { return stripAgentBlocks(String(text)).trim().replace(/\r\n/g, '\n'); } @@ -23201,7 +23256,23 @@ export class TeamProvisioningService { const relayKey = this.getMemberRelayKey(teamName, memberName); const existing = this.memberInboxRelayInFlight.get(relayKey); if (existing) { - return existing; + try { + return await this.waitForInboxRelayInFlight({ + promise: existing, + relayName: 'member_inbox_relay', + relayKey, + }); + } catch (error) { + if (!isInboxRelayInFlightTimeoutError(error)) { + throw error; + } + logger.warn(`[${teamName}] member_inbox_relay_timed_out: ${getErrorMessage(error)}`); + return 0; + } finally { + if (this.memberInboxRelayInFlight.get(relayKey) === existing) { + this.memberInboxRelayInFlight.delete(relayKey); + } + } } const work = (async (): Promise => { @@ -23274,7 +23345,9 @@ export class TeamProvisioningService { if (actionableUnread.length === 0) return 0; const MAX_RELAY = 10; - const batch = actionableUnread.slice(0, MAX_RELAY); + const batch = [...actionableUnread] + .sort(compareMemberInboxRelayMessagesByPriority) + .slice(0, MAX_RELAY); this.armSilentTeammateForward(run, memberName, 'member_inbox_relay'); const rememberedRelayIds = this.rememberPendingInboxRelayCandidates(run, memberName, batch); @@ -23355,7 +23428,17 @@ export class TeamProvisioningService { this.memberInboxRelayInFlight.set(relayKey, work); try { - return await work; + return await this.waitForInboxRelayInFlight({ + promise: work, + relayName: 'member_inbox_relay', + relayKey, + }); + } catch (error) { + if (!isInboxRelayInFlightTimeoutError(error)) { + throw error; + } + logger.warn(`[${teamName}] member_inbox_relay_timed_out: ${getErrorMessage(error)}`); + return 0; } finally { if (this.memberInboxRelayInFlight.get(relayKey) === work) { this.memberInboxRelayInFlight.delete(relayKey); @@ -23521,7 +23604,37 @@ export class TeamProvisioningService { if (existing) { const onlyMessageId = options.onlyMessageId?.trim(); if (!onlyMessageId) { - return existing; + try { + return await this.waitForInboxRelayInFlight({ + promise: existing, + relayName: 'opencode_member_inbox_relay', + relayKey, + }); + } catch (error) { + if (!isInboxRelayInFlightTimeoutError(error)) { + throw error; + } + const diagnostic = `opencode_member_inbox_relay_timed_out: ${getErrorMessage(error)}`; + logger.warn(`[${teamName}] ${diagnostic}`); + return { + relayed: 0, + attempted: 0, + delivered: 0, + failed: 1, + lastDelivery: { + delivered: false, + accepted: false, + responsePending: false, + reason: 'opencode_member_inbox_relay_timed_out', + diagnostics: [diagnostic], + }, + diagnostics: [diagnostic], + }; + } finally { + if (this.openCodeMemberInboxRelayInFlight.get(relayKey) === existing) { + this.openCodeMemberInboxRelayInFlight.delete(relayKey); + } + } } const inboxMessages = await this.inboxReader .getMessagesFor(teamName, memberName) @@ -24012,7 +24125,31 @@ export class TeamProvisioningService { this.openCodeMemberInboxRelayInFlight.set(relayKey, work); try { - return await work; + return await this.waitForInboxRelayInFlight({ + promise: work, + relayName: 'opencode_member_inbox_relay', + relayKey, + }); + } catch (error) { + if (!isInboxRelayInFlightTimeoutError(error)) { + throw error; + } + const diagnostic = `opencode_member_inbox_relay_timed_out: ${getErrorMessage(error)}`; + logger.warn(`[${teamName}] ${diagnostic}`); + return { + relayed: 0, + attempted: options.onlyMessageId ? 1 : 0, + delivered: 0, + failed: 1, + lastDelivery: { + delivered: false, + accepted: false, + responsePending: false, + reason: 'opencode_member_inbox_relay_timed_out', + diagnostics: [diagnostic], + }, + diagnostics: [diagnostic], + }; } finally { if (this.openCodeMemberInboxRelayInFlight.get(relayKey) === work) { this.openCodeMemberInboxRelayInFlight.delete(relayKey); @@ -24321,7 +24458,23 @@ export class TeamProvisioningService { async relayLeadInboxMessages(teamName: string): Promise { const existing = this.leadInboxRelayInFlight.get(teamName); if (existing) { - return existing; + try { + return await this.waitForInboxRelayInFlight({ + promise: existing, + relayName: 'lead_inbox_relay', + relayKey: teamName, + }); + } catch (error) { + if (!isInboxRelayInFlightTimeoutError(error)) { + throw error; + } + logger.warn(`[${teamName}] lead_inbox_relay_timed_out: ${getErrorMessage(error)}`); + return 0; + } finally { + if (this.leadInboxRelayInFlight.get(teamName) === existing) { + this.leadInboxRelayInFlight.delete(teamName); + } + } } const work = (async (): Promise => { @@ -24877,7 +25030,17 @@ export class TeamProvisioningService { this.leadInboxRelayInFlight.set(teamName, work); try { - return await work; + return await this.waitForInboxRelayInFlight({ + promise: work, + relayName: 'lead_inbox_relay', + relayKey: teamName, + }); + } catch (error) { + if (!isInboxRelayInFlightTimeoutError(error)) { + throw error; + } + logger.warn(`[${teamName}] lead_inbox_relay_timed_out: ${getErrorMessage(error)}`); + return 0; } finally { if (this.leadInboxRelayInFlight.get(teamName) === work) { this.leadInboxRelayInFlight.delete(teamName); diff --git a/test/main/services/team/TeamProvisioningServiceRelay.test.ts b/test/main/services/team/TeamProvisioningServiceRelay.test.ts index 5c16db1c..bd68f65f 100644 --- a/test/main/services/team/TeamProvisioningServiceRelay.test.ts +++ b/test/main/services/team/TeamProvisioningServiceRelay.test.ts @@ -1630,6 +1630,81 @@ Messages: expect(payload).toContain('Please retry with logging enabled.'); }); + it('prioritizes member work-sync nudges over older ordinary member relay rows', async () => { + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + seedConfig(teamName); + seedMemberInbox(teamName, 'alice', [ + ...Array.from({ length: 11 }, (_, index) => ({ + from: 'team-lead', + text: `Routine relay row ${index + 1}.`, + timestamp: `2026-02-23T10:${String(index).padStart(2, '0')}:00.000Z`, + read: false, + messageId: `m-ordinary-${index + 1}`, + })), + { + from: 'system', + text: 'Call member_work_sync_status, then member_work_sync_report.', + timestamp: '2026-02-23T10:30:00.000Z', + read: false, + messageId: 'm-work-sync-late', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + }, + ]); + + const { writeSpy } = attachAliveRun(service, teamName); + const relayed = await service.relayMemberInboxMessages(teamName, 'alice'); + + expect(relayed).toBe(10); + const payload = String(writeSpy.mock.calls[0]?.[0] ?? ''); + expect(payload).toContain('1) From: system'); + expect(payload).toContain('MessageId: m-work-sync-late'); + expect(payload).toContain('Message kind: member_work_sync_nudge'); + expect(payload).not.toContain('MessageId: m-ordinary-11'); + }); + + it('retries a work-sync nudge after member relay times out before stdin write completes', async () => { + vi.useFakeTimers(); + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + try { + seedConfig(teamName); + seedMemberInbox(teamName, 'alice', [ + { + from: 'system', + text: 'Call member_work_sync_status, then member_work_sync_report.', + timestamp: '2026-02-23T10:00:00.000Z', + read: false, + messageId: 'm-work-sync-retry', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + }, + ]); + + const { writeSpy } = attachAliveRun(service, teamName); + writeSpy.mockImplementationOnce(() => true); + + const firstRelay = service.relayMemberInboxMessages(teamName, 'alice'); + await vi.advanceTimersByTimeAsync(0); + expect(writeSpy).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(120_000); + await expect(firstRelay).resolves.toBe(0); + vi.mocked(console.warn).mockClear(); + + const secondRelay = await service.relayMemberInboxMessages(teamName, 'alice'); + + expect(secondRelay).toBe(1); + expect(writeSpy).toHaveBeenCalledTimes(2); + const secondPayload = String(writeSpy.mock.calls[1]?.[0] ?? ''); + expect(secondPayload).toContain('MessageId: m-work-sync-retry'); + expect(secondPayload).toContain('Message kind: member_work_sync_nudge'); + } finally { + vi.useRealTimers(); + } + }); + it('marks exact teammate relay copies with relayOfMessageId', async () => { const service = new TeamProvisioningService(); const teamName = 'my-team'; @@ -3831,6 +3906,129 @@ Messages: } }); + it('times out a hung existing OpenCode member relay in-flight lock', async () => { + vi.useFakeTimers(); + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + const relayKey = `opencode:${teamName}:jack`; + try { + ( + service as unknown as { + openCodeMemberInboxRelayInFlight: Map>; + } + ).openCodeMemberInboxRelayInFlight.set(relayKey, new Promise(() => undefined)); + + const relay = service.relayOpenCodeMemberInboxMessages(teamName, 'jack'); + await vi.advanceTimersByTimeAsync(120_000); + + await expect(relay).resolves.toMatchObject({ + attempted: 0, + delivered: 0, + failed: 1, + lastDelivery: { + delivered: false, + accepted: false, + responsePending: false, + reason: 'opencode_member_inbox_relay_timed_out', + }, + }); + expect( + ( + service as unknown as { + openCodeMemberInboxRelayInFlight: Map>; + } + ).openCodeMemberInboxRelayInFlight.has(relayKey) + ).toBe(false); + expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain( + 'opencode_member_inbox_relay_timed_out' + ); + vi.mocked(console.warn).mockClear(); + } finally { + vi.useRealTimers(); + } + }); + + it('times out a hung existing lead relay in-flight lock', async () => { + vi.useFakeTimers(); + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + try { + ( + service as unknown as { + leadInboxRelayInFlight: Map>; + } + ).leadInboxRelayInFlight.set(teamName, new Promise(() => undefined)); + + const relay = service.relayLeadInboxMessages(teamName); + await vi.advanceTimersByTimeAsync(120_000); + + await expect(relay).resolves.toBe(0); + expect( + ( + service as unknown as { + leadInboxRelayInFlight: Map>; + } + ).leadInboxRelayInFlight.has(teamName) + ).toBe(false); + expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain( + 'lead_inbox_relay_timed_out' + ); + vi.mocked(console.warn).mockClear(); + } finally { + vi.useRealTimers(); + } + }); + + it('times out a hung existing member relay in-flight lock', async () => { + vi.useFakeTimers(); + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + const relayKey = `${teamName}:alice`; + try { + ( + service as unknown as { + memberInboxRelayInFlight: Map>; + } + ).memberInboxRelayInFlight.set(relayKey, new Promise(() => undefined)); + + const relay = service.relayMemberInboxMessages(teamName, 'alice'); + await vi.advanceTimersByTimeAsync(120_000); + + await expect(relay).resolves.toBe(0); + expect( + ( + service as unknown as { + memberInboxRelayInFlight: Map>; + } + ).memberInboxRelayInFlight.has(relayKey) + ).toBe(false); + expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain( + 'member_inbox_relay_timed_out' + ); + vi.mocked(console.warn).mockClear(); + } finally { + vi.useRealTimers(); + } + }); + + it('does not convert non-timeout member relay failures into timeout results', async () => { + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + const relayKey = `${teamName}:alice`; + const rejected = Promise.reject(new Error('relay failed')); + rejected.catch(() => undefined); + ( + service as unknown as { + memberInboxRelayInFlight: Map>; + } + ).memberInboxRelayInFlight.set(relayKey, rejected); + + await expect(service.relayMemberInboxMessages(teamName, 'alice')).rejects.toThrow( + 'relay failed' + ); + expect(vi.mocked(console.warn)).not.toHaveBeenCalled(); + }); + it('treats an already-read specific OpenCode inbox row as delivered for UI-send relay', async () => { const service = new TeamProvisioningService(); const teamName = 'my-team';