From bfcdff5955eabde6d42f258612a4108ac03e2ca5 Mon Sep 17 00:00:00 2001 From: iliya Date: Tue, 10 Mar 2026 16:06:28 +0200 Subject: [PATCH] feat: improve cross-team message handling and delivery tracking - Added functionality to mark delivered cross-team messages as read and restore reply hints in TeamProvisioningService. - Introduced a new method to extract user text from incoming messages for better processing of native teammate messages. - Removed outdated methods related to cross-team target team parsing to streamline the service. - Enhanced tests to validate the new message handling and ensure proper functionality in live scenarios. --- .../services/team/TeamProvisioningService.ts | 248 ++++++++++-------- ...eamProvisioningServiceLiveMessages.test.ts | 59 +++++ .../team/TeamProvisioningServiceRelay.test.ts | 66 +++-- 3 files changed, 235 insertions(+), 138 deletions(-) diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index 8be2ca02..5430a775 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -33,6 +33,7 @@ import { parseCliArgs } from '@shared/utils/cliArgsParser'; import { isInboxNoiseMessage } from '@shared/utils/inboxNoise'; import { createLogger } from '@shared/utils/logger'; import { formatTaskDisplayLabel } from '@shared/utils/taskIdentity'; +import { parseAllTeammateMessages } from '@shared/utils/teammateMessageParser'; import { createCliAutoSuffixNameGuard } from '@shared/utils/teamMemberName'; import { extractToolPreview, formatToolSummaryFromCalls } from '@shared/utils/toolSummary'; import * as agentTeamsControllerModule from 'agent-teams-controller'; @@ -1082,7 +1083,6 @@ function isTransientProbeWarning(warning: string): boolean { export class TeamProvisioningService { private static readonly CLAUDE_LOG_LINES_LIMIT = 50_000; - private static readonly PENDING_CROSS_TEAM_REPLY_TTL_MS = 10 * 60 * 1000; private readonly runs = new Map(); private readonly activeByTeam = new Map(); @@ -1329,29 +1329,6 @@ export class TeamProvisioningService { return `${otherTeam.trim()}\0${conversationId.trim()}`; } - private parseCrossTeamTargetTeam(value: string | undefined): string | null { - if (typeof value !== 'string') return null; - const trimmed = value.trim(); - if (!trimmed) return null; - if (trimmed.startsWith('cross-team:')) { - const teamName = trimmed.slice('cross-team:'.length).trim(); - return TEAM_NAME_PATTERN.test(teamName) ? teamName : null; - } - const dot = trimmed.indexOf('.'); - if (dot <= 0) return null; - const teamName = trimmed.slice(0, dot).trim(); - return TEAM_NAME_PATTERN.test(teamName) ? teamName : null; - } - - private getCrossTeamSourceTeam(value: string | undefined): string | null { - if (typeof value !== 'string') return null; - const trimmed = value.trim(); - const dot = trimmed.indexOf('.'); - if (dot <= 0) return null; - const teamName = trimmed.slice(0, dot).trim(); - return TEAM_NAME_PATTERN.test(teamName) ? teamName : null; - } - registerPendingCrossTeamReplyExpectation( teamName: string, otherTeam: string, @@ -1383,20 +1360,136 @@ export class TeamProvisioningService { } } - private getPendingCrossTeamReplyExpectationKeys(teamName: string): Set { - const teamMap = this.pendingCrossTeamFirstReplies.get(teamName.trim()); - if (!teamMap) return new Set(); - const cutoff = Date.now() - TeamProvisioningService.PENDING_CROSS_TEAM_REPLY_TTL_MS; - for (const [key, createdAt] of teamMap.entries()) { - if (createdAt < cutoff) { - teamMap.delete(key); - } + private getRunLeadName(run: ProvisioningRun): string { + return ( + run.request.members.find((m) => m.role?.toLowerCase().includes('lead'))?.name || 'team-lead' + ); + } + + private extractStreamUserText(msg: Record): string | null { + const topLevelContent = msg.content; + if (typeof topLevelContent === 'string') { + return topLevelContent; } - if (teamMap.size === 0) { - this.pendingCrossTeamFirstReplies.delete(teamName.trim()); - return new Set(); + if (Array.isArray(topLevelContent)) { + const text = topLevelContent + .filter( + (part): part is Record => + !!part && + typeof part === 'object' && + part.type === 'text' && + typeof part.text === 'string' + ) + .map((part) => part.text as string) + .join('\n') + .trim(); + if (text.length > 0) return text; } - return new Set(teamMap.keys()); + + const message = msg.message; + if (!message || typeof message !== 'object') return null; + const innerContent = (message as Record).content; + if (typeof innerContent === 'string') { + const trimmed = innerContent.trim(); + return trimmed.length > 0 ? trimmed : null; + } + if (!Array.isArray(innerContent)) return null; + const text = innerContent + .filter( + (part): part is Record => + !!part && + typeof part === 'object' && + part.type === 'text' && + typeof part.text === 'string' + ) + .map((part) => part.text as string) + .join('\n') + .trim(); + return text.length > 0 ? text : null; + } + + private async markDeliveredCrossTeamLeadMessagesRead( + teamName: string, + leadName: string, + deliveredBlocks: Array<{ + teammateId: string; + content: string; + conversationId: string; + }> + ): Promise { + if (deliveredBlocks.length === 0) return; + + let leadInboxMessages: Awaited> = []; + try { + leadInboxMessages = await this.inboxReader.getMessagesFor(teamName, leadName); + } catch { + return; + } + + const toMark: InboxMessage[] = []; + for (const block of deliveredBlocks) { + const matchesBlock = (message: InboxMessage, requireExactText: boolean): boolean => { + if (message.read || message.source !== CROSS_TEAM_SOURCE) return false; + if (!this.hasStableMessageId(message)) return false; + if (message.from.trim() !== block.teammateId.trim()) return false; + const messageConversationId = + message.replyToConversationId?.trim() ?? + message.conversationId?.trim() ?? + parseCrossTeamPrefix(message.text)?.conversationId; + if (messageConversationId !== block.conversationId) return false; + return !requireExactText || message.text.trim() === block.content.trim(); + }; + const matched = + leadInboxMessages.find((message) => matchesBlock(message, true)) ?? + leadInboxMessages.find((message) => matchesBlock(message, false)); + if (!matched) continue; + matched.read = true; + toMark.push(matched); + } + + if (toMark.length === 0) return; + + try { + await this.markInboxMessagesRead(teamName, leadName, toMark); + } catch { + // best-effort + } + } + + private handleNativeTeammateUserMessage( + run: ProvisioningRun, + msg: Record + ): void { + const rawText = this.extractStreamUserText(msg); + if (!rawText) return; + + const blocks = parseAllTeammateMessages(rawText); + if (blocks.length === 0) return; + + const crossTeamBlocks = blocks.flatMap((block) => { + const origin = parseCrossTeamPrefix(block.content); + const sourceTeam = origin?.from.includes('.') ? origin.from.split('.', 1)[0] : null; + const conversationId = + origin?.conversationId?.trim() || origin?.replyToConversationId?.trim(); + if (!sourceTeam || !conversationId) return []; + return [ + { + teammateId: block.teammateId, + content: block.content, + toTeam: sourceTeam, + conversationId, + }, + ]; + }); + if (crossTeamBlocks.length === 0) return; + + run.activeCrossTeamReplyHints = crossTeamBlocks.map((block) => ({ + toTeam: block.toTeam, + conversationId: block.conversationId, + })); + + const leadName = this.getRunLeadName(run); + void this.markDeliveredCrossTeamLeadMessagesRead(run.teamName, leadName, crossTeamBlocks); } private persistSentMessage(teamName: string, message: InboxMessage): void { @@ -2953,74 +3046,14 @@ export class TeamProvisioningService { if (unread.length === 0) return 0; - const latestOutboundByConversation = new Map(); - const latestReadInboundByConversation = new Map(); - for (const message of leadInboxMessages) { - const timestampMs = Date.parse(message.timestamp); - if (!Number.isFinite(timestampMs)) continue; - if (message.source === CROSS_TEAM_SENT_SOURCE) { - const conversationId = message.conversationId?.trim(); - const targetTeam = this.parseCrossTeamTargetTeam(message.to); - if (!conversationId || !targetTeam) continue; - const key = this.buildCrossTeamConversationKey(targetTeam, conversationId); - latestOutboundByConversation.set( - key, - Math.max(latestOutboundByConversation.get(key) ?? 0, timestampMs) - ); - continue; - } - if (message.source === CROSS_TEAM_SOURCE && message.read) { - const conversationId = - message.replyToConversationId?.trim() ?? - message.conversationId?.trim() ?? - parseCrossTeamPrefix(message.text)?.conversationId; - const sourceTeam = this.getCrossTeamSourceTeam(message.from); - if (!conversationId || !sourceTeam) continue; - const key = this.buildCrossTeamConversationKey(sourceTeam, conversationId); - latestReadInboundByConversation.set( - key, - Math.max(latestReadInboundByConversation.get(key) ?? 0, timestampMs) - ); - } - } - const pendingHistoricalReplies = new Set( - Array.from(latestOutboundByConversation.entries()) - .filter(([key, sentAtMs]) => sentAtMs > (latestReadInboundByConversation.get(key) ?? 0)) - .map(([key]) => key) - ); - const pendingTransientReplies = this.getPendingCrossTeamReplyExpectationKeys(teamName); - const matchedTransientReplyKeys = new Set(); - - const isCrossTeamReplyToOwnOutbound = (message: InboxMessage): boolean => { - if (message.source !== CROSS_TEAM_SOURCE) return false; - const conversationId = - message.replyToConversationId?.trim() ?? - message.conversationId?.trim() ?? - parseCrossTeamPrefix(message.text)?.conversationId; - if (!conversationId) return false; - const sourceTeam = this.getCrossTeamSourceTeam(message.from); - if (!sourceTeam) return false; - const key = this.buildCrossTeamConversationKey(sourceTeam, conversationId); - if (pendingHistoricalReplies.has(key)) { - return true; - } - if (pendingTransientReplies.has(key)) { - matchedTransientReplyKeys.add(key); - return true; - } - return false; - }; - // Ignore (and auto-mark read) internal coordination noise like idle/shutdown messages. // Also ignore local sender-copy rows for cross-team traffic: those exist only so the UI // can show outbound activity and must not be re-injected into the live lead as new work. - // Incoming replies to our own outbound cross-team conversations should also remain visible - // in team history without waking the local lead into a reply loop. + // Incoming cross-team deliveries are handled through Claude's native + // path and are marked read when that raw user turn is observed, so we intentionally do not + // custom-relay them here. const ignoredUnread = unread.filter( - (m) => - isInboxNoiseMessage(m.text) || - m.source === CROSS_TEAM_SENT_SOURCE || - isCrossTeamReplyToOwnOutbound(m) + (m) => isInboxNoiseMessage(m.text) || m.source === CROSS_TEAM_SENT_SOURCE ); if (ignoredUnread.length > 0) { try { @@ -3028,19 +3061,13 @@ export class TeamProvisioningService { } catch { // best-effort } - for (const key of matchedTransientReplyKeys) { - const [otherTeam, conversationId] = key.split('\0'); - if (otherTeam && conversationId) { - this.clearPendingCrossTeamReplyExpectation(teamName, otherTeam, conversationId); - } - } } const actionableUnread = unread.filter( (m) => !isInboxNoiseMessage(m.text) && m.source !== CROSS_TEAM_SENT_SOURCE && - !isCrossTeamReplyToOwnOutbound(m) + m.source !== CROSS_TEAM_SOURCE ); if (actionableUnread.length === 0) return 0; @@ -3621,8 +3648,7 @@ export class TeamProvisioningService { */ private pushLiveLeadTextMessage(run: ProvisioningRun, cleanText: string): void { run.leadMsgSeq += 1; - const leadName = - run.request.members.find((m) => m.role?.toLowerCase().includes('lead'))?.name || 'team-lead'; + const leadName = this.getRunLeadName(run); const messageId = `lead-turn-${run.runId}-${run.leadMsgSeq}`; // Attach accumulated tool call details from preceding tool_use messages, then reset. const toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined; @@ -3687,6 +3713,10 @@ export class TeamProvisioningService { // stream-json output has various message types: // {"type":"assistant","content":[{"type":"text","text":"..."},...]} // {"type":"result","subtype":"success",...} + if (msg.type === 'user') { + this.handleNativeTeammateUserMessage(run, msg); + return; + } if (msg.type === 'assistant') { const content = Array.isArray(msg.content) ? (msg.content as Record[]) diff --git a/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts b/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts index f94aaef2..591b7c05 100644 --- a/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts +++ b/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts @@ -73,11 +73,27 @@ vi.mock('../../../../src/main/services/team/atomicWrite', () => ({ atomicWriteAsync: hoisted.atomicWrite, })); +vi.mock('../../../../src/main/services/team/fileLock', () => ({ + withFileLock: async (_filePath: string, fn: () => Promise) => await fn(), +})); + +vi.mock('../../../../src/main/services/team/inboxLock', () => ({ + withInboxLock: async (_filePath: string, fn: () => Promise) => await fn(), +})); + vi.mock('../../../../src/main/utils/pathDecoder', async (importOriginal) => { const actual = await importOriginal(); return { ...actual, getTeamsBasePath: () => '/mock/teams' }; }); +vi.mock('../../../../src/main/utils/fsRead', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + readFileUtf8WithTimeout: hoisted.readFile, + }; +}); + vi.mock('agent-teams-controller', () => ({ createController: ({ teamName }: { teamName: string }) => ({ messages: { @@ -102,6 +118,10 @@ function seedConfig(teamName: string): void { ); } +function seedLeadInbox(teamName: string, messages: unknown[]): void { + hoisted.files.set(`/mock/teams/${teamName}/inboxes/team-lead.json`, JSON.stringify(messages)); +} + interface RunLike { runId: string; teamName: string; @@ -568,6 +588,45 @@ describe('TeamProvisioningService pre-ready live messages', () => { expect(hoisted.sendInboxMessage).not.toHaveBeenCalled(); }); + it('marks native cross-team teammate-message deliveries as read and restores reply hints', async () => { + const service = new TeamProvisioningService(); + seedConfig('my-team'); + seedLeadInbox('my-team', [ + { + from: 'other-team.team-lead', + to: 'team-lead', + text: '\nНативная доставка.', + timestamp: '2026-02-23T10:01:00.000Z', + read: false, + source: 'cross_team', + messageId: 'm-native-cross-team-1', + conversationId: 'conv-native-1', + replyToConversationId: 'conv-native-1', + }, + ]); + const run = attachRun(service, 'my-team', { provisioningComplete: true }); + + callHandleStreamJsonMessage(service, run, { + type: 'user', + message: { + role: 'user', + content: + '\nНативная доставка.', + }, + }); + + await vi.waitFor(() => { + const updatedInbox = JSON.parse( + hoisted.files.get('/mock/teams/my-team/inboxes/team-lead.json') ?? '[]' + ) as Array<{ read?: boolean }>; + expect(updatedInbox[0]?.read).toBe(true); + }); + + expect(run.activeCrossTeamReplyHints).toEqual([ + { toTeam: 'other-team', conversationId: 'conv-native-1' }, + ]); + }); + it('rescues mistaken cross_team_send recipients into actual cross-team replies', async () => { const service = new TeamProvisioningService(); seedConfig('my-team'); diff --git a/test/main/services/team/TeamProvisioningServiceRelay.test.ts b/test/main/services/team/TeamProvisioningServiceRelay.test.ts index b9a79f8c..5f60969b 100644 --- a/test/main/services/team/TeamProvisioningServiceRelay.test.ts +++ b/test/main/services/team/TeamProvisioningServiceRelay.test.ts @@ -88,6 +88,14 @@ vi.mock('../../../../src/main/services/team/atomicWrite', () => ({ atomicWriteAsync: hoisted.atomicWrite, })); +vi.mock('../../../../src/main/services/team/fileLock', () => ({ + withFileLock: async (_filePath: string, fn: () => Promise) => await fn(), +})); + +vi.mock('../../../../src/main/services/team/inboxLock', () => ({ + withInboxLock: async (_filePath: string, fn: () => Promise) => await fn(), +})); + vi.mock('../../../../src/main/utils/pathDecoder', async (importOriginal) => { const actual = await importOriginal(); return { @@ -96,6 +104,14 @@ vi.mock('../../../../src/main/utils/pathDecoder', async (importOriginal) => { }; }); +vi.mock('../../../../src/main/utils/fsRead', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + readFileUtf8WithTimeout: hoisted.readFile, + }; +}); + vi.mock('agent-teams-controller', () => ({ createController: ({ teamName }: { teamName: string }) => ({ messages: { @@ -339,7 +355,7 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => { expect(service.resolveCrossTeamReplyMetadata(teamName, 'other-team')).toBeNull(); }); - it('includes explicit cross-team reply instructions in lead relay prompts', async () => { + it('does not custom-relay incoming cross-team lead inbox messages', async () => { const service = new TeamProvisioningService(); const teamName = 'my-team'; seedConfig(teamName); @@ -357,24 +373,17 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => { ]); const { writeSpy } = attachAliveRun(service, teamName); - const relayPromise = service.relayLeadInboxMessages(teamName); - const run = await waitForCapture(service); - expect(run?.leadRelayCapture).toBeTruthy(); + const relayed = await service.relayLeadInboxMessages(teamName); - const payload = String(writeSpy.mock.calls[0]?.[0] ?? ''); - expect(payload).toContain('Source: cross_team'); - expect(payload).toContain('Cross-team conversationId: conv-explicit'); - expect(payload).toContain('Call the MCP tool named cross_team_send with toTeam=\\"other-team\\"'); - expect(payload).toContain('replyToConversationId=\\"conv-explicit\\"'); - expect(payload).toContain('NEVER set recipient/to to \\"cross_team_send\\"'); + expect(relayed).toBe(0); + expect(writeSpy).toHaveBeenCalledTimes(0); - (service as any).handleStreamJsonMessage(run, { - type: 'assistant', - content: [{ type: 'text', text: 'Replying properly.' }], - }); - (service as any).handleStreamJsonMessage(run, { type: 'result', subtype: 'success' }); - - await relayPromise; + const updatedInbox = JSON.parse( + hoisted.files.get(`/mock/teams/${teamName}/inboxes/team-lead.json`) ?? '[]' + ) as Array<{ messageId?: string; read?: boolean }>; + expect(updatedInbox).toHaveLength(1); + expect(updatedInbox[0]?.messageId).toBe('m-cross-team-explicit'); + expect(updatedInbox[0]?.read).toBe(false); }); it('does not relay cross-team sender copies back into the live lead', async () => { @@ -473,7 +482,7 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => { expect(writeSpy).toHaveBeenCalledTimes(0); }); - it('relays later follow-up messages after the first reply in a conversation was already received', async () => { + it('leaves later cross-team follow-up messages for the native teammate-message path', async () => { const service = new TeamProvisioningService(); const teamName = 'my-team'; seedConfig(teamName); @@ -513,18 +522,17 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => { ]); const { writeSpy } = attachAliveRun(service, teamName); - const relayPromise = service.relayLeadInboxMessages(teamName); - const run = await waitForCapture(service); - expect(run?.leadRelayCapture).toBeTruthy(); - (service as any).handleStreamJsonMessage(run, { - type: 'assistant', - content: [{ type: 'text', text: 'I will answer the follow-up.' }], - }); - (service as any).handleStreamJsonMessage(run, { type: 'result', subtype: 'success' }); + const relayed = await service.relayLeadInboxMessages(teamName); - const relayed = await relayPromise; - expect(relayed).toBe(1); - expect(writeSpy).toHaveBeenCalledTimes(1); + expect(relayed).toBe(0); + expect(writeSpy).toHaveBeenCalledTimes(0); + + const updatedInbox = JSON.parse( + hoisted.files.get(`/mock/teams/${teamName}/inboxes/team-lead.json`) ?? '[]' + ) as Array<{ messageId?: string; read?: boolean }>; + expect(updatedInbox).toHaveLength(3); + expect(updatedInbox[2]?.messageId).toBe('m-cross-team-followup'); + expect(updatedInbox[2]?.read).toBe(false); }); it('relays unread teammate inbox messages through the live team process', async () => {