Merge branch 'worktree-perf-jsonl-parallel' into dev

# Conflicts:
#	src/main/services/team/ChangeExtractorService.ts
This commit is contained in:
iliya 2026-03-14 23:10:00 +02:00
commit 71045841e1

View file

@ -88,23 +88,15 @@ export class ChangeExtractorService {
const paths = await this.logsFinder.findMemberLogPaths(teamName, memberName);
const projectPath = await this.resolveProjectPath(teamName);
// Собираем все snippets из всех JSONL файлов
const allSnippets: SnippetDiff[] = [];
// Собираем все snippets из всех JSONL файлов параллельно
const parseResults = await this.parseJSONLFilesWithConcurrency(paths);
let latestMtime = 0;
for (const filePath of paths) {
try {
const fileStat = await stat(filePath);
if (fileStat.mtimeMs > latestMtime) {
latestMtime = fileStat.mtimeMs;
}
} catch {
// Файл может быть удалён между обнаружением и чтением
}
const snippets = await this.parseJSONLFile(filePath);
allSnippets.push(...snippets);
const merged: SnippetDiff[] = [];
for (const r of parseResults) {
merged.push(...r.snippets);
if (r.mtime > latestMtime) latestMtime = r.mtime;
}
const allSnippets = this.sortSnippetsChronologically(merged);
const files = this.aggregateByFile(allSnippets, projectPath);
@ -515,11 +507,11 @@ export class ChangeExtractorService {
return false;
};
const allParsed = await this.parseJSONLFilesWithConcurrency(logRefs.map((ref) => ref.filePath));
const allowedSnippets: SnippetDiff[] = [];
const toolUseIdsSet = new Set<string>();
for (const ref of logRefs) {
const snippets = await this.parseJSONLFile(ref.filePath);
for (const { snippets } of allParsed) {
for (const s of snippets) {
if (s.isError) continue;
if (!inAnyInterval(s.timestamp)) continue;
@ -528,7 +520,11 @@ export class ChangeExtractorService {
}
}
const files = this.aggregateByFile(allowedSnippets, projectPath, includeDetails);
const files = this.aggregateByFile(
this.sortSnippetsChronologically(allowedSnippets),
projectPath,
includeDetails
);
return {
files,
toolUseIds: [...toolUseIdsSet],
@ -557,19 +553,69 @@ export class ChangeExtractorService {
return (hash >>> 0).toString(36);
}
/** Deterministic sort: timestamp → filePath → toolUseId → originalIndex */
private sortSnippetsChronologically(snippets: SnippetDiff[]): SnippetDiff[] {
return snippets
.map((snippet, originalIndex) => ({ snippet, originalIndex }))
.sort((a, b) => {
const aMs = Date.parse(a.snippet.timestamp);
const bMs = Date.parse(b.snippet.timestamp);
const safeA = Number.isFinite(aMs) ? aMs : Number.MAX_SAFE_INTEGER;
const safeB = Number.isFinite(bMs) ? bMs : Number.MAX_SAFE_INTEGER;
if (safeA !== safeB) return safeA - safeB;
if (a.snippet.filePath !== b.snippet.filePath)
return a.snippet.filePath.localeCompare(b.snippet.filePath);
if (a.snippet.toolUseId !== b.snippet.toolUseId)
return a.snippet.toolUseId.localeCompare(b.snippet.toolUseId);
return a.originalIndex - b.originalIndex;
})
.map(({ snippet }) => snippet);
}
/** Parse multiple JSONL files with bounded concurrency (worker-pool) */
private static readonly JSONL_PARSE_CONCURRENCY = 6;
private async parseJSONLFilesWithConcurrency(
paths: string[]
): Promise<Array<{ snippets: SnippetDiff[]; mtime: number }>> {
if (paths.length === 0) return [];
const results = new Array<{ snippets: SnippetDiff[]; mtime: number }>(paths.length);
let nextIndex = 0;
const worker = async (): Promise<void> => {
while (true) {
const currentIndex = nextIndex++;
if (currentIndex >= paths.length) return;
results[currentIndex] = await this.parseJSONLFile(paths[currentIndex]);
}
};
await Promise.all(
Array.from(
{ length: Math.min(ChangeExtractorService.JSONL_PARSE_CONCURRENCY, paths.length) },
() => worker()
)
);
return results;
}
/** Парсить один JSONL файл и извлечь все snippets (двухпроходный подход) */
private async parseJSONLFile(filePath: string): Promise<SnippetDiff[]> {
private async parseJSONLFile(
filePath: string
): Promise<{ snippets: SnippetDiff[]; mtime: number }> {
let fileMtime = 0;
try {
const fileStat = await stat(filePath);
fileMtime = fileStat.mtimeMs;
const cached = this.parsedSnippetsCache.get(filePath);
if (cached?.mtime === fileMtime && cached.expiresAt > Date.now()) {
return cached.data;
return { snippets: cached.data, mtime: fileMtime };
}
} catch (err) {
logger.debug(`Не удалось stat файла ${filePath}: ${String(err)}`);
return [];
return { snippets: [], mtime: 0 };
}
// Сначала считываем все записи в память для двух проходов
@ -593,7 +639,7 @@ export class ChangeExtractorService {
stream.destroy();
} catch (err) {
logger.debug(`Не удалось прочитать файл ${filePath}: ${String(err)}`);
return [];
return { snippets: [], mtime: 0 };
}
// Проход 1: собираем tool_use_id с ошибками
@ -711,7 +757,7 @@ export class ChangeExtractorService {
expiresAt: Date.now() + this.parsedSnippetsCacheTtl,
});
return snippets;
return { snippets, mtime: fileMtime };
}
/** Извлечь content array из JSONL entry (оба формата: subagent и main) */
@ -790,6 +836,7 @@ export class ChangeExtractorService {
const existing = fileMap.get(normalizedFilePath);
if (existing) {
existing.snippets.push(snippet);
if (snippet.type === 'write-new') existing.isNewFile = true;
} else {
fileMap.set(normalizedFilePath, {
filePath: snippet.filePath,
@ -892,11 +939,10 @@ export class ChangeExtractorService {
projectPath?: string,
includeDetails = true
): Promise<FileChangeSummary[]> {
const allParsed = await this.parseJSONLFilesWithConcurrency(logRefs.map((ref) => ref.filePath));
const allSnippets: SnippetDiff[] = [];
for (const ref of logRefs) {
const snippets = await this.parseJSONLFile(ref.filePath);
for (const { snippets } of allParsed) {
if (allowedToolUseIds.size > 0) {
// Фильтруем только по разрешённым tool_use IDs
for (const s of snippets) {
if (allowedToolUseIds.has(s.toolUseId)) {
allSnippets.push(s);
@ -906,7 +952,11 @@ export class ChangeExtractorService {
allSnippets.push(...snippets);
}
}
return this.aggregateByFile(allSnippets, projectPath, includeDetails);
return this.aggregateByFile(
this.sortSnippetsChronologically(allSnippets),
projectPath,
includeDetails
);
}
/** Извлечь все изменения из одного файла */
@ -916,7 +966,7 @@ export class ChangeExtractorService {
projectPath?: string,
includeDetails = true
): Promise<FileChangeSummary[]> {
const snippets = await this.parseJSONLFile(filePath);
const { snippets } = await this.parseJSONLFile(filePath);
return this.aggregateByFile(snippets, projectPath, includeDetails);
}
@ -928,11 +978,8 @@ export class ChangeExtractorService {
projectPath?: string,
includeDetails = true
): Promise<TaskChangeSetV2> {
const allSnippets: SnippetDiff[] = [];
for (const ref of logRefs) {
const snippets = await this.parseJSONLFile(ref.filePath);
allSnippets.push(...snippets);
}
const allParsed = await this.parseJSONLFilesWithConcurrency(logRefs.map((ref) => ref.filePath));
const allSnippets = this.sortSnippetsChronologically(allParsed.flatMap((r) => r.snippets));
const allFiles = this.aggregateByFile(allSnippets, projectPath, includeDetails);
const fallbackScope: TaskChangeScope = {