fix(task-logs): cache transcript discovery safely

This commit is contained in:
777genius 2026-05-01 17:46:49 +03:00
parent 50ce94dcaa
commit b584fa0403
6 changed files with 335 additions and 23 deletions

View file

@ -193,6 +193,7 @@ import {
TeamTaskStallNotifier,
TeamTaskStallPolicy,
TeamTaskStallSnapshotSource,
TeamTranscriptSourceLocator,
UpdaterService,
} from './services';
@ -1052,7 +1053,14 @@ async function initializeServices(): Promise<void> {
cliInstallerService = new CliInstallerService();
ptyTerminalService = new PtyTerminalService();
const teamMemberLogsFinder = new TeamMemberLogsFinder();
const boardTaskActivityRecordSource = new BoardTaskActivityRecordSource();
const teamLogSourceTracker = new TeamLogSourceTracker(teamMemberLogsFinder);
const teamTranscriptSourceLocator = new TeamTranscriptSourceLocator();
teamLogSourceTracker.onLogSourceChange((teamName) => {
teamTranscriptSourceLocator.invalidateTeam(teamName);
});
const boardTaskActivityRecordSource = new BoardTaskActivityRecordSource(
teamTranscriptSourceLocator
);
const boardTaskActivityService = new BoardTaskActivityService(boardTaskActivityRecordSource);
const boardTaskActivityDetailService = new BoardTaskActivityDetailService(
boardTaskActivityRecordSource
@ -1061,7 +1069,15 @@ async function initializeServices(): Promise<void> {
const boardTaskExactLogDetailService = new BoardTaskExactLogDetailService(
boardTaskActivityRecordSource
);
const boardTaskLogStreamService = new BoardTaskLogStreamService(boardTaskActivityRecordSource);
const boardTaskLogStreamService = new BoardTaskLogStreamService(
boardTaskActivityRecordSource,
undefined,
undefined,
undefined,
undefined,
undefined,
teamTranscriptSourceLocator
);
const teamMemberRuntimeAdvisoryService = new TeamMemberRuntimeAdvisoryService(
teamMemberLogsFinder
);
@ -1101,10 +1117,9 @@ async function initializeServices(): Promise<void> {
teamProvisioningService.setCrossTeamSender((request) => crossTeamService.send(request));
const taskChangePresenceRepository = new JsonTaskChangePresenceRepository();
const teamLogSourceTracker = new TeamLogSourceTracker(teamMemberLogsFinder);
teamTaskStallMonitor = new TeamTaskStallMonitor(
new ActiveTeamRegistry(teamDataService, teamLogSourceTracker),
new TeamTaskStallSnapshotSource(),
new TeamTaskStallSnapshotSource(teamTranscriptSourceLocator),
new TeamTaskStallPolicy(),
new TeamTaskStallJournal(),
new TeamTaskStallNotifier(teamDataService, teamProvisioningService)

View file

@ -61,6 +61,7 @@ export { BoardTaskActivityRecordSource } from './taskLogs/activity/BoardTaskActi
export { BoardTaskActivityService } from './taskLogs/activity/BoardTaskActivityService';
export { BoardTaskExactLogDetailService } from './taskLogs/exact/BoardTaskExactLogDetailService';
export { BoardTaskExactLogsService } from './taskLogs/exact/BoardTaskExactLogsService';
export { TeamTranscriptSourceLocator } from './taskLogs/discovery/TeamTranscriptSourceLocator';
export { BoardTaskLogStreamService } from './taskLogs/stream/BoardTaskLogStreamService';
export type {
OpenCodeTaskLogAttributionBulkWriteOutcome,

View file

@ -10,6 +10,7 @@ const logger = createLogger('Service:TeamTranscriptSourceLocator');
const TRANSCRIPT_DISCOVERY_WARN_MS = 3_000;
const TRANSCRIPT_DISCOVERY_FILE_COUNT_WARN = 500;
const TRANSCRIPT_DISCOVERY_SESSION_CONCURRENCY = process.platform === 'win32' ? 4 : 8;
const TRANSCRIPT_SOURCE_CONTEXT_CACHE_TTL_MS = 3_000;
export interface TeamTranscriptSourceContext {
projectDir: string;
@ -19,6 +20,17 @@ export interface TeamTranscriptSourceContext {
transcriptFiles: string[];
}
interface TeamTranscriptSourceContextCacheEntry {
expiresAt: number;
generation: number;
value: TeamTranscriptSourceContext;
}
interface TeamTranscriptSourceContextInFlightEntry {
generation: number;
promise: Promise<TeamTranscriptSourceContext | null>;
}
async function mapLimit<T, R>(
items: readonly T[],
limit: number,
@ -42,11 +54,76 @@ async function mapLimit<T, R>(
}
export class TeamTranscriptSourceLocator {
private readonly contextCache = new Map<string, TeamTranscriptSourceContextCacheEntry>();
private readonly contextInFlight = new Map<string, TeamTranscriptSourceContextInFlightEntry>();
private readonly generationByTeam = new Map<string, number>();
constructor(
private readonly projectResolver: TeamTranscriptProjectResolver = new TeamTranscriptProjectResolver()
) {}
async getContext(teamName: string): Promise<TeamTranscriptSourceContext | null> {
getGeneration(teamName: string): number {
return this.generationByTeam.get(teamName) ?? 0;
}
invalidateTeam(teamName: string): void {
this.generationByTeam.set(teamName, this.getGeneration(teamName) + 1);
this.contextCache.delete(teamName);
this.contextInFlight.delete(teamName);
}
clear(): void {
const teamNames = new Set([
...this.contextCache.keys(),
...this.contextInFlight.keys(),
...this.generationByTeam.keys(),
]);
for (const teamName of teamNames) {
this.generationByTeam.set(teamName, this.getGeneration(teamName) + 1);
}
this.contextCache.clear();
this.contextInFlight.clear();
}
async getContext(
teamName: string,
options?: { forceRefresh?: boolean }
): Promise<TeamTranscriptSourceContext | null> {
if (options?.forceRefresh) {
this.invalidateTeam(teamName);
}
const generation = this.getGeneration(teamName);
const cached = this.contextCache.get(teamName);
if (cached && cached.generation === generation && cached.expiresAt > Date.now()) {
return cached.value;
}
const inFlight = this.contextInFlight.get(teamName);
if (inFlight && inFlight.generation === generation) {
return await inFlight.promise;
}
let entry: TeamTranscriptSourceContextInFlightEntry | null = null;
const promise = this.buildContext(teamName, generation).finally(() => {
if (this.contextInFlight.get(teamName) === entry) {
this.contextInFlight.delete(teamName);
}
});
entry = { generation, promise };
this.contextInFlight.set(teamName, entry);
return await promise;
}
async listTranscriptFiles(teamName: string): Promise<string[]> {
const context = await this.getContext(teamName);
return context?.transcriptFiles ?? [];
}
private async buildContext(
teamName: string,
generation: number
): Promise<TeamTranscriptSourceContext | null> {
const context = await this.projectResolver.getContext(teamName);
if (!context) {
return null;
@ -64,13 +141,17 @@ export class TeamTranscriptSourceLocator {
`Large task-log transcript discovery: team=${teamName} sessions=${sessionIds.length} files=${transcriptFiles.length} elapsedMs=${elapsedMs}`
);
}
return { projectDir, projectId, config, sessionIds, transcriptFiles };
const value = { projectDir, projectId, config, sessionIds, transcriptFiles };
if (this.getGeneration(teamName) === generation) {
this.contextCache.set(teamName, {
expiresAt: Date.now() + TRANSCRIPT_SOURCE_CONTEXT_CACHE_TTL_MS,
generation,
value,
});
}
return value;
}
async listTranscriptFiles(teamName: string): Promise<string[]> {
const context = await this.getContext(teamName);
return context?.transcriptFiles ?? [];
}
private async listTranscriptFilesForSessions(
projectDir: string,
sessionIds: string[]

View file

@ -1592,11 +1592,18 @@ export class BoardTaskLogStreamService {
string,
{
expiresAt: number;
generation: number;
layout: StreamLayout;
}
>();
private readonly layoutInFlight = new Map<string, Promise<StreamLayout>>();
private readonly layoutInFlight = new Map<
string,
{
generation: number;
promise: Promise<StreamLayout>;
}
>();
constructor(
private readonly recordSource: BoardTaskActivityRecordSource = new BoardTaskActivityRecordSource(),
@ -1615,32 +1622,47 @@ export class BoardTaskLogStreamService {
return `${teamName}::${taskId}`;
}
private getTranscriptDiscoveryGeneration(teamName: string): number {
const locator = this.transcriptSourceLocator as {
getGeneration?: (teamName: string) => number;
};
return locator.getGeneration?.(teamName) ?? 0;
}
private async getStreamLayout(teamName: string, taskId: string): Promise<StreamLayout> {
const cacheKey = this.buildLayoutCacheKey(teamName, taskId);
const generation = this.getTranscriptDiscoveryGeneration(teamName);
const cached = this.layoutCache.get(cacheKey);
if (cached && cached.expiresAt > Date.now()) {
if (cached && cached.generation === generation && cached.expiresAt > Date.now()) {
return cached.layout;
}
const existingPromise = this.layoutInFlight.get(cacheKey);
if (existingPromise) {
return await existingPromise;
const existingInFlight = this.layoutInFlight.get(cacheKey);
if (existingInFlight && existingInFlight.generation === generation) {
return await existingInFlight.promise;
}
const startedAt = Date.now();
let inFlightEntry: { generation: number; promise: Promise<StreamLayout> } | null = null;
const promise = this.buildStreamLayout(teamName, taskId)
.then((layout) => {
this.layoutCache.set(cacheKey, {
expiresAt: Date.now() + STREAM_LAYOUT_CACHE_TTL_MS,
layout,
});
if (this.getTranscriptDiscoveryGeneration(teamName) === generation) {
this.layoutCache.set(cacheKey, {
expiresAt: Date.now() + STREAM_LAYOUT_CACHE_TTL_MS,
generation,
layout,
});
}
return layout;
})
.finally(() => {
this.layoutInFlight.delete(cacheKey);
if (this.layoutInFlight.get(cacheKey) === inFlightEntry) {
this.layoutInFlight.delete(cacheKey);
}
});
this.layoutInFlight.set(cacheKey, promise);
inFlightEntry = { generation, promise };
this.layoutInFlight.set(cacheKey, inFlightEntry);
const layout = await promise;
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= STREAM_LAYOUT_BUILD_WARN_MS) {

View file

@ -493,13 +493,35 @@ describe('BoardTaskLogStreamService', () => {
})),
};
const buildBundleChunks = vi.fn((messages: ParsedMessage[]) => [{ id: messages[0]?.uuid }]);
const taskReader = {
getTasks: vi.fn(async () => [
{
id: 'task-a',
displayId: 'abcd1234',
owner: 'tom',
status: 'in_progress',
createdAt: '2026-04-12T15:59:00.000Z',
updatedAt: '2026-04-12T16:05:00.000Z',
},
]),
getDeletedTasks: vi.fn(async () => []),
};
const transcriptSourceLocator = {
getGeneration: vi.fn(() => 0),
getContext: vi.fn(async () => ({
transcriptFiles: [],
config: { members: [] },
})),
};
const service = new BoardTaskLogStreamService(
recordSource as never,
summarySelector as never,
strictParser as never,
detailSelector as never,
{ buildBundleChunks } as never
{ buildBundleChunks } as never,
taskReader as never,
transcriptSourceLocator as never
);
const [summary, response] = await Promise.all([
@ -511,6 +533,81 @@ describe('BoardTaskLogStreamService', () => {
expect(response.segments).toHaveLength(1);
expect(recordSource.getTaskRecords).toHaveBeenCalledTimes(1);
expect(strictParser.parseFiles).toHaveBeenCalledTimes(1);
expect(transcriptSourceLocator.getContext).toHaveBeenCalledTimes(1);
});
it('does not cache a stream layout when transcript discovery changes during build', async () => {
const tom = {
memberName: 'tom',
role: 'member' as const,
sessionId: 'session-tom',
agentId: 'agent-tom',
isSidechain: true,
};
const baseCandidate = makeCandidate(
'c1',
'2026-04-12T16:00:00.000Z',
tom,
'tool-1'
);
const executionRecord: BoardTaskActivityRecord = {
...baseCandidate.records[0]!,
linkKind: 'execution',
};
const candidate: BoardTaskExactLogBundleCandidate = {
...baseCandidate,
records: [executionRecord],
linkKinds: ['execution'],
};
let generation = 0;
let recordReadCount = 0;
const recordSource = {
getTaskRecords: vi.fn(async () => {
recordReadCount += 1;
if (recordReadCount === 1) {
generation += 1;
}
return candidate.records;
}),
};
const summarySelector = {
selectSummaries: vi.fn(() => [candidate]),
};
const strictParser = {
parseFiles: vi.fn(async () => new Map([['/tmp/task.jsonl', []]])),
};
const detailSelector = {
selectDetail: vi.fn(() => ({
id: candidate.id,
timestamp: candidate.timestamp,
actor: candidate.actor,
source: candidate.source,
records: candidate.records,
filteredMessages: [makeMessage(candidate.id, candidate.timestamp, 'native work')],
})),
};
const transcriptSourceLocator = {
getGeneration: vi.fn(() => generation),
getContext: vi.fn(async () => null),
};
const buildBundleChunks = vi.fn((messages: ParsedMessage[]) => [{ id: messages[0]?.uuid }]);
const service = new BoardTaskLogStreamService(
recordSource as never,
summarySelector as never,
strictParser as never,
detailSelector as never,
{ buildBundleChunks } as never,
undefined as never,
transcriptSourceLocator as never
);
await service.getTaskLogStream('demo', 'task-a');
await service.getTaskLogStream('demo', 'task-a');
await service.getTaskLogStream('demo', 'task-a');
expect(recordSource.getTaskRecords).toHaveBeenCalledTimes(2);
expect(buildBundleChunks).toHaveBeenCalledTimes(3);
});
it('merges duplicate message uuids inside one participant segment before chunk building', async () => {

View file

@ -1,6 +1,6 @@
import * as os from 'os';
import * as path from 'path';
import { afterEach, describe, expect, it } from 'vitest';
import { afterEach, describe, expect, it, vi } from 'vitest';
import * as fs from 'fs/promises';
@ -18,6 +18,30 @@ describe('TeamTranscriptSourceLocator', () => {
}
});
async function writeSessionFixture(projectRoot: string, sessionId: string): Promise<string[]> {
const rootTranscript = path.join(projectRoot, `${sessionId}.jsonl`);
const subagentsDir = path.join(projectRoot, sessionId, 'subagents');
const subagentTranscript = path.join(subagentsDir, 'agent-worker.jsonl');
await fs.mkdir(subagentsDir, { recursive: true });
await fs.writeFile(rootTranscript, '{}\n', 'utf8');
await fs.writeFile(subagentTranscript, '{}\n', 'utf8');
return [rootTranscript, subagentTranscript];
}
function makeResolverContext(projectRoot: string, teamName: string, sessionIds: string[]) {
return {
projectDir: projectRoot,
projectId: '-Users-test-cache',
config: {
name: teamName,
projectPath: '/Users/test/cache',
members: [],
},
sessionIds,
};
}
it('recovers projectPath from member cwd and includes only team-related root sessions', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-'));
setClaudeBasePathOverride(tmpDir);
@ -182,4 +206,76 @@ describe('TeamTranscriptSourceLocator', () => {
expect(transcriptFiles).toEqual([...expectedFiles].sort((a, b) => a.localeCompare(b)));
});
it('shares in-flight context discovery across parallel context and file-list reads', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-'));
const teamName = 'inflight-discovery-test';
const projectRoot = path.join(tmpDir, 'projects', '-Users-test-cache');
const expectedFiles = await writeSessionFixture(projectRoot, 'session-a');
const resolver = {
getContext: vi.fn(async () => {
await Promise.resolve();
return makeResolverContext(projectRoot, teamName, ['session-a']);
}),
};
const locator = new TeamTranscriptSourceLocator(resolver as never);
const [context, transcriptFiles] = await Promise.all([
locator.getContext(teamName),
locator.listTranscriptFiles(teamName),
]);
expect(context?.sessionIds).toEqual(['session-a']);
expect(transcriptFiles).toEqual(expectedFiles.sort((a, b) => a.localeCompare(b)));
expect(resolver.getContext).toHaveBeenCalledTimes(1);
});
it('reuses cached context inside the TTL and rebuilds after team invalidation', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-'));
const teamName = 'cached-discovery-test';
const projectRoot = path.join(tmpDir, 'projects', '-Users-test-cache');
await writeSessionFixture(projectRoot, 'session-a');
const sessionBFiles = await writeSessionFixture(projectRoot, 'session-b');
let sessionIds = ['session-a'];
const resolver = {
getContext: vi.fn(async () => makeResolverContext(projectRoot, teamName, [...sessionIds])),
};
const locator = new TeamTranscriptSourceLocator(resolver as never);
await locator.listTranscriptFiles(teamName);
await locator.listTranscriptFiles(teamName);
expect(resolver.getContext).toHaveBeenCalledTimes(1);
sessionIds = ['session-a', 'session-b'];
locator.invalidateTeam(teamName);
const transcriptFiles = await locator.listTranscriptFiles(teamName);
expect(resolver.getContext).toHaveBeenCalledTimes(2);
expect(transcriptFiles).toEqual(expect.arrayContaining(sessionBFiles));
});
it('bypasses cached context when forceRefresh is requested', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-'));
const teamName = 'force-refresh-discovery-test';
const projectRoot = path.join(tmpDir, 'projects', '-Users-test-cache');
await writeSessionFixture(projectRoot, 'session-a');
let sessionIds = ['session-a'];
const resolver = {
getContext: vi.fn(async () => makeResolverContext(projectRoot, teamName, [...sessionIds])),
};
const locator = new TeamTranscriptSourceLocator(resolver as never);
await locator.getContext(teamName);
sessionIds = ['session-a', 'session-b'];
await locator.getContext(teamName);
expect(resolver.getContext).toHaveBeenCalledTimes(1);
const refreshed = await locator.getContext(teamName, { forceRefresh: true });
expect(refreshed?.sessionIds).toEqual(['session-a', 'session-b']);
expect(resolver.getContext).toHaveBeenCalledTimes(2);
});
});