diff --git a/src/main/services/team/FileContentResolver.ts b/src/main/services/team/FileContentResolver.ts index 9b3e651d..7c7b97aa 100644 --- a/src/main/services/team/FileContentResolver.ts +++ b/src/main/services/team/FileContentResolver.ts @@ -1,12 +1,11 @@ +import { readJsonlLines } from '@main/utils/jsonlLineReader'; import { getHomeDir } from '@main/utils/pathDecoder'; import { createLogger } from '@shared/utils/logger'; import { normalizePathForComparison } from '@shared/utils/platformPath'; import { createHash } from 'crypto'; import { diffLines } from 'diff'; -import { createReadStream } from 'fs'; import { access, readFile } from 'fs/promises'; import * as path from 'path'; -import * as readline from 'readline'; import type { GitDiffFallback } from './GitDiffFallback'; import type { TeamMemberLogsFinder } from './TeamMemberLogsFinder'; @@ -407,10 +406,7 @@ export class FileContentResolver { targetFilePath: string ): Promise { try { - const stream = createReadStream(logPath, { encoding: 'utf8' }); - const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); - - for await (const line of rl) { + for await (const line of readJsonlLines(logPath)) { const trimmed = line.trim(); if (!trimmed) continue; @@ -431,17 +427,12 @@ export class FileContentResolver { const backupFileName = trackedFileBackups[targetFilePath]; if (backupFileName) { - rl.close(); - stream.destroy(); return backupFileName; } } catch { // Skip malformed JSON } } - - rl.close(); - stream.destroy(); } catch { logger.debug(`Не удалось прочитать JSONL для file-history: ${logPath}`); } diff --git a/src/main/services/team/MemberStatsComputer.ts b/src/main/services/team/MemberStatsComputer.ts index 5a868e3a..0d6cf532 100644 --- a/src/main/services/team/MemberStatsComputer.ts +++ b/src/main/services/team/MemberStatsComputer.ts @@ -1,6 +1,5 @@ +import { readJsonlLines } from '@main/utils/jsonlLineReader'; import { createLogger } from '@shared/utils/logger'; -import { createReadStream } from 'fs'; -import * as readline from 'readline'; import { type TeamMemberLogsFinder } from './TeamMemberLogsFinder'; import { countLineChanges } from './UnifiedLineCounter'; @@ -179,10 +178,7 @@ export class MemberStatsComputer { }; try { - const stream = createReadStream(filePath, { encoding: 'utf8' }); - const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); - - for await (const line of rl) { + for await (const line of readJsonlLines(filePath)) { const trimmed = line.trim(); if (!trimmed) continue; @@ -332,9 +328,6 @@ export class MemberStatsComputer { // Skip malformed lines } } - - rl.close(); - stream.destroy(); } catch (err) { logger.debug(`Failed to parse file ${filePath}: ${String(err)}`); } diff --git a/src/main/services/team/TaskBoundaryParser.ts b/src/main/services/team/TaskBoundaryParser.ts index dd3a1bd5..a75db4f1 100644 --- a/src/main/services/team/TaskBoundaryParser.ts +++ b/src/main/services/team/TaskBoundaryParser.ts @@ -1,7 +1,6 @@ +import { readJsonlLines } from '@main/utils/jsonlLineReader'; import { createLogger } from '@shared/utils/logger'; -import { createReadStream } from 'fs'; import { stat } from 'fs/promises'; -import * as readline from 'readline'; import { canonicalizeAgentTeamsToolName, @@ -102,10 +101,7 @@ export class TaskBoundaryParser { let detectedMechanism: DetectedMechanism = 'none'; try { - const stream = createReadStream(filePath, { encoding: 'utf8' }); - const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); - - for await (const line of rl) { + for await (const line of readJsonlLines(filePath)) { lineNumber++; const trimmed = line.trim(); if (!trimmed) continue; @@ -149,9 +145,6 @@ export class TaskBoundaryParser { // Пропускаем невалидные строки } } - - rl.close(); - stream.destroy(); } catch (err) { logger.debug(`Error reading file ${filePath}: ${String(err)}`); } diff --git a/src/main/utils/jsonlLineReader.ts b/src/main/utils/jsonlLineReader.ts new file mode 100644 index 00000000..b7d9cb8c --- /dev/null +++ b/src/main/utils/jsonlLineReader.ts @@ -0,0 +1,47 @@ +import { createReadStream } from 'fs'; + +/** + * Async generator that yields the lines of a JSONL file using a chunked stream read + * plus a plain `\n` split, as a drop-in replacement for + * `for await (const line of readline.createInterface({ input, crlfDelay: Infinity }))`. + * + * readline runs an expensive Unicode line-break regex (`\r?\n | \r | U+2028 | U+2029`) + * and extra stream/string-decoder machinery on every chunk. JSONL is strictly + * newline-delimited, so a plain `\n` split is cheaper and more correct here: it will + * not split on a bare `\r` or a Unicode line/paragraph separator that appears *inside* + * a JSON string value, which readline would. + * + * The stream is opened with utf8 encoding, so the runtime's StringDecoder reassembles + * multi-byte characters that straddle a chunk boundary before we split — string + * concatenation + `indexOf('\n')` is therefore safe. + * + * Semantics match the readline loop the callers replace: + * - every line is yielded IN ORDER, INCLUDING empty lines (so callers tracking a + * 1-based line number stay correct); + * - a trailing `\r` (from a CRLF ending) is stripped, exactly as readline does; + * - a final line with no trailing newline is still yielded; + * - breaking/returning out of the `for await` destroys the underlying stream via the + * generator's `finally`. + */ +export async function* readJsonlLines(filePath: string): AsyncGenerator { + const stream = createReadStream(filePath, { encoding: 'utf8' }); + let pending = ''; + try { + for await (const chunk of stream) { + pending += chunk as string; + let newlineIndex = pending.indexOf('\n'); + while (newlineIndex !== -1) { + const line = pending.slice(0, newlineIndex); + pending = pending.slice(newlineIndex + 1); + yield line.endsWith('\r') ? line.slice(0, -1) : line; + newlineIndex = pending.indexOf('\n'); + } + } + // Honor a final line that has no trailing newline (readline yields it too). + if (pending.length > 0) { + yield pending.endsWith('\r') ? pending.slice(0, -1) : pending; + } + } finally { + stream.destroy(); + } +} diff --git a/test/main/utils/jsonlLineReader.test.ts b/test/main/utils/jsonlLineReader.test.ts new file mode 100644 index 00000000..58293d72 --- /dev/null +++ b/test/main/utils/jsonlLineReader.test.ts @@ -0,0 +1,73 @@ +import * as fs from 'fs/promises'; +import * as os from 'os'; +import * as path from 'path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { readJsonlLines } from '../../../src/main/utils/jsonlLineReader'; + +describe('readJsonlLines', () => { + let dir: string; + + beforeEach(async () => { + dir = await fs.mkdtemp(path.join(os.tmpdir(), 'jsonl-line-reader-')); + }); + afterEach(async () => { + await fs.rm(dir, { recursive: true, force: true }); + }); + + async function write(name: string, content: string): Promise { + const p = path.join(dir, name); + await fs.writeFile(p, content, 'utf8'); + return p; + } + + async function collect(filePath: string): Promise { + const out: string[] = []; + for await (const line of readJsonlLines(filePath)) { + out.push(line); + } + return out; + } + + it('yields every line in order, including empty lines', async () => { + // empty lines must still be yielded so callers tracking line numbers match readline + const p = await write('a.jsonl', 'a\n\nb\nc\n'); + expect(await collect(p)).toEqual(['a', '', 'b', 'c']); + }); + + it('strips a trailing CR from CRLF endings', async () => { + const p = await write('crlf.jsonl', 'one\r\ntwo\r\nthree\r\n'); + expect(await collect(p)).toEqual(['one', 'two', 'three']); + }); + + it('yields a final line that has no trailing newline', async () => { + const p = await write('tail.jsonl', 'first\nlast-no-newline'); + expect(await collect(p)).toEqual(['first', 'last-no-newline']); + }); + + it('returns nothing for an empty file', async () => { + const p = await write('empty.jsonl', ''); + expect(await collect(p)).toEqual([]); + }); + + it('stops and cleans up when the consumer breaks out of the loop', async () => { + const p = await write('stop.jsonl', 'l1\nl2\nl3\nl4\n'); + const seen: string[] = []; + for await (const line of readJsonlLines(p)) { + seen.push(line); + if (line === 'l2') break; + } + expect(seen).toEqual(['l1', 'l2']); + }); + + it('decodes multi-byte UTF-8 that straddles a read-chunk boundary', async () => { + // >64KB of 2-byte Cyrillic before the marker forces a multi-byte char to span the + // stream's default 64KB chunk boundary; the marker line must still arrive intact. + const big = 'я'.repeat(40_000); // ~80KB + const p = await write('mb.jsonl', `${big}\n${big}\nМАРКЕР-Ω\n`); + const lines = await collect(p); + expect(lines).toHaveLength(3); + expect(lines[0]).toBe(big); + expect(lines[2]).toBe('МАРКЕР-Ω'); + }); +});