From e64fff8af0851026a026ebea287c1776a6f74a6f Mon Sep 17 00:00:00 2001 From: 777genius Date: Mon, 25 May 2026 22:26:55 +0300 Subject: [PATCH] fix(watcher): baseline large existing jsonl files --- .../services/infrastructure/FileWatcher.ts | 39 ++++++++------- src/main/utils/jsonl.ts | 38 ++++++++++++-- .../infrastructure/FileWatcher.test.ts | 50 +++++++++++++++++++ 3 files changed, 107 insertions(+), 20 deletions(-) diff --git a/src/main/services/infrastructure/FileWatcher.ts b/src/main/services/infrastructure/FileWatcher.ts index 5b37a7e9..fc40281d 100644 --- a/src/main/services/infrastructure/FileWatcher.ts +++ b/src/main/services/infrastructure/FileWatcher.ts @@ -11,7 +11,11 @@ */ import { type FileChangeEvent, type ParsedMessage } from '@main/types'; -import { parseJsonlFileWithStats, parseJsonlStream } from '@main/utils/jsonl'; +import { + countJsonlFileWithStats, + parseJsonlFileWithStats, + parseJsonlStream, +} from '@main/utils/jsonl'; import { getProjectsBasePath, getTasksBasePath, @@ -928,6 +932,11 @@ export class FileWatcher extends EventEmitter { } const isFirstRead = lastLineCount === 0 && lastSize === 0; + if (isFirstRead && fileStats.birthtimeMs < this.instanceCreatedAt) { + await this.establishPreExistingFileBaseline(filePath, currentSize); + return; + } + const canUseIncrementalAppend = lastSize > 0 && currentSize > lastSize; let newMessages: ParsedMessage[] = []; let currentLineCount: number; @@ -952,22 +961,6 @@ export class FileWatcher extends EventEmitter { return; } - // On first read (after app restart), establish baseline without detecting errors - // for files that existed BEFORE this FileWatcher started. This prevents flooding - // notifications with historical errors from old sessions. - // Files created AFTER startup are new sessions — detect errors normally. - if (isFirstRead) { - const isPreExistingFile = fileStats.birthtimeMs < this.instanceCreatedAt; - if (isPreExistingFile) { - this.lastProcessedLineCount.set(filePath, currentLineCount); - this.lastProcessedSize.set(filePath, processedSize); - logger.info( - `FileWatcher: Baseline established for ${filePath} (${currentLineCount} lines, ${processedSize} bytes)` - ); - return; - } - } - // Detect errors in new messages // Note: We pass the offset-adjusted line numbers to errorDetector const errors = await errorDetector.detectErrors(newMessages, sessionId, projectId, filePath); @@ -1009,6 +1002,18 @@ export class FileWatcher extends EventEmitter { } } + private async establishPreExistingFileBaseline( + filePath: string, + currentSize: number + ): Promise { + const baseline = await countJsonlFileWithStats(filePath, this.fsProvider); + this.lastProcessedLineCount.set(filePath, baseline.parsedLineCount); + this.lastProcessedSize.set(filePath, baseline.consumedBytes); + logger.info( + `FileWatcher: Baseline established for ${filePath} (${baseline.parsedLineCount} lines, ${baseline.consumedBytes}/${currentSize} bytes)` + ); + } + /** * Clears the error detection tracking for a specific file. * Call this when a file is deleted or to force re-processing. diff --git a/src/main/utils/jsonl.ts b/src/main/utils/jsonl.ts index e61b1d1e..16852d31 100644 --- a/src/main/utils/jsonl.ts +++ b/src/main/utils/jsonl.ts @@ -58,6 +58,10 @@ export interface JsonlParseResult { consumedBytes: number; } +interface JsonlStreamParseOptions { + collectMessages?: boolean; +} + /** * Parse a JSONL file line by line using streaming. * This avoids loading the entire file into memory. @@ -88,14 +92,38 @@ export async function parseJsonlFileWithStats( return parseJsonlStream(fsProvider.createReadStream(filePath), filePath); } +/** + * Count parseable JSONL messages and consumed bytes without retaining message + * objects. Useful for first-read baselines where old transcript contents should + * not be surfaced and can be too large to keep in memory. + */ +export async function countJsonlFileWithStats( + filePath: string, + fsProvider: FileSystemProvider = defaultProvider +): Promise> { + if (!(await fsProvider.exists(filePath))) { + return { parsedLineCount: 0, consumedBytes: 0 }; + } + + const result = await parseJsonlStream(fsProvider.createReadStream(filePath), filePath, { + collectMessages: false, + }); + return { + parsedLineCount: result.parsedLineCount, + consumedBytes: result.consumedBytes, + }; +} + /** * Parse JSONL data from a readable stream while tracking how many bytes were * safely consumed as complete lines. */ export async function parseJsonlStream( stream: Readable, - filePath?: string + filePath?: string, + options: JsonlStreamParseOptions = {} ): Promise { + const collectMessages = options.collectMessages !== false; const messages: ParsedMessage[] = []; let pending = Buffer.alloc(0); let parsedLineCount = 0; @@ -124,7 +152,9 @@ export async function parseJsonlStream( try { const parsed = parseJsonlLine(normalized); if (parsed) { - messages.push(parsed); + if (collectMessages) { + messages.push(parsed); + } parsedLineCount += 1; } } catch { @@ -165,7 +195,9 @@ export async function parseJsonlStream( if (looksLikeJsonObjectLine(normalized)) { const parsed = parseJsonlLine(normalized); if (parsed) { - messages.push(parsed); + if (collectMessages) { + messages.push(parsed); + } parsedLineCount += 1; consumedBytes += pending.length; } diff --git a/test/main/services/infrastructure/FileWatcher.test.ts b/test/main/services/infrastructure/FileWatcher.test.ts index 497d953c..1401e589 100644 --- a/test/main/services/infrastructure/FileWatcher.test.ts +++ b/test/main/services/infrastructure/FileWatcher.test.ts @@ -2542,6 +2542,56 @@ describe('FileWatcher', () => { fs.rmSync(tempDir, { recursive: true, force: true }); }); + it('preserves line offset for oversized pre-existing files without notifications', async () => { + vi.useRealTimers(); + useRealExistsSync(); + + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-large-baseline-')); + const projectsDir = path.join(tempDir, 'projects'); + const projectDir = path.join(projectsDir, 'test-project'); + fs.mkdirSync(projectDir, { recursive: true }); + + const filePath = path.join(projectDir, 'session-large.jsonl'); + const largeLineCount = 17_000; + const largePayload = 'old data '.repeat(120); + fs.writeFileSync( + filePath, + Array.from({ length: largeLineCount }, (_, index) => + jsonlLine(`large-${index}`, largePayload) + ).join(''), + 'utf8' + ); + + const dataCache = new DataCache(50, 10, false); + const notificationManager = createMockNotificationManager(); + const watcher = new FileWatcher(dataCache, projectsDir, path.join(tempDir, 'todos')); + watcher.setNotificationManager(notificationManager); + + const watcherAny = watcher as unknown as { + detectErrorsInSessionFile: ( + projectId: string, + sessionId: string, + filePath: string + ) => Promise; + lastProcessedLineCount: Map; + lastProcessedSize: Map; + instanceCreatedAt: number; + }; + watcherAny.instanceCreatedAt = Date.now() + 60_000; + + vi.mocked(errorDetector.detectErrors).mockClear(); + + await watcherAny.detectErrorsInSessionFile('test-project', 'session-large', filePath); + + expect(errorDetector.detectErrors).not.toHaveBeenCalled(); + expect(watcherAny.lastProcessedLineCount.get(filePath)).toBe(largeLineCount); + expect(watcherAny.lastProcessedSize.get(filePath)).toBe(fs.statSync(filePath).size); + expect(notificationManager.addError).not.toHaveBeenCalled(); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + it('detects errors immediately for files created after watcher startup', async () => { vi.useRealTimers(); useRealExistsSync();