feat: improve cross-team message handling and delivery tracking

- Added functionality to mark delivered cross-team messages as read and restore reply hints in TeamProvisioningService.
- Introduced a new method to extract user text from incoming messages for better processing of native teammate messages.
- Removed outdated methods related to cross-team target team parsing to streamline the service.
- Enhanced tests to validate the new message handling and ensure proper functionality in live scenarios.
This commit is contained in:
iliya 2026-03-10 16:06:28 +02:00
parent c93f3a4181
commit bfcdff5955
3 changed files with 235 additions and 138 deletions

View file

@ -33,6 +33,7 @@ import { parseCliArgs } from '@shared/utils/cliArgsParser';
import { isInboxNoiseMessage } from '@shared/utils/inboxNoise';
import { createLogger } from '@shared/utils/logger';
import { formatTaskDisplayLabel } from '@shared/utils/taskIdentity';
import { parseAllTeammateMessages } from '@shared/utils/teammateMessageParser';
import { createCliAutoSuffixNameGuard } from '@shared/utils/teamMemberName';
import { extractToolPreview, formatToolSummaryFromCalls } from '@shared/utils/toolSummary';
import * as agentTeamsControllerModule from 'agent-teams-controller';
@ -1082,7 +1083,6 @@ function isTransientProbeWarning(warning: string): boolean {
export class TeamProvisioningService {
private static readonly CLAUDE_LOG_LINES_LIMIT = 50_000;
private static readonly PENDING_CROSS_TEAM_REPLY_TTL_MS = 10 * 60 * 1000;
private readonly runs = new Map<string, ProvisioningRun>();
private readonly activeByTeam = new Map<string, string>();
@ -1329,29 +1329,6 @@ export class TeamProvisioningService {
return `${otherTeam.trim()}\0${conversationId.trim()}`;
}
private parseCrossTeamTargetTeam(value: string | undefined): string | null {
if (typeof value !== 'string') return null;
const trimmed = value.trim();
if (!trimmed) return null;
if (trimmed.startsWith('cross-team:')) {
const teamName = trimmed.slice('cross-team:'.length).trim();
return TEAM_NAME_PATTERN.test(teamName) ? teamName : null;
}
const dot = trimmed.indexOf('.');
if (dot <= 0) return null;
const teamName = trimmed.slice(0, dot).trim();
return TEAM_NAME_PATTERN.test(teamName) ? teamName : null;
}
private getCrossTeamSourceTeam(value: string | undefined): string | null {
if (typeof value !== 'string') return null;
const trimmed = value.trim();
const dot = trimmed.indexOf('.');
if (dot <= 0) return null;
const teamName = trimmed.slice(0, dot).trim();
return TEAM_NAME_PATTERN.test(teamName) ? teamName : null;
}
registerPendingCrossTeamReplyExpectation(
teamName: string,
otherTeam: string,
@ -1383,20 +1360,136 @@ export class TeamProvisioningService {
}
}
private getPendingCrossTeamReplyExpectationKeys(teamName: string): Set<string> {
const teamMap = this.pendingCrossTeamFirstReplies.get(teamName.trim());
if (!teamMap) return new Set<string>();
const cutoff = Date.now() - TeamProvisioningService.PENDING_CROSS_TEAM_REPLY_TTL_MS;
for (const [key, createdAt] of teamMap.entries()) {
if (createdAt < cutoff) {
teamMap.delete(key);
}
private getRunLeadName(run: ProvisioningRun): string {
return (
run.request.members.find((m) => m.role?.toLowerCase().includes('lead'))?.name || 'team-lead'
);
}
private extractStreamUserText(msg: Record<string, unknown>): string | null {
const topLevelContent = msg.content;
if (typeof topLevelContent === 'string') {
return topLevelContent;
}
if (teamMap.size === 0) {
this.pendingCrossTeamFirstReplies.delete(teamName.trim());
return new Set<string>();
if (Array.isArray(topLevelContent)) {
const text = topLevelContent
.filter(
(part): part is Record<string, unknown> =>
!!part &&
typeof part === 'object' &&
part.type === 'text' &&
typeof part.text === 'string'
)
.map((part) => part.text as string)
.join('\n')
.trim();
if (text.length > 0) return text;
}
return new Set(teamMap.keys());
const message = msg.message;
if (!message || typeof message !== 'object') return null;
const innerContent = (message as Record<string, unknown>).content;
if (typeof innerContent === 'string') {
const trimmed = innerContent.trim();
return trimmed.length > 0 ? trimmed : null;
}
if (!Array.isArray(innerContent)) return null;
const text = innerContent
.filter(
(part): part is Record<string, unknown> =>
!!part &&
typeof part === 'object' &&
part.type === 'text' &&
typeof part.text === 'string'
)
.map((part) => part.text as string)
.join('\n')
.trim();
return text.length > 0 ? text : null;
}
private async markDeliveredCrossTeamLeadMessagesRead(
teamName: string,
leadName: string,
deliveredBlocks: Array<{
teammateId: string;
content: string;
conversationId: string;
}>
): Promise<void> {
if (deliveredBlocks.length === 0) return;
let leadInboxMessages: Awaited<ReturnType<TeamInboxReader['getMessagesFor']>> = [];
try {
leadInboxMessages = await this.inboxReader.getMessagesFor(teamName, leadName);
} catch {
return;
}
const toMark: InboxMessage[] = [];
for (const block of deliveredBlocks) {
const matchesBlock = (message: InboxMessage, requireExactText: boolean): boolean => {
if (message.read || message.source !== CROSS_TEAM_SOURCE) return false;
if (!this.hasStableMessageId(message)) return false;
if (message.from.trim() !== block.teammateId.trim()) return false;
const messageConversationId =
message.replyToConversationId?.trim() ??
message.conversationId?.trim() ??
parseCrossTeamPrefix(message.text)?.conversationId;
if (messageConversationId !== block.conversationId) return false;
return !requireExactText || message.text.trim() === block.content.trim();
};
const matched =
leadInboxMessages.find((message) => matchesBlock(message, true)) ??
leadInboxMessages.find((message) => matchesBlock(message, false));
if (!matched) continue;
matched.read = true;
toMark.push(matched);
}
if (toMark.length === 0) return;
try {
await this.markInboxMessagesRead(teamName, leadName, toMark);
} catch {
// best-effort
}
}
private handleNativeTeammateUserMessage(
run: ProvisioningRun,
msg: Record<string, unknown>
): void {
const rawText = this.extractStreamUserText(msg);
if (!rawText) return;
const blocks = parseAllTeammateMessages(rawText);
if (blocks.length === 0) return;
const crossTeamBlocks = blocks.flatMap((block) => {
const origin = parseCrossTeamPrefix(block.content);
const sourceTeam = origin?.from.includes('.') ? origin.from.split('.', 1)[0] : null;
const conversationId =
origin?.conversationId?.trim() || origin?.replyToConversationId?.trim();
if (!sourceTeam || !conversationId) return [];
return [
{
teammateId: block.teammateId,
content: block.content,
toTeam: sourceTeam,
conversationId,
},
];
});
if (crossTeamBlocks.length === 0) return;
run.activeCrossTeamReplyHints = crossTeamBlocks.map((block) => ({
toTeam: block.toTeam,
conversationId: block.conversationId,
}));
const leadName = this.getRunLeadName(run);
void this.markDeliveredCrossTeamLeadMessagesRead(run.teamName, leadName, crossTeamBlocks);
}
private persistSentMessage(teamName: string, message: InboxMessage): void {
@ -2953,74 +3046,14 @@ export class TeamProvisioningService {
if (unread.length === 0) return 0;
const latestOutboundByConversation = new Map<string, number>();
const latestReadInboundByConversation = new Map<string, number>();
for (const message of leadInboxMessages) {
const timestampMs = Date.parse(message.timestamp);
if (!Number.isFinite(timestampMs)) continue;
if (message.source === CROSS_TEAM_SENT_SOURCE) {
const conversationId = message.conversationId?.trim();
const targetTeam = this.parseCrossTeamTargetTeam(message.to);
if (!conversationId || !targetTeam) continue;
const key = this.buildCrossTeamConversationKey(targetTeam, conversationId);
latestOutboundByConversation.set(
key,
Math.max(latestOutboundByConversation.get(key) ?? 0, timestampMs)
);
continue;
}
if (message.source === CROSS_TEAM_SOURCE && message.read) {
const conversationId =
message.replyToConversationId?.trim() ??
message.conversationId?.trim() ??
parseCrossTeamPrefix(message.text)?.conversationId;
const sourceTeam = this.getCrossTeamSourceTeam(message.from);
if (!conversationId || !sourceTeam) continue;
const key = this.buildCrossTeamConversationKey(sourceTeam, conversationId);
latestReadInboundByConversation.set(
key,
Math.max(latestReadInboundByConversation.get(key) ?? 0, timestampMs)
);
}
}
const pendingHistoricalReplies = new Set(
Array.from(latestOutboundByConversation.entries())
.filter(([key, sentAtMs]) => sentAtMs > (latestReadInboundByConversation.get(key) ?? 0))
.map(([key]) => key)
);
const pendingTransientReplies = this.getPendingCrossTeamReplyExpectationKeys(teamName);
const matchedTransientReplyKeys = new Set<string>();
const isCrossTeamReplyToOwnOutbound = (message: InboxMessage): boolean => {
if (message.source !== CROSS_TEAM_SOURCE) return false;
const conversationId =
message.replyToConversationId?.trim() ??
message.conversationId?.trim() ??
parseCrossTeamPrefix(message.text)?.conversationId;
if (!conversationId) return false;
const sourceTeam = this.getCrossTeamSourceTeam(message.from);
if (!sourceTeam) return false;
const key = this.buildCrossTeamConversationKey(sourceTeam, conversationId);
if (pendingHistoricalReplies.has(key)) {
return true;
}
if (pendingTransientReplies.has(key)) {
matchedTransientReplyKeys.add(key);
return true;
}
return false;
};
// Ignore (and auto-mark read) internal coordination noise like idle/shutdown messages.
// Also ignore local sender-copy rows for cross-team traffic: those exist only so the UI
// can show outbound activity and must not be re-injected into the live lead as new work.
// Incoming replies to our own outbound cross-team conversations should also remain visible
// in team history without waking the local lead into a reply loop.
// Incoming cross-team deliveries are handled through Claude's native <teammate-message>
// path and are marked read when that raw user turn is observed, so we intentionally do not
// custom-relay them here.
const ignoredUnread = unread.filter(
(m) =>
isInboxNoiseMessage(m.text) ||
m.source === CROSS_TEAM_SENT_SOURCE ||
isCrossTeamReplyToOwnOutbound(m)
(m) => isInboxNoiseMessage(m.text) || m.source === CROSS_TEAM_SENT_SOURCE
);
if (ignoredUnread.length > 0) {
try {
@ -3028,19 +3061,13 @@ export class TeamProvisioningService {
} catch {
// best-effort
}
for (const key of matchedTransientReplyKeys) {
const [otherTeam, conversationId] = key.split('\0');
if (otherTeam && conversationId) {
this.clearPendingCrossTeamReplyExpectation(teamName, otherTeam, conversationId);
}
}
}
const actionableUnread = unread.filter(
(m) =>
!isInboxNoiseMessage(m.text) &&
m.source !== CROSS_TEAM_SENT_SOURCE &&
!isCrossTeamReplyToOwnOutbound(m)
m.source !== CROSS_TEAM_SOURCE
);
if (actionableUnread.length === 0) return 0;
@ -3621,8 +3648,7 @@ export class TeamProvisioningService {
*/
private pushLiveLeadTextMessage(run: ProvisioningRun, cleanText: string): void {
run.leadMsgSeq += 1;
const leadName =
run.request.members.find((m) => m.role?.toLowerCase().includes('lead'))?.name || 'team-lead';
const leadName = this.getRunLeadName(run);
const messageId = `lead-turn-${run.runId}-${run.leadMsgSeq}`;
// Attach accumulated tool call details from preceding tool_use messages, then reset.
const toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined;
@ -3687,6 +3713,10 @@ export class TeamProvisioningService {
// stream-json output has various message types:
// {"type":"assistant","content":[{"type":"text","text":"..."},...]}
// {"type":"result","subtype":"success",...}
if (msg.type === 'user') {
this.handleNativeTeammateUserMessage(run, msg);
return;
}
if (msg.type === 'assistant') {
const content = Array.isArray(msg.content)
? (msg.content as Record<string, unknown>[])

View file

@ -73,11 +73,27 @@ vi.mock('../../../../src/main/services/team/atomicWrite', () => ({
atomicWriteAsync: hoisted.atomicWrite,
}));
vi.mock('../../../../src/main/services/team/fileLock', () => ({
withFileLock: async (_filePath: string, fn: () => Promise<unknown>) => await fn(),
}));
vi.mock('../../../../src/main/services/team/inboxLock', () => ({
withInboxLock: async (_filePath: string, fn: () => Promise<unknown>) => await fn(),
}));
vi.mock('../../../../src/main/utils/pathDecoder', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../../../src/main/utils/pathDecoder')>();
return { ...actual, getTeamsBasePath: () => '/mock/teams' };
});
vi.mock('../../../../src/main/utils/fsRead', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../../../src/main/utils/fsRead')>();
return {
...actual,
readFileUtf8WithTimeout: hoisted.readFile,
};
});
vi.mock('agent-teams-controller', () => ({
createController: ({ teamName }: { teamName: string }) => ({
messages: {
@ -102,6 +118,10 @@ function seedConfig(teamName: string): void {
);
}
function seedLeadInbox(teamName: string, messages: unknown[]): void {
hoisted.files.set(`/mock/teams/${teamName}/inboxes/team-lead.json`, JSON.stringify(messages));
}
interface RunLike {
runId: string;
teamName: string;
@ -568,6 +588,45 @@ describe('TeamProvisioningService pre-ready live messages', () => {
expect(hoisted.sendInboxMessage).not.toHaveBeenCalled();
});
it('marks native cross-team teammate-message deliveries as read and restores reply hints', async () => {
const service = new TeamProvisioningService();
seedConfig('my-team');
seedLeadInbox('my-team', [
{
from: 'other-team.team-lead',
to: 'team-lead',
text: '<cross-team from="other-team.team-lead" depth="0" conversationId="conv-native-1" replyToConversationId="conv-native-1" />\nНативная доставка.',
timestamp: '2026-02-23T10:01:00.000Z',
read: false,
source: 'cross_team',
messageId: 'm-native-cross-team-1',
conversationId: 'conv-native-1',
replyToConversationId: 'conv-native-1',
},
]);
const run = attachRun(service, 'my-team', { provisioningComplete: true });
callHandleStreamJsonMessage(service, run, {
type: 'user',
message: {
role: 'user',
content:
'<teammate-message teammate_id="other-team.team-lead" color="purple" summary="Cross-team reply"><cross-team from="other-team.team-lead" depth="0" conversationId="conv-native-1" replyToConversationId="conv-native-1" />\nНативная доставка.</teammate-message>',
},
});
await vi.waitFor(() => {
const updatedInbox = JSON.parse(
hoisted.files.get('/mock/teams/my-team/inboxes/team-lead.json') ?? '[]'
) as Array<{ read?: boolean }>;
expect(updatedInbox[0]?.read).toBe(true);
});
expect(run.activeCrossTeamReplyHints).toEqual([
{ toTeam: 'other-team', conversationId: 'conv-native-1' },
]);
});
it('rescues mistaken cross_team_send recipients into actual cross-team replies', async () => {
const service = new TeamProvisioningService();
seedConfig('my-team');

View file

@ -88,6 +88,14 @@ vi.mock('../../../../src/main/services/team/atomicWrite', () => ({
atomicWriteAsync: hoisted.atomicWrite,
}));
vi.mock('../../../../src/main/services/team/fileLock', () => ({
withFileLock: async (_filePath: string, fn: () => Promise<unknown>) => await fn(),
}));
vi.mock('../../../../src/main/services/team/inboxLock', () => ({
withInboxLock: async (_filePath: string, fn: () => Promise<unknown>) => await fn(),
}));
vi.mock('../../../../src/main/utils/pathDecoder', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../../../src/main/utils/pathDecoder')>();
return {
@ -96,6 +104,14 @@ vi.mock('../../../../src/main/utils/pathDecoder', async (importOriginal) => {
};
});
vi.mock('../../../../src/main/utils/fsRead', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../../../src/main/utils/fsRead')>();
return {
...actual,
readFileUtf8WithTimeout: hoisted.readFile,
};
});
vi.mock('agent-teams-controller', () => ({
createController: ({ teamName }: { teamName: string }) => ({
messages: {
@ -339,7 +355,7 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => {
expect(service.resolveCrossTeamReplyMetadata(teamName, 'other-team')).toBeNull();
});
it('includes explicit cross-team reply instructions in lead relay prompts', async () => {
it('does not custom-relay incoming cross-team lead inbox messages', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
seedConfig(teamName);
@ -357,24 +373,17 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => {
]);
const { writeSpy } = attachAliveRun(service, teamName);
const relayPromise = service.relayLeadInboxMessages(teamName);
const run = await waitForCapture(service);
expect(run?.leadRelayCapture).toBeTruthy();
const relayed = await service.relayLeadInboxMessages(teamName);
const payload = String(writeSpy.mock.calls[0]?.[0] ?? '');
expect(payload).toContain('Source: cross_team');
expect(payload).toContain('Cross-team conversationId: conv-explicit');
expect(payload).toContain('Call the MCP tool named cross_team_send with toTeam=\\"other-team\\"');
expect(payload).toContain('replyToConversationId=\\"conv-explicit\\"');
expect(payload).toContain('NEVER set recipient/to to \\"cross_team_send\\"');
expect(relayed).toBe(0);
expect(writeSpy).toHaveBeenCalledTimes(0);
(service as any).handleStreamJsonMessage(run, {
type: 'assistant',
content: [{ type: 'text', text: 'Replying properly.' }],
});
(service as any).handleStreamJsonMessage(run, { type: 'result', subtype: 'success' });
await relayPromise;
const updatedInbox = JSON.parse(
hoisted.files.get(`/mock/teams/${teamName}/inboxes/team-lead.json`) ?? '[]'
) as Array<{ messageId?: string; read?: boolean }>;
expect(updatedInbox).toHaveLength(1);
expect(updatedInbox[0]?.messageId).toBe('m-cross-team-explicit');
expect(updatedInbox[0]?.read).toBe(false);
});
it('does not relay cross-team sender copies back into the live lead', async () => {
@ -473,7 +482,7 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => {
expect(writeSpy).toHaveBeenCalledTimes(0);
});
it('relays later follow-up messages after the first reply in a conversation was already received', async () => {
it('leaves later cross-team follow-up messages for the native teammate-message path', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
seedConfig(teamName);
@ -513,18 +522,17 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => {
]);
const { writeSpy } = attachAliveRun(service, teamName);
const relayPromise = service.relayLeadInboxMessages(teamName);
const run = await waitForCapture(service);
expect(run?.leadRelayCapture).toBeTruthy();
(service as any).handleStreamJsonMessage(run, {
type: 'assistant',
content: [{ type: 'text', text: 'I will answer the follow-up.' }],
});
(service as any).handleStreamJsonMessage(run, { type: 'result', subtype: 'success' });
const relayed = await service.relayLeadInboxMessages(teamName);
const relayed = await relayPromise;
expect(relayed).toBe(1);
expect(writeSpy).toHaveBeenCalledTimes(1);
expect(relayed).toBe(0);
expect(writeSpy).toHaveBeenCalledTimes(0);
const updatedInbox = JSON.parse(
hoisted.files.get(`/mock/teams/${teamName}/inboxes/team-lead.json`) ?? '[]'
) as Array<{ messageId?: string; read?: boolean }>;
expect(updatedInbox).toHaveLength(3);
expect(updatedInbox[2]?.messageId).toBe('m-cross-team-followup');
expect(updatedInbox[2]?.read).toBe(false);
});
it('relays unread teammate inbox messages through the live team process', async () => {