perf(team): limit board task log parsing concurrency

This commit is contained in:
777genius 2026-04-27 11:17:26 +03:00
parent 068e473d2d
commit bf51dfd3c5
3 changed files with 135 additions and 7 deletions

View file

@ -14,6 +14,8 @@ import {
import { BoardTaskActivityParseCache } from './BoardTaskActivityParseCache';
const logger = createLogger('Service:BoardTaskActivityTranscriptReader');
const TASK_ACTIVITY_TRANSCRIPT_READ_CONCURRENCY = process.platform === 'win32' ? 4 : 8;
const TASK_ACTIVITY_TRANSCRIPT_READ_WARN_MS = 3_000;
export interface RawTaskActivityMessage {
filePath: string;
@ -28,6 +30,28 @@ export interface RawTaskActivityMessage {
sourceOrder: number;
}
async function mapLimit<T, R>(
items: readonly T[],
limit: number,
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results = new Array<R>(items.length);
let index = 0;
const workerCount = Math.max(1, Math.min(limit, items.length));
const workers = new Array(workerCount).fill(0).map(async () => {
while (true) {
const currentIndex = index;
index += 1;
if (currentIndex >= items.length) {
return;
}
results[currentIndex] = await fn(items[currentIndex]!);
}
});
await Promise.all(workers);
return results;
}
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === 'object' ? (value as Record<string, unknown>) : null;
}
@ -39,9 +63,21 @@ export class BoardTaskActivityTranscriptReader {
const uniqueFilePaths = [...new Set(filePaths)].sort();
this.cache.retainOnly(new Set(uniqueFilePaths));
const parsedFiles = await Promise.all(
uniqueFilePaths.map((filePath) => this.readFile(filePath))
const startedAt = Date.now();
const parsedFiles = await mapLimit(
uniqueFilePaths,
TASK_ACTIVITY_TRANSCRIPT_READ_CONCURRENCY,
(filePath) => this.readFile(filePath)
);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= TASK_ACTIVITY_TRANSCRIPT_READ_WARN_MS) {
logger.warn(
`Slow task-activity transcript read: files=${uniqueFilePaths.length} records=${parsedFiles.reduce(
(sum, rows) => sum + rows.length,
0
)} elapsedMs=${elapsedMs}`
);
}
return parsedFiles.flat();
}
@ -83,8 +119,10 @@ export class BoardTaskActivityTranscriptReader {
});
let sourceOrder = 0;
let lineCount = 0;
for await (const line of rl) {
if (!line.trim()) continue;
lineCount += 1;
try {
const parsed = JSON.parse(line) as unknown;
@ -116,7 +154,7 @@ export class BoardTaskActivityTranscriptReader {
logger.debug(`Skipping malformed task-activity line in ${filePath}: ${String(error)}`);
}
if (sourceOrder > 0 && sourceOrder % 250 === 0) {
if (lineCount % 500 === 0) {
await yieldToEventLoop();
}
}

View file

@ -10,6 +10,8 @@ import { BoardTaskExactLogsParseCache } from './BoardTaskExactLogsParseCache';
import type { ParsedMessage } from '@main/types';
const logger = createLogger('Service:BoardTaskExactLogStrictParser');
const EXACT_LOG_PARSE_CONCURRENCY = process.platform === 'win32' ? 4 : 8;
const EXACT_LOG_PARSE_WARN_MS = 3_000;
function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === 'object' ? (value as Record<string, unknown>) : null;
@ -22,6 +24,28 @@ function hasStrictTimestamp(record: Record<string, unknown>): boolean {
return Number.isFinite(Date.parse(record.timestamp));
}
async function mapLimit<T, R>(
items: readonly T[],
limit: number,
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results = new Array<R>(items.length);
let index = 0;
const workerCount = Math.max(1, Math.min(limit, items.length));
const workers = new Array(workerCount).fill(0).map(async () => {
while (true) {
const currentIndex = index;
index += 1;
if (currentIndex >= items.length) {
return;
}
results[currentIndex] = await fn(items[currentIndex]!);
}
});
await Promise.all(workers);
return results;
}
export class BoardTaskExactLogStrictParser {
constructor(
private readonly cache: BoardTaskExactLogsParseCache = new BoardTaskExactLogsParseCache()
@ -31,9 +55,21 @@ export class BoardTaskExactLogStrictParser {
const uniquePaths = [...new Set(filePaths)].sort();
this.cache.retainOnly(new Set(uniquePaths));
const results = await Promise.all(
uniquePaths.map(async (filePath) => [filePath, await this.parseFile(filePath)] as const)
const startedAt = Date.now();
const results = await mapLimit(
uniquePaths,
EXACT_LOG_PARSE_CONCURRENCY,
async (filePath) => [filePath, await this.parseFile(filePath)] as const
);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= EXACT_LOG_PARSE_WARN_MS) {
logger.warn(
`Slow exact-log parse: files=${uniquePaths.length} messages=${results.reduce(
(sum, [, messages]) => sum + messages.length,
0
)} elapsedMs=${elapsedMs}`
);
}
return new Map(results);
}

View file

@ -1,5 +1,6 @@
import { extractToolCalls, extractToolResults } from '@main/utils/toolExtraction';
import { isLeadMember as isLeadMemberCheck } from '@shared/utils/leadDetection';
import { createLogger } from '@shared/utils/logger';
import { getTaskDisplayId } from '@shared/utils/taskIdentity';
import { canonicalizeAgentTeamsToolName } from '../../agentTeamsToolNames';
@ -58,10 +59,13 @@ interface StreamLayout {
visibleSlices: StreamSlice[];
}
const logger = createLogger('Service:BoardTaskLogStreamService');
const INFERRED_WINDOW_GRACE_BEFORE_MS = 30_000;
const INFERRED_WINDOW_GRACE_AFTER_MS = 15_000;
const INFERRED_RECORD_RANGE_BEFORE_MS = 5 * 60_000;
const INFERRED_RECORD_RANGE_AFTER_MS = 60_000;
const STREAM_LAYOUT_CACHE_TTL_MS = 1_000;
const STREAM_LAYOUT_BUILD_WARN_MS = 3_000;
const HISTORICAL_BOARD_LIFECYCLE_TOOL_NAMES = new Set([
'task_complete',
'task_set_status',
@ -1417,6 +1421,16 @@ function countSegmentsFromSlices(visibleSlices: StreamSlice[]): number {
}
export class BoardTaskLogStreamService {
private readonly layoutCache = new Map<
string,
{
expiresAt: number;
layout: StreamLayout;
}
>();
private readonly layoutInFlight = new Map<string, Promise<StreamLayout>>();
constructor(
private readonly recordSource: BoardTaskActivityRecordSource = new BoardTaskActivityRecordSource(),
private readonly summarySelector: BoardTaskExactLogSummarySelector = new BoardTaskExactLogSummarySelector(),
@ -1428,6 +1442,46 @@ export class BoardTaskLogStreamService {
private readonly runtimeFallbackSource: OpenCodeTaskLogStreamSource = new OpenCodeTaskLogStreamSource()
) {}
private buildLayoutCacheKey(teamName: string, taskId: string): string {
return `${teamName}::${taskId}`;
}
private async getStreamLayout(teamName: string, taskId: string): Promise<StreamLayout> {
const cacheKey = this.buildLayoutCacheKey(teamName, taskId);
const cached = this.layoutCache.get(cacheKey);
if (cached && cached.expiresAt > Date.now()) {
return cached.layout;
}
const existingPromise = this.layoutInFlight.get(cacheKey);
if (existingPromise) {
return await existingPromise;
}
const startedAt = Date.now();
const promise = this.buildStreamLayout(teamName, taskId)
.then((layout) => {
this.layoutCache.set(cacheKey, {
expiresAt: Date.now() + STREAM_LAYOUT_CACHE_TTL_MS,
layout,
});
return layout;
})
.finally(() => {
this.layoutInFlight.delete(cacheKey);
});
this.layoutInFlight.set(cacheKey, promise);
const layout = await promise;
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= STREAM_LAYOUT_BUILD_WARN_MS) {
logger.warn(
`Slow task-log stream layout: team=${teamName} task=${taskId} participants=${layout.participants.length} slices=${layout.visibleSlices.length} elapsedMs=${elapsedMs}`
);
}
return layout;
}
private async buildInferredExecutionSlices(
teamName: string,
taskId: string,
@ -1854,7 +1908,7 @@ export class BoardTaskLogStreamService {
return emptySummary();
}
const layout = await this.buildStreamLayout(teamName, taskId);
const layout = await this.getStreamLayout(teamName, taskId);
if (layout.visibleSlices.length === 0) {
return emptySummary();
}
@ -1869,7 +1923,7 @@ export class BoardTaskLogStreamService {
return emptyResponse();
}
const layout = await this.buildStreamLayout(teamName, taskId);
const layout = await this.getStreamLayout(teamName, taskId);
if (layout.visibleSlices.length === 0) {
return (
(await this.runtimeFallbackSource.getTaskLogStream(teamName, taskId)) ?? emptyResponse()