perf(main): share lead session tail reads
This commit is contained in:
parent
f4f42e2ca4
commit
4b5a49e6e8
3 changed files with 98 additions and 48 deletions
|
|
@ -3245,20 +3245,12 @@ export class TeamDataService {
|
|||
return sessionIds;
|
||||
}
|
||||
|
||||
private async extractLeadAssistantTextsFromJsonl(
|
||||
jsonlPath: string,
|
||||
leadName: string,
|
||||
leadSessionId: string,
|
||||
maxTexts: number
|
||||
): Promise<InboxMessage[]> {
|
||||
if (maxTexts <= 0) return [];
|
||||
|
||||
private async readLeadSessionJsonlTailLines(jsonlPath: string): Promise<string[]> {
|
||||
const MAX_SCAN_BYTES = 8 * 1024 * 1024;
|
||||
const INITIAL_SCAN_BYTES = 256 * 1024;
|
||||
|
||||
const rawLinesReversed: string[] = [];
|
||||
const seenRawLines = new Set<string>();
|
||||
const seenMessageIds = new Set<string>();
|
||||
const handle = await fs.promises.open(jsonlPath, 'r');
|
||||
try {
|
||||
const stat = await handle.stat();
|
||||
|
|
@ -3289,7 +3281,17 @@ export class TeamDataService {
|
|||
await handle.close();
|
||||
}
|
||||
|
||||
const rawLines = rawLinesReversed.reverse();
|
||||
return rawLinesReversed.reverse();
|
||||
}
|
||||
|
||||
private async extractLeadAssistantTextsFromJsonlLines(
|
||||
rawLines: readonly string[],
|
||||
leadName: string,
|
||||
leadSessionId: string,
|
||||
maxTexts: number
|
||||
): Promise<InboxMessage[]> {
|
||||
if (maxTexts <= 0) return [];
|
||||
const seenMessageIds = new Set<string>();
|
||||
const texts: InboxMessage[] = [];
|
||||
let syntheticBuffer: {
|
||||
firstMsg: Record<string, unknown>;
|
||||
|
|
@ -3475,13 +3477,15 @@ export class TeamDataService {
|
|||
}
|
||||
|
||||
const parse = async (): Promise<InboxMessage[]> => {
|
||||
const rawLines = await this.readLeadSessionJsonlTailLines(jsonlPath);
|
||||
const [assistantTexts, commandResults] = await Promise.all([
|
||||
this.extractLeadAssistantTextsFromJsonl(jsonlPath, leadName, leadSessionId, maxTexts),
|
||||
this.extractLeadAssistantTextsFromJsonlLines(rawLines, leadName, leadSessionId, maxTexts),
|
||||
extractLeadSessionMessagesFromJsonl({
|
||||
jsonlPath,
|
||||
leadName,
|
||||
leadSessionId,
|
||||
maxMessages: maxTexts,
|
||||
rawLines,
|
||||
}),
|
||||
]);
|
||||
const combined = [...assistantTexts, ...commandResults];
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ interface LeadSessionMessageExtractorOptions {
|
|||
leadName: string;
|
||||
leadSessionId: string;
|
||||
maxMessages: number;
|
||||
rawLines?: readonly string[];
|
||||
}
|
||||
|
||||
function getMessageText(message: ParsedMessage): string {
|
||||
|
|
@ -98,50 +99,61 @@ export async function extractLeadSessionMessagesFromJsonl({
|
|||
leadName,
|
||||
leadSessionId,
|
||||
maxMessages,
|
||||
rawLines,
|
||||
}: LeadSessionMessageExtractorOptions): Promise<InboxMessage[]> {
|
||||
if (maxMessages <= 0) return [];
|
||||
|
||||
const parsedMessagesReversed: ParsedMessage[] = [];
|
||||
const seenScanKeys = new Set<string>();
|
||||
const handle = await fs.promises.open(jsonlPath, 'r');
|
||||
const collectLine = (rawLine: string | undefined): void => {
|
||||
const trimmed = rawLine?.trim();
|
||||
if (!trimmed) return;
|
||||
|
||||
try {
|
||||
const stat = await handle.stat();
|
||||
const fileSize = stat.size;
|
||||
|
||||
let scanBytes = Math.min(INITIAL_SCAN_BYTES, fileSize);
|
||||
while (scanBytes <= MAX_SCAN_BYTES) {
|
||||
const start = Math.max(0, fileSize - scanBytes);
|
||||
const buffer = Buffer.alloc(scanBytes);
|
||||
await handle.read(buffer, 0, scanBytes, start);
|
||||
const chunk = buffer.toString('utf8');
|
||||
|
||||
const lines = chunk.split(/\r?\n/);
|
||||
const fromIndex = start > 0 ? 1 : 0;
|
||||
|
||||
for (let i = lines.length - 1; i >= fromIndex; i--) {
|
||||
const trimmed = lines[i]?.trim();
|
||||
if (!trimmed) continue;
|
||||
|
||||
let parsed: ParsedMessage | null = null;
|
||||
try {
|
||||
parsed = parseJsonlLine(trimmed);
|
||||
} catch {
|
||||
parsed = null;
|
||||
}
|
||||
if (!parsed || parsed.isSidechain) continue;
|
||||
|
||||
const scanKey = buildScanKey(parsed, trimmed);
|
||||
if (seenScanKeys.has(scanKey)) continue;
|
||||
seenScanKeys.add(scanKey);
|
||||
parsedMessagesReversed.push(parsed);
|
||||
}
|
||||
|
||||
if (scanBytes === fileSize) break;
|
||||
scanBytes = Math.min(fileSize, scanBytes * 2);
|
||||
let parsed: ParsedMessage | null = null;
|
||||
try {
|
||||
parsed = parseJsonlLine(trimmed);
|
||||
} catch {
|
||||
parsed = null;
|
||||
}
|
||||
if (!parsed || parsed.isSidechain) return;
|
||||
|
||||
const scanKey = buildScanKey(parsed, trimmed);
|
||||
if (seenScanKeys.has(scanKey)) return;
|
||||
seenScanKeys.add(scanKey);
|
||||
parsedMessagesReversed.push(parsed);
|
||||
};
|
||||
|
||||
if (rawLines) {
|
||||
for (let i = rawLines.length - 1; i >= 0; i--) {
|
||||
collectLine(rawLines[i]);
|
||||
}
|
||||
} else {
|
||||
const handle = await fs.promises.open(jsonlPath, 'r');
|
||||
|
||||
try {
|
||||
const stat = await handle.stat();
|
||||
const fileSize = stat.size;
|
||||
|
||||
let scanBytes = Math.min(INITIAL_SCAN_BYTES, fileSize);
|
||||
while (scanBytes <= MAX_SCAN_BYTES) {
|
||||
const start = Math.max(0, fileSize - scanBytes);
|
||||
const buffer = Buffer.alloc(scanBytes);
|
||||
await handle.read(buffer, 0, scanBytes, start);
|
||||
const chunk = buffer.toString('utf8');
|
||||
|
||||
const lines = chunk.split(/\r?\n/);
|
||||
const fromIndex = start > 0 ? 1 : 0;
|
||||
|
||||
for (let i = lines.length - 1; i >= fromIndex; i--) {
|
||||
collectLine(lines[i]);
|
||||
}
|
||||
|
||||
if (scanBytes === fileSize) break;
|
||||
scanBytes = Math.min(fileSize, scanBytes * 2);
|
||||
}
|
||||
} finally {
|
||||
await handle.close();
|
||||
}
|
||||
} finally {
|
||||
await handle.close();
|
||||
}
|
||||
|
||||
const parsedMessages = parsedMessagesReversed.reverse();
|
||||
|
|
|
|||
|
|
@ -145,4 +145,38 @@ describe('extractLeadSessionMessagesFromJsonl', () => {
|
|||
text: 'Detached output',
|
||||
});
|
||||
});
|
||||
|
||||
it('extracts command outputs from preloaded raw lines without reading the jsonl file', async () => {
|
||||
const rawLines = [
|
||||
createUserEntry(
|
||||
'user-slash-raw',
|
||||
'2026-03-27T12:00:00.000Z',
|
||||
'<command-name>/cost</command-name><command-message>cost</command-message><command-args></command-args>'
|
||||
),
|
||||
createUserEntry(
|
||||
'stdout-raw',
|
||||
'2026-03-27T12:00:01.000Z',
|
||||
'<local-command-stdout>Total cost: $1.23</local-command-stdout>'
|
||||
),
|
||||
].map((entry) => JSON.stringify(entry));
|
||||
|
||||
const messages = await extractLeadSessionMessagesFromJsonl({
|
||||
jsonlPath: path.join(os.tmpdir(), 'missing-lead-session.jsonl'),
|
||||
leadName: 'team-lead',
|
||||
leadSessionId: 'lead-1',
|
||||
maxMessages: 20,
|
||||
rawLines,
|
||||
});
|
||||
|
||||
expect(messages).toHaveLength(1);
|
||||
expect(messages[0]).toMatchObject({
|
||||
from: 'team-lead',
|
||||
messageKind: 'slash_command_result',
|
||||
commandOutput: {
|
||||
stream: 'stdout',
|
||||
commandLabel: '/cost',
|
||||
},
|
||||
text: 'Total cost: $1.23',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in a new issue