fix(team): bound inbox relay waits

This commit is contained in:
777genius 2026-06-03 22:47:38 +03:00
parent 6d0b06e2b1
commit 5366dbb34a
2 changed files with 368 additions and 7 deletions

View file

@ -3286,6 +3286,17 @@ interface OpenCodeMemberInboxDelivery {
userVisibleImpact?: OpenCodeRuntimeDeliveryUserVisibleImpact;
}
class InboxRelayInFlightTimeoutError extends Error {
constructor(message: string) {
super(message);
this.name = 'InboxRelayInFlightTimeoutError';
}
}
function isInboxRelayInFlightTimeoutError(error: unknown): error is InboxRelayInFlightTimeoutError {
return error instanceof InboxRelayInFlightTimeoutError;
}
type OpenCodeVisibleReplyCorrelation = NonNullable<
OpenCodePromptDeliveryLedgerRecord['visibleReplyCorrelation']
>;
@ -3444,6 +3455,15 @@ function getOpenCodeInboxRelayPriority(
return 0;
}
function getMemberInboxRelayPriority(
message: Pick<InboxMessage, 'messageKind' | 'source'>
): number {
if (message.messageKind === 'member_work_sync_nudge') {
return 30;
}
return 0;
}
function getLeadInboxRelayPriority(message: Pick<InboxMessage, 'messageKind'>): number {
if (message.messageKind === 'member_work_sync_nudge') {
return 30;
@ -3478,6 +3498,13 @@ function compareOpenCodeInboxRelayMessagesByPriority(
return compareInboxRelayMessages(a, b, getOpenCodeInboxRelayPriority);
}
function compareMemberInboxRelayMessagesByPriority(
a: Pick<InboxMessage, 'messageKind' | 'source' | 'timestamp'> & { messageId: string },
b: Pick<InboxMessage, 'messageKind' | 'source' | 'timestamp'> & { messageId: string }
): number {
return compareInboxRelayMessages(a, b, getMemberInboxRelayPriority);
}
function compareLeadInboxRelayMessagesByPriority(
a: Pick<InboxMessage, 'messageKind' | 'source' | 'timestamp'> & { messageId: string },
b: Pick<InboxMessage, 'messageKind' | 'source' | 'timestamp'> & { messageId: string }
@ -3527,6 +3554,7 @@ export class TeamProvisioningService {
private static readonly RETAINED_PROVISIONING_PROGRESS_TTL_MS = 5 * 60_000;
private static readonly OPENCODE_RUNTIME_DELIVERY_ADVISORY_EVENT_TTL_MS = 24 * 60 * 60_000;
private static readonly OPENCODE_RUNTIME_DELIVERY_LEAD_NOTICE_TTL_MS = 24 * 60 * 60_000;
private static readonly INBOX_RELAY_IN_FLIGHT_TIMEOUT_MS = 2 * 60_000;
private readonly runs = new Map<string, ProvisioningRun>();
private readonly provisioningRunByTeam = new Map<string, string>();
@ -11661,6 +11689,33 @@ export class TeamProvisioningService {
return `opencode:${this.getMemberRelayKey(teamName, memberName)}`;
}
private async waitForInboxRelayInFlight<T>(input: {
promise: Promise<T>;
relayName: string;
relayKey: string;
}): Promise<T> {
let timer: ReturnType<typeof setTimeout> | null = null;
try {
return await Promise.race([
input.promise,
new Promise<never>((_, reject) => {
timer = setTimeout(() => {
reject(
new InboxRelayInFlightTimeoutError(
`${input.relayName} timed out after ${TeamProvisioningService.INBOX_RELAY_IN_FLIGHT_TIMEOUT_MS}ms: ${input.relayKey}`
)
);
}, TeamProvisioningService.INBOX_RELAY_IN_FLIGHT_TIMEOUT_MS);
timer.unref?.();
}),
]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
private normalizeRelayCandidateText(text: string): string {
return stripAgentBlocks(String(text)).trim().replace(/\r\n/g, '\n');
}
@ -23201,7 +23256,23 @@ export class TeamProvisioningService {
const relayKey = this.getMemberRelayKey(teamName, memberName);
const existing = this.memberInboxRelayInFlight.get(relayKey);
if (existing) {
return existing;
try {
return await this.waitForInboxRelayInFlight({
promise: existing,
relayName: 'member_inbox_relay',
relayKey,
});
} catch (error) {
if (!isInboxRelayInFlightTimeoutError(error)) {
throw error;
}
logger.warn(`[${teamName}] member_inbox_relay_timed_out: ${getErrorMessage(error)}`);
return 0;
} finally {
if (this.memberInboxRelayInFlight.get(relayKey) === existing) {
this.memberInboxRelayInFlight.delete(relayKey);
}
}
}
const work = (async (): Promise<number> => {
@ -23274,7 +23345,9 @@ export class TeamProvisioningService {
if (actionableUnread.length === 0) return 0;
const MAX_RELAY = 10;
const batch = actionableUnread.slice(0, MAX_RELAY);
const batch = [...actionableUnread]
.sort(compareMemberInboxRelayMessagesByPriority)
.slice(0, MAX_RELAY);
this.armSilentTeammateForward(run, memberName, 'member_inbox_relay');
const rememberedRelayIds = this.rememberPendingInboxRelayCandidates(run, memberName, batch);
@ -23355,7 +23428,17 @@ export class TeamProvisioningService {
this.memberInboxRelayInFlight.set(relayKey, work);
try {
return await work;
return await this.waitForInboxRelayInFlight({
promise: work,
relayName: 'member_inbox_relay',
relayKey,
});
} catch (error) {
if (!isInboxRelayInFlightTimeoutError(error)) {
throw error;
}
logger.warn(`[${teamName}] member_inbox_relay_timed_out: ${getErrorMessage(error)}`);
return 0;
} finally {
if (this.memberInboxRelayInFlight.get(relayKey) === work) {
this.memberInboxRelayInFlight.delete(relayKey);
@ -23521,7 +23604,37 @@ export class TeamProvisioningService {
if (existing) {
const onlyMessageId = options.onlyMessageId?.trim();
if (!onlyMessageId) {
return existing;
try {
return await this.waitForInboxRelayInFlight({
promise: existing,
relayName: 'opencode_member_inbox_relay',
relayKey,
});
} catch (error) {
if (!isInboxRelayInFlightTimeoutError(error)) {
throw error;
}
const diagnostic = `opencode_member_inbox_relay_timed_out: ${getErrorMessage(error)}`;
logger.warn(`[${teamName}] ${diagnostic}`);
return {
relayed: 0,
attempted: 0,
delivered: 0,
failed: 1,
lastDelivery: {
delivered: false,
accepted: false,
responsePending: false,
reason: 'opencode_member_inbox_relay_timed_out',
diagnostics: [diagnostic],
},
diagnostics: [diagnostic],
};
} finally {
if (this.openCodeMemberInboxRelayInFlight.get(relayKey) === existing) {
this.openCodeMemberInboxRelayInFlight.delete(relayKey);
}
}
}
const inboxMessages = await this.inboxReader
.getMessagesFor(teamName, memberName)
@ -24012,7 +24125,31 @@ export class TeamProvisioningService {
this.openCodeMemberInboxRelayInFlight.set(relayKey, work);
try {
return await work;
return await this.waitForInboxRelayInFlight({
promise: work,
relayName: 'opencode_member_inbox_relay',
relayKey,
});
} catch (error) {
if (!isInboxRelayInFlightTimeoutError(error)) {
throw error;
}
const diagnostic = `opencode_member_inbox_relay_timed_out: ${getErrorMessage(error)}`;
logger.warn(`[${teamName}] ${diagnostic}`);
return {
relayed: 0,
attempted: options.onlyMessageId ? 1 : 0,
delivered: 0,
failed: 1,
lastDelivery: {
delivered: false,
accepted: false,
responsePending: false,
reason: 'opencode_member_inbox_relay_timed_out',
diagnostics: [diagnostic],
},
diagnostics: [diagnostic],
};
} finally {
if (this.openCodeMemberInboxRelayInFlight.get(relayKey) === work) {
this.openCodeMemberInboxRelayInFlight.delete(relayKey);
@ -24321,7 +24458,23 @@ export class TeamProvisioningService {
async relayLeadInboxMessages(teamName: string): Promise<number> {
const existing = this.leadInboxRelayInFlight.get(teamName);
if (existing) {
return existing;
try {
return await this.waitForInboxRelayInFlight({
promise: existing,
relayName: 'lead_inbox_relay',
relayKey: teamName,
});
} catch (error) {
if (!isInboxRelayInFlightTimeoutError(error)) {
throw error;
}
logger.warn(`[${teamName}] lead_inbox_relay_timed_out: ${getErrorMessage(error)}`);
return 0;
} finally {
if (this.leadInboxRelayInFlight.get(teamName) === existing) {
this.leadInboxRelayInFlight.delete(teamName);
}
}
}
const work = (async (): Promise<number> => {
@ -24877,7 +25030,17 @@ export class TeamProvisioningService {
this.leadInboxRelayInFlight.set(teamName, work);
try {
return await work;
return await this.waitForInboxRelayInFlight({
promise: work,
relayName: 'lead_inbox_relay',
relayKey: teamName,
});
} catch (error) {
if (!isInboxRelayInFlightTimeoutError(error)) {
throw error;
}
logger.warn(`[${teamName}] lead_inbox_relay_timed_out: ${getErrorMessage(error)}`);
return 0;
} finally {
if (this.leadInboxRelayInFlight.get(teamName) === work) {
this.leadInboxRelayInFlight.delete(teamName);

View file

@ -1630,6 +1630,81 @@ Messages:
expect(payload).toContain('Please retry with logging enabled.');
});
it('prioritizes member work-sync nudges over older ordinary member relay rows', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
seedConfig(teamName);
seedMemberInbox(teamName, 'alice', [
...Array.from({ length: 11 }, (_, index) => ({
from: 'team-lead',
text: `Routine relay row ${index + 1}.`,
timestamp: `2026-02-23T10:${String(index).padStart(2, '0')}:00.000Z`,
read: false,
messageId: `m-ordinary-${index + 1}`,
})),
{
from: 'system',
text: 'Call member_work_sync_status, then member_work_sync_report.',
timestamp: '2026-02-23T10:30:00.000Z',
read: false,
messageId: 'm-work-sync-late',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
},
]);
const { writeSpy } = attachAliveRun(service, teamName);
const relayed = await service.relayMemberInboxMessages(teamName, 'alice');
expect(relayed).toBe(10);
const payload = String(writeSpy.mock.calls[0]?.[0] ?? '');
expect(payload).toContain('1) From: system');
expect(payload).toContain('MessageId: m-work-sync-late');
expect(payload).toContain('Message kind: member_work_sync_nudge');
expect(payload).not.toContain('MessageId: m-ordinary-11');
});
it('retries a work-sync nudge after member relay times out before stdin write completes', async () => {
vi.useFakeTimers();
const service = new TeamProvisioningService();
const teamName = 'my-team';
try {
seedConfig(teamName);
seedMemberInbox(teamName, 'alice', [
{
from: 'system',
text: 'Call member_work_sync_status, then member_work_sync_report.',
timestamp: '2026-02-23T10:00:00.000Z',
read: false,
messageId: 'm-work-sync-retry',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
},
]);
const { writeSpy } = attachAliveRun(service, teamName);
writeSpy.mockImplementationOnce(() => true);
const firstRelay = service.relayMemberInboxMessages(teamName, 'alice');
await vi.advanceTimersByTimeAsync(0);
expect(writeSpy).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(120_000);
await expect(firstRelay).resolves.toBe(0);
vi.mocked(console.warn).mockClear();
const secondRelay = await service.relayMemberInboxMessages(teamName, 'alice');
expect(secondRelay).toBe(1);
expect(writeSpy).toHaveBeenCalledTimes(2);
const secondPayload = String(writeSpy.mock.calls[1]?.[0] ?? '');
expect(secondPayload).toContain('MessageId: m-work-sync-retry');
expect(secondPayload).toContain('Message kind: member_work_sync_nudge');
} finally {
vi.useRealTimers();
}
});
it('marks exact teammate relay copies with relayOfMessageId', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
@ -3831,6 +3906,129 @@ Messages:
}
});
it('times out a hung existing OpenCode member relay in-flight lock', async () => {
vi.useFakeTimers();
const service = new TeamProvisioningService();
const teamName = 'my-team';
const relayKey = `opencode:${teamName}:jack`;
try {
(
service as unknown as {
openCodeMemberInboxRelayInFlight: Map<string, Promise<unknown>>;
}
).openCodeMemberInboxRelayInFlight.set(relayKey, new Promise(() => undefined));
const relay = service.relayOpenCodeMemberInboxMessages(teamName, 'jack');
await vi.advanceTimersByTimeAsync(120_000);
await expect(relay).resolves.toMatchObject({
attempted: 0,
delivered: 0,
failed: 1,
lastDelivery: {
delivered: false,
accepted: false,
responsePending: false,
reason: 'opencode_member_inbox_relay_timed_out',
},
});
expect(
(
service as unknown as {
openCodeMemberInboxRelayInFlight: Map<string, Promise<unknown>>;
}
).openCodeMemberInboxRelayInFlight.has(relayKey)
).toBe(false);
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
'opencode_member_inbox_relay_timed_out'
);
vi.mocked(console.warn).mockClear();
} finally {
vi.useRealTimers();
}
});
it('times out a hung existing lead relay in-flight lock', async () => {
vi.useFakeTimers();
const service = new TeamProvisioningService();
const teamName = 'my-team';
try {
(
service as unknown as {
leadInboxRelayInFlight: Map<string, Promise<number>>;
}
).leadInboxRelayInFlight.set(teamName, new Promise(() => undefined));
const relay = service.relayLeadInboxMessages(teamName);
await vi.advanceTimersByTimeAsync(120_000);
await expect(relay).resolves.toBe(0);
expect(
(
service as unknown as {
leadInboxRelayInFlight: Map<string, Promise<number>>;
}
).leadInboxRelayInFlight.has(teamName)
).toBe(false);
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
'lead_inbox_relay_timed_out'
);
vi.mocked(console.warn).mockClear();
} finally {
vi.useRealTimers();
}
});
it('times out a hung existing member relay in-flight lock', async () => {
vi.useFakeTimers();
const service = new TeamProvisioningService();
const teamName = 'my-team';
const relayKey = `${teamName}:alice`;
try {
(
service as unknown as {
memberInboxRelayInFlight: Map<string, Promise<number>>;
}
).memberInboxRelayInFlight.set(relayKey, new Promise(() => undefined));
const relay = service.relayMemberInboxMessages(teamName, 'alice');
await vi.advanceTimersByTimeAsync(120_000);
await expect(relay).resolves.toBe(0);
expect(
(
service as unknown as {
memberInboxRelayInFlight: Map<string, Promise<number>>;
}
).memberInboxRelayInFlight.has(relayKey)
).toBe(false);
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
'member_inbox_relay_timed_out'
);
vi.mocked(console.warn).mockClear();
} finally {
vi.useRealTimers();
}
});
it('does not convert non-timeout member relay failures into timeout results', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
const relayKey = `${teamName}:alice`;
const rejected = Promise.reject(new Error('relay failed'));
rejected.catch(() => undefined);
(
service as unknown as {
memberInboxRelayInFlight: Map<string, Promise<number>>;
}
).memberInboxRelayInFlight.set(relayKey, rejected);
await expect(service.relayMemberInboxMessages(teamName, 'alice')).rejects.toThrow(
'relay failed'
);
expect(vi.mocked(console.warn)).not.toHaveBeenCalled();
});
it('treats an already-read specific OpenCode inbox row as delivered for UI-send relay', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';