perf: replace readline with a chunked line generator in team JSONL readers

readline.createInterface runs an expensive Unicode line-break regex + extra
stream/string-decoder machinery per chunk. The main transcript parser (parseJsonlStream)
already uses a buffer + manual newline split; these per-team readers still used readline.

Add readJsonlLines(): an async generator that yields a JSONL file's lines via a chunked
utf8 stream read + a plain '\n' split (drop-in for 'for await (const line of rl)'), so the
consumers' loop bodies are unchanged. Stream is utf8-decoded before splitting, so multi-byte
chars across chunk boundaries are safe; trailing CR (CRLF) is stripped; empty lines and a
final newline-less line are yielded, matching readline; breaking out of the loop destroys
the stream via the generator's finally.

Adopt it in MemberStatsComputer, TaskBoundaryParser, and FileContentResolver (file-history
scan). Behavior-identical (their existing tests pass: 18 + 6 + 12) plus 6 new tests for the
generator (CRLF, empty lines, no-trailing-newline, early break, multi-byte chunk boundary).

Note: session-browser readline paths (jsonl metadata extractor, metadataExtraction,
SessionContentFilter) are off the launch path and left as-is for now.
This commit is contained in:
777genius 2026-05-30 15:58:09 +03:00
parent 92f1000a4f
commit 8cb44cd793
5 changed files with 126 additions and 29 deletions

View file

@ -1,12 +1,11 @@
import { readJsonlLines } from '@main/utils/jsonlLineReader';
import { getHomeDir } from '@main/utils/pathDecoder';
import { createLogger } from '@shared/utils/logger';
import { normalizePathForComparison } from '@shared/utils/platformPath';
import { createHash } from 'crypto';
import { diffLines } from 'diff';
import { createReadStream } from 'fs';
import { access, readFile } from 'fs/promises';
import * as path from 'path';
import * as readline from 'readline';
import type { GitDiffFallback } from './GitDiffFallback';
import type { TeamMemberLogsFinder } from './TeamMemberLogsFinder';
@ -407,10 +406,7 @@ export class FileContentResolver {
targetFilePath: string
): Promise<string | null> {
try {
const stream = createReadStream(logPath, { encoding: 'utf8' });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
for await (const line of readJsonlLines(logPath)) {
const trimmed = line.trim();
if (!trimmed) continue;
@ -431,17 +427,12 @@ export class FileContentResolver {
const backupFileName = trackedFileBackups[targetFilePath];
if (backupFileName) {
rl.close();
stream.destroy();
return backupFileName;
}
} catch {
// Skip malformed JSON
}
}
rl.close();
stream.destroy();
} catch {
logger.debug(`Не удалось прочитать JSONL для file-history: ${logPath}`);
}

View file

@ -1,6 +1,5 @@
import { readJsonlLines } from '@main/utils/jsonlLineReader';
import { createLogger } from '@shared/utils/logger';
import { createReadStream } from 'fs';
import * as readline from 'readline';
import { type TeamMemberLogsFinder } from './TeamMemberLogsFinder';
import { countLineChanges } from './UnifiedLineCounter';
@ -179,10 +178,7 @@ export class MemberStatsComputer {
};
try {
const stream = createReadStream(filePath, { encoding: 'utf8' });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
for await (const line of readJsonlLines(filePath)) {
const trimmed = line.trim();
if (!trimmed) continue;
@ -332,9 +328,6 @@ export class MemberStatsComputer {
// Skip malformed lines
}
}
rl.close();
stream.destroy();
} catch (err) {
logger.debug(`Failed to parse file ${filePath}: ${String(err)}`);
}

View file

@ -1,7 +1,6 @@
import { readJsonlLines } from '@main/utils/jsonlLineReader';
import { createLogger } from '@shared/utils/logger';
import { createReadStream } from 'fs';
import { stat } from 'fs/promises';
import * as readline from 'readline';
import {
canonicalizeAgentTeamsToolName,
@ -102,10 +101,7 @@ export class TaskBoundaryParser {
let detectedMechanism: DetectedMechanism = 'none';
try {
const stream = createReadStream(filePath, { encoding: 'utf8' });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
for await (const line of rl) {
for await (const line of readJsonlLines(filePath)) {
lineNumber++;
const trimmed = line.trim();
if (!trimmed) continue;
@ -149,9 +145,6 @@ export class TaskBoundaryParser {
// Пропускаем невалидные строки
}
}
rl.close();
stream.destroy();
} catch (err) {
logger.debug(`Error reading file ${filePath}: ${String(err)}`);
}

View file

@ -0,0 +1,47 @@
import { createReadStream } from 'fs';
/**
* Async generator that yields the lines of a JSONL file using a chunked stream read
* plus a plain `\n` split, as a drop-in replacement for
* `for await (const line of readline.createInterface({ input, crlfDelay: Infinity }))`.
*
* readline runs an expensive Unicode line-break regex (`\r?\n | \r | U+2028 | U+2029`)
* and extra stream/string-decoder machinery on every chunk. JSONL is strictly
* newline-delimited, so a plain `\n` split is cheaper and more correct here: it will
* not split on a bare `\r` or a Unicode line/paragraph separator that appears *inside*
* a JSON string value, which readline would.
*
* The stream is opened with utf8 encoding, so the runtime's StringDecoder reassembles
* multi-byte characters that straddle a chunk boundary before we split string
* concatenation + `indexOf('\n')` is therefore safe.
*
* Semantics match the readline loop the callers replace:
* - every line is yielded IN ORDER, INCLUDING empty lines (so callers tracking a
* 1-based line number stay correct);
* - a trailing `\r` (from a CRLF ending) is stripped, exactly as readline does;
* - a final line with no trailing newline is still yielded;
* - breaking/returning out of the `for await` destroys the underlying stream via the
* generator's `finally`.
*/
export async function* readJsonlLines(filePath: string): AsyncGenerator<string, void, undefined> {
const stream = createReadStream(filePath, { encoding: 'utf8' });
let pending = '';
try {
for await (const chunk of stream) {
pending += chunk as string;
let newlineIndex = pending.indexOf('\n');
while (newlineIndex !== -1) {
const line = pending.slice(0, newlineIndex);
pending = pending.slice(newlineIndex + 1);
yield line.endsWith('\r') ? line.slice(0, -1) : line;
newlineIndex = pending.indexOf('\n');
}
}
// Honor a final line that has no trailing newline (readline yields it too).
if (pending.length > 0) {
yield pending.endsWith('\r') ? pending.slice(0, -1) : pending;
}
} finally {
stream.destroy();
}
}

View file

@ -0,0 +1,73 @@
import * as fs from 'fs/promises';
import * as os from 'os';
import * as path from 'path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { readJsonlLines } from '../../../src/main/utils/jsonlLineReader';
describe('readJsonlLines', () => {
let dir: string;
beforeEach(async () => {
dir = await fs.mkdtemp(path.join(os.tmpdir(), 'jsonl-line-reader-'));
});
afterEach(async () => {
await fs.rm(dir, { recursive: true, force: true });
});
async function write(name: string, content: string): Promise<string> {
const p = path.join(dir, name);
await fs.writeFile(p, content, 'utf8');
return p;
}
async function collect(filePath: string): Promise<string[]> {
const out: string[] = [];
for await (const line of readJsonlLines(filePath)) {
out.push(line);
}
return out;
}
it('yields every line in order, including empty lines', async () => {
// empty lines must still be yielded so callers tracking line numbers match readline
const p = await write('a.jsonl', 'a\n\nb\nc\n');
expect(await collect(p)).toEqual(['a', '', 'b', 'c']);
});
it('strips a trailing CR from CRLF endings', async () => {
const p = await write('crlf.jsonl', 'one\r\ntwo\r\nthree\r\n');
expect(await collect(p)).toEqual(['one', 'two', 'three']);
});
it('yields a final line that has no trailing newline', async () => {
const p = await write('tail.jsonl', 'first\nlast-no-newline');
expect(await collect(p)).toEqual(['first', 'last-no-newline']);
});
it('returns nothing for an empty file', async () => {
const p = await write('empty.jsonl', '');
expect(await collect(p)).toEqual([]);
});
it('stops and cleans up when the consumer breaks out of the loop', async () => {
const p = await write('stop.jsonl', 'l1\nl2\nl3\nl4\n');
const seen: string[] = [];
for await (const line of readJsonlLines(p)) {
seen.push(line);
if (line === 'l2') break;
}
expect(seen).toEqual(['l1', 'l2']);
});
it('decodes multi-byte UTF-8 that straddles a read-chunk boundary', async () => {
// >64KB of 2-byte Cyrillic before the marker forces a multi-byte char to span the
// stream's default 64KB chunk boundary; the marker line must still arrive intact.
const big = 'я'.repeat(40_000); // ~80KB
const p = await write('mb.jsonl', `${big}\n${big}\nМАРКЕР\n`);
const lines = await collect(p);
expect(lines).toHaveLength(3);
expect(lines[0]).toBe(big);
expect(lines[2]).toBe('МАРКЕР-Ω');
});
});