From 511dd5d7a6d36c4a370abf59df9dfab9b4dae504 Mon Sep 17 00:00:00 2001 From: 777genius Date: Fri, 22 May 2026 21:00:20 +0300 Subject: [PATCH] fix(team): coalesce lead stream fragments --- src/main/services/team/TeamDataService.ts | 257 +++++++++++------- .../services/team/TeamProvisioningService.ts | 123 +++++++-- src/renderer/utils/mergeTeamMessages.ts | 108 +++++++- .../services/team/TeamDataService.test.ts | 55 ++++ .../team/TeamProvisioningService.test.ts | 57 ++++ ...eamProvisioningServiceLiveMessages.test.ts | 50 +++- test/renderer/utils/mergeTeamMessages.test.ts | 76 ++++++ 7 files changed, 607 insertions(+), 119 deletions(-) diff --git a/src/main/services/team/TeamDataService.ts b/src/main/services/team/TeamDataService.ts index 27e3954c..fbac3aee 100644 --- a/src/main/services/team/TeamDataService.ts +++ b/src/main/services/team/TeamDataService.ts @@ -105,7 +105,7 @@ const logger = createLogger('Service:TeamDataService'); const MIN_TEXT_LENGTH = 30; const MAX_LEAD_TEXTS = 150; -const LEAD_SESSION_PARSE_CACHE_SCHEMA_VERSION = 'combined-v1'; +const LEAD_SESSION_PARSE_CACHE_SCHEMA_VERSION = 'combined-v2'; const PROCESS_HEALTH_INTERVAL_MS = 2_000; const TASK_MAP_YIELD_EVERY = 250; const TASK_COMMENT_NOTIFICATION_SOURCE = 'system_notification'; @@ -3221,7 +3221,8 @@ export class TeamDataService { const MAX_SCAN_BYTES = 8 * 1024 * 1024; const INITIAL_SCAN_BYTES = 256 * 1024; - const textsReversed: InboxMessage[] = []; + const rawLinesReversed: string[] = []; + const seenRawLines = new Set(); const seenMessageIds = new Set(); const handle = await fs.promises.open(jsonlPath, 'r'); try { @@ -3229,7 +3230,7 @@ export class TeamDataService { const fileSize = stat.size; let scanBytes = Math.min(INITIAL_SCAN_BYTES, fileSize); - while (textsReversed.length < maxTexts && scanBytes <= MAX_SCAN_BYTES) { + while (scanBytes <= MAX_SCAN_BYTES) { const start = Math.max(0, fileSize - scanBytes); const buffer = Buffer.alloc(scanBytes); await handle.read(buffer, 0, scanBytes, start); @@ -3241,96 +3242,11 @@ export class TeamDataService { for (let i = lines.length - 1; i >= fromIndex; i--) { const trimmed = lines[i]?.trim(); if (!trimmed) continue; - - let msg: Record; - try { - msg = JSON.parse(trimmed) as Record; - } catch { - continue; - } - - if (msg.type !== 'assistant') continue; - - const message = (msg.message ?? msg) as Record; - const content = message.content; - if (!Array.isArray(content)) continue; - - const timestamp = - typeof msg.timestamp === 'string' ? msg.timestamp : new Date().toISOString(); - - const textParts: string[] = []; - for (const block of content as Record[]) { - if (block.type !== 'text' || typeof block.text !== 'string') continue; - textParts.push(block.text); - } - if (textParts.length === 0) continue; - - const combined = stripAgentBlocks(textParts.join('\n')).trim(); - if (combined.length < MIN_TEXT_LENGTH) continue; - - const toolCallsList: ToolCallMeta[] = []; - const lookaheadLimit = Math.min(i + 200, lines.length); - for (let j = i + 1; j < lookaheadLimit; j++) { - const tLine = lines[j]?.trim(); - if (!tLine) continue; - let tMsg: Record; - try { - tMsg = JSON.parse(tLine) as Record; - } catch { - continue; - } - if (tMsg.type !== 'assistant') continue; - const tMessage = (tMsg.message ?? tMsg) as Record; - const tContent = tMessage.content; - if (!Array.isArray(tContent)) continue; - const tBlocks = tContent as Record[]; - if (tBlocks.some((b) => b.type === 'text')) break; - for (const b of tBlocks) { - if (b.type === 'tool_use' && typeof b.name === 'string' && b.name !== 'SendMessage') { - const input = (b.input ?? {}) as Record; - toolCallsList.push({ - name: b.name, - preview: extractToolPreview(b.name, input), - }); - } - } - } - const toolCalls = toolCallsList.length > 0 ? toolCallsList : undefined; - const toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined; - - const entryUuid = typeof msg.uuid === 'string' ? msg.uuid.trim() : ''; - const assistantMessageId = typeof message.id === 'string' ? message.id.trim() : ''; - const stableMessageId = entryUuid - ? `lead-thought-${entryUuid}` - : assistantMessageId - ? `lead-thought-msg-${assistantMessageId}` - : null; - - const textPrefix = combined - .slice(0, 50) - .replace(/[^\p{L}\p{N}]/gu, '') - .slice(0, 20); - - const messageId = - stableMessageId ?? `lead-session-${leadSessionId}-${timestamp}-${textPrefix}`; - if (seenMessageIds.has(messageId)) continue; - seenMessageIds.add(messageId); - - textsReversed.push({ - from: leadName, - text: combined, - timestamp, - read: true, - source: 'lead_session', - leadSessionId, - messageId, - toolSummary, - toolCalls, - }); - if (textsReversed.length >= maxTexts) break; + if (seenRawLines.has(trimmed)) continue; + seenRawLines.add(trimmed); + rawLinesReversed.push(trimmed); } - if (textsReversed.length >= maxTexts) break; if (scanBytes === fileSize) break; scanBytes = Math.min(fileSize, scanBytes * 2); } @@ -3338,8 +3254,163 @@ export class TeamDataService { await handle.close(); } - textsReversed.reverse(); - return textsReversed.length > maxTexts ? textsReversed.slice(-maxTexts) : textsReversed; + const rawLines = rawLinesReversed.reverse(); + const texts: InboxMessage[] = []; + let syntheticBuffer: { + firstMsg: Record; + firstMessage: Record; + timestamp: string; + parts: string[]; + } | null = null; + + const collectToolCallsAfterIndex = (index: number): ToolCallMeta[] | undefined => { + const toolCallsList: ToolCallMeta[] = []; + const lookaheadLimit = Math.min(index + 200, rawLines.length); + for (let j = index + 1; j < lookaheadLimit; j++) { + const tLine = rawLines[j]?.trim(); + if (!tLine) continue; + let tMsg: Record; + try { + tMsg = JSON.parse(tLine) as Record; + } catch { + continue; + } + if (tMsg.type !== 'assistant') break; + const tMessage = (tMsg.message ?? tMsg) as Record; + const tContent = tMessage.content; + if (!Array.isArray(tContent)) continue; + const tBlocks = tContent as Record[]; + if (tBlocks.some((b) => b.type === 'text')) break; + for (const b of tBlocks) { + if (b.type === 'tool_use' && typeof b.name === 'string' && b.name !== 'SendMessage') { + const input = (b.input ?? {}) as Record; + toolCallsList.push({ + name: b.name, + preview: extractToolPreview(b.name, input), + }); + } + } + } + return toolCallsList.length > 0 ? toolCallsList : undefined; + }; + + const pushLeadText = ( + msg: Record, + message: Record, + combined: string, + timestamp: string, + toolCalls?: ToolCallMeta[], + streamGroup = false + ): void => { + if (combined.length < MIN_TEXT_LENGTH) return; + + const entryUuid = typeof msg.uuid === 'string' ? msg.uuid.trim() : ''; + const assistantMessageId = typeof message.id === 'string' ? message.id.trim() : ''; + const stableMessageId = entryUuid + ? streamGroup + ? `lead-thought-stream-${entryUuid}` + : `lead-thought-${entryUuid}` + : assistantMessageId + ? `lead-thought-msg-${assistantMessageId}` + : null; + + const textPrefix = combined + .slice(0, 50) + .replace(/[^\p{L}\p{N}]/gu, '') + .slice(0, 20); + + const messageId = + stableMessageId ?? `lead-session-${leadSessionId}-${timestamp}-${textPrefix}`; + if (seenMessageIds.has(messageId)) return; + seenMessageIds.add(messageId); + + const toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined; + texts.push({ + from: leadName, + text: combined, + timestamp, + read: true, + source: 'lead_session', + leadSessionId, + messageId, + toolSummary, + toolCalls, + }); + }; + + const flushSyntheticBuffer = (): void => { + if (!syntheticBuffer) return; + const combined = stripAgentBlocks(syntheticBuffer.parts.join('')).trim(); + pushLeadText( + syntheticBuffer.firstMsg, + syntheticBuffer.firstMessage, + combined, + syntheticBuffer.timestamp, + undefined, + true + ); + syntheticBuffer = null; + }; + + for (let i = 0; i < rawLines.length; i++) { + const trimmed = rawLines[i]?.trim(); + if (!trimmed) continue; + + let msg: Record; + try { + msg = JSON.parse(trimmed) as Record; + } catch { + continue; + } + + if (msg.type !== 'assistant') { + flushSyntheticBuffer(); + continue; + } + + const message = (msg.message ?? msg) as Record; + const content = message.content; + if (!Array.isArray(content)) { + flushSyntheticBuffer(); + continue; + } + + const textParts: string[] = []; + for (const block of content as Record[]) { + if (block.type !== 'text' || typeof block.text !== 'string') continue; + textParts.push(block.text); + } + + if (textParts.length === 0) { + if ((content as Record[]).some((block) => block.type === 'tool_use')) { + flushSyntheticBuffer(); + } + continue; + } + + const timestamp = + typeof msg.timestamp === 'string' ? msg.timestamp : new Date().toISOString(); + const isSyntheticChunk = message.model === '' && message.type === 'message'; + if (isSyntheticChunk) { + if (!syntheticBuffer) { + syntheticBuffer = { + firstMsg: msg, + firstMessage: message, + timestamp, + parts: [], + }; + } + syntheticBuffer.parts.push(textParts.join('')); + continue; + } + + flushSyntheticBuffer(); + const combined = stripAgentBlocks(textParts.join('\n')).trim(); + pushLeadText(msg, message, combined, timestamp, collectToolCallsAfterIndex(i)); + } + + flushSyntheticBuffer(); + return texts.length > maxTexts ? texts.slice(-maxTexts) : texts; } private async extractLeadSessionTextsFromJsonl( diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index e45985c8..74322bc5 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -2281,6 +2281,7 @@ interface ProvisioningRun { leadName: string; startedAt: string; textParts: string[]; + textJoinMode?: 'block' | 'stream'; replyVisibility?: 'user' | 'internal_activity'; hasVisibleSendMessage?: boolean; hasUserVisibleSendMessage?: boolean; @@ -2297,6 +2298,14 @@ interface ProvisioningRun { }[]; /** Monotonic counter for individual lead assistant messages. */ leadMsgSeq: number; + /** Active text bubble for token-streamed lead assistant output. */ + liveLeadTextBuffer: { + messageId: string; + text: string; + timestamp: string; + toolCalls?: ToolCallMeta[]; + toolSummary?: string; + } | null; /** Accumulated tool_use details between text messages. */ pendingToolCalls: ToolCallMeta[]; /** Active runtime tool calls keyed by tool_use_id. */ @@ -20997,6 +21006,7 @@ export class TeamProvisioningService { leadRelayCapture: null, activeCrossTeamReplyHints: [], leadMsgSeq: 0, + liveLeadTextBuffer: null, pendingToolCalls: [], activeToolCalls: new Map(), pendingDirectCrossTeamSendRefresh: false, @@ -22311,6 +22321,7 @@ export class TeamProvisioningService { leadRelayCapture: null, activeCrossTeamReplyHints: [], leadMsgSeq: 0, + liveLeadTextBuffer: null, pendingToolCalls: [], activeToolCalls: new Map(), pendingDirectCrossTeamSendRefresh: false, @@ -24577,7 +24588,9 @@ export class TeamProvisioningService { replyText = (await capturePromise).trim() || null; } catch { // Best-effort: if we captured some text but never got result.success, keep it. - const partial = run.leadRelayCapture?.textParts?.join('')?.trim(); + const partial = run.leadRelayCapture + ? this.joinLeadRelayCaptureText(run.leadRelayCapture) + : null; replyText = partial && partial.length > 0 ? partial : null; } finally { if (run.leadRelayCapture) { @@ -31127,6 +31140,21 @@ export class TeamProvisioningService { return null; } + private isSyntheticLeadTextChunk(msg: Record): boolean { + const message = (msg.message ?? msg) as Record; + return message.model === '' && message.type === 'message'; + } + + private joinLeadRelayCaptureText( + capture: NonNullable + ): string { + return capture.textParts.join(capture.textJoinMode === 'stream' ? '' : '\n').trim(); + } + + private resetLiveLeadTextBuffer(run: ProvisioningRun): void { + run.liveLeadTextBuffer = null; + } + private appendProvisioningAssistantText( run: ProvisioningRun, msg: Record, @@ -31172,27 +31200,61 @@ export class TeamProvisioningService { run: ProvisioningRun, cleanText: string, stableMessageId?: string, - messageTimestamp?: string + messageTimestamp?: string, + options?: { coalesceStreamChunk?: boolean } ): void { - run.leadMsgSeq += 1; const leadName = this.getRunLeadName(run); - const messageId = stableMessageId || `lead-turn-${run.runId}-${run.leadMsgSeq}`; const timestamp = typeof messageTimestamp === 'string' && messageTimestamp.trim().length > 0 && Number.isFinite(Date.parse(messageTimestamp)) ? messageTimestamp : nowIso(); - // Attach accumulated tool call details from preceding tool_use messages, then reset. - const toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined; - const toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined; - run.pendingToolCalls = []; + const coalesceStreamChunk = options?.coalesceStreamChunk === true; + let messageId = stableMessageId; + let text = cleanText; + let timestampForMessage = timestamp; + let toolCalls: ToolCallMeta[] | undefined; + let toolSummary: string | undefined; + + if (coalesceStreamChunk) { + if (!run.liveLeadTextBuffer) { + run.leadMsgSeq += 1; + toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined; + toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined; + run.liveLeadTextBuffer = { + messageId: `lead-turn-${run.runId}-${run.leadMsgSeq}`, + text: cleanText, + timestamp, + toolCalls, + toolSummary, + }; + run.pendingToolCalls = []; + } else { + run.liveLeadTextBuffer.text += cleanText; + } + + messageId = run.liveLeadTextBuffer.messageId; + text = stripAgentBlocks(run.liveLeadTextBuffer.text).trim(); + timestampForMessage = run.liveLeadTextBuffer.timestamp; + toolCalls = run.liveLeadTextBuffer.toolCalls; + toolSummary = run.liveLeadTextBuffer.toolSummary; + } else { + this.resetLiveLeadTextBuffer(run); + run.leadMsgSeq += 1; + messageId = messageId || `lead-turn-${run.runId}-${run.leadMsgSeq}`; + // Attach accumulated tool call details from preceding tool_use messages, then reset. + toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined; + toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined; + run.pendingToolCalls = []; + } + const leadMsg: InboxMessage = { from: leadName, - text: cleanText, - timestamp, + text, + timestamp: timestampForMessage, read: true, - summary: cleanText.length > 60 ? cleanText.slice(0, 57) + '...' : cleanText, + summary: text.length > 60 ? text.slice(0, 57) + '...' : text, messageId, source: 'lead_process', toolSummary, @@ -31979,6 +32041,7 @@ export class TeamProvisioningService { } if (msg.type === 'user') { + this.resetLiveLeadTextBuffer(run); // Check for permission_request in raw user message text BEFORE teammate-message parsing. // The permission_request may arrive as plain JSON without wrapper, // and handleNativeTeammateUserMessage only processes blocks. @@ -32056,12 +32119,17 @@ export class TeamProvisioningService { // until relayLeadInboxMessages() finally clears run.leadRelayCapture. if (run.leadRelayCapture && !run.leadRelayCapture.settled) { const capture = run.leadRelayCapture; + if (this.isSyntheticLeadTextChunk(msg)) { + capture.textJoinMode = 'stream'; + } else if (!capture.textJoinMode) { + capture.textJoinMode = 'block'; + } capture.textParts.push(text); if (capture.idleHandle) { clearTimeout(capture.idleHandle); } capture.idleHandle = setTimeout(() => { - const combined = capture.textParts.join('\n').trim(); + const combined = this.joinLeadRelayCaptureText(capture); capture.resolveOnce(combined); }, capture.idleMs); } else if (run.provisioningComplete) { @@ -32074,13 +32142,18 @@ export class TeamProvisioningService { !run.suppressGeminiPostLaunchHydrationOutput && !hasCapturedVisibleSendMessage ) { - const cleanText = stripAgentBlocks(text).trim(); - if (cleanText.length > 0 && !isTeamInternalControlMessageText(cleanText)) { + const isSyntheticChunk = this.isSyntheticLeadTextChunk(msg); + const displayText = isSyntheticChunk ? text : stripAgentBlocks(text).trim(); + if ( + (displayText.length > 0 || (isSyntheticChunk && run.liveLeadTextBuffer)) && + !isTeamInternalControlMessageText(displayText) + ) { this.pushLiveLeadTextMessage( run, - cleanText, + displayText, this.getStableLeadThoughtMessageId(msg) ?? undefined, - messageTimestamp + messageTimestamp, + { coalesceStreamChunk: isSyntheticChunk } ); } } @@ -32088,13 +32161,18 @@ export class TeamProvisioningService { // Pre-ready: keep showing provisioning narration in the banner, but also mirror it // into the live cache so Messages/Activity can show the earliest assistant output. if (!run.silentUserDmForward && !hasCapturedVisibleSendMessage) { - const cleanText = stripAgentBlocks(text).trim(); - if (cleanText.length > 0 && !isTeamInternalControlMessageText(cleanText)) { + const isSyntheticChunk = this.isSyntheticLeadTextChunk(msg); + const displayText = isSyntheticChunk ? text : stripAgentBlocks(text).trim(); + if ( + (displayText.length > 0 || (isSyntheticChunk && run.liveLeadTextBuffer)) && + !isTeamInternalControlMessageText(displayText) + ) { this.pushLiveLeadTextMessage( run, - cleanText, + displayText, this.getStableLeadThoughtMessageId(msg) ?? undefined, - messageTimestamp + messageTimestamp, + { coalesceStreamChunk: isSyntheticChunk } ); } } @@ -32116,6 +32194,7 @@ export class TeamProvisioningService { preview: extractToolPreview(block.name, input), toolUseId: typeof block.id === 'string' ? block.id : undefined, }); + this.resetLiveLeadTextBuffer(run); this.startRuntimeToolActivity(run, this.getRunLeadName(run), block); } } @@ -32264,9 +32343,10 @@ export class TeamProvisioningService { } if (run.leadRelayCapture) { const capture = run.leadRelayCapture; - const combined = capture.textParts.join('\n').trim(); + const combined = this.joinLeadRelayCaptureText(capture); capture.resolveOnce(combined); } + this.resetLiveLeadTextBuffer(run); // Clear silent relay flag after any successful turn. run.activeCrossTeamReplyHints = []; run.pendingInboxRelayCandidates = []; @@ -32302,6 +32382,7 @@ export class TeamProvisioningService { if (run.leadRelayCapture) { run.leadRelayCapture.rejectOnce(errorMsg); } + this.resetLiveLeadTextBuffer(run); // Clear silent relay flag after any errored turn. run.pendingDirectCrossTeamSendRefresh = false; run.activeCrossTeamReplyHints = []; diff --git a/src/renderer/utils/mergeTeamMessages.ts b/src/renderer/utils/mergeTeamMessages.ts index 7a0b3fcc..7adddf2a 100644 --- a/src/renderer/utils/mergeTeamMessages.ts +++ b/src/renderer/utils/mergeTeamMessages.ts @@ -2,12 +2,118 @@ import { toMessageKey } from './teamMessageKey'; import type { InboxMessage } from '@shared/types'; +const MAX_LEAD_FRAGMENT_GAP_MS = 2_000; +const MAX_LEAD_FRAGMENT_AVG_LENGTH = 14; +const MIN_LEAD_FRAGMENT_RUN_LENGTH = 3; + function compareMessages(a: InboxMessage, b: InboxMessage): number { const diff = Date.parse(b.timestamp) - Date.parse(a.timestamp); if (diff !== 0) return diff; return toMessageKey(a).localeCompare(toMessageKey(b)); } +function isLeadThoughtFragmentCandidate(message: InboxMessage): boolean { + if (typeof message.to === 'string' && message.to.trim().length > 0) { + return false; + } + if (message.messageKind || message.toolCalls?.length || message.toolSummary) { + return false; + } + return message.source === 'lead_process' || message.source === 'lead_session'; +} + +function canJoinLeadThoughtFragments(older: InboxMessage, newer: InboxMessage): boolean { + if (!isLeadThoughtFragmentCandidate(older) || !isLeadThoughtFragmentCandidate(newer)) { + return false; + } + if (older.from !== newer.from) { + return false; + } + if ((older.leadSessionId ?? null) !== (newer.leadSessionId ?? null)) { + return false; + } + if (older.source !== newer.source) { + return false; + } + + const olderMs = Date.parse(older.timestamp); + const newerMs = Date.parse(newer.timestamp); + if (!Number.isFinite(olderMs) || !Number.isFinite(newerMs)) { + return false; + } + + return newerMs >= olderMs && newerMs - olderMs <= MAX_LEAD_FRAGMENT_GAP_MS; +} + +function shouldCoalesceLeadThoughtRun(runNewestFirst: InboxMessage[]): boolean { + if (runNewestFirst.length < MIN_LEAD_FRAGMENT_RUN_LENGTH) { + return false; + } + + const totalTrimmedLength = runNewestFirst.reduce( + (total, message) => total + message.text.trim().length, + 0 + ); + return totalTrimmedLength / runNewestFirst.length <= MAX_LEAD_FRAGMENT_AVG_LENGTH; +} + +function coalesceLeadThoughtRun(runNewestFirst: InboxMessage[]): InboxMessage[] { + if (!shouldCoalesceLeadThoughtRun(runNewestFirst)) { + return runNewestFirst; + } + + const chronological = [...runNewestFirst].reverse(); + const combinedText = chronological + .map((message) => message.text) + .join('') + .trim(); + if (!combinedText) { + return runNewestFirst; + } + + const newest = runNewestFirst[0]; + const oldest = chronological[0]; + return [ + { + ...newest, + text: combinedText, + summary: combinedText.length > 60 ? `${combinedText.slice(0, 57)}...` : combinedText, + messageId: `lead-thought-coalesced-${toMessageKey(oldest)}-${runNewestFirst.length}`, + }, + ]; +} + +function coalesceLeadThoughtFragments(messagesNewestFirst: InboxMessage[]): InboxMessage[] { + const result: InboxMessage[] = []; + let run: InboxMessage[] = []; + + const flushRun = (): void => { + if (run.length === 0) return; + result.push(...coalesceLeadThoughtRun(run)); + run = []; + }; + + for (const message of messagesNewestFirst) { + if (!isLeadThoughtFragmentCandidate(message)) { + flushRun(); + result.push(message); + continue; + } + + const currentOldest = run[run.length - 1]; + if (!currentOldest || canJoinLeadThoughtFragments(message, currentOldest)) { + run.push(message); + continue; + } + + flushRun(); + run.push(message); + } + + flushRun(); + return result; +} + /** * Merge multiple message arrays into one newest-first list with stable deduplication. * @@ -23,5 +129,5 @@ export function mergeTeamMessages(...messageLists: readonly InboxMessage[][]): I } } - return Array.from(merged.values()).sort(compareMessages); + return coalesceLeadThoughtFragments(Array.from(merged.values()).sort(compareMessages)); } diff --git a/test/main/services/team/TeamDataService.test.ts b/test/main/services/team/TeamDataService.test.ts index 428c05a7..edf6c0fc 100644 --- a/test/main/services/team/TeamDataService.test.ts +++ b/test/main/services/team/TeamDataService.test.ts @@ -58,6 +58,29 @@ function createLeadAssistantEntry( }; } +function createSyntheticLeadAssistantChunk( + uuid: string, + timestamp: string, + text: string +): Record { + return { + ...createLeadAssistantEntry(uuid, timestamp, text), + message: { + role: 'assistant', + model: '', + id: `msg-${uuid}`, + type: 'message', + stop_reason: 'stop_sequence', + stop_sequence: '', + usage: { + input_tokens: 0, + output_tokens: 0, + }, + content: [{ type: 'text', text }], + }, + }; +} + async function createTempJsonl(entries: Record[]): Promise { const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-data-lead-session-')); tempPaths.push(dir); @@ -4368,6 +4391,38 @@ describe('TeamDataService', () => { expect(linked?.relayOfMessageId).toBeUndefined(); }); + it('coalesces Codex synthetic lead stream chunks into one lead-session message', async () => { + const service = createLeadSessionCachingService(); + const jsonlPath = await createTempJsonl([ + createSyntheticLeadAssistantChunk('chunk-1', '2026-03-27T22:17:01.000Z', 'Соз'), + createSyntheticLeadAssistantChunk('chunk-2', '2026-03-27T22:17:01.010Z', 'дал'), + createSyntheticLeadAssistantChunk( + 'chunk-3', + '2026-03-27T22:17:01.020Z', + ' стартовую задачу для /212 и раздал работу.' + ), + ]); + + const extract = ( + service as unknown as { + extractLeadSessionTextsFromJsonl: ( + jsonlPath: string, + leadName: string, + leadSessionId: string, + maxTexts: number + ) => Promise>; + } + ).extractLeadSessionTextsFromJsonl.bind(service); + + const messages = await extract(jsonlPath, 'team-lead', 'lead-1', 150); + + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + messageId: 'lead-thought-stream-chunk-1', + text: 'Создал стартовую задачу для /212 и раздал работу.', + }); + }); + it('caches unchanged lead-session extraction results and returns defensive clones', async () => { const service = createLeadSessionCachingService(); const jsonlPath = await createTempJsonl([ diff --git a/test/main/services/team/TeamProvisioningService.test.ts b/test/main/services/team/TeamProvisioningService.test.ts index 54528feb..dc7c990e 100644 --- a/test/main/services/team/TeamProvisioningService.test.ts +++ b/test/main/services/team/TeamProvisioningService.test.ts @@ -809,6 +809,63 @@ describe('TeamProvisioningService', () => { }); }); + describe('live lead messages', () => { + it('updates one live message for Codex synthetic text chunks', () => { + const svc = new TeamProvisioningService(); + const internals = svc as unknown as { + pushLiveLeadTextMessage: ( + run: object, + cleanText: string, + stableMessageId?: string, + messageTimestamp?: string, + options?: { coalesceStreamChunk?: boolean } + ) => void; + }; + const run = { + teamName: 'my-team', + runId: 'run-1', + request: { + members: [{ name: 'team-lead', role: 'Team Lead' }], + }, + leadMsgSeq: 0, + liveLeadTextBuffer: null, + pendingToolCalls: [], + lastLeadTextEmitMs: 0, + }; + + internals.pushLiveLeadTextMessage( + run, + 'Соз', + undefined, + '2026-04-17T12:00:00.000Z', + { coalesceStreamChunk: true } + ); + internals.pushLiveLeadTextMessage( + run, + 'дал', + undefined, + '2026-04-17T12:00:00.010Z', + { coalesceStreamChunk: true } + ); + internals.pushLiveLeadTextMessage( + run, + ' стартовую задачу', + undefined, + '2026-04-17T12:00:00.020Z', + { coalesceStreamChunk: true } + ); + + const messages = svc.getLiveLeadProcessMessages('my-team'); + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + messageId: 'lead-turn-run-1-1', + text: 'Создал стартовую задачу', + timestamp: '2026-04-17T12:00:00.000Z', + source: 'lead_process', + }); + }); + }); + describe('OpenCode runtime delivery user-visible impact', () => { it('treats policy none as authoritative over raw failed delivery facts', () => { const svc = new TeamProvisioningService(); diff --git a/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts b/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts index 25a990c2..964cd585 100644 --- a/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts +++ b/test/main/services/team/TeamProvisioningServiceLiveMessages.test.ts @@ -115,7 +115,6 @@ vi.mock('agent-teams-controller', () => ({ }, })); -import type { TeamChangeEvent } from '@shared/types/team'; import { ConfigManager } from '../../../../src/main/services/infrastructure/ConfigManager'; import { clearAutoResumeService, @@ -124,6 +123,8 @@ import { } from '../../../../src/main/services/team/AutoResumeService'; import { TeamProvisioningService } from '../../../../src/main/services/team/TeamProvisioningService'; +import type { TeamChangeEvent } from '@shared/types/team'; + function seedConfig(teamName: string): void { hoisted.files.set( `/mock/teams/${teamName}/config.json`, @@ -166,9 +167,11 @@ interface RunLike { processKilled: boolean; cancelRequested: boolean; provisioningOutputParts: string[]; + provisioningOutputIndexByMessageId: Map; request: { members: { name: string; role?: string }[] }; activeCrossTeamReplyHints?: Array<{ toTeam: string; conversationId: string }>; pendingInboxRelayCandidates?: unknown[]; + liveLeadTextBuffer?: unknown; memberSpawnStatuses: Map; pendingApprovals: Map; } @@ -210,8 +213,10 @@ function attachRun( processKilled: false, cancelRequested: false, provisioningOutputParts: [], + provisioningOutputIndexByMessageId: new Map(), request: { members: [{ name: 'team-lead', role: 'Team Lead' }] }, activeCrossTeamReplyHints: [], + liveLeadTextBuffer: null, memberSpawnStatuses: new Map(), pendingApprovals: new Map(), }; @@ -261,6 +266,38 @@ describe('TeamProvisioningService pre-ready live messages', () => { expect(run.provisioningOutputParts).toHaveLength(1); }); + it('coalesces Codex synthetic chunks through stream handling without dropping spaces', () => { + const service = new TeamProvisioningService(); + seedConfig('my-team'); + const run = attachRun(service, 'my-team', { provisioningComplete: false }); + + const emitSyntheticText = (text: string, timestamp: string): void => { + callHandleStreamJsonMessage(service, run, { + type: 'assistant', + timestamp, + message: { + id: `msg-${timestamp}`, + model: '', + type: 'message', + content: [{ type: 'text', text }], + }, + }); + }; + + emitSyntheticText('Пр', '2026-04-17T12:00:00.000Z'); + emitSyntheticText('ин', '2026-04-17T12:00:00.010Z'); + emitSyntheticText('ял', '2026-04-17T12:00:00.020Z'); + emitSyntheticText(':', '2026-04-17T12:00:00.030Z'); + emitSyntheticText(' раз', '2026-04-17T12:00:00.040Z'); + emitSyntheticText('лож', '2026-04-17T12:00:00.050Z'); + emitSyntheticText('у', '2026-04-17T12:00:00.060Z'); + + const live = service.getLiveLeadProcessMessages('my-team'); + expect(live).toHaveLength(1); + expect(live[0].text).toBe('Принял: разложу'); + expect(live[0].messageId).toBe('lead-turn-run-1-1'); + }); + it('attaches leadSessionId to a live message when the same assistant payload carries session_id', () => { const service = new TeamProvisioningService(); seedConfig('my-team'); @@ -1008,9 +1045,14 @@ describe('TeamProvisioningService pre-ready live messages', () => { ]); const run = attachRun(service, 'my-team', { provisioningComplete: true }); - (service as any).rememberRecentCrossTeamLeadDeliveryMessageIds('my-team', [ - 'm-native-cross-team-dup', - ]); + ( + service as unknown as { + rememberRecentCrossTeamLeadDeliveryMessageIds: ( + teamName: string, + messageIds: string[] + ) => void; + } + ).rememberRecentCrossTeamLeadDeliveryMessageIds('my-team', ['m-native-cross-team-dup']); callHandleStreamJsonMessage(service, run, { type: 'user', diff --git a/test/renderer/utils/mergeTeamMessages.test.ts b/test/renderer/utils/mergeTeamMessages.test.ts index d8da3645..5df165a8 100644 --- a/test/renderer/utils/mergeTeamMessages.test.ts +++ b/test/renderer/utils/mergeTeamMessages.test.ts @@ -59,4 +59,80 @@ describe('mergeTeamMessages', () => { expect(merged[0].summary).toBe('live'); expect(merged[0].source).toBe('lead_process'); }); + + it('coalesces pathological lead thought fragments before display', () => { + const messages = [ + makeMessage({ + from: 'team-lead', + text: 'ложу', + timestamp: '2026-01-01T00:00:00.060Z', + messageId: 'chunk-4', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + makeMessage({ + from: 'team-lead', + text: ' раз', + timestamp: '2026-01-01T00:00:00.040Z', + messageId: 'chunk-3', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + makeMessage({ + from: 'team-lead', + text: ':', + timestamp: '2026-01-01T00:00:00.030Z', + messageId: 'chunk-2', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + makeMessage({ + from: 'team-lead', + text: 'Принял', + timestamp: '2026-01-01T00:00:00.000Z', + messageId: 'chunk-1', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + ]; + + const merged = mergeTeamMessages(messages); + + expect(merged).toHaveLength(1); + expect(merged[0].text).toBe('Принял: разложу'); + expect(merged[0].messageId).toMatch(/^lead-thought-coalesced-/); + }); + + it('does not coalesce separate short lead thoughts across a large gap', () => { + const messages = [ + makeMessage({ + from: 'team-lead', + text: 'Done', + timestamp: '2026-01-01T00:00:05.000Z', + messageId: 'm3', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + makeMessage({ + from: 'team-lead', + text: 'OK', + timestamp: '2026-01-01T00:00:00.000Z', + messageId: 'm2', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + makeMessage({ + from: 'team-lead', + text: 'Hi', + timestamp: '2025-12-31T23:59:55.000Z', + messageId: 'm1', + source: 'lead_session', + leadSessionId: 'sess-1', + }), + ]; + + const merged = mergeTeamMessages(messages); + + expect(merged.map((message) => message.text)).toEqual(['Done', 'OK', 'Hi']); + }); });