feat: add same-team native delivery dedup for lead inbox relay

3-layer dedup to prevent duplicate message delivery when CLI delivers
teammate messages both natively (<teammate-message>) and via inbox file:

Layer 1: Grace window — defer recent source-less messages (15s)
Layer 2: Fingerprint cache + one-to-one pairing — marks specific messageIds as read
Layer 3: Deduped retry timers — ensures delivery if native path fails
This commit is contained in:
iliya 2026-03-16 18:31:28 +02:00
parent 5686d60d99
commit 7c6837eacd

View file

@ -34,7 +34,10 @@ import { isInboxNoiseMessage } from '@shared/utils/inboxNoise';
import { isLeadAgentType, isLeadMember } from '@shared/utils/leadDetection';
import { createLogger } from '@shared/utils/logger';
import { formatTaskDisplayLabel } from '@shared/utils/taskIdentity';
import { parseAllTeammateMessages } from '@shared/utils/teammateMessageParser';
import {
parseAllTeammateMessages,
type ParsedTeammateContent,
} from '@shared/utils/teammateMessageParser';
import { createCliAutoSuffixNameGuard, parseNumericSuffixName } from '@shared/utils/teamMemberName';
import { extractToolPreview, formatToolSummaryFromCalls } from '@shared/utils/toolSummary';
import * as agentTeamsControllerModule from 'agent-teams-controller';
@ -1154,10 +1157,27 @@ interface PendingInboxRelayCandidate {
queuedAtMs: number;
}
interface NativeSameTeamFingerprint {
id: string;
from: string;
text: string;
summary: string;
seenAt: number;
}
function normalizeSameTeamText(text: string): string {
return text.trim().replace(/\r\n/g, '\n');
}
export class TeamProvisioningService {
private static readonly CLAUDE_LOG_LINES_LIMIT = 50_000;
private static readonly RECENT_CROSS_TEAM_DELIVERY_TTL_MS = 10 * 60 * 1000;
private static readonly PENDING_INBOX_RELAY_TTL_MS = 2 * 60 * 1000;
private static readonly SAME_TEAM_NATIVE_DELIVERY_GRACE_MS = 15_000;
private static readonly SAME_TEAM_NATIVE_FINGERPRINT_TTL_MS = 60_000;
private static readonly SAME_TEAM_MATCH_WINDOW_MS = 30_000;
private static readonly SAME_TEAM_RUN_START_SKEW_MS = 1_000;
private static readonly SAME_TEAM_PERSIST_RETRY_MS = 2_000;
private readonly runs = new Map<string, ProvisioningRun>();
private readonly provisioningRunByTeam = new Map<string, string>();
@ -1170,6 +1190,10 @@ export class TeamProvisioningService {
private readonly pendingCrossTeamFirstReplies = new Map<string, Map<string, number>>();
private readonly recentCrossTeamLeadDeliveryMessageIds = new Map<string, Map<string, number>>();
private readonly liveLeadProcessMessages = new Map<string, InboxMessage[]>();
private readonly recentSameTeamNativeFingerprints = new Map<
string,
NativeSameTeamFingerprint[]
>();
private teamChangeEmitter: ((event: TeamChangeEvent) => void) | null = null;
private helpOutputCache: string | null = null;
private helpOutputCacheTime = 0;
@ -1673,35 +1697,44 @@ export class TeamProvisioningService {
},
];
});
if (crossTeamBlocks.length === 0) return;
const leadName = this.getRunLeadName(run);
void (async () => {
const matches = await this.matchCrossTeamLeadInboxMessages(
run.teamName,
leadName,
crossTeamBlocks
);
const unreadMatches = matches.filter((match) => !match.wasRead);
if (unreadMatches.length > 0) {
try {
await this.markInboxMessagesRead(run.teamName, leadName, unreadMatches);
} catch {
// best-effort
// Cross-team reconciliation (existing logic)
if (crossTeamBlocks.length > 0) {
const leadName = this.getRunLeadName(run);
void (async () => {
const matches = await this.matchCrossTeamLeadInboxMessages(
run.teamName,
leadName,
crossTeamBlocks
);
const unreadMatches = matches.filter((match) => !match.wasRead);
if (unreadMatches.length > 0) {
try {
await this.markInboxMessagesRead(run.teamName, leadName, unreadMatches);
} catch {
// best-effort
}
}
}
const freshMatches = matches.filter(
(match) => !this.wasRecentlyDeliveredToLead(run.teamName, match.messageId)
);
this.rememberRecentCrossTeamLeadDeliveryMessageIds(
run.teamName,
freshMatches.map((match) => match.messageId)
);
run.activeCrossTeamReplyHints = freshMatches.map((match) => ({
toTeam: match.toTeam,
conversationId: match.conversationId,
}));
})();
const freshMatches = matches.filter(
(match) => !this.wasRecentlyDeliveredToLead(run.teamName, match.messageId)
);
this.rememberRecentCrossTeamLeadDeliveryMessageIds(
run.teamName,
freshMatches.map((match) => match.messageId)
);
run.activeCrossTeamReplyHints = freshMatches.map((match) => ({
toTeam: match.toTeam,
conversationId: match.conversationId,
}));
})();
}
// Same-team reconciliation: record fingerprints for native delivery dedup
const sameTeamBlocks = blocks.filter((block) => !parseCrossTeamPrefix(block.content));
if (sameTeamBlocks.length > 0) {
this.rememberSameTeamNativeFingerprints(run.teamName, sameTeamBlocks);
const leadName = this.getRunLeadName(run);
void this.reconcileSameTeamNativeDeliveries(run.teamName, leadName);
}
}
private persistSentMessage(teamName: string, message: InboxMessage): void {
@ -3577,21 +3610,18 @@ export class TeamProvisioningService {
return false;
};
// Ignore (and auto-mark read) internal coordination noise like idle/shutdown messages.
// Also ignore local sender-copy rows for cross-team traffic: those exist only so the UI
// can show outbound activity and must not be re-injected into the live lead as new work.
// If the same cross-team delivery already arrived via a raw <teammate-message> turn,
// suppress the duplicate relay here and simply mark the inbox row as read.
const ignoredUnread = unread.filter(
// Category 1: permanently ignored → mark as read.
// Includes noise (idle/shutdown), cross-team sender copies, cross-team reply dedup.
const permanentlyIgnored = unread.filter(
(m) =>
isInboxNoiseMessage(m.text) ||
m.source === CROSS_TEAM_SENT_SOURCE ||
isCrossTeamReplyToOwnOutbound(m) ||
wasRecentlyDeliveredCrossTeam(m)
);
if (ignoredUnread.length > 0) {
if (permanentlyIgnored.length > 0) {
try {
await this.markInboxMessagesRead(teamName, leadName, ignoredUnread);
await this.markInboxMessagesRead(teamName, leadName, permanentlyIgnored);
} catch {
// best-effort
}
@ -3603,13 +3633,38 @@ export class TeamProvisioningService {
}
}
// Category 2: same-team native delivery confirmation (one-to-one pairing).
const { nativeMatchedMessageIds, persisted: sameTeamPersisted } =
await this.confirmSameTeamNativeMatches(teamName, leadName, unread);
// Category 3: deferred by age — source-less messages within grace window of CURRENT run.
// NOT marked read (crash safety: if native delivery fails, retry will relay).
const runStartedAtMs = Date.parse(run.startedAt);
const permanentlyIgnoredIds = new Set(permanentlyIgnored.map((m) => m.messageId));
const deferredByAge = unread.filter(
(m) =>
!permanentlyIgnoredIds.has(m.messageId) &&
!nativeMatchedMessageIds.has(m.messageId) &&
this.shouldDeferSameTeamMessage(m, leadName, runStartedAtMs)
);
const deferredIds = new Set(deferredByAge.map((m) => m.messageId));
// Actionable: everything not in any category.
const actionableUnread = unread.filter(
(m) =>
!isInboxNoiseMessage(m.text) &&
m.source !== CROSS_TEAM_SENT_SOURCE &&
!isCrossTeamReplyToOwnOutbound(m) &&
!wasRecentlyDeliveredCrossTeam(m)
!permanentlyIgnoredIds.has(m.messageId) &&
!nativeMatchedMessageIds.has(m.messageId) &&
!deferredIds.has(m.messageId)
);
// Layer 3: schedule retry timers.
if (nativeMatchedMessageIds.size > 0 && !sameTeamPersisted) {
this.scheduleSameTeamPersistRetry(teamName);
}
if (deferredByAge.length > 0) {
this.scheduleSameTeamDeferredRetry(teamName);
}
if (actionableUnread.length === 0) return 0;
const MAX_RELAY = 10;
@ -5426,6 +5481,263 @@ export class TeamProvisioningService {
);
}
// ---------------------------------------------------------------------------
// Same-team native delivery dedup (Layer 2)
// ---------------------------------------------------------------------------
private collectConfirmedSameTeamPairs(
messages: InboxMessage[],
fingerprints: NativeSameTeamFingerprint[],
leadName: string
): { confirmedMessageIds: Set<string>; matchedFingerprintIds: Set<string> } {
const confirmedMessageIds = new Set<string>();
const matchedFingerprintIds = new Set<string>();
if (fingerprints.length === 0) {
return { confirmedMessageIds, matchedFingerprintIds };
}
// Build group key: from + normalizedText (summary checked during pairing, not grouping)
const groupKey = (from: string, text: string) => `${from}\0${text}`;
// Group fingerprints by (from, text), sorted FIFO by seenAt within each group
const fpByGroup = new Map<string, NativeSameTeamFingerprint[]>();
for (const fp of fingerprints) {
const key = groupKey(fp.from, fp.text);
let group = fpByGroup.get(key);
if (!group) {
group = [];
fpByGroup.set(key, group);
}
group.push(fp);
}
for (const group of fpByGroup.values()) {
group.sort((a, b) => a.seenAt - b.seenAt);
}
// Collect eligible inbox messages, grouped by (from, text), sorted FIFO by timestamp
type EligibleMsg = InboxMessage & { messageId: string; parsedTs: number };
const msgByGroup = new Map<string, EligibleMsg[]>();
for (const m of messages) {
if (m.read) continue;
if (m.source) continue;
if (!this.hasStableMessageId(m)) continue;
const fromName = m.from?.trim() ?? '';
if (!fromName || fromName === leadName || fromName === 'user') continue;
const parsedTs = Date.parse(m.timestamp);
if (!Number.isFinite(parsedTs)) continue;
const key = groupKey(fromName, normalizeSameTeamText(m.text));
let group = msgByGroup.get(key);
if (!group) {
group = [];
msgByGroup.set(key, group);
}
group.push({ ...m, parsedTs } as EligibleMsg);
}
for (const group of msgByGroup.values()) {
group.sort((a, b) => a.parsedTs - b.parsedTs);
}
// FIFO pair within each group: first fingerprint → first message, second → second, etc.
// This prevents delayed native delivery from pairing with the wrong inbox row
// when identical messages (e.g. "Done") are sent close together.
for (const [key, fps] of fpByGroup) {
const msgs = msgByGroup.get(key);
if (!msgs || msgs.length === 0) continue;
const limit = Math.min(fps.length, msgs.length);
for (let i = 0; i < limit; i++) {
const fp = fps[i];
const m = msgs[i];
// Summary validation: if both sides have summary, they must match
if (fp.summary && m.summary?.trim() && fp.summary !== m.summary.trim()) continue;
// Time window validation
if (Math.abs(m.parsedTs - fp.seenAt) > TeamProvisioningService.SAME_TEAM_MATCH_WINDOW_MS) {
continue;
}
confirmedMessageIds.add(m.messageId);
matchedFingerprintIds.add(fp.id);
}
}
return { confirmedMessageIds, matchedFingerprintIds };
}
private rememberSameTeamNativeFingerprints(
teamName: string,
blocks: ParsedTeammateContent[]
): void {
const teamKey = teamName.trim();
const existing = this.recentSameTeamNativeFingerprints.get(teamKey) ?? [];
const now = Date.now();
const cutoff = now - TeamProvisioningService.SAME_TEAM_NATIVE_FINGERPRINT_TTL_MS;
const fresh = existing.filter((fp) => fp.seenAt > cutoff);
for (const block of blocks) {
fresh.push({
id: randomUUID(),
from: block.teammateId.trim(),
text: normalizeSameTeamText(block.content),
summary: (block.summary ?? '').trim(),
seenAt: now,
});
}
this.recentSameTeamNativeFingerprints.set(teamKey, fresh);
}
private consumeMatchedSameTeamFingerprints(teamName: string, matchedIds: Set<string>): void {
if (matchedIds.size === 0) return;
const current = this.recentSameTeamNativeFingerprints.get(teamName.trim()) ?? [];
if (current.length === 0) return;
const remaining = current.filter((fp) => !matchedIds.has(fp.id));
if (remaining.length > 0) {
this.recentSameTeamNativeFingerprints.set(teamName.trim(), remaining);
} else {
this.recentSameTeamNativeFingerprints.delete(teamName.trim());
}
}
private getFreshSameTeamNativeFingerprints(teamName: string): NativeSameTeamFingerprint[] {
const all = this.recentSameTeamNativeFingerprints.get(teamName) ?? [];
if (all.length === 0) return [];
const cutoff = Date.now() - TeamProvisioningService.SAME_TEAM_NATIVE_FINGERPRINT_TTL_MS;
const fresh = all.filter((fp) => fp.seenAt > cutoff);
if (fresh.length !== all.length) {
if (fresh.length > 0) {
this.recentSameTeamNativeFingerprints.set(teamName, fresh);
} else {
this.recentSameTeamNativeFingerprints.delete(teamName);
}
}
return fresh;
}
private isPotentialSameTeamCliMessage(m: InboxMessage, leadName: string): boolean {
if (m.source) return false;
const fromName = m.from?.trim() ?? '';
if (!fromName || fromName === leadName || fromName === 'user') return false;
const toName = m.to?.trim();
if (toName && toName !== leadName) return false;
return true;
}
private shouldDeferSameTeamMessage(
m: InboxMessage,
leadName: string,
runStartedAtMs: number
): boolean {
if (!this.isPotentialSameTeamCliMessage(m, leadName)) return false;
const messageTs = Date.parse(m.timestamp);
if (!Number.isFinite(messageTs) || messageTs < 0) return false;
if (
Number.isFinite(runStartedAtMs) &&
messageTs < runStartedAtMs - TeamProvisioningService.SAME_TEAM_RUN_START_SKEW_MS
) {
return false;
}
const ageMs = Date.now() - messageTs;
if (ageMs < 0) return false;
return ageMs < TeamProvisioningService.SAME_TEAM_NATIVE_DELIVERY_GRACE_MS;
}
private async confirmSameTeamNativeMatches(
teamName: string,
leadName: string,
messages: InboxMessage[]
): Promise<{ nativeMatchedMessageIds: Set<string>; persisted: boolean }> {
const fingerprints = this.getFreshSameTeamNativeFingerprints(teamName);
const { confirmedMessageIds, matchedFingerprintIds } = this.collectConfirmedSameTeamPairs(
messages,
fingerprints,
leadName
);
if (confirmedMessageIds.size === 0) {
return { nativeMatchedMessageIds: confirmedMessageIds, persisted: true };
}
const toMarkRead = Array.from(confirmedMessageIds, (messageId) => ({ messageId }));
let persisted = false;
try {
await this.markInboxMessagesRead(teamName, leadName, toMarkRead);
persisted = true;
} catch {
// keep fingerprints alive for next attempt
}
if (persisted) {
// Durable: inbox says read=true. Safe to add in-memory dedup and consume fingerprints.
const relayedIds = this.relayedLeadInboxMessageIds.get(teamName) ?? new Set<string>();
for (const messageId of confirmedMessageIds) {
relayedIds.add(messageId);
}
this.relayedLeadInboxMessageIds.set(teamName, this.trimRelayedSet(relayedIds));
this.consumeMatchedSameTeamFingerprints(teamName, matchedFingerprintIds);
}
// If NOT persisted: don't add to relayedIds, don't consume fingerprints.
// Next relay cycle will see the message in unread, re-match, and retry persist.
return { nativeMatchedMessageIds: confirmedMessageIds, persisted };
}
private async reconcileSameTeamNativeDeliveries(
teamName: string,
leadName: string
): Promise<void> {
let leadInboxMessages: Awaited<ReturnType<TeamInboxReader['getMessagesFor']>> = [];
try {
leadInboxMessages = await this.inboxReader.getMessagesFor(teamName, leadName);
} catch {
return;
}
const { nativeMatchedMessageIds, persisted } = await this.confirmSameTeamNativeMatches(
teamName,
leadName,
leadInboxMessages
);
// If native was matched but persist failed, schedule a quick retry
// so we don't wait for the 16s deferred timer to retry the disk write.
if (nativeMatchedMessageIds.size > 0 && !persisted) {
this.scheduleSameTeamPersistRetry(teamName);
}
}
private scheduleSameTeamDeferredRetry(teamName: string): void {
const key = `same-team-deferred:${teamName}`;
if (this.pendingTimeouts.has(key)) return;
const timer = setTimeout(() => {
this.pendingTimeouts.delete(key);
void this.relayLeadInboxMessages(teamName).catch((e: unknown) =>
logger.warn(`[${teamName}] same-team deferred retry failed: ${String(e)}`)
);
}, TeamProvisioningService.SAME_TEAM_NATIVE_DELIVERY_GRACE_MS + 1_000);
this.pendingTimeouts.set(key, timer);
}
/**
* Best-effort durable follow-up after native delivery was matched but inbox read-state
* could not be persisted. If the run dies before this retry succeeds, a later reconnect
* may still relay the row once because in-memory dedupe is not durable.
*/
private scheduleSameTeamPersistRetry(teamName: string): void {
const key = `same-team-persist:${teamName}`;
if (this.pendingTimeouts.has(key)) return;
const timer = setTimeout(() => {
this.pendingTimeouts.delete(key);
void this.relayLeadInboxMessages(teamName).catch((e: unknown) =>
logger.warn(`[${teamName}] same-team persist retry failed: ${String(e)}`)
);
}, TeamProvisioningService.SAME_TEAM_PERSIST_RETRY_MS);
this.pendingTimeouts.set(key, timer);
}
/**
* Remove a run from tracking maps.
*/
@ -5457,6 +5769,16 @@ export class TeamProvisioningService {
this.relayedLeadInboxMessageIds.delete(run.teamName);
this.pendingCrossTeamFirstReplies.delete(run.teamName);
this.recentCrossTeamLeadDeliveryMessageIds.delete(run.teamName);
this.recentSameTeamNativeFingerprints.delete(run.teamName);
// Clear same-team retry timers
for (const suffix of ['deferred', 'persist']) {
const key = `same-team-${suffix}:${run.teamName}`;
const timer = this.pendingTimeouts.get(key);
if (timer) {
clearTimeout(timer);
this.pendingTimeouts.delete(key);
}
}
run.activeCrossTeamReplyHints = [];
run.pendingInboxRelayCandidates = [];
for (const key of Array.from(this.memberInboxRelayInFlight.keys())) {