From 7c6837eacdfa59b1ece8227e22a3be94c637547e Mon Sep 17 00:00:00 2001 From: iliya Date: Mon, 16 Mar 2026 18:31:28 +0200 Subject: [PATCH] feat: add same-team native delivery dedup for lead inbox relay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 3-layer dedup to prevent duplicate message delivery when CLI delivers teammate messages both natively () and via inbox file: Layer 1: Grace window — defer recent source-less messages (15s) Layer 2: Fingerprint cache + one-to-one pairing — marks specific messageIds as read Layer 3: Deduped retry timers — ensures delivery if native path fails --- .../services/team/TeamProvisioningService.ts | 404 ++++++++++++++++-- 1 file changed, 363 insertions(+), 41 deletions(-) diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index e5ac9aed..5bd0466e 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -34,7 +34,10 @@ import { isInboxNoiseMessage } from '@shared/utils/inboxNoise'; import { isLeadAgentType, isLeadMember } from '@shared/utils/leadDetection'; import { createLogger } from '@shared/utils/logger'; import { formatTaskDisplayLabel } from '@shared/utils/taskIdentity'; -import { parseAllTeammateMessages } from '@shared/utils/teammateMessageParser'; +import { + parseAllTeammateMessages, + type ParsedTeammateContent, +} from '@shared/utils/teammateMessageParser'; import { createCliAutoSuffixNameGuard, parseNumericSuffixName } from '@shared/utils/teamMemberName'; import { extractToolPreview, formatToolSummaryFromCalls } from '@shared/utils/toolSummary'; import * as agentTeamsControllerModule from 'agent-teams-controller'; @@ -1154,10 +1157,27 @@ interface PendingInboxRelayCandidate { queuedAtMs: number; } +interface NativeSameTeamFingerprint { + id: string; + from: string; + text: string; + summary: string; + seenAt: number; +} + +function normalizeSameTeamText(text: string): string { + return text.trim().replace(/\r\n/g, '\n'); +} + export class TeamProvisioningService { private static readonly CLAUDE_LOG_LINES_LIMIT = 50_000; private static readonly RECENT_CROSS_TEAM_DELIVERY_TTL_MS = 10 * 60 * 1000; private static readonly PENDING_INBOX_RELAY_TTL_MS = 2 * 60 * 1000; + private static readonly SAME_TEAM_NATIVE_DELIVERY_GRACE_MS = 15_000; + private static readonly SAME_TEAM_NATIVE_FINGERPRINT_TTL_MS = 60_000; + private static readonly SAME_TEAM_MATCH_WINDOW_MS = 30_000; + private static readonly SAME_TEAM_RUN_START_SKEW_MS = 1_000; + private static readonly SAME_TEAM_PERSIST_RETRY_MS = 2_000; private readonly runs = new Map(); private readonly provisioningRunByTeam = new Map(); @@ -1170,6 +1190,10 @@ export class TeamProvisioningService { private readonly pendingCrossTeamFirstReplies = new Map>(); private readonly recentCrossTeamLeadDeliveryMessageIds = new Map>(); private readonly liveLeadProcessMessages = new Map(); + private readonly recentSameTeamNativeFingerprints = new Map< + string, + NativeSameTeamFingerprint[] + >(); private teamChangeEmitter: ((event: TeamChangeEvent) => void) | null = null; private helpOutputCache: string | null = null; private helpOutputCacheTime = 0; @@ -1673,35 +1697,44 @@ export class TeamProvisioningService { }, ]; }); - if (crossTeamBlocks.length === 0) return; - - const leadName = this.getRunLeadName(run); - void (async () => { - const matches = await this.matchCrossTeamLeadInboxMessages( - run.teamName, - leadName, - crossTeamBlocks - ); - const unreadMatches = matches.filter((match) => !match.wasRead); - if (unreadMatches.length > 0) { - try { - await this.markInboxMessagesRead(run.teamName, leadName, unreadMatches); - } catch { - // best-effort + // Cross-team reconciliation (existing logic) + if (crossTeamBlocks.length > 0) { + const leadName = this.getRunLeadName(run); + void (async () => { + const matches = await this.matchCrossTeamLeadInboxMessages( + run.teamName, + leadName, + crossTeamBlocks + ); + const unreadMatches = matches.filter((match) => !match.wasRead); + if (unreadMatches.length > 0) { + try { + await this.markInboxMessagesRead(run.teamName, leadName, unreadMatches); + } catch { + // best-effort + } } - } - const freshMatches = matches.filter( - (match) => !this.wasRecentlyDeliveredToLead(run.teamName, match.messageId) - ); - this.rememberRecentCrossTeamLeadDeliveryMessageIds( - run.teamName, - freshMatches.map((match) => match.messageId) - ); - run.activeCrossTeamReplyHints = freshMatches.map((match) => ({ - toTeam: match.toTeam, - conversationId: match.conversationId, - })); - })(); + const freshMatches = matches.filter( + (match) => !this.wasRecentlyDeliveredToLead(run.teamName, match.messageId) + ); + this.rememberRecentCrossTeamLeadDeliveryMessageIds( + run.teamName, + freshMatches.map((match) => match.messageId) + ); + run.activeCrossTeamReplyHints = freshMatches.map((match) => ({ + toTeam: match.toTeam, + conversationId: match.conversationId, + })); + })(); + } + + // Same-team reconciliation: record fingerprints for native delivery dedup + const sameTeamBlocks = blocks.filter((block) => !parseCrossTeamPrefix(block.content)); + if (sameTeamBlocks.length > 0) { + this.rememberSameTeamNativeFingerprints(run.teamName, sameTeamBlocks); + const leadName = this.getRunLeadName(run); + void this.reconcileSameTeamNativeDeliveries(run.teamName, leadName); + } } private persistSentMessage(teamName: string, message: InboxMessage): void { @@ -3577,21 +3610,18 @@ export class TeamProvisioningService { return false; }; - // Ignore (and auto-mark read) internal coordination noise like idle/shutdown messages. - // Also ignore local sender-copy rows for cross-team traffic: those exist only so the UI - // can show outbound activity and must not be re-injected into the live lead as new work. - // If the same cross-team delivery already arrived via a raw turn, - // suppress the duplicate relay here and simply mark the inbox row as read. - const ignoredUnread = unread.filter( + // Category 1: permanently ignored → mark as read. + // Includes noise (idle/shutdown), cross-team sender copies, cross-team reply dedup. + const permanentlyIgnored = unread.filter( (m) => isInboxNoiseMessage(m.text) || m.source === CROSS_TEAM_SENT_SOURCE || isCrossTeamReplyToOwnOutbound(m) || wasRecentlyDeliveredCrossTeam(m) ); - if (ignoredUnread.length > 0) { + if (permanentlyIgnored.length > 0) { try { - await this.markInboxMessagesRead(teamName, leadName, ignoredUnread); + await this.markInboxMessagesRead(teamName, leadName, permanentlyIgnored); } catch { // best-effort } @@ -3603,13 +3633,38 @@ export class TeamProvisioningService { } } + // Category 2: same-team native delivery confirmation (one-to-one pairing). + const { nativeMatchedMessageIds, persisted: sameTeamPersisted } = + await this.confirmSameTeamNativeMatches(teamName, leadName, unread); + + // Category 3: deferred by age — source-less messages within grace window of CURRENT run. + // NOT marked read (crash safety: if native delivery fails, retry will relay). + const runStartedAtMs = Date.parse(run.startedAt); + const permanentlyIgnoredIds = new Set(permanentlyIgnored.map((m) => m.messageId)); + const deferredByAge = unread.filter( + (m) => + !permanentlyIgnoredIds.has(m.messageId) && + !nativeMatchedMessageIds.has(m.messageId) && + this.shouldDeferSameTeamMessage(m, leadName, runStartedAtMs) + ); + const deferredIds = new Set(deferredByAge.map((m) => m.messageId)); + + // Actionable: everything not in any category. const actionableUnread = unread.filter( (m) => - !isInboxNoiseMessage(m.text) && - m.source !== CROSS_TEAM_SENT_SOURCE && - !isCrossTeamReplyToOwnOutbound(m) && - !wasRecentlyDeliveredCrossTeam(m) + !permanentlyIgnoredIds.has(m.messageId) && + !nativeMatchedMessageIds.has(m.messageId) && + !deferredIds.has(m.messageId) ); + + // Layer 3: schedule retry timers. + if (nativeMatchedMessageIds.size > 0 && !sameTeamPersisted) { + this.scheduleSameTeamPersistRetry(teamName); + } + if (deferredByAge.length > 0) { + this.scheduleSameTeamDeferredRetry(teamName); + } + if (actionableUnread.length === 0) return 0; const MAX_RELAY = 10; @@ -5426,6 +5481,263 @@ export class TeamProvisioningService { ); } + // --------------------------------------------------------------------------- + // Same-team native delivery dedup (Layer 2) + // --------------------------------------------------------------------------- + + private collectConfirmedSameTeamPairs( + messages: InboxMessage[], + fingerprints: NativeSameTeamFingerprint[], + leadName: string + ): { confirmedMessageIds: Set; matchedFingerprintIds: Set } { + const confirmedMessageIds = new Set(); + const matchedFingerprintIds = new Set(); + + if (fingerprints.length === 0) { + return { confirmedMessageIds, matchedFingerprintIds }; + } + + // Build group key: from + normalizedText (summary checked during pairing, not grouping) + const groupKey = (from: string, text: string) => `${from}\0${text}`; + + // Group fingerprints by (from, text), sorted FIFO by seenAt within each group + const fpByGroup = new Map(); + for (const fp of fingerprints) { + const key = groupKey(fp.from, fp.text); + let group = fpByGroup.get(key); + if (!group) { + group = []; + fpByGroup.set(key, group); + } + group.push(fp); + } + for (const group of fpByGroup.values()) { + group.sort((a, b) => a.seenAt - b.seenAt); + } + + // Collect eligible inbox messages, grouped by (from, text), sorted FIFO by timestamp + type EligibleMsg = InboxMessage & { messageId: string; parsedTs: number }; + const msgByGroup = new Map(); + for (const m of messages) { + if (m.read) continue; + if (m.source) continue; + if (!this.hasStableMessageId(m)) continue; + const fromName = m.from?.trim() ?? ''; + if (!fromName || fromName === leadName || fromName === 'user') continue; + const parsedTs = Date.parse(m.timestamp); + if (!Number.isFinite(parsedTs)) continue; + + const key = groupKey(fromName, normalizeSameTeamText(m.text)); + let group = msgByGroup.get(key); + if (!group) { + group = []; + msgByGroup.set(key, group); + } + group.push({ ...m, parsedTs } as EligibleMsg); + } + for (const group of msgByGroup.values()) { + group.sort((a, b) => a.parsedTs - b.parsedTs); + } + + // FIFO pair within each group: first fingerprint → first message, second → second, etc. + // This prevents delayed native delivery from pairing with the wrong inbox row + // when identical messages (e.g. "Done") are sent close together. + for (const [key, fps] of fpByGroup) { + const msgs = msgByGroup.get(key); + if (!msgs || msgs.length === 0) continue; + + const limit = Math.min(fps.length, msgs.length); + for (let i = 0; i < limit; i++) { + const fp = fps[i]; + const m = msgs[i]; + // Summary validation: if both sides have summary, they must match + if (fp.summary && m.summary?.trim() && fp.summary !== m.summary.trim()) continue; + // Time window validation + if (Math.abs(m.parsedTs - fp.seenAt) > TeamProvisioningService.SAME_TEAM_MATCH_WINDOW_MS) { + continue; + } + confirmedMessageIds.add(m.messageId); + matchedFingerprintIds.add(fp.id); + } + } + + return { confirmedMessageIds, matchedFingerprintIds }; + } + + private rememberSameTeamNativeFingerprints( + teamName: string, + blocks: ParsedTeammateContent[] + ): void { + const teamKey = teamName.trim(); + const existing = this.recentSameTeamNativeFingerprints.get(teamKey) ?? []; + const now = Date.now(); + const cutoff = now - TeamProvisioningService.SAME_TEAM_NATIVE_FINGERPRINT_TTL_MS; + const fresh = existing.filter((fp) => fp.seenAt > cutoff); + + for (const block of blocks) { + fresh.push({ + id: randomUUID(), + from: block.teammateId.trim(), + text: normalizeSameTeamText(block.content), + summary: (block.summary ?? '').trim(), + seenAt: now, + }); + } + + this.recentSameTeamNativeFingerprints.set(teamKey, fresh); + } + + private consumeMatchedSameTeamFingerprints(teamName: string, matchedIds: Set): void { + if (matchedIds.size === 0) return; + const current = this.recentSameTeamNativeFingerprints.get(teamName.trim()) ?? []; + if (current.length === 0) return; + const remaining = current.filter((fp) => !matchedIds.has(fp.id)); + if (remaining.length > 0) { + this.recentSameTeamNativeFingerprints.set(teamName.trim(), remaining); + } else { + this.recentSameTeamNativeFingerprints.delete(teamName.trim()); + } + } + + private getFreshSameTeamNativeFingerprints(teamName: string): NativeSameTeamFingerprint[] { + const all = this.recentSameTeamNativeFingerprints.get(teamName) ?? []; + if (all.length === 0) return []; + const cutoff = Date.now() - TeamProvisioningService.SAME_TEAM_NATIVE_FINGERPRINT_TTL_MS; + const fresh = all.filter((fp) => fp.seenAt > cutoff); + if (fresh.length !== all.length) { + if (fresh.length > 0) { + this.recentSameTeamNativeFingerprints.set(teamName, fresh); + } else { + this.recentSameTeamNativeFingerprints.delete(teamName); + } + } + return fresh; + } + + private isPotentialSameTeamCliMessage(m: InboxMessage, leadName: string): boolean { + if (m.source) return false; + const fromName = m.from?.trim() ?? ''; + if (!fromName || fromName === leadName || fromName === 'user') return false; + const toName = m.to?.trim(); + if (toName && toName !== leadName) return false; + return true; + } + + private shouldDeferSameTeamMessage( + m: InboxMessage, + leadName: string, + runStartedAtMs: number + ): boolean { + if (!this.isPotentialSameTeamCliMessage(m, leadName)) return false; + const messageTs = Date.parse(m.timestamp); + if (!Number.isFinite(messageTs) || messageTs < 0) return false; + if ( + Number.isFinite(runStartedAtMs) && + messageTs < runStartedAtMs - TeamProvisioningService.SAME_TEAM_RUN_START_SKEW_MS + ) { + return false; + } + const ageMs = Date.now() - messageTs; + if (ageMs < 0) return false; + return ageMs < TeamProvisioningService.SAME_TEAM_NATIVE_DELIVERY_GRACE_MS; + } + + private async confirmSameTeamNativeMatches( + teamName: string, + leadName: string, + messages: InboxMessage[] + ): Promise<{ nativeMatchedMessageIds: Set; persisted: boolean }> { + const fingerprints = this.getFreshSameTeamNativeFingerprints(teamName); + const { confirmedMessageIds, matchedFingerprintIds } = this.collectConfirmedSameTeamPairs( + messages, + fingerprints, + leadName + ); + + if (confirmedMessageIds.size === 0) { + return { nativeMatchedMessageIds: confirmedMessageIds, persisted: true }; + } + + const toMarkRead = Array.from(confirmedMessageIds, (messageId) => ({ messageId })); + let persisted = false; + try { + await this.markInboxMessagesRead(teamName, leadName, toMarkRead); + persisted = true; + } catch { + // keep fingerprints alive for next attempt + } + + if (persisted) { + // Durable: inbox says read=true. Safe to add in-memory dedup and consume fingerprints. + const relayedIds = this.relayedLeadInboxMessageIds.get(teamName) ?? new Set(); + for (const messageId of confirmedMessageIds) { + relayedIds.add(messageId); + } + this.relayedLeadInboxMessageIds.set(teamName, this.trimRelayedSet(relayedIds)); + this.consumeMatchedSameTeamFingerprints(teamName, matchedFingerprintIds); + } + // If NOT persisted: don't add to relayedIds, don't consume fingerprints. + // Next relay cycle will see the message in unread, re-match, and retry persist. + + return { nativeMatchedMessageIds: confirmedMessageIds, persisted }; + } + + private async reconcileSameTeamNativeDeliveries( + teamName: string, + leadName: string + ): Promise { + let leadInboxMessages: Awaited> = []; + try { + leadInboxMessages = await this.inboxReader.getMessagesFor(teamName, leadName); + } catch { + return; + } + + const { nativeMatchedMessageIds, persisted } = await this.confirmSameTeamNativeMatches( + teamName, + leadName, + leadInboxMessages + ); + // If native was matched but persist failed, schedule a quick retry + // so we don't wait for the 16s deferred timer to retry the disk write. + if (nativeMatchedMessageIds.size > 0 && !persisted) { + this.scheduleSameTeamPersistRetry(teamName); + } + } + + private scheduleSameTeamDeferredRetry(teamName: string): void { + const key = `same-team-deferred:${teamName}`; + if (this.pendingTimeouts.has(key)) return; + + const timer = setTimeout(() => { + this.pendingTimeouts.delete(key); + void this.relayLeadInboxMessages(teamName).catch((e: unknown) => + logger.warn(`[${teamName}] same-team deferred retry failed: ${String(e)}`) + ); + }, TeamProvisioningService.SAME_TEAM_NATIVE_DELIVERY_GRACE_MS + 1_000); + + this.pendingTimeouts.set(key, timer); + } + + /** + * Best-effort durable follow-up after native delivery was matched but inbox read-state + * could not be persisted. If the run dies before this retry succeeds, a later reconnect + * may still relay the row once because in-memory dedupe is not durable. + */ + private scheduleSameTeamPersistRetry(teamName: string): void { + const key = `same-team-persist:${teamName}`; + if (this.pendingTimeouts.has(key)) return; + + const timer = setTimeout(() => { + this.pendingTimeouts.delete(key); + void this.relayLeadInboxMessages(teamName).catch((e: unknown) => + logger.warn(`[${teamName}] same-team persist retry failed: ${String(e)}`) + ); + }, TeamProvisioningService.SAME_TEAM_PERSIST_RETRY_MS); + + this.pendingTimeouts.set(key, timer); + } + /** * Remove a run from tracking maps. */ @@ -5457,6 +5769,16 @@ export class TeamProvisioningService { this.relayedLeadInboxMessageIds.delete(run.teamName); this.pendingCrossTeamFirstReplies.delete(run.teamName); this.recentCrossTeamLeadDeliveryMessageIds.delete(run.teamName); + this.recentSameTeamNativeFingerprints.delete(run.teamName); + // Clear same-team retry timers + for (const suffix of ['deferred', 'persist']) { + const key = `same-team-${suffix}:${run.teamName}`; + const timer = this.pendingTimeouts.get(key); + if (timer) { + clearTimeout(timer); + this.pendingTimeouts.delete(key); + } + } run.activeCrossTeamReplyHints = []; run.pendingInboxRelayCandidates = []; for (const key of Array.from(this.memberInboxRelayInFlight.keys())) {