From bf51dfd3c5908bd463d23d84947f04b92b01e9ea Mon Sep 17 00:00:00 2001 From: 777genius Date: Mon, 27 Apr 2026 11:17:26 +0300 Subject: [PATCH] perf(team): limit board task log parsing concurrency --- .../BoardTaskActivityTranscriptReader.ts | 44 +++++++++++++- .../exact/BoardTaskExactLogStrictParser.ts | 40 ++++++++++++- .../stream/BoardTaskLogStreamService.ts | 58 ++++++++++++++++++- 3 files changed, 135 insertions(+), 7 deletions(-) diff --git a/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts b/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts index ffeeef7d..b9a3e9ae 100644 --- a/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts +++ b/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts @@ -14,6 +14,8 @@ import { import { BoardTaskActivityParseCache } from './BoardTaskActivityParseCache'; const logger = createLogger('Service:BoardTaskActivityTranscriptReader'); +const TASK_ACTIVITY_TRANSCRIPT_READ_CONCURRENCY = process.platform === 'win32' ? 4 : 8; +const TASK_ACTIVITY_TRANSCRIPT_READ_WARN_MS = 3_000; export interface RawTaskActivityMessage { filePath: string; @@ -28,6 +30,28 @@ export interface RawTaskActivityMessage { sourceOrder: number; } +async function mapLimit( + items: readonly T[], + limit: number, + fn: (item: T) => Promise +): Promise { + const results = new Array(items.length); + let index = 0; + const workerCount = Math.max(1, Math.min(limit, items.length)); + const workers = new Array(workerCount).fill(0).map(async () => { + while (true) { + const currentIndex = index; + index += 1; + if (currentIndex >= items.length) { + return; + } + results[currentIndex] = await fn(items[currentIndex]!); + } + }); + await Promise.all(workers); + return results; +} + function asRecord(value: unknown): Record | null { return value && typeof value === 'object' ? (value as Record) : null; } @@ -39,9 +63,21 @@ export class BoardTaskActivityTranscriptReader { const uniqueFilePaths = [...new Set(filePaths)].sort(); this.cache.retainOnly(new Set(uniqueFilePaths)); - const parsedFiles = await Promise.all( - uniqueFilePaths.map((filePath) => this.readFile(filePath)) + const startedAt = Date.now(); + const parsedFiles = await mapLimit( + uniqueFilePaths, + TASK_ACTIVITY_TRANSCRIPT_READ_CONCURRENCY, + (filePath) => this.readFile(filePath) ); + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= TASK_ACTIVITY_TRANSCRIPT_READ_WARN_MS) { + logger.warn( + `Slow task-activity transcript read: files=${uniqueFilePaths.length} records=${parsedFiles.reduce( + (sum, rows) => sum + rows.length, + 0 + )} elapsedMs=${elapsedMs}` + ); + } return parsedFiles.flat(); } @@ -83,8 +119,10 @@ export class BoardTaskActivityTranscriptReader { }); let sourceOrder = 0; + let lineCount = 0; for await (const line of rl) { if (!line.trim()) continue; + lineCount += 1; try { const parsed = JSON.parse(line) as unknown; @@ -116,7 +154,7 @@ export class BoardTaskActivityTranscriptReader { logger.debug(`Skipping malformed task-activity line in ${filePath}: ${String(error)}`); } - if (sourceOrder > 0 && sourceOrder % 250 === 0) { + if (lineCount % 500 === 0) { await yieldToEventLoop(); } } diff --git a/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts b/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts index d52274bf..5aee017b 100644 --- a/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts +++ b/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts @@ -10,6 +10,8 @@ import { BoardTaskExactLogsParseCache } from './BoardTaskExactLogsParseCache'; import type { ParsedMessage } from '@main/types'; const logger = createLogger('Service:BoardTaskExactLogStrictParser'); +const EXACT_LOG_PARSE_CONCURRENCY = process.platform === 'win32' ? 4 : 8; +const EXACT_LOG_PARSE_WARN_MS = 3_000; function asRecord(value: unknown): Record | null { return value && typeof value === 'object' ? (value as Record) : null; @@ -22,6 +24,28 @@ function hasStrictTimestamp(record: Record): boolean { return Number.isFinite(Date.parse(record.timestamp)); } +async function mapLimit( + items: readonly T[], + limit: number, + fn: (item: T) => Promise +): Promise { + const results = new Array(items.length); + let index = 0; + const workerCount = Math.max(1, Math.min(limit, items.length)); + const workers = new Array(workerCount).fill(0).map(async () => { + while (true) { + const currentIndex = index; + index += 1; + if (currentIndex >= items.length) { + return; + } + results[currentIndex] = await fn(items[currentIndex]!); + } + }); + await Promise.all(workers); + return results; +} + export class BoardTaskExactLogStrictParser { constructor( private readonly cache: BoardTaskExactLogsParseCache = new BoardTaskExactLogsParseCache() @@ -31,9 +55,21 @@ export class BoardTaskExactLogStrictParser { const uniquePaths = [...new Set(filePaths)].sort(); this.cache.retainOnly(new Set(uniquePaths)); - const results = await Promise.all( - uniquePaths.map(async (filePath) => [filePath, await this.parseFile(filePath)] as const) + const startedAt = Date.now(); + const results = await mapLimit( + uniquePaths, + EXACT_LOG_PARSE_CONCURRENCY, + async (filePath) => [filePath, await this.parseFile(filePath)] as const ); + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= EXACT_LOG_PARSE_WARN_MS) { + logger.warn( + `Slow exact-log parse: files=${uniquePaths.length} messages=${results.reduce( + (sum, [, messages]) => sum + messages.length, + 0 + )} elapsedMs=${elapsedMs}` + ); + } return new Map(results); } diff --git a/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts b/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts index c855bd93..49215e41 100644 --- a/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts +++ b/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts @@ -1,5 +1,6 @@ import { extractToolCalls, extractToolResults } from '@main/utils/toolExtraction'; import { isLeadMember as isLeadMemberCheck } from '@shared/utils/leadDetection'; +import { createLogger } from '@shared/utils/logger'; import { getTaskDisplayId } from '@shared/utils/taskIdentity'; import { canonicalizeAgentTeamsToolName } from '../../agentTeamsToolNames'; @@ -58,10 +59,13 @@ interface StreamLayout { visibleSlices: StreamSlice[]; } +const logger = createLogger('Service:BoardTaskLogStreamService'); const INFERRED_WINDOW_GRACE_BEFORE_MS = 30_000; const INFERRED_WINDOW_GRACE_AFTER_MS = 15_000; const INFERRED_RECORD_RANGE_BEFORE_MS = 5 * 60_000; const INFERRED_RECORD_RANGE_AFTER_MS = 60_000; +const STREAM_LAYOUT_CACHE_TTL_MS = 1_000; +const STREAM_LAYOUT_BUILD_WARN_MS = 3_000; const HISTORICAL_BOARD_LIFECYCLE_TOOL_NAMES = new Set([ 'task_complete', 'task_set_status', @@ -1417,6 +1421,16 @@ function countSegmentsFromSlices(visibleSlices: StreamSlice[]): number { } export class BoardTaskLogStreamService { + private readonly layoutCache = new Map< + string, + { + expiresAt: number; + layout: StreamLayout; + } + >(); + + private readonly layoutInFlight = new Map>(); + constructor( private readonly recordSource: BoardTaskActivityRecordSource = new BoardTaskActivityRecordSource(), private readonly summarySelector: BoardTaskExactLogSummarySelector = new BoardTaskExactLogSummarySelector(), @@ -1428,6 +1442,46 @@ export class BoardTaskLogStreamService { private readonly runtimeFallbackSource: OpenCodeTaskLogStreamSource = new OpenCodeTaskLogStreamSource() ) {} + private buildLayoutCacheKey(teamName: string, taskId: string): string { + return `${teamName}::${taskId}`; + } + + private async getStreamLayout(teamName: string, taskId: string): Promise { + const cacheKey = this.buildLayoutCacheKey(teamName, taskId); + const cached = this.layoutCache.get(cacheKey); + if (cached && cached.expiresAt > Date.now()) { + return cached.layout; + } + + const existingPromise = this.layoutInFlight.get(cacheKey); + if (existingPromise) { + return await existingPromise; + } + + const startedAt = Date.now(); + const promise = this.buildStreamLayout(teamName, taskId) + .then((layout) => { + this.layoutCache.set(cacheKey, { + expiresAt: Date.now() + STREAM_LAYOUT_CACHE_TTL_MS, + layout, + }); + return layout; + }) + .finally(() => { + this.layoutInFlight.delete(cacheKey); + }); + + this.layoutInFlight.set(cacheKey, promise); + const layout = await promise; + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= STREAM_LAYOUT_BUILD_WARN_MS) { + logger.warn( + `Slow task-log stream layout: team=${teamName} task=${taskId} participants=${layout.participants.length} slices=${layout.visibleSlices.length} elapsedMs=${elapsedMs}` + ); + } + return layout; + } + private async buildInferredExecutionSlices( teamName: string, taskId: string, @@ -1854,7 +1908,7 @@ export class BoardTaskLogStreamService { return emptySummary(); } - const layout = await this.buildStreamLayout(teamName, taskId); + const layout = await this.getStreamLayout(teamName, taskId); if (layout.visibleSlices.length === 0) { return emptySummary(); } @@ -1869,7 +1923,7 @@ export class BoardTaskLogStreamService { return emptyResponse(); } - const layout = await this.buildStreamLayout(teamName, taskId); + const layout = await this.getStreamLayout(teamName, taskId); if (layout.visibleSlices.length === 0) { return ( (await this.runtimeFallbackSource.getTaskLogStream(teamName, taskId)) ?? emptyResponse()