From f3399e50888758bd8b59826a7731941c8ac4aed9 Mon Sep 17 00:00:00 2001 From: iliya Date: Sat, 14 Mar 2026 23:06:51 +0200 Subject: [PATCH] perf: parallelize JSONL parsing in ChangeExtractorService - Replace sequential for-loops with bounded-concurrency worker-pool (6 workers) - Add deterministic chronological sort with originalIndex tie-breaker for MultiEdit stability - Change parseJSONLFile return type to { snippets, mtime } eliminating double stat() calls - Fix isNewFile in aggregateByFile to not depend on snippet ordering - Apply to all 4 sequential parsing sites: getAgentChanges, extractIntervalScopedChanges, extractFilteredChanges, fallbackSingleTaskScope --- .../services/team/ChangeExtractorService.ts | 119 ++++++++++++------ 1 file changed, 81 insertions(+), 38 deletions(-) diff --git a/src/main/services/team/ChangeExtractorService.ts b/src/main/services/team/ChangeExtractorService.ts index 8bff0c22..f7cba583 100644 --- a/src/main/services/team/ChangeExtractorService.ts +++ b/src/main/services/team/ChangeExtractorService.ts @@ -73,23 +73,15 @@ export class ChangeExtractorService { const paths = await this.logsFinder.findMemberLogPaths(teamName, memberName); const projectPath = await this.resolveProjectPath(teamName); - // Собираем все snippets из всех JSONL файлов - const allSnippets: SnippetDiff[] = []; + // Собираем все snippets из всех JSONL файлов параллельно + const parseResults = await this.parseJSONLFilesWithConcurrency(paths); let latestMtime = 0; - - for (const filePath of paths) { - try { - const fileStat = await stat(filePath); - if (fileStat.mtimeMs > latestMtime) { - latestMtime = fileStat.mtimeMs; - } - } catch { - // Файл может быть удалён между обнаружением и чтением - } - - const snippets = await this.parseJSONLFile(filePath); - allSnippets.push(...snippets); + const merged: SnippetDiff[] = []; + for (const r of parseResults) { + merged.push(...r.snippets); + if (r.mtime > latestMtime) latestMtime = r.mtime; } + const allSnippets = this.sortSnippetsChronologically(merged); const files = this.aggregateByFile(allSnippets, projectPath); @@ -407,11 +399,11 @@ export class ChangeExtractorService { return false; }; + const allParsed = await this.parseJSONLFilesWithConcurrency(logRefs.map((ref) => ref.filePath)); const allowedSnippets: SnippetDiff[] = []; const toolUseIdsSet = new Set(); - for (const ref of logRefs) { - const snippets = await this.parseJSONLFile(ref.filePath); + for (const { snippets } of allParsed) { for (const s of snippets) { if (s.isError) continue; if (!inAnyInterval(s.timestamp)) continue; @@ -420,7 +412,11 @@ export class ChangeExtractorService { } } - const files = this.aggregateByFile(allowedSnippets, projectPath, includeDetails); + const files = this.aggregateByFile( + this.sortSnippetsChronologically(allowedSnippets), + projectPath, + includeDetails + ); return { files, toolUseIds: [...toolUseIdsSet], @@ -449,19 +445,69 @@ export class ChangeExtractorService { return (hash >>> 0).toString(36); } + /** Deterministic sort: timestamp → filePath → toolUseId → originalIndex */ + private sortSnippetsChronologically(snippets: SnippetDiff[]): SnippetDiff[] { + return snippets + .map((snippet, originalIndex) => ({ snippet, originalIndex })) + .sort((a, b) => { + const aMs = Date.parse(a.snippet.timestamp); + const bMs = Date.parse(b.snippet.timestamp); + const safeA = Number.isFinite(aMs) ? aMs : Number.MAX_SAFE_INTEGER; + const safeB = Number.isFinite(bMs) ? bMs : Number.MAX_SAFE_INTEGER; + if (safeA !== safeB) return safeA - safeB; + if (a.snippet.filePath !== b.snippet.filePath) + return a.snippet.filePath.localeCompare(b.snippet.filePath); + if (a.snippet.toolUseId !== b.snippet.toolUseId) + return a.snippet.toolUseId.localeCompare(b.snippet.toolUseId); + return a.originalIndex - b.originalIndex; + }) + .map(({ snippet }) => snippet); + } + + /** Parse multiple JSONL files with bounded concurrency (worker-pool) */ + private static readonly JSONL_PARSE_CONCURRENCY = 6; + + private async parseJSONLFilesWithConcurrency( + paths: string[] + ): Promise> { + if (paths.length === 0) return []; + + const results = new Array<{ snippets: SnippetDiff[]; mtime: number }>(paths.length); + let nextIndex = 0; + + const worker = async (): Promise => { + while (true) { + const currentIndex = nextIndex++; + if (currentIndex >= paths.length) return; + results[currentIndex] = await this.parseJSONLFile(paths[currentIndex]); + } + }; + + await Promise.all( + Array.from( + { length: Math.min(ChangeExtractorService.JSONL_PARSE_CONCURRENCY, paths.length) }, + () => worker() + ) + ); + + return results; + } + /** Парсить один JSONL файл и извлечь все snippets (двухпроходный подход) */ - private async parseJSONLFile(filePath: string): Promise { + private async parseJSONLFile( + filePath: string + ): Promise<{ snippets: SnippetDiff[]; mtime: number }> { let fileMtime = 0; try { const fileStat = await stat(filePath); fileMtime = fileStat.mtimeMs; const cached = this.parsedSnippetsCache.get(filePath); if (cached?.mtime === fileMtime && cached.expiresAt > Date.now()) { - return cached.data; + return { snippets: cached.data, mtime: fileMtime }; } } catch (err) { logger.debug(`Не удалось stat файла ${filePath}: ${String(err)}`); - return []; + return { snippets: [], mtime: 0 }; } // Сначала считываем все записи в память для двух проходов @@ -485,7 +531,7 @@ export class ChangeExtractorService { stream.destroy(); } catch (err) { logger.debug(`Не удалось прочитать файл ${filePath}: ${String(err)}`); - return []; + return { snippets: [], mtime: 0 }; } // Проход 1: собираем tool_use_id с ошибками @@ -602,7 +648,7 @@ export class ChangeExtractorService { expiresAt: Date.now() + this.parsedSnippetsCacheTtl, }); - return snippets; + return { snippets, mtime: fileMtime }; } /** Извлечь content array из JSONL entry (оба формата: subagent и main) */ @@ -677,6 +723,7 @@ export class ChangeExtractorService { const existing = fileMap.get(snippet.filePath); if (existing) { existing.snippets.push(snippet); + if (snippet.type === 'write-new') existing.isNewFile = true; } else { fileMap.set(snippet.filePath, { snippets: [snippet], @@ -818,11 +865,10 @@ export class ChangeExtractorService { projectPath?: string, includeDetails = true ): Promise { + const allParsed = await this.parseJSONLFilesWithConcurrency(logRefs.map((ref) => ref.filePath)); const allSnippets: SnippetDiff[] = []; - for (const ref of logRefs) { - const snippets = await this.parseJSONLFile(ref.filePath); + for (const { snippets } of allParsed) { if (allowedToolUseIds.size > 0) { - // Фильтруем только по разрешённым tool_use IDs for (const s of snippets) { if (allowedToolUseIds.has(s.toolUseId)) { allSnippets.push(s); @@ -832,7 +878,11 @@ export class ChangeExtractorService { allSnippets.push(...snippets); } } - return this.aggregateByFile(allSnippets, projectPath, includeDetails); + return this.aggregateByFile( + this.sortSnippetsChronologically(allSnippets), + projectPath, + includeDetails + ); } /** Извлечь все изменения из одного файла */ @@ -842,7 +892,7 @@ export class ChangeExtractorService { projectPath?: string, includeDetails = true ): Promise { - const snippets = await this.parseJSONLFile(filePath); + const { snippets } = await this.parseJSONLFile(filePath); return this.aggregateByFile(snippets, projectPath, includeDetails); } @@ -854,16 +904,9 @@ export class ChangeExtractorService { projectPath?: string, includeDetails = true ): Promise { - const allFiles: FileChangeSummary[] = []; - for (const ref of logRefs) { - const files = await this.extractAllChanges( - ref.filePath, - ref.memberName, - projectPath, - includeDetails - ); - allFiles.push(...files); - } + const allParsed = await this.parseJSONLFilesWithConcurrency(logRefs.map((ref) => ref.filePath)); + const allSnippets = this.sortSnippetsChronologically(allParsed.flatMap((r) => r.snippets)); + const allFiles = this.aggregateByFile(allSnippets, projectPath, includeDetails); const fallbackScope: TaskChangeScope = { taskId,