fix(watcher): baseline large existing jsonl files

This commit is contained in:
777genius 2026-05-25 22:26:55 +03:00
parent bafd4d7194
commit e64fff8af0
3 changed files with 107 additions and 20 deletions

View file

@ -11,7 +11,11 @@
*/
import { type FileChangeEvent, type ParsedMessage } from '@main/types';
import { parseJsonlFileWithStats, parseJsonlStream } from '@main/utils/jsonl';
import {
countJsonlFileWithStats,
parseJsonlFileWithStats,
parseJsonlStream,
} from '@main/utils/jsonl';
import {
getProjectsBasePath,
getTasksBasePath,
@ -928,6 +932,11 @@ export class FileWatcher extends EventEmitter {
}
const isFirstRead = lastLineCount === 0 && lastSize === 0;
if (isFirstRead && fileStats.birthtimeMs < this.instanceCreatedAt) {
await this.establishPreExistingFileBaseline(filePath, currentSize);
return;
}
const canUseIncrementalAppend = lastSize > 0 && currentSize > lastSize;
let newMessages: ParsedMessage[] = [];
let currentLineCount: number;
@ -952,22 +961,6 @@ export class FileWatcher extends EventEmitter {
return;
}
// On first read (after app restart), establish baseline without detecting errors
// for files that existed BEFORE this FileWatcher started. This prevents flooding
// notifications with historical errors from old sessions.
// Files created AFTER startup are new sessions — detect errors normally.
if (isFirstRead) {
const isPreExistingFile = fileStats.birthtimeMs < this.instanceCreatedAt;
if (isPreExistingFile) {
this.lastProcessedLineCount.set(filePath, currentLineCount);
this.lastProcessedSize.set(filePath, processedSize);
logger.info(
`FileWatcher: Baseline established for ${filePath} (${currentLineCount} lines, ${processedSize} bytes)`
);
return;
}
}
// Detect errors in new messages
// Note: We pass the offset-adjusted line numbers to errorDetector
const errors = await errorDetector.detectErrors(newMessages, sessionId, projectId, filePath);
@ -1009,6 +1002,18 @@ export class FileWatcher extends EventEmitter {
}
}
private async establishPreExistingFileBaseline(
filePath: string,
currentSize: number
): Promise<void> {
const baseline = await countJsonlFileWithStats(filePath, this.fsProvider);
this.lastProcessedLineCount.set(filePath, baseline.parsedLineCount);
this.lastProcessedSize.set(filePath, baseline.consumedBytes);
logger.info(
`FileWatcher: Baseline established for ${filePath} (${baseline.parsedLineCount} lines, ${baseline.consumedBytes}/${currentSize} bytes)`
);
}
/**
* Clears the error detection tracking for a specific file.
* Call this when a file is deleted or to force re-processing.

View file

@ -58,6 +58,10 @@ export interface JsonlParseResult {
consumedBytes: number;
}
interface JsonlStreamParseOptions {
collectMessages?: boolean;
}
/**
* Parse a JSONL file line by line using streaming.
* This avoids loading the entire file into memory.
@ -88,14 +92,38 @@ export async function parseJsonlFileWithStats(
return parseJsonlStream(fsProvider.createReadStream(filePath), filePath);
}
/**
* Count parseable JSONL messages and consumed bytes without retaining message
* objects. Useful for first-read baselines where old transcript contents should
* not be surfaced and can be too large to keep in memory.
*/
export async function countJsonlFileWithStats(
filePath: string,
fsProvider: FileSystemProvider = defaultProvider
): Promise<Omit<JsonlParseResult, 'messages'>> {
if (!(await fsProvider.exists(filePath))) {
return { parsedLineCount: 0, consumedBytes: 0 };
}
const result = await parseJsonlStream(fsProvider.createReadStream(filePath), filePath, {
collectMessages: false,
});
return {
parsedLineCount: result.parsedLineCount,
consumedBytes: result.consumedBytes,
};
}
/**
* Parse JSONL data from a readable stream while tracking how many bytes were
* safely consumed as complete lines.
*/
export async function parseJsonlStream(
stream: Readable,
filePath?: string
filePath?: string,
options: JsonlStreamParseOptions = {}
): Promise<JsonlParseResult> {
const collectMessages = options.collectMessages !== false;
const messages: ParsedMessage[] = [];
let pending = Buffer.alloc(0);
let parsedLineCount = 0;
@ -124,7 +152,9 @@ export async function parseJsonlStream(
try {
const parsed = parseJsonlLine(normalized);
if (parsed) {
messages.push(parsed);
if (collectMessages) {
messages.push(parsed);
}
parsedLineCount += 1;
}
} catch {
@ -165,7 +195,9 @@ export async function parseJsonlStream(
if (looksLikeJsonObjectLine(normalized)) {
const parsed = parseJsonlLine(normalized);
if (parsed) {
messages.push(parsed);
if (collectMessages) {
messages.push(parsed);
}
parsedLineCount += 1;
consumedBytes += pending.length;
}

View file

@ -2542,6 +2542,56 @@ describe('FileWatcher', () => {
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('preserves line offset for oversized pre-existing files without notifications', async () => {
vi.useRealTimers();
useRealExistsSync();
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-large-baseline-'));
const projectsDir = path.join(tempDir, 'projects');
const projectDir = path.join(projectsDir, 'test-project');
fs.mkdirSync(projectDir, { recursive: true });
const filePath = path.join(projectDir, 'session-large.jsonl');
const largeLineCount = 17_000;
const largePayload = 'old data '.repeat(120);
fs.writeFileSync(
filePath,
Array.from({ length: largeLineCount }, (_, index) =>
jsonlLine(`large-${index}`, largePayload)
).join(''),
'utf8'
);
const dataCache = new DataCache(50, 10, false);
const notificationManager = createMockNotificationManager();
const watcher = new FileWatcher(dataCache, projectsDir, path.join(tempDir, 'todos'));
watcher.setNotificationManager(notificationManager);
const watcherAny = watcher as unknown as {
detectErrorsInSessionFile: (
projectId: string,
sessionId: string,
filePath: string
) => Promise<void>;
lastProcessedLineCount: Map<string, number>;
lastProcessedSize: Map<string, number>;
instanceCreatedAt: number;
};
watcherAny.instanceCreatedAt = Date.now() + 60_000;
vi.mocked(errorDetector.detectErrors).mockClear();
await watcherAny.detectErrorsInSessionFile('test-project', 'session-large', filePath);
expect(errorDetector.detectErrors).not.toHaveBeenCalled();
expect(watcherAny.lastProcessedLineCount.get(filePath)).toBe(largeLineCount);
expect(watcherAny.lastProcessedSize.get(filePath)).toBe(fs.statSync(filePath).size);
expect(notificationManager.addError).not.toHaveBeenCalled();
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('detects errors immediately for files created after watcher startup', async () => {
vi.useRealTimers();
useRealExistsSync();