From ea25e6ba58c228c9c8683afab5489280c852c99a Mon Sep 17 00:00:00 2001 From: 777genius Date: Mon, 27 Apr 2026 12:40:11 +0300 Subject: [PATCH] perf(team): cache board task activity records --- .../activity/BoardTaskActivityRecordSource.ts | 66 ++++++++- .../BoardTaskActivityTranscriptReader.ts | 6 + .../discovery/TeamTranscriptSourceLocator.ts | 75 ++++++++--- .../exact/BoardTaskExactLogStrictParser.ts | 5 +- src/main/utils/jsonl.ts | 4 + .../BoardTaskActivityRecordSource.test.ts | 69 +++++++++- .../BoardTaskActivityTranscriptReader.test.ts | 126 +++++++++++------- .../team/TeamTranscriptSourceLocator.test.ts | 71 ++++++++++ 8 files changed, 340 insertions(+), 82 deletions(-) diff --git a/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordSource.ts b/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordSource.ts index 7af3dc1b..a08675cd 100644 --- a/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordSource.ts +++ b/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordSource.ts @@ -5,16 +5,62 @@ import { BoardTaskActivityRecordBuilder } from './BoardTaskActivityRecordBuilder import { BoardTaskActivityTranscriptReader } from './BoardTaskActivityTranscriptReader'; import type { BoardTaskActivityRecord } from './BoardTaskActivityRecord'; +import type { TeamTask } from '@shared/types'; + +const TASK_ACTIVITY_INDEX_CACHE_TTL_MS = 1_000; + +interface TaskActivityIndex { + expiresAt: number; + tasksById: Map; + recordsByTaskId: Map; +} export class BoardTaskActivityRecordSource { + private readonly indexCache = new Map(); + private readonly indexInFlight = new Map>(); + constructor( private readonly transcriptSourceLocator: TeamTranscriptSourceLocator = new TeamTranscriptSourceLocator(), private readonly taskReader: TeamTaskReader = new TeamTaskReader(), private readonly transcriptReader: BoardTaskActivityTranscriptReader = new BoardTaskActivityTranscriptReader(), - private readonly recordBuilder: BoardTaskActivityRecordBuilder = new BoardTaskActivityRecordBuilder() + private readonly recordBuilder: Pick< + BoardTaskActivityRecordBuilder, + 'buildForTasks' + > = new BoardTaskActivityRecordBuilder() ) {} async getTaskRecords(teamName: string, taskId: string): Promise { + const index = await this.getTaskActivityIndex(teamName); + if (!index.tasksById.has(taskId)) { + return []; + } + return [...(index.recordsByTaskId.get(taskId) ?? [])]; + } + + private async getTaskActivityIndex(teamName: string): Promise { + const cached = this.indexCache.get(teamName); + if (cached && cached.expiresAt > Date.now()) { + return cached; + } + + const existingPromise = this.indexInFlight.get(teamName); + if (existingPromise) { + return await existingPromise; + } + + const promise = this.buildTaskActivityIndex(teamName) + .then((index) => { + this.indexCache.set(teamName, index); + return index; + }) + .finally(() => { + this.indexInFlight.delete(teamName); + }); + this.indexInFlight.set(teamName, promise); + return await promise; + } + + private async buildTaskActivityIndex(teamName: string): Promise { const [activeTasks, deletedTasks, transcriptFiles] = await Promise.all([ this.taskReader.getTasks(teamName), this.taskReader.getDeletedTasks(teamName), @@ -22,17 +68,25 @@ export class BoardTaskActivityRecordSource { ]); const tasks = [...activeTasks, ...deletedTasks]; - const targetTask = tasks.find((task) => task.id === taskId); - if (!targetTask || transcriptFiles.length === 0) { - return []; + const tasksById = new Map(tasks.map((task) => [task.id, task] as const)); + if (tasks.length === 0 || transcriptFiles.length === 0) { + return { + expiresAt: Date.now() + TASK_ACTIVITY_INDEX_CACHE_TTL_MS, + tasksById, + recordsByTaskId: new Map(), + }; } const messages = await this.transcriptReader.readFiles(transcriptFiles); - return this.recordBuilder.buildForTask({ + const recordsByTaskId = this.recordBuilder.buildForTasks({ teamName, - targetTask, tasks, messages, }); + return { + expiresAt: Date.now() + TASK_ACTIVITY_INDEX_CACHE_TTL_MS, + tasksById, + recordsByTaskId, + }; } } diff --git a/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts b/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts index b9a3e9ae..64e92093 100644 --- a/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts +++ b/src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader.ts @@ -123,6 +123,12 @@ export class BoardTaskActivityTranscriptReader { for await (const line of rl) { if (!line.trim()) continue; lineCount += 1; + if (!line.includes('"boardTaskLinks"')) { + if (lineCount % 500 === 0) { + await yieldToEventLoop(); + } + continue; + } try { const parsed = JSON.parse(line) as unknown; diff --git a/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts b/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts index 83620a4d..4cdddef0 100644 --- a/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts +++ b/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts @@ -10,6 +10,7 @@ import type { TeamConfig } from '@shared/types'; const logger = createLogger('Service:TeamTranscriptSourceLocator'); const TRANSCRIPT_DISCOVERY_WARN_MS = 3_000; const TRANSCRIPT_DISCOVERY_FILE_COUNT_WARN = 500; +const TRANSCRIPT_DISCOVERY_SESSION_CONCURRENCY = process.platform === 'win32' ? 4 : 8; export interface TeamTranscriptSourceContext { projectDir: string; @@ -19,6 +20,28 @@ export interface TeamTranscriptSourceContext { transcriptFiles: string[]; } +async function mapLimit( + items: readonly T[], + limit: number, + fn: (item: T) => Promise +): Promise { + const results = new Array(items.length); + let index = 0; + const workerCount = Math.max(1, Math.min(limit, items.length)); + const workers = new Array(workerCount).fill(0).map(async () => { + while (true) { + const currentIndex = index; + index += 1; + if (currentIndex >= items.length) { + return; + } + results[currentIndex] = await fn(items[currentIndex]!); + } + }); + await Promise.all(workers); + return results; +} + export class TeamTranscriptSourceLocator { constructor( private readonly projectResolver: TeamTranscriptProjectResolver = new TeamTranscriptProjectResolver() @@ -55,29 +78,41 @@ export class TeamTranscriptSourceLocator { ): Promise { const transcriptFiles = new Set(); - for (const sessionId of sessionIds) { - const mainTranscript = path.join(projectDir, `${sessionId}.jsonl`); - try { - const stat = await fs.stat(mainTranscript); - if (stat.isFile()) { - transcriptFiles.add(mainTranscript); + const filesBySession = await mapLimit( + sessionIds, + TRANSCRIPT_DISCOVERY_SESSION_CONCURRENCY, + async (sessionId) => { + const sessionFiles: string[] = []; + const mainTranscript = path.join(projectDir, `${sessionId}.jsonl`); + try { + const stat = await fs.stat(mainTranscript); + if (stat.isFile()) { + sessionFiles.push(mainTranscript); + } + } catch { + // ignore missing root transcript } - } catch { - // ignore missing root transcript - } - const subagentsDir = path.join(projectDir, sessionId, 'subagents'); - try { - const dirEntries = await fs.readdir(subagentsDir, { withFileTypes: true }); - for (const entry of dirEntries) { - if (!entry.isFile()) continue; - if (!entry.name.endsWith('.jsonl')) continue; - if (!entry.name.startsWith('agent-')) continue; - if (entry.name.startsWith('agent-acompact')) continue; - transcriptFiles.add(path.join(subagentsDir, entry.name)); + const subagentsDir = path.join(projectDir, sessionId, 'subagents'); + try { + const dirEntries = await fs.readdir(subagentsDir, { withFileTypes: true }); + for (const entry of dirEntries) { + if (!entry.isFile()) continue; + if (!entry.name.endsWith('.jsonl')) continue; + if (!entry.name.startsWith('agent-')) continue; + if (entry.name.startsWith('agent-acompact')) continue; + sessionFiles.push(path.join(subagentsDir, entry.name)); + } + } catch { + // ignore missing subagent dir } - } catch { - // ignore missing subagent dir + return sessionFiles; + } + ); + + for (const sessionFiles of filesBySession) { + for (const filePath of sessionFiles) { + transcriptFiles.add(filePath); } } diff --git a/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts b/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts index 5aee017b..fd822753 100644 --- a/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts +++ b/src/main/services/team/taskLogs/exact/BoardTaskExactLogStrictParser.ts @@ -1,5 +1,5 @@ import { yieldToEventLoop } from '@main/utils/asyncYield'; -import { parseJsonlLine } from '@main/utils/jsonl'; +import { parseJsonlEntry } from '@main/utils/jsonl'; import { createLogger } from '@shared/utils/logger'; import { createReadStream } from 'fs'; import * as fs from 'fs/promises'; @@ -8,6 +8,7 @@ import * as readline from 'readline'; import { BoardTaskExactLogsParseCache } from './BoardTaskExactLogsParseCache'; import type { ParsedMessage } from '@main/types'; +import type { ChatHistoryEntry } from '@main/types'; const logger = createLogger('Service:BoardTaskExactLogStrictParser'); const EXACT_LOG_PARSE_CONCURRENCY = process.platform === 'win32' ? 4 : 8; @@ -123,7 +124,7 @@ export class BoardTaskExactLogStrictParser { continue; } - const parsed = parseJsonlLine(line); + const parsed = parseJsonlEntry(record as unknown as ChatHistoryEntry); if (parsed) { results.push(parsed); } diff --git a/src/main/utils/jsonl.ts b/src/main/utils/jsonl.ts index 72320061..976f63c6 100644 --- a/src/main/utils/jsonl.ts +++ b/src/main/utils/jsonl.ts @@ -203,6 +203,10 @@ export function parseJsonlLine(line: string): ParsedMessage | null { } const entry = JSON.parse(normalized) as ChatHistoryEntry; + return parseJsonlEntry(entry); +} + +export function parseJsonlEntry(entry: ChatHistoryEntry): ParsedMessage | null { return parseChatHistoryEntry(entry); } diff --git a/test/main/services/team/BoardTaskActivityRecordSource.test.ts b/test/main/services/team/BoardTaskActivityRecordSource.test.ts index af672611..38a3e34c 100644 --- a/test/main/services/team/BoardTaskActivityRecordSource.test.ts +++ b/test/main/services/team/BoardTaskActivityRecordSource.test.ts @@ -31,7 +31,7 @@ describe('BoardTaskActivityRecordSource', () => { readFiles: vi.fn(async () => rawMessages), }; const recordBuilder = { - buildForTask: vi.fn(() => builtRecords), + buildForTasks: vi.fn(() => new Map([['task-a', builtRecords]])), }; const source = new BoardTaskActivityRecordSource( @@ -43,12 +43,12 @@ describe('BoardTaskActivityRecordSource', () => { const result = await source.getTaskRecords('demo', 'task-a'); - expect(result).toBe(builtRecords); + expect(result).toEqual(builtRecords); + expect(result).not.toBe(builtRecords); expect(locator.listTranscriptFiles).toHaveBeenCalledWith('demo'); expect(transcriptReader.readFiles).toHaveBeenCalledWith(transcriptFiles); - expect(recordBuilder.buildForTask).toHaveBeenCalledWith({ + expect(recordBuilder.buildForTasks).toHaveBeenCalledWith({ teamName: 'demo', - targetTask, tasks: [targetTask, deletedTask], messages: rawMessages, }); @@ -66,7 +66,7 @@ describe('BoardTaskActivityRecordSource', () => { readFiles: vi.fn(async () => [{ uuid: 'm1' }]), }; const recordBuilder = { - buildForTask: vi.fn(() => [{ id: 'r1' }]), + buildForTasks: vi.fn(() => new Map([['task-known', [{ id: 'r1' }]]])), }; const source = new BoardTaskActivityRecordSource( @@ -77,6 +77,63 @@ describe('BoardTaskActivityRecordSource', () => { ); await expect(source.getTaskRecords('demo', 'task-missing')).resolves.toEqual([]); - expect(recordBuilder.buildForTask).not.toHaveBeenCalled(); + expect(recordBuilder.buildForTasks).not.toHaveBeenCalled(); + }); + + it('shares one in-flight team index across concurrent task lookups', async () => { + const taskA = { + id: 'task-a', + displayId: 'aaaa1111', + subject: 'A', + status: 'pending', + }; + const taskB = { + id: 'task-b', + displayId: 'bbbb2222', + subject: 'B', + status: 'pending', + }; + const transcriptFiles = ['/tmp/a.jsonl']; + const rawMessages = [{ uuid: 'm1' }]; + const recordsA = [{ id: 'record-a' }]; + const recordsB = [{ id: 'record-b' }]; + + let resolveReadFiles: (messages: typeof rawMessages) => void = () => undefined; + const readFilesPromise = new Promise((resolve) => { + resolveReadFiles = resolve; + }); + const locator = { + listTranscriptFiles: vi.fn(async () => transcriptFiles), + }; + const taskReader = { + getTasks: vi.fn(async () => [taskA, taskB]), + getDeletedTasks: vi.fn(async () => []), + }; + const transcriptReader = { + readFiles: vi.fn(() => readFilesPromise), + }; + const recordBuilder = { + buildForTasks: vi.fn(() => new Map([ + ['task-a', recordsA], + ['task-b', recordsB], + ])), + }; + + const source = new BoardTaskActivityRecordSource( + locator as never, + taskReader as never, + transcriptReader as never, + recordBuilder as never, + ); + + const taskAResult = source.getTaskRecords('demo', 'task-a'); + const taskBResult = source.getTaskRecords('demo', 'task-b'); + resolveReadFiles(rawMessages); + + await expect(taskAResult).resolves.toEqual(recordsA); + await expect(taskBResult).resolves.toEqual(recordsB); + expect(locator.listTranscriptFiles).toHaveBeenCalledTimes(1); + expect(transcriptReader.readFiles).toHaveBeenCalledTimes(1); + expect(recordBuilder.buildForTasks).toHaveBeenCalledTimes(1); }); }); diff --git a/test/main/services/team/BoardTaskActivityTranscriptReader.test.ts b/test/main/services/team/BoardTaskActivityTranscriptReader.test.ts index 0932abcc..0ae91ecd 100644 --- a/test/main/services/team/BoardTaskActivityTranscriptReader.test.ts +++ b/test/main/services/team/BoardTaskActivityTranscriptReader.test.ts @@ -1,67 +1,97 @@ +import { afterEach, describe, expect, it } from 'vitest'; import * as fs from 'fs/promises'; import * as os from 'os'; import * as path from 'path'; -import { afterEach, describe, expect, it } from 'vitest'; - import { BoardTaskActivityTranscriptReader } from '../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader'; -const tempPaths: string[] = []; - -async function createTempTranscript(lines: unknown[]): Promise { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'board-task-activity-')); - const filePath = path.join(dir, 'transcript.jsonl'); - tempPaths.push(dir); - await fs.writeFile( - filePath, - lines.map(line => JSON.stringify(line)).join('\n'), - 'utf8', - ); - return filePath; -} +const tempDirs: string[] = []; afterEach(async () => { await Promise.all( - tempPaths.splice(0).map(dir => fs.rm(dir, { recursive: true, force: true })), + tempDirs.splice(0, tempDirs.length).map(async (dirPath) => { + await fs.rm(dirPath, { recursive: true, force: true }); + }) ); }); describe('BoardTaskActivityTranscriptReader', () => { - it('skips transcript rows without a stable timestamp', async () => { - const filePath = await createTempTranscript([ - { - uuid: 'missing-timestamp', - sessionId: 'session-1', - boardTaskLinks: [ - { - schemaVersion: 1, - task: { ref: 'abcd1234', refKind: 'display' }, - targetRole: 'subject', - linkKind: 'execution', - actorContext: { relation: 'same_task' }, - }, - ], - }, - { - uuid: 'valid-row', - timestamp: '2026-04-12T10:00:00.000Z', - sessionId: 'session-1', - boardTaskLinks: [ - { - schemaVersion: 1, - task: { ref: 'abcd1234', refKind: 'display' }, - targetRole: 'subject', - linkKind: 'execution', - actorContext: { relation: 'same_task' }, - }, - ], - }, - ]); + it('skips non-board and malformed rows while preserving task-linked activity rows', async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'activity-transcript-reader-')); + tempDirs.push(tempDir); + + const filePath = path.join(tempDir, 'session.jsonl'); + await fs.writeFile( + filePath, + [ + '{not-json', + JSON.stringify({ + uuid: 'ordinary-message', + sessionId: 'session-a', + timestamp: '2026-04-20T12:00:00.000Z', + message: { role: 'assistant', content: 'No board task links here' }, + }), + '{"boardTaskLinks":', + JSON.stringify({ + uuid: 'linked-message', + sessionId: 'session-a', + timestamp: '2026-04-20T12:01:00.000Z', + agentId: 'agent-a', + agentName: 'alice', + isSidechain: true, + boardTaskLinks: [ + { + schemaVersion: 1, + task: { ref: '12345678', refKind: 'display', canonicalId: 'task-a' }, + targetRole: 'subject', + linkKind: 'execution', + actorContext: { relation: 'same_task' }, + toolUseId: 'toolu_1', + }, + ], + boardTaskToolActions: [ + { + schemaVersion: 1, + toolUseId: 'toolu_1', + canonicalToolName: 'task_set_status', + input: { status: 'in_progress' }, + }, + ], + }), + ].join('\n'), + 'utf8' + ); const rows = await new BoardTaskActivityTranscriptReader().readFiles([filePath]); expect(rows).toHaveLength(1); - expect(rows[0]?.uuid).toBe('valid-row'); - expect(rows[0]?.timestamp).toBe('2026-04-12T10:00:00.000Z'); + expect(rows[0]).toMatchObject({ + filePath, + uuid: 'linked-message', + sessionId: 'session-a', + timestamp: '2026-04-20T12:01:00.000Z', + agentId: 'agent-a', + agentName: 'alice', + isSidechain: true, + sourceOrder: 1, + boardTaskLinks: [ + { + schemaVersion: 1, + toolUseId: 'toolu_1', + task: { ref: '12345678', refKind: 'display', canonicalId: 'task-a' }, + targetRole: 'subject', + linkKind: 'execution', + actorContext: { relation: 'same_task' }, + }, + ], + boardTaskToolActions: [ + { + schemaVersion: 1, + toolUseId: 'toolu_1', + canonicalToolName: 'task_set_status', + input: { status: 'in_progress' }, + }, + ], + }); }); }); diff --git a/test/main/services/team/TeamTranscriptSourceLocator.test.ts b/test/main/services/team/TeamTranscriptSourceLocator.test.ts index 254ecb8f..7030c3f7 100644 --- a/test/main/services/team/TeamTranscriptSourceLocator.test.ts +++ b/test/main/services/team/TeamTranscriptSourceLocator.test.ts @@ -111,4 +111,75 @@ describe('TeamTranscriptSourceLocator', () => { ); expect(transcriptFiles).not.toContain(path.join(projectRoot, 'unrelated-session.jsonl')); }); + + it('returns the same sorted transcript set across multiple session directories', async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-')); + setClaudeBasePathOverride(tmpDir); + + const teamName = 'bounded-discovery-test'; + const projectPath = '/Users/test/bounded-discovery'; + const projectId = '-Users-test-bounded-discovery'; + const sessionIds = Array.from({ length: 12 }, (_, index) => `member-${index + 1}`); + + await fs.mkdir(path.join(tmpDir, 'teams', teamName), { recursive: true }); + await fs.writeFile( + path.join(tmpDir, 'teams', teamName, 'config.json'), + JSON.stringify( + { + name: teamName, + projectPath, + members: sessionIds.map((sessionId, index) => ({ + name: `member-${index + 1}`, + agentType: 'general-purpose', + sessionId, + cwd: projectPath, + })), + }, + null, + 2 + ), + 'utf8' + ); + + const projectRoot = path.join(tmpDir, 'projects', projectId); + const expectedFiles: string[] = []; + + for (const sessionId of sessionIds) { + const rootTranscript = path.join(projectRoot, `${sessionId}.jsonl`); + const subagentsDir = path.join(projectRoot, sessionId, 'subagents'); + const subagentTranscript = path.join(subagentsDir, 'agent-worker.jsonl'); + + await fs.mkdir(subagentsDir, { recursive: true }); + await fs.writeFile( + rootTranscript, + JSON.stringify({ + timestamp: '2026-04-15T14:02:00.000Z', + type: 'user', + teamName, + message: { role: 'user', content: `Bootstrap ${sessionId} for ${teamName}` }, + }) + '\n', + 'utf8' + ); + await fs.writeFile( + subagentTranscript, + JSON.stringify({ + timestamp: '2026-04-15T14:02:01.000Z', + type: 'user', + message: { role: 'user', content: `Subagent for ${sessionId}` }, + }) + '\n', + 'utf8' + ); + await fs.writeFile( + path.join(subagentsDir, 'agent-acompact-ignore.jsonl'), + '{}\n', + 'utf8' + ); + + expectedFiles.push(rootTranscript, subagentTranscript); + } + + const transcriptFiles = await new TeamTranscriptSourceLocator().listTranscriptFiles(teamName); + + expect(transcriptFiles).toEqual([...expectedFiles].sort((a, b) => a.localeCompare(b))); + }); });