perf(team): cache board task activity records

This commit is contained in:
777genius 2026-04-27 12:40:11 +03:00
parent 6707cc3b60
commit ea25e6ba58
8 changed files with 340 additions and 82 deletions

View file

@ -5,16 +5,62 @@ import { BoardTaskActivityRecordBuilder } from './BoardTaskActivityRecordBuilder
import { BoardTaskActivityTranscriptReader } from './BoardTaskActivityTranscriptReader';
import type { BoardTaskActivityRecord } from './BoardTaskActivityRecord';
import type { TeamTask } from '@shared/types';
const TASK_ACTIVITY_INDEX_CACHE_TTL_MS = 1_000;
interface TaskActivityIndex {
expiresAt: number;
tasksById: Map<string, TeamTask>;
recordsByTaskId: Map<string, BoardTaskActivityRecord[]>;
}
export class BoardTaskActivityRecordSource {
private readonly indexCache = new Map<string, TaskActivityIndex>();
private readonly indexInFlight = new Map<string, Promise<TaskActivityIndex>>();
constructor(
private readonly transcriptSourceLocator: TeamTranscriptSourceLocator = new TeamTranscriptSourceLocator(),
private readonly taskReader: TeamTaskReader = new TeamTaskReader(),
private readonly transcriptReader: BoardTaskActivityTranscriptReader = new BoardTaskActivityTranscriptReader(),
private readonly recordBuilder: BoardTaskActivityRecordBuilder = new BoardTaskActivityRecordBuilder()
private readonly recordBuilder: Pick<
BoardTaskActivityRecordBuilder,
'buildForTasks'
> = new BoardTaskActivityRecordBuilder()
) {}
async getTaskRecords(teamName: string, taskId: string): Promise<BoardTaskActivityRecord[]> {
const index = await this.getTaskActivityIndex(teamName);
if (!index.tasksById.has(taskId)) {
return [];
}
return [...(index.recordsByTaskId.get(taskId) ?? [])];
}
private async getTaskActivityIndex(teamName: string): Promise<TaskActivityIndex> {
const cached = this.indexCache.get(teamName);
if (cached && cached.expiresAt > Date.now()) {
return cached;
}
const existingPromise = this.indexInFlight.get(teamName);
if (existingPromise) {
return await existingPromise;
}
const promise = this.buildTaskActivityIndex(teamName)
.then((index) => {
this.indexCache.set(teamName, index);
return index;
})
.finally(() => {
this.indexInFlight.delete(teamName);
});
this.indexInFlight.set(teamName, promise);
return await promise;
}
private async buildTaskActivityIndex(teamName: string): Promise<TaskActivityIndex> {
const [activeTasks, deletedTasks, transcriptFiles] = await Promise.all([
this.taskReader.getTasks(teamName),
this.taskReader.getDeletedTasks(teamName),
@ -22,17 +68,25 @@ export class BoardTaskActivityRecordSource {
]);
const tasks = [...activeTasks, ...deletedTasks];
const targetTask = tasks.find((task) => task.id === taskId);
if (!targetTask || transcriptFiles.length === 0) {
return [];
const tasksById = new Map(tasks.map((task) => [task.id, task] as const));
if (tasks.length === 0 || transcriptFiles.length === 0) {
return {
expiresAt: Date.now() + TASK_ACTIVITY_INDEX_CACHE_TTL_MS,
tasksById,
recordsByTaskId: new Map(),
};
}
const messages = await this.transcriptReader.readFiles(transcriptFiles);
return this.recordBuilder.buildForTask({
const recordsByTaskId = this.recordBuilder.buildForTasks({
teamName,
targetTask,
tasks,
messages,
});
return {
expiresAt: Date.now() + TASK_ACTIVITY_INDEX_CACHE_TTL_MS,
tasksById,
recordsByTaskId,
};
}
}

View file

@ -123,6 +123,12 @@ export class BoardTaskActivityTranscriptReader {
for await (const line of rl) {
if (!line.trim()) continue;
lineCount += 1;
if (!line.includes('"boardTaskLinks"')) {
if (lineCount % 500 === 0) {
await yieldToEventLoop();
}
continue;
}
try {
const parsed = JSON.parse(line) as unknown;

View file

@ -10,6 +10,7 @@ import type { TeamConfig } from '@shared/types';
const logger = createLogger('Service:TeamTranscriptSourceLocator');
const TRANSCRIPT_DISCOVERY_WARN_MS = 3_000;
const TRANSCRIPT_DISCOVERY_FILE_COUNT_WARN = 500;
const TRANSCRIPT_DISCOVERY_SESSION_CONCURRENCY = process.platform === 'win32' ? 4 : 8;
export interface TeamTranscriptSourceContext {
projectDir: string;
@ -19,6 +20,28 @@ export interface TeamTranscriptSourceContext {
transcriptFiles: string[];
}
async function mapLimit<T, R>(
items: readonly T[],
limit: number,
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results = new Array<R>(items.length);
let index = 0;
const workerCount = Math.max(1, Math.min(limit, items.length));
const workers = new Array(workerCount).fill(0).map(async () => {
while (true) {
const currentIndex = index;
index += 1;
if (currentIndex >= items.length) {
return;
}
results[currentIndex] = await fn(items[currentIndex]!);
}
});
await Promise.all(workers);
return results;
}
export class TeamTranscriptSourceLocator {
constructor(
private readonly projectResolver: TeamTranscriptProjectResolver = new TeamTranscriptProjectResolver()
@ -55,29 +78,41 @@ export class TeamTranscriptSourceLocator {
): Promise<string[]> {
const transcriptFiles = new Set<string>();
for (const sessionId of sessionIds) {
const mainTranscript = path.join(projectDir, `${sessionId}.jsonl`);
try {
const stat = await fs.stat(mainTranscript);
if (stat.isFile()) {
transcriptFiles.add(mainTranscript);
const filesBySession = await mapLimit(
sessionIds,
TRANSCRIPT_DISCOVERY_SESSION_CONCURRENCY,
async (sessionId) => {
const sessionFiles: string[] = [];
const mainTranscript = path.join(projectDir, `${sessionId}.jsonl`);
try {
const stat = await fs.stat(mainTranscript);
if (stat.isFile()) {
sessionFiles.push(mainTranscript);
}
} catch {
// ignore missing root transcript
}
} catch {
// ignore missing root transcript
}
const subagentsDir = path.join(projectDir, sessionId, 'subagents');
try {
const dirEntries = await fs.readdir(subagentsDir, { withFileTypes: true });
for (const entry of dirEntries) {
if (!entry.isFile()) continue;
if (!entry.name.endsWith('.jsonl')) continue;
if (!entry.name.startsWith('agent-')) continue;
if (entry.name.startsWith('agent-acompact')) continue;
transcriptFiles.add(path.join(subagentsDir, entry.name));
const subagentsDir = path.join(projectDir, sessionId, 'subagents');
try {
const dirEntries = await fs.readdir(subagentsDir, { withFileTypes: true });
for (const entry of dirEntries) {
if (!entry.isFile()) continue;
if (!entry.name.endsWith('.jsonl')) continue;
if (!entry.name.startsWith('agent-')) continue;
if (entry.name.startsWith('agent-acompact')) continue;
sessionFiles.push(path.join(subagentsDir, entry.name));
}
} catch {
// ignore missing subagent dir
}
} catch {
// ignore missing subagent dir
return sessionFiles;
}
);
for (const sessionFiles of filesBySession) {
for (const filePath of sessionFiles) {
transcriptFiles.add(filePath);
}
}

View file

@ -1,5 +1,5 @@
import { yieldToEventLoop } from '@main/utils/asyncYield';
import { parseJsonlLine } from '@main/utils/jsonl';
import { parseJsonlEntry } from '@main/utils/jsonl';
import { createLogger } from '@shared/utils/logger';
import { createReadStream } from 'fs';
import * as fs from 'fs/promises';
@ -8,6 +8,7 @@ import * as readline from 'readline';
import { BoardTaskExactLogsParseCache } from './BoardTaskExactLogsParseCache';
import type { ParsedMessage } from '@main/types';
import type { ChatHistoryEntry } from '@main/types';
const logger = createLogger('Service:BoardTaskExactLogStrictParser');
const EXACT_LOG_PARSE_CONCURRENCY = process.platform === 'win32' ? 4 : 8;
@ -123,7 +124,7 @@ export class BoardTaskExactLogStrictParser {
continue;
}
const parsed = parseJsonlLine(line);
const parsed = parseJsonlEntry(record as unknown as ChatHistoryEntry);
if (parsed) {
results.push(parsed);
}

View file

@ -203,6 +203,10 @@ export function parseJsonlLine(line: string): ParsedMessage | null {
}
const entry = JSON.parse(normalized) as ChatHistoryEntry;
return parseJsonlEntry(entry);
}
export function parseJsonlEntry(entry: ChatHistoryEntry): ParsedMessage | null {
return parseChatHistoryEntry(entry);
}

View file

@ -31,7 +31,7 @@ describe('BoardTaskActivityRecordSource', () => {
readFiles: vi.fn(async () => rawMessages),
};
const recordBuilder = {
buildForTask: vi.fn(() => builtRecords),
buildForTasks: vi.fn(() => new Map([['task-a', builtRecords]])),
};
const source = new BoardTaskActivityRecordSource(
@ -43,12 +43,12 @@ describe('BoardTaskActivityRecordSource', () => {
const result = await source.getTaskRecords('demo', 'task-a');
expect(result).toBe(builtRecords);
expect(result).toEqual(builtRecords);
expect(result).not.toBe(builtRecords);
expect(locator.listTranscriptFiles).toHaveBeenCalledWith('demo');
expect(transcriptReader.readFiles).toHaveBeenCalledWith(transcriptFiles);
expect(recordBuilder.buildForTask).toHaveBeenCalledWith({
expect(recordBuilder.buildForTasks).toHaveBeenCalledWith({
teamName: 'demo',
targetTask,
tasks: [targetTask, deletedTask],
messages: rawMessages,
});
@ -66,7 +66,7 @@ describe('BoardTaskActivityRecordSource', () => {
readFiles: vi.fn(async () => [{ uuid: 'm1' }]),
};
const recordBuilder = {
buildForTask: vi.fn(() => [{ id: 'r1' }]),
buildForTasks: vi.fn(() => new Map([['task-known', [{ id: 'r1' }]]])),
};
const source = new BoardTaskActivityRecordSource(
@ -77,6 +77,63 @@ describe('BoardTaskActivityRecordSource', () => {
);
await expect(source.getTaskRecords('demo', 'task-missing')).resolves.toEqual([]);
expect(recordBuilder.buildForTask).not.toHaveBeenCalled();
expect(recordBuilder.buildForTasks).not.toHaveBeenCalled();
});
it('shares one in-flight team index across concurrent task lookups', async () => {
const taskA = {
id: 'task-a',
displayId: 'aaaa1111',
subject: 'A',
status: 'pending',
};
const taskB = {
id: 'task-b',
displayId: 'bbbb2222',
subject: 'B',
status: 'pending',
};
const transcriptFiles = ['/tmp/a.jsonl'];
const rawMessages = [{ uuid: 'm1' }];
const recordsA = [{ id: 'record-a' }];
const recordsB = [{ id: 'record-b' }];
let resolveReadFiles: (messages: typeof rawMessages) => void = () => undefined;
const readFilesPromise = new Promise<typeof rawMessages>((resolve) => {
resolveReadFiles = resolve;
});
const locator = {
listTranscriptFiles: vi.fn(async () => transcriptFiles),
};
const taskReader = {
getTasks: vi.fn(async () => [taskA, taskB]),
getDeletedTasks: vi.fn(async () => []),
};
const transcriptReader = {
readFiles: vi.fn(() => readFilesPromise),
};
const recordBuilder = {
buildForTasks: vi.fn(() => new Map([
['task-a', recordsA],
['task-b', recordsB],
])),
};
const source = new BoardTaskActivityRecordSource(
locator as never,
taskReader as never,
transcriptReader as never,
recordBuilder as never,
);
const taskAResult = source.getTaskRecords('demo', 'task-a');
const taskBResult = source.getTaskRecords('demo', 'task-b');
resolveReadFiles(rawMessages);
await expect(taskAResult).resolves.toEqual(recordsA);
await expect(taskBResult).resolves.toEqual(recordsB);
expect(locator.listTranscriptFiles).toHaveBeenCalledTimes(1);
expect(transcriptReader.readFiles).toHaveBeenCalledTimes(1);
expect(recordBuilder.buildForTasks).toHaveBeenCalledTimes(1);
});
});

View file

@ -1,67 +1,97 @@
import { afterEach, describe, expect, it } from 'vitest';
import * as fs from 'fs/promises';
import * as os from 'os';
import * as path from 'path';
import { afterEach, describe, expect, it } from 'vitest';
import { BoardTaskActivityTranscriptReader } from '../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader';
const tempPaths: string[] = [];
async function createTempTranscript(lines: unknown[]): Promise<string> {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'board-task-activity-'));
const filePath = path.join(dir, 'transcript.jsonl');
tempPaths.push(dir);
await fs.writeFile(
filePath,
lines.map(line => JSON.stringify(line)).join('\n'),
'utf8',
);
return filePath;
}
const tempDirs: string[] = [];
afterEach(async () => {
await Promise.all(
tempPaths.splice(0).map(dir => fs.rm(dir, { recursive: true, force: true })),
tempDirs.splice(0, tempDirs.length).map(async (dirPath) => {
await fs.rm(dirPath, { recursive: true, force: true });
})
);
});
describe('BoardTaskActivityTranscriptReader', () => {
it('skips transcript rows without a stable timestamp', async () => {
const filePath = await createTempTranscript([
{
uuid: 'missing-timestamp',
sessionId: 'session-1',
boardTaskLinks: [
{
schemaVersion: 1,
task: { ref: 'abcd1234', refKind: 'display' },
targetRole: 'subject',
linkKind: 'execution',
actorContext: { relation: 'same_task' },
},
],
},
{
uuid: 'valid-row',
timestamp: '2026-04-12T10:00:00.000Z',
sessionId: 'session-1',
boardTaskLinks: [
{
schemaVersion: 1,
task: { ref: 'abcd1234', refKind: 'display' },
targetRole: 'subject',
linkKind: 'execution',
actorContext: { relation: 'same_task' },
},
],
},
]);
it('skips non-board and malformed rows while preserving task-linked activity rows', async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'activity-transcript-reader-'));
tempDirs.push(tempDir);
const filePath = path.join(tempDir, 'session.jsonl');
await fs.writeFile(
filePath,
[
'{not-json',
JSON.stringify({
uuid: 'ordinary-message',
sessionId: 'session-a',
timestamp: '2026-04-20T12:00:00.000Z',
message: { role: 'assistant', content: 'No board task links here' },
}),
'{"boardTaskLinks":',
JSON.stringify({
uuid: 'linked-message',
sessionId: 'session-a',
timestamp: '2026-04-20T12:01:00.000Z',
agentId: 'agent-a',
agentName: 'alice',
isSidechain: true,
boardTaskLinks: [
{
schemaVersion: 1,
task: { ref: '12345678', refKind: 'display', canonicalId: 'task-a' },
targetRole: 'subject',
linkKind: 'execution',
actorContext: { relation: 'same_task' },
toolUseId: 'toolu_1',
},
],
boardTaskToolActions: [
{
schemaVersion: 1,
toolUseId: 'toolu_1',
canonicalToolName: 'task_set_status',
input: { status: 'in_progress' },
},
],
}),
].join('\n'),
'utf8'
);
const rows = await new BoardTaskActivityTranscriptReader().readFiles([filePath]);
expect(rows).toHaveLength(1);
expect(rows[0]?.uuid).toBe('valid-row');
expect(rows[0]?.timestamp).toBe('2026-04-12T10:00:00.000Z');
expect(rows[0]).toMatchObject({
filePath,
uuid: 'linked-message',
sessionId: 'session-a',
timestamp: '2026-04-20T12:01:00.000Z',
agentId: 'agent-a',
agentName: 'alice',
isSidechain: true,
sourceOrder: 1,
boardTaskLinks: [
{
schemaVersion: 1,
toolUseId: 'toolu_1',
task: { ref: '12345678', refKind: 'display', canonicalId: 'task-a' },
targetRole: 'subject',
linkKind: 'execution',
actorContext: { relation: 'same_task' },
},
],
boardTaskToolActions: [
{
schemaVersion: 1,
toolUseId: 'toolu_1',
canonicalToolName: 'task_set_status',
input: { status: 'in_progress' },
},
],
});
});
});

View file

@ -111,4 +111,75 @@ describe('TeamTranscriptSourceLocator', () => {
);
expect(transcriptFiles).not.toContain(path.join(projectRoot, 'unrelated-session.jsonl'));
});
it('returns the same sorted transcript set across multiple session directories', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-'));
setClaudeBasePathOverride(tmpDir);
const teamName = 'bounded-discovery-test';
const projectPath = '/Users/test/bounded-discovery';
const projectId = '-Users-test-bounded-discovery';
const sessionIds = Array.from({ length: 12 }, (_, index) => `member-${index + 1}`);
await fs.mkdir(path.join(tmpDir, 'teams', teamName), { recursive: true });
await fs.writeFile(
path.join(tmpDir, 'teams', teamName, 'config.json'),
JSON.stringify(
{
name: teamName,
projectPath,
members: sessionIds.map((sessionId, index) => ({
name: `member-${index + 1}`,
agentType: 'general-purpose',
sessionId,
cwd: projectPath,
})),
},
null,
2
),
'utf8'
);
const projectRoot = path.join(tmpDir, 'projects', projectId);
const expectedFiles: string[] = [];
for (const sessionId of sessionIds) {
const rootTranscript = path.join(projectRoot, `${sessionId}.jsonl`);
const subagentsDir = path.join(projectRoot, sessionId, 'subagents');
const subagentTranscript = path.join(subagentsDir, 'agent-worker.jsonl');
await fs.mkdir(subagentsDir, { recursive: true });
await fs.writeFile(
rootTranscript,
JSON.stringify({
timestamp: '2026-04-15T14:02:00.000Z',
type: 'user',
teamName,
message: { role: 'user', content: `Bootstrap ${sessionId} for ${teamName}` },
}) + '\n',
'utf8'
);
await fs.writeFile(
subagentTranscript,
JSON.stringify({
timestamp: '2026-04-15T14:02:01.000Z',
type: 'user',
message: { role: 'user', content: `Subagent for ${sessionId}` },
}) + '\n',
'utf8'
);
await fs.writeFile(
path.join(subagentsDir, 'agent-acompact-ignore.jsonl'),
'{}\n',
'utf8'
);
expectedFiles.push(rootTranscript, subagentTranscript);
}
const transcriptFiles = await new TeamTranscriptSourceLocator().listTranscriptFiles(teamName);
expect(transcriptFiles).toEqual([...expectedFiles].sort((a, b) => a.localeCompare(b)));
});
});