fix(team): coalesce lead stream fragments

This commit is contained in:
777genius 2026-05-22 21:00:20 +03:00
parent 077c749cb7
commit 511dd5d7a6
7 changed files with 607 additions and 119 deletions

View file

@ -105,7 +105,7 @@ const logger = createLogger('Service:TeamDataService');
const MIN_TEXT_LENGTH = 30;
const MAX_LEAD_TEXTS = 150;
const LEAD_SESSION_PARSE_CACHE_SCHEMA_VERSION = 'combined-v1';
const LEAD_SESSION_PARSE_CACHE_SCHEMA_VERSION = 'combined-v2';
const PROCESS_HEALTH_INTERVAL_MS = 2_000;
const TASK_MAP_YIELD_EVERY = 250;
const TASK_COMMENT_NOTIFICATION_SOURCE = 'system_notification';
@ -3221,7 +3221,8 @@ export class TeamDataService {
const MAX_SCAN_BYTES = 8 * 1024 * 1024;
const INITIAL_SCAN_BYTES = 256 * 1024;
const textsReversed: InboxMessage[] = [];
const rawLinesReversed: string[] = [];
const seenRawLines = new Set<string>();
const seenMessageIds = new Set<string>();
const handle = await fs.promises.open(jsonlPath, 'r');
try {
@ -3229,7 +3230,7 @@ export class TeamDataService {
const fileSize = stat.size;
let scanBytes = Math.min(INITIAL_SCAN_BYTES, fileSize);
while (textsReversed.length < maxTexts && scanBytes <= MAX_SCAN_BYTES) {
while (scanBytes <= MAX_SCAN_BYTES) {
const start = Math.max(0, fileSize - scanBytes);
const buffer = Buffer.alloc(scanBytes);
await handle.read(buffer, 0, scanBytes, start);
@ -3241,96 +3242,11 @@ export class TeamDataService {
for (let i = lines.length - 1; i >= fromIndex; i--) {
const trimmed = lines[i]?.trim();
if (!trimmed) continue;
let msg: Record<string, unknown>;
try {
msg = JSON.parse(trimmed) as Record<string, unknown>;
} catch {
continue;
}
if (msg.type !== 'assistant') continue;
const message = (msg.message ?? msg) as Record<string, unknown>;
const content = message.content;
if (!Array.isArray(content)) continue;
const timestamp =
typeof msg.timestamp === 'string' ? msg.timestamp : new Date().toISOString();
const textParts: string[] = [];
for (const block of content as Record<string, unknown>[]) {
if (block.type !== 'text' || typeof block.text !== 'string') continue;
textParts.push(block.text);
}
if (textParts.length === 0) continue;
const combined = stripAgentBlocks(textParts.join('\n')).trim();
if (combined.length < MIN_TEXT_LENGTH) continue;
const toolCallsList: ToolCallMeta[] = [];
const lookaheadLimit = Math.min(i + 200, lines.length);
for (let j = i + 1; j < lookaheadLimit; j++) {
const tLine = lines[j]?.trim();
if (!tLine) continue;
let tMsg: Record<string, unknown>;
try {
tMsg = JSON.parse(tLine) as Record<string, unknown>;
} catch {
continue;
}
if (tMsg.type !== 'assistant') continue;
const tMessage = (tMsg.message ?? tMsg) as Record<string, unknown>;
const tContent = tMessage.content;
if (!Array.isArray(tContent)) continue;
const tBlocks = tContent as Record<string, unknown>[];
if (tBlocks.some((b) => b.type === 'text')) break;
for (const b of tBlocks) {
if (b.type === 'tool_use' && typeof b.name === 'string' && b.name !== 'SendMessage') {
const input = (b.input ?? {}) as Record<string, unknown>;
toolCallsList.push({
name: b.name,
preview: extractToolPreview(b.name, input),
});
}
}
}
const toolCalls = toolCallsList.length > 0 ? toolCallsList : undefined;
const toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined;
const entryUuid = typeof msg.uuid === 'string' ? msg.uuid.trim() : '';
const assistantMessageId = typeof message.id === 'string' ? message.id.trim() : '';
const stableMessageId = entryUuid
? `lead-thought-${entryUuid}`
: assistantMessageId
? `lead-thought-msg-${assistantMessageId}`
: null;
const textPrefix = combined
.slice(0, 50)
.replace(/[^\p{L}\p{N}]/gu, '')
.slice(0, 20);
const messageId =
stableMessageId ?? `lead-session-${leadSessionId}-${timestamp}-${textPrefix}`;
if (seenMessageIds.has(messageId)) continue;
seenMessageIds.add(messageId);
textsReversed.push({
from: leadName,
text: combined,
timestamp,
read: true,
source: 'lead_session',
leadSessionId,
messageId,
toolSummary,
toolCalls,
});
if (textsReversed.length >= maxTexts) break;
if (seenRawLines.has(trimmed)) continue;
seenRawLines.add(trimmed);
rawLinesReversed.push(trimmed);
}
if (textsReversed.length >= maxTexts) break;
if (scanBytes === fileSize) break;
scanBytes = Math.min(fileSize, scanBytes * 2);
}
@ -3338,8 +3254,163 @@ export class TeamDataService {
await handle.close();
}
textsReversed.reverse();
return textsReversed.length > maxTexts ? textsReversed.slice(-maxTexts) : textsReversed;
const rawLines = rawLinesReversed.reverse();
const texts: InboxMessage[] = [];
let syntheticBuffer: {
firstMsg: Record<string, unknown>;
firstMessage: Record<string, unknown>;
timestamp: string;
parts: string[];
} | null = null;
const collectToolCallsAfterIndex = (index: number): ToolCallMeta[] | undefined => {
const toolCallsList: ToolCallMeta[] = [];
const lookaheadLimit = Math.min(index + 200, rawLines.length);
for (let j = index + 1; j < lookaheadLimit; j++) {
const tLine = rawLines[j]?.trim();
if (!tLine) continue;
let tMsg: Record<string, unknown>;
try {
tMsg = JSON.parse(tLine) as Record<string, unknown>;
} catch {
continue;
}
if (tMsg.type !== 'assistant') break;
const tMessage = (tMsg.message ?? tMsg) as Record<string, unknown>;
const tContent = tMessage.content;
if (!Array.isArray(tContent)) continue;
const tBlocks = tContent as Record<string, unknown>[];
if (tBlocks.some((b) => b.type === 'text')) break;
for (const b of tBlocks) {
if (b.type === 'tool_use' && typeof b.name === 'string' && b.name !== 'SendMessage') {
const input = (b.input ?? {}) as Record<string, unknown>;
toolCallsList.push({
name: b.name,
preview: extractToolPreview(b.name, input),
});
}
}
}
return toolCallsList.length > 0 ? toolCallsList : undefined;
};
const pushLeadText = (
msg: Record<string, unknown>,
message: Record<string, unknown>,
combined: string,
timestamp: string,
toolCalls?: ToolCallMeta[],
streamGroup = false
): void => {
if (combined.length < MIN_TEXT_LENGTH) return;
const entryUuid = typeof msg.uuid === 'string' ? msg.uuid.trim() : '';
const assistantMessageId = typeof message.id === 'string' ? message.id.trim() : '';
const stableMessageId = entryUuid
? streamGroup
? `lead-thought-stream-${entryUuid}`
: `lead-thought-${entryUuid}`
: assistantMessageId
? `lead-thought-msg-${assistantMessageId}`
: null;
const textPrefix = combined
.slice(0, 50)
.replace(/[^\p{L}\p{N}]/gu, '')
.slice(0, 20);
const messageId =
stableMessageId ?? `lead-session-${leadSessionId}-${timestamp}-${textPrefix}`;
if (seenMessageIds.has(messageId)) return;
seenMessageIds.add(messageId);
const toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined;
texts.push({
from: leadName,
text: combined,
timestamp,
read: true,
source: 'lead_session',
leadSessionId,
messageId,
toolSummary,
toolCalls,
});
};
const flushSyntheticBuffer = (): void => {
if (!syntheticBuffer) return;
const combined = stripAgentBlocks(syntheticBuffer.parts.join('')).trim();
pushLeadText(
syntheticBuffer.firstMsg,
syntheticBuffer.firstMessage,
combined,
syntheticBuffer.timestamp,
undefined,
true
);
syntheticBuffer = null;
};
for (let i = 0; i < rawLines.length; i++) {
const trimmed = rawLines[i]?.trim();
if (!trimmed) continue;
let msg: Record<string, unknown>;
try {
msg = JSON.parse(trimmed) as Record<string, unknown>;
} catch {
continue;
}
if (msg.type !== 'assistant') {
flushSyntheticBuffer();
continue;
}
const message = (msg.message ?? msg) as Record<string, unknown>;
const content = message.content;
if (!Array.isArray(content)) {
flushSyntheticBuffer();
continue;
}
const textParts: string[] = [];
for (const block of content as Record<string, unknown>[]) {
if (block.type !== 'text' || typeof block.text !== 'string') continue;
textParts.push(block.text);
}
if (textParts.length === 0) {
if ((content as Record<string, unknown>[]).some((block) => block.type === 'tool_use')) {
flushSyntheticBuffer();
}
continue;
}
const timestamp =
typeof msg.timestamp === 'string' ? msg.timestamp : new Date().toISOString();
const isSyntheticChunk = message.model === '<synthetic>' && message.type === 'message';
if (isSyntheticChunk) {
if (!syntheticBuffer) {
syntheticBuffer = {
firstMsg: msg,
firstMessage: message,
timestamp,
parts: [],
};
}
syntheticBuffer.parts.push(textParts.join(''));
continue;
}
flushSyntheticBuffer();
const combined = stripAgentBlocks(textParts.join('\n')).trim();
pushLeadText(msg, message, combined, timestamp, collectToolCallsAfterIndex(i));
}
flushSyntheticBuffer();
return texts.length > maxTexts ? texts.slice(-maxTexts) : texts;
}
private async extractLeadSessionTextsFromJsonl(

View file

@ -2281,6 +2281,7 @@ interface ProvisioningRun {
leadName: string;
startedAt: string;
textParts: string[];
textJoinMode?: 'block' | 'stream';
replyVisibility?: 'user' | 'internal_activity';
hasVisibleSendMessage?: boolean;
hasUserVisibleSendMessage?: boolean;
@ -2297,6 +2298,14 @@ interface ProvisioningRun {
}[];
/** Monotonic counter for individual lead assistant messages. */
leadMsgSeq: number;
/** Active text bubble for token-streamed lead assistant output. */
liveLeadTextBuffer: {
messageId: string;
text: string;
timestamp: string;
toolCalls?: ToolCallMeta[];
toolSummary?: string;
} | null;
/** Accumulated tool_use details between text messages. */
pendingToolCalls: ToolCallMeta[];
/** Active runtime tool calls keyed by tool_use_id. */
@ -20997,6 +21006,7 @@ export class TeamProvisioningService {
leadRelayCapture: null,
activeCrossTeamReplyHints: [],
leadMsgSeq: 0,
liveLeadTextBuffer: null,
pendingToolCalls: [],
activeToolCalls: new Map(),
pendingDirectCrossTeamSendRefresh: false,
@ -22311,6 +22321,7 @@ export class TeamProvisioningService {
leadRelayCapture: null,
activeCrossTeamReplyHints: [],
leadMsgSeq: 0,
liveLeadTextBuffer: null,
pendingToolCalls: [],
activeToolCalls: new Map(),
pendingDirectCrossTeamSendRefresh: false,
@ -24577,7 +24588,9 @@ export class TeamProvisioningService {
replyText = (await capturePromise).trim() || null;
} catch {
// Best-effort: if we captured some text but never got result.success, keep it.
const partial = run.leadRelayCapture?.textParts?.join('')?.trim();
const partial = run.leadRelayCapture
? this.joinLeadRelayCaptureText(run.leadRelayCapture)
: null;
replyText = partial && partial.length > 0 ? partial : null;
} finally {
if (run.leadRelayCapture) {
@ -31127,6 +31140,21 @@ export class TeamProvisioningService {
return null;
}
private isSyntheticLeadTextChunk(msg: Record<string, unknown>): boolean {
const message = (msg.message ?? msg) as Record<string, unknown>;
return message.model === '<synthetic>' && message.type === 'message';
}
private joinLeadRelayCaptureText(
capture: NonNullable<ProvisioningRun['leadRelayCapture']>
): string {
return capture.textParts.join(capture.textJoinMode === 'stream' ? '' : '\n').trim();
}
private resetLiveLeadTextBuffer(run: ProvisioningRun): void {
run.liveLeadTextBuffer = null;
}
private appendProvisioningAssistantText(
run: ProvisioningRun,
msg: Record<string, unknown>,
@ -31172,27 +31200,61 @@ export class TeamProvisioningService {
run: ProvisioningRun,
cleanText: string,
stableMessageId?: string,
messageTimestamp?: string
messageTimestamp?: string,
options?: { coalesceStreamChunk?: boolean }
): void {
run.leadMsgSeq += 1;
const leadName = this.getRunLeadName(run);
const messageId = stableMessageId || `lead-turn-${run.runId}-${run.leadMsgSeq}`;
const timestamp =
typeof messageTimestamp === 'string' &&
messageTimestamp.trim().length > 0 &&
Number.isFinite(Date.parse(messageTimestamp))
? messageTimestamp
: nowIso();
// Attach accumulated tool call details from preceding tool_use messages, then reset.
const toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined;
const toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined;
run.pendingToolCalls = [];
const coalesceStreamChunk = options?.coalesceStreamChunk === true;
let messageId = stableMessageId;
let text = cleanText;
let timestampForMessage = timestamp;
let toolCalls: ToolCallMeta[] | undefined;
let toolSummary: string | undefined;
if (coalesceStreamChunk) {
if (!run.liveLeadTextBuffer) {
run.leadMsgSeq += 1;
toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined;
toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined;
run.liveLeadTextBuffer = {
messageId: `lead-turn-${run.runId}-${run.leadMsgSeq}`,
text: cleanText,
timestamp,
toolCalls,
toolSummary,
};
run.pendingToolCalls = [];
} else {
run.liveLeadTextBuffer.text += cleanText;
}
messageId = run.liveLeadTextBuffer.messageId;
text = stripAgentBlocks(run.liveLeadTextBuffer.text).trim();
timestampForMessage = run.liveLeadTextBuffer.timestamp;
toolCalls = run.liveLeadTextBuffer.toolCalls;
toolSummary = run.liveLeadTextBuffer.toolSummary;
} else {
this.resetLiveLeadTextBuffer(run);
run.leadMsgSeq += 1;
messageId = messageId || `lead-turn-${run.runId}-${run.leadMsgSeq}`;
// Attach accumulated tool call details from preceding tool_use messages, then reset.
toolCalls = run.pendingToolCalls.length > 0 ? [...run.pendingToolCalls] : undefined;
toolSummary = toolCalls ? formatToolSummaryFromCalls(toolCalls) : undefined;
run.pendingToolCalls = [];
}
const leadMsg: InboxMessage = {
from: leadName,
text: cleanText,
timestamp,
text,
timestamp: timestampForMessage,
read: true,
summary: cleanText.length > 60 ? cleanText.slice(0, 57) + '...' : cleanText,
summary: text.length > 60 ? text.slice(0, 57) + '...' : text,
messageId,
source: 'lead_process',
toolSummary,
@ -31979,6 +32041,7 @@ export class TeamProvisioningService {
}
if (msg.type === 'user') {
this.resetLiveLeadTextBuffer(run);
// Check for permission_request in raw user message text BEFORE teammate-message parsing.
// The permission_request may arrive as plain JSON without <teammate-message> wrapper,
// and handleNativeTeammateUserMessage only processes <teammate-message> blocks.
@ -32056,12 +32119,17 @@ export class TeamProvisioningService {
// until relayLeadInboxMessages() finally clears run.leadRelayCapture.
if (run.leadRelayCapture && !run.leadRelayCapture.settled) {
const capture = run.leadRelayCapture;
if (this.isSyntheticLeadTextChunk(msg)) {
capture.textJoinMode = 'stream';
} else if (!capture.textJoinMode) {
capture.textJoinMode = 'block';
}
capture.textParts.push(text);
if (capture.idleHandle) {
clearTimeout(capture.idleHandle);
}
capture.idleHandle = setTimeout(() => {
const combined = capture.textParts.join('\n').trim();
const combined = this.joinLeadRelayCaptureText(capture);
capture.resolveOnce(combined);
}, capture.idleMs);
} else if (run.provisioningComplete) {
@ -32074,13 +32142,18 @@ export class TeamProvisioningService {
!run.suppressGeminiPostLaunchHydrationOutput &&
!hasCapturedVisibleSendMessage
) {
const cleanText = stripAgentBlocks(text).trim();
if (cleanText.length > 0 && !isTeamInternalControlMessageText(cleanText)) {
const isSyntheticChunk = this.isSyntheticLeadTextChunk(msg);
const displayText = isSyntheticChunk ? text : stripAgentBlocks(text).trim();
if (
(displayText.length > 0 || (isSyntheticChunk && run.liveLeadTextBuffer)) &&
!isTeamInternalControlMessageText(displayText)
) {
this.pushLiveLeadTextMessage(
run,
cleanText,
displayText,
this.getStableLeadThoughtMessageId(msg) ?? undefined,
messageTimestamp
messageTimestamp,
{ coalesceStreamChunk: isSyntheticChunk }
);
}
}
@ -32088,13 +32161,18 @@ export class TeamProvisioningService {
// Pre-ready: keep showing provisioning narration in the banner, but also mirror it
// into the live cache so Messages/Activity can show the earliest assistant output.
if (!run.silentUserDmForward && !hasCapturedVisibleSendMessage) {
const cleanText = stripAgentBlocks(text).trim();
if (cleanText.length > 0 && !isTeamInternalControlMessageText(cleanText)) {
const isSyntheticChunk = this.isSyntheticLeadTextChunk(msg);
const displayText = isSyntheticChunk ? text : stripAgentBlocks(text).trim();
if (
(displayText.length > 0 || (isSyntheticChunk && run.liveLeadTextBuffer)) &&
!isTeamInternalControlMessageText(displayText)
) {
this.pushLiveLeadTextMessage(
run,
cleanText,
displayText,
this.getStableLeadThoughtMessageId(msg) ?? undefined,
messageTimestamp
messageTimestamp,
{ coalesceStreamChunk: isSyntheticChunk }
);
}
}
@ -32116,6 +32194,7 @@ export class TeamProvisioningService {
preview: extractToolPreview(block.name, input),
toolUseId: typeof block.id === 'string' ? block.id : undefined,
});
this.resetLiveLeadTextBuffer(run);
this.startRuntimeToolActivity(run, this.getRunLeadName(run), block);
}
}
@ -32264,9 +32343,10 @@ export class TeamProvisioningService {
}
if (run.leadRelayCapture) {
const capture = run.leadRelayCapture;
const combined = capture.textParts.join('\n').trim();
const combined = this.joinLeadRelayCaptureText(capture);
capture.resolveOnce(combined);
}
this.resetLiveLeadTextBuffer(run);
// Clear silent relay flag after any successful turn.
run.activeCrossTeamReplyHints = [];
run.pendingInboxRelayCandidates = [];
@ -32302,6 +32382,7 @@ export class TeamProvisioningService {
if (run.leadRelayCapture) {
run.leadRelayCapture.rejectOnce(errorMsg);
}
this.resetLiveLeadTextBuffer(run);
// Clear silent relay flag after any errored turn.
run.pendingDirectCrossTeamSendRefresh = false;
run.activeCrossTeamReplyHints = [];

View file

@ -2,12 +2,118 @@ import { toMessageKey } from './teamMessageKey';
import type { InboxMessage } from '@shared/types';
const MAX_LEAD_FRAGMENT_GAP_MS = 2_000;
const MAX_LEAD_FRAGMENT_AVG_LENGTH = 14;
const MIN_LEAD_FRAGMENT_RUN_LENGTH = 3;
function compareMessages(a: InboxMessage, b: InboxMessage): number {
const diff = Date.parse(b.timestamp) - Date.parse(a.timestamp);
if (diff !== 0) return diff;
return toMessageKey(a).localeCompare(toMessageKey(b));
}
function isLeadThoughtFragmentCandidate(message: InboxMessage): boolean {
if (typeof message.to === 'string' && message.to.trim().length > 0) {
return false;
}
if (message.messageKind || message.toolCalls?.length || message.toolSummary) {
return false;
}
return message.source === 'lead_process' || message.source === 'lead_session';
}
function canJoinLeadThoughtFragments(older: InboxMessage, newer: InboxMessage): boolean {
if (!isLeadThoughtFragmentCandidate(older) || !isLeadThoughtFragmentCandidate(newer)) {
return false;
}
if (older.from !== newer.from) {
return false;
}
if ((older.leadSessionId ?? null) !== (newer.leadSessionId ?? null)) {
return false;
}
if (older.source !== newer.source) {
return false;
}
const olderMs = Date.parse(older.timestamp);
const newerMs = Date.parse(newer.timestamp);
if (!Number.isFinite(olderMs) || !Number.isFinite(newerMs)) {
return false;
}
return newerMs >= olderMs && newerMs - olderMs <= MAX_LEAD_FRAGMENT_GAP_MS;
}
function shouldCoalesceLeadThoughtRun(runNewestFirst: InboxMessage[]): boolean {
if (runNewestFirst.length < MIN_LEAD_FRAGMENT_RUN_LENGTH) {
return false;
}
const totalTrimmedLength = runNewestFirst.reduce(
(total, message) => total + message.text.trim().length,
0
);
return totalTrimmedLength / runNewestFirst.length <= MAX_LEAD_FRAGMENT_AVG_LENGTH;
}
function coalesceLeadThoughtRun(runNewestFirst: InboxMessage[]): InboxMessage[] {
if (!shouldCoalesceLeadThoughtRun(runNewestFirst)) {
return runNewestFirst;
}
const chronological = [...runNewestFirst].reverse();
const combinedText = chronological
.map((message) => message.text)
.join('')
.trim();
if (!combinedText) {
return runNewestFirst;
}
const newest = runNewestFirst[0];
const oldest = chronological[0];
return [
{
...newest,
text: combinedText,
summary: combinedText.length > 60 ? `${combinedText.slice(0, 57)}...` : combinedText,
messageId: `lead-thought-coalesced-${toMessageKey(oldest)}-${runNewestFirst.length}`,
},
];
}
function coalesceLeadThoughtFragments(messagesNewestFirst: InboxMessage[]): InboxMessage[] {
const result: InboxMessage[] = [];
let run: InboxMessage[] = [];
const flushRun = (): void => {
if (run.length === 0) return;
result.push(...coalesceLeadThoughtRun(run));
run = [];
};
for (const message of messagesNewestFirst) {
if (!isLeadThoughtFragmentCandidate(message)) {
flushRun();
result.push(message);
continue;
}
const currentOldest = run[run.length - 1];
if (!currentOldest || canJoinLeadThoughtFragments(message, currentOldest)) {
run.push(message);
continue;
}
flushRun();
run.push(message);
}
flushRun();
return result;
}
/**
* Merge multiple message arrays into one newest-first list with stable deduplication.
*
@ -23,5 +129,5 @@ export function mergeTeamMessages(...messageLists: readonly InboxMessage[][]): I
}
}
return Array.from(merged.values()).sort(compareMessages);
return coalesceLeadThoughtFragments(Array.from(merged.values()).sort(compareMessages));
}

View file

@ -58,6 +58,29 @@ function createLeadAssistantEntry(
};
}
function createSyntheticLeadAssistantChunk(
uuid: string,
timestamp: string,
text: string
): Record<string, unknown> {
return {
...createLeadAssistantEntry(uuid, timestamp, text),
message: {
role: 'assistant',
model: '<synthetic>',
id: `msg-${uuid}`,
type: 'message',
stop_reason: 'stop_sequence',
stop_sequence: '',
usage: {
input_tokens: 0,
output_tokens: 0,
},
content: [{ type: 'text', text }],
},
};
}
async function createTempJsonl(entries: Record<string, unknown>[]): Promise<string> {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-data-lead-session-'));
tempPaths.push(dir);
@ -4368,6 +4391,38 @@ describe('TeamDataService', () => {
expect(linked?.relayOfMessageId).toBeUndefined();
});
it('coalesces Codex synthetic lead stream chunks into one lead-session message', async () => {
const service = createLeadSessionCachingService();
const jsonlPath = await createTempJsonl([
createSyntheticLeadAssistantChunk('chunk-1', '2026-03-27T22:17:01.000Z', 'Соз'),
createSyntheticLeadAssistantChunk('chunk-2', '2026-03-27T22:17:01.010Z', 'дал'),
createSyntheticLeadAssistantChunk(
'chunk-3',
'2026-03-27T22:17:01.020Z',
' стартовую задачу для /212 и раздал работу.'
),
]);
const extract = (
service as unknown as {
extractLeadSessionTextsFromJsonl: (
jsonlPath: string,
leadName: string,
leadSessionId: string,
maxTexts: number
) => Promise<Array<{ messageId?: string; text: string }>>;
}
).extractLeadSessionTextsFromJsonl.bind(service);
const messages = await extract(jsonlPath, 'team-lead', 'lead-1', 150);
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
messageId: 'lead-thought-stream-chunk-1',
text: 'Создал стартовую задачу для /212 и раздал работу.',
});
});
it('caches unchanged lead-session extraction results and returns defensive clones', async () => {
const service = createLeadSessionCachingService();
const jsonlPath = await createTempJsonl([

View file

@ -809,6 +809,63 @@ describe('TeamProvisioningService', () => {
});
});
describe('live lead messages', () => {
it('updates one live message for Codex synthetic text chunks', () => {
const svc = new TeamProvisioningService();
const internals = svc as unknown as {
pushLiveLeadTextMessage: (
run: object,
cleanText: string,
stableMessageId?: string,
messageTimestamp?: string,
options?: { coalesceStreamChunk?: boolean }
) => void;
};
const run = {
teamName: 'my-team',
runId: 'run-1',
request: {
members: [{ name: 'team-lead', role: 'Team Lead' }],
},
leadMsgSeq: 0,
liveLeadTextBuffer: null,
pendingToolCalls: [],
lastLeadTextEmitMs: 0,
};
internals.pushLiveLeadTextMessage(
run,
'Соз',
undefined,
'2026-04-17T12:00:00.000Z',
{ coalesceStreamChunk: true }
);
internals.pushLiveLeadTextMessage(
run,
'дал',
undefined,
'2026-04-17T12:00:00.010Z',
{ coalesceStreamChunk: true }
);
internals.pushLiveLeadTextMessage(
run,
' стартовую задачу',
undefined,
'2026-04-17T12:00:00.020Z',
{ coalesceStreamChunk: true }
);
const messages = svc.getLiveLeadProcessMessages('my-team');
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
messageId: 'lead-turn-run-1-1',
text: 'Создал стартовую задачу',
timestamp: '2026-04-17T12:00:00.000Z',
source: 'lead_process',
});
});
});
describe('OpenCode runtime delivery user-visible impact', () => {
it('treats policy none as authoritative over raw failed delivery facts', () => {
const svc = new TeamProvisioningService();

View file

@ -115,7 +115,6 @@ vi.mock('agent-teams-controller', () => ({
},
}));
import type { TeamChangeEvent } from '@shared/types/team';
import { ConfigManager } from '../../../../src/main/services/infrastructure/ConfigManager';
import {
clearAutoResumeService,
@ -124,6 +123,8 @@ import {
} from '../../../../src/main/services/team/AutoResumeService';
import { TeamProvisioningService } from '../../../../src/main/services/team/TeamProvisioningService';
import type { TeamChangeEvent } from '@shared/types/team';
function seedConfig(teamName: string): void {
hoisted.files.set(
`/mock/teams/${teamName}/config.json`,
@ -166,9 +167,11 @@ interface RunLike {
processKilled: boolean;
cancelRequested: boolean;
provisioningOutputParts: string[];
provisioningOutputIndexByMessageId: Map<string, number>;
request: { members: { name: string; role?: string }[] };
activeCrossTeamReplyHints?: Array<{ toTeam: string; conversationId: string }>;
pendingInboxRelayCandidates?: unknown[];
liveLeadTextBuffer?: unknown;
memberSpawnStatuses: Map<string, unknown>;
pendingApprovals: Map<string, unknown>;
}
@ -210,8 +213,10 @@ function attachRun(
processKilled: false,
cancelRequested: false,
provisioningOutputParts: [],
provisioningOutputIndexByMessageId: new Map(),
request: { members: [{ name: 'team-lead', role: 'Team Lead' }] },
activeCrossTeamReplyHints: [],
liveLeadTextBuffer: null,
memberSpawnStatuses: new Map(),
pendingApprovals: new Map(),
};
@ -261,6 +266,38 @@ describe('TeamProvisioningService pre-ready live messages', () => {
expect(run.provisioningOutputParts).toHaveLength(1);
});
it('coalesces Codex synthetic chunks through stream handling without dropping spaces', () => {
const service = new TeamProvisioningService();
seedConfig('my-team');
const run = attachRun(service, 'my-team', { provisioningComplete: false });
const emitSyntheticText = (text: string, timestamp: string): void => {
callHandleStreamJsonMessage(service, run, {
type: 'assistant',
timestamp,
message: {
id: `msg-${timestamp}`,
model: '<synthetic>',
type: 'message',
content: [{ type: 'text', text }],
},
});
};
emitSyntheticText('Пр', '2026-04-17T12:00:00.000Z');
emitSyntheticText('ин', '2026-04-17T12:00:00.010Z');
emitSyntheticText('ял', '2026-04-17T12:00:00.020Z');
emitSyntheticText(':', '2026-04-17T12:00:00.030Z');
emitSyntheticText(' раз', '2026-04-17T12:00:00.040Z');
emitSyntheticText('лож', '2026-04-17T12:00:00.050Z');
emitSyntheticText('у', '2026-04-17T12:00:00.060Z');
const live = service.getLiveLeadProcessMessages('my-team');
expect(live).toHaveLength(1);
expect(live[0].text).toBe('Принял: разложу');
expect(live[0].messageId).toBe('lead-turn-run-1-1');
});
it('attaches leadSessionId to a live message when the same assistant payload carries session_id', () => {
const service = new TeamProvisioningService();
seedConfig('my-team');
@ -1008,9 +1045,14 @@ describe('TeamProvisioningService pre-ready live messages', () => {
]);
const run = attachRun(service, 'my-team', { provisioningComplete: true });
(service as any).rememberRecentCrossTeamLeadDeliveryMessageIds('my-team', [
'm-native-cross-team-dup',
]);
(
service as unknown as {
rememberRecentCrossTeamLeadDeliveryMessageIds: (
teamName: string,
messageIds: string[]
) => void;
}
).rememberRecentCrossTeamLeadDeliveryMessageIds('my-team', ['m-native-cross-team-dup']);
callHandleStreamJsonMessage(service, run, {
type: 'user',

View file

@ -59,4 +59,80 @@ describe('mergeTeamMessages', () => {
expect(merged[0].summary).toBe('live');
expect(merged[0].source).toBe('lead_process');
});
it('coalesces pathological lead thought fragments before display', () => {
const messages = [
makeMessage({
from: 'team-lead',
text: 'ложу',
timestamp: '2026-01-01T00:00:00.060Z',
messageId: 'chunk-4',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
makeMessage({
from: 'team-lead',
text: ' раз',
timestamp: '2026-01-01T00:00:00.040Z',
messageId: 'chunk-3',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
makeMessage({
from: 'team-lead',
text: ':',
timestamp: '2026-01-01T00:00:00.030Z',
messageId: 'chunk-2',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
makeMessage({
from: 'team-lead',
text: 'Принял',
timestamp: '2026-01-01T00:00:00.000Z',
messageId: 'chunk-1',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
];
const merged = mergeTeamMessages(messages);
expect(merged).toHaveLength(1);
expect(merged[0].text).toBe('Принял: разложу');
expect(merged[0].messageId).toMatch(/^lead-thought-coalesced-/);
});
it('does not coalesce separate short lead thoughts across a large gap', () => {
const messages = [
makeMessage({
from: 'team-lead',
text: 'Done',
timestamp: '2026-01-01T00:00:05.000Z',
messageId: 'm3',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
makeMessage({
from: 'team-lead',
text: 'OK',
timestamp: '2026-01-01T00:00:00.000Z',
messageId: 'm2',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
makeMessage({
from: 'team-lead',
text: 'Hi',
timestamp: '2025-12-31T23:59:55.000Z',
messageId: 'm1',
source: 'lead_session',
leadSessionId: 'sess-1',
}),
];
const merged = mergeTeamMessages(messages);
expect(merged.map((message) => message.text)).toEqual(['Done', 'OK', 'Hi']);
});
});