diff --git a/src/main/index.ts b/src/main/index.ts index 3e1fca2a..a7ab29af 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -193,6 +193,7 @@ import { TeamTaskStallNotifier, TeamTaskStallPolicy, TeamTaskStallSnapshotSource, + TeamTranscriptSourceLocator, UpdaterService, } from './services'; @@ -1052,7 +1053,14 @@ async function initializeServices(): Promise { cliInstallerService = new CliInstallerService(); ptyTerminalService = new PtyTerminalService(); const teamMemberLogsFinder = new TeamMemberLogsFinder(); - const boardTaskActivityRecordSource = new BoardTaskActivityRecordSource(); + const teamLogSourceTracker = new TeamLogSourceTracker(teamMemberLogsFinder); + const teamTranscriptSourceLocator = new TeamTranscriptSourceLocator(); + teamLogSourceTracker.onLogSourceChange((teamName) => { + teamTranscriptSourceLocator.invalidateTeam(teamName); + }); + const boardTaskActivityRecordSource = new BoardTaskActivityRecordSource( + teamTranscriptSourceLocator + ); const boardTaskActivityService = new BoardTaskActivityService(boardTaskActivityRecordSource); const boardTaskActivityDetailService = new BoardTaskActivityDetailService( boardTaskActivityRecordSource @@ -1061,7 +1069,15 @@ async function initializeServices(): Promise { const boardTaskExactLogDetailService = new BoardTaskExactLogDetailService( boardTaskActivityRecordSource ); - const boardTaskLogStreamService = new BoardTaskLogStreamService(boardTaskActivityRecordSource); + const boardTaskLogStreamService = new BoardTaskLogStreamService( + boardTaskActivityRecordSource, + undefined, + undefined, + undefined, + undefined, + undefined, + teamTranscriptSourceLocator + ); const teamMemberRuntimeAdvisoryService = new TeamMemberRuntimeAdvisoryService( teamMemberLogsFinder ); @@ -1101,10 +1117,9 @@ async function initializeServices(): Promise { teamProvisioningService.setCrossTeamSender((request) => crossTeamService.send(request)); const taskChangePresenceRepository = new JsonTaskChangePresenceRepository(); - const teamLogSourceTracker = new TeamLogSourceTracker(teamMemberLogsFinder); teamTaskStallMonitor = new TeamTaskStallMonitor( new ActiveTeamRegistry(teamDataService, teamLogSourceTracker), - new TeamTaskStallSnapshotSource(), + new TeamTaskStallSnapshotSource(teamTranscriptSourceLocator), new TeamTaskStallPolicy(), new TeamTaskStallJournal(), new TeamTaskStallNotifier(teamDataService, teamProvisioningService) diff --git a/src/main/services/team/index.ts b/src/main/services/team/index.ts index 63c45d22..908f879e 100644 --- a/src/main/services/team/index.ts +++ b/src/main/services/team/index.ts @@ -61,6 +61,7 @@ export { BoardTaskActivityRecordSource } from './taskLogs/activity/BoardTaskActi export { BoardTaskActivityService } from './taskLogs/activity/BoardTaskActivityService'; export { BoardTaskExactLogDetailService } from './taskLogs/exact/BoardTaskExactLogDetailService'; export { BoardTaskExactLogsService } from './taskLogs/exact/BoardTaskExactLogsService'; +export { TeamTranscriptSourceLocator } from './taskLogs/discovery/TeamTranscriptSourceLocator'; export { BoardTaskLogStreamService } from './taskLogs/stream/BoardTaskLogStreamService'; export type { OpenCodeTaskLogAttributionBulkWriteOutcome, diff --git a/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts b/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts index e133b615..0c40164f 100644 --- a/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts +++ b/src/main/services/team/taskLogs/discovery/TeamTranscriptSourceLocator.ts @@ -10,6 +10,7 @@ const logger = createLogger('Service:TeamTranscriptSourceLocator'); const TRANSCRIPT_DISCOVERY_WARN_MS = 3_000; const TRANSCRIPT_DISCOVERY_FILE_COUNT_WARN = 500; const TRANSCRIPT_DISCOVERY_SESSION_CONCURRENCY = process.platform === 'win32' ? 4 : 8; +const TRANSCRIPT_SOURCE_CONTEXT_CACHE_TTL_MS = 3_000; export interface TeamTranscriptSourceContext { projectDir: string; @@ -19,6 +20,17 @@ export interface TeamTranscriptSourceContext { transcriptFiles: string[]; } +interface TeamTranscriptSourceContextCacheEntry { + expiresAt: number; + generation: number; + value: TeamTranscriptSourceContext; +} + +interface TeamTranscriptSourceContextInFlightEntry { + generation: number; + promise: Promise; +} + async function mapLimit( items: readonly T[], limit: number, @@ -42,11 +54,76 @@ async function mapLimit( } export class TeamTranscriptSourceLocator { + private readonly contextCache = new Map(); + private readonly contextInFlight = new Map(); + private readonly generationByTeam = new Map(); + constructor( private readonly projectResolver: TeamTranscriptProjectResolver = new TeamTranscriptProjectResolver() ) {} - async getContext(teamName: string): Promise { + getGeneration(teamName: string): number { + return this.generationByTeam.get(teamName) ?? 0; + } + + invalidateTeam(teamName: string): void { + this.generationByTeam.set(teamName, this.getGeneration(teamName) + 1); + this.contextCache.delete(teamName); + this.contextInFlight.delete(teamName); + } + + clear(): void { + const teamNames = new Set([ + ...this.contextCache.keys(), + ...this.contextInFlight.keys(), + ...this.generationByTeam.keys(), + ]); + for (const teamName of teamNames) { + this.generationByTeam.set(teamName, this.getGeneration(teamName) + 1); + } + this.contextCache.clear(); + this.contextInFlight.clear(); + } + + async getContext( + teamName: string, + options?: { forceRefresh?: boolean } + ): Promise { + if (options?.forceRefresh) { + this.invalidateTeam(teamName); + } + + const generation = this.getGeneration(teamName); + const cached = this.contextCache.get(teamName); + if (cached && cached.generation === generation && cached.expiresAt > Date.now()) { + return cached.value; + } + + const inFlight = this.contextInFlight.get(teamName); + if (inFlight && inFlight.generation === generation) { + return await inFlight.promise; + } + + let entry: TeamTranscriptSourceContextInFlightEntry | null = null; + const promise = this.buildContext(teamName, generation).finally(() => { + if (this.contextInFlight.get(teamName) === entry) { + this.contextInFlight.delete(teamName); + } + }); + entry = { generation, promise }; + this.contextInFlight.set(teamName, entry); + return await promise; + } + + async listTranscriptFiles(teamName: string): Promise { + const context = await this.getContext(teamName); + return context?.transcriptFiles ?? []; + } + + private async buildContext( + teamName: string, + generation: number + ): Promise { const context = await this.projectResolver.getContext(teamName); if (!context) { return null; @@ -64,13 +141,17 @@ export class TeamTranscriptSourceLocator { `Large task-log transcript discovery: team=${teamName} sessions=${sessionIds.length} files=${transcriptFiles.length} elapsedMs=${elapsedMs}` ); } - return { projectDir, projectId, config, sessionIds, transcriptFiles }; + const value = { projectDir, projectId, config, sessionIds, transcriptFiles }; + if (this.getGeneration(teamName) === generation) { + this.contextCache.set(teamName, { + expiresAt: Date.now() + TRANSCRIPT_SOURCE_CONTEXT_CACHE_TTL_MS, + generation, + value, + }); + } + return value; } - async listTranscriptFiles(teamName: string): Promise { - const context = await this.getContext(teamName); - return context?.transcriptFiles ?? []; - } private async listTranscriptFilesForSessions( projectDir: string, sessionIds: string[] diff --git a/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts b/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts index ef20647f..06fd9210 100644 --- a/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts +++ b/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts @@ -1592,11 +1592,18 @@ export class BoardTaskLogStreamService { string, { expiresAt: number; + generation: number; layout: StreamLayout; } >(); - private readonly layoutInFlight = new Map>(); + private readonly layoutInFlight = new Map< + string, + { + generation: number; + promise: Promise; + } + >(); constructor( private readonly recordSource: BoardTaskActivityRecordSource = new BoardTaskActivityRecordSource(), @@ -1615,32 +1622,47 @@ export class BoardTaskLogStreamService { return `${teamName}::${taskId}`; } + private getTranscriptDiscoveryGeneration(teamName: string): number { + const locator = this.transcriptSourceLocator as { + getGeneration?: (teamName: string) => number; + }; + return locator.getGeneration?.(teamName) ?? 0; + } + private async getStreamLayout(teamName: string, taskId: string): Promise { const cacheKey = this.buildLayoutCacheKey(teamName, taskId); + const generation = this.getTranscriptDiscoveryGeneration(teamName); const cached = this.layoutCache.get(cacheKey); - if (cached && cached.expiresAt > Date.now()) { + if (cached && cached.generation === generation && cached.expiresAt > Date.now()) { return cached.layout; } - const existingPromise = this.layoutInFlight.get(cacheKey); - if (existingPromise) { - return await existingPromise; + const existingInFlight = this.layoutInFlight.get(cacheKey); + if (existingInFlight && existingInFlight.generation === generation) { + return await existingInFlight.promise; } const startedAt = Date.now(); + let inFlightEntry: { generation: number; promise: Promise } | null = null; const promise = this.buildStreamLayout(teamName, taskId) .then((layout) => { - this.layoutCache.set(cacheKey, { - expiresAt: Date.now() + STREAM_LAYOUT_CACHE_TTL_MS, - layout, - }); + if (this.getTranscriptDiscoveryGeneration(teamName) === generation) { + this.layoutCache.set(cacheKey, { + expiresAt: Date.now() + STREAM_LAYOUT_CACHE_TTL_MS, + generation, + layout, + }); + } return layout; }) .finally(() => { - this.layoutInFlight.delete(cacheKey); + if (this.layoutInFlight.get(cacheKey) === inFlightEntry) { + this.layoutInFlight.delete(cacheKey); + } }); - this.layoutInFlight.set(cacheKey, promise); + inFlightEntry = { generation, promise }; + this.layoutInFlight.set(cacheKey, inFlightEntry); const layout = await promise; const elapsedMs = Date.now() - startedAt; if (elapsedMs >= STREAM_LAYOUT_BUILD_WARN_MS) { diff --git a/test/main/services/team/BoardTaskLogStreamService.test.ts b/test/main/services/team/BoardTaskLogStreamService.test.ts index 3863ff04..b7dd9927 100644 --- a/test/main/services/team/BoardTaskLogStreamService.test.ts +++ b/test/main/services/team/BoardTaskLogStreamService.test.ts @@ -493,13 +493,35 @@ describe('BoardTaskLogStreamService', () => { })), }; const buildBundleChunks = vi.fn((messages: ParsedMessage[]) => [{ id: messages[0]?.uuid }]); + const taskReader = { + getTasks: vi.fn(async () => [ + { + id: 'task-a', + displayId: 'abcd1234', + owner: 'tom', + status: 'in_progress', + createdAt: '2026-04-12T15:59:00.000Z', + updatedAt: '2026-04-12T16:05:00.000Z', + }, + ]), + getDeletedTasks: vi.fn(async () => []), + }; + const transcriptSourceLocator = { + getGeneration: vi.fn(() => 0), + getContext: vi.fn(async () => ({ + transcriptFiles: [], + config: { members: [] }, + })), + }; const service = new BoardTaskLogStreamService( recordSource as never, summarySelector as never, strictParser as never, detailSelector as never, - { buildBundleChunks } as never + { buildBundleChunks } as never, + taskReader as never, + transcriptSourceLocator as never ); const [summary, response] = await Promise.all([ @@ -511,6 +533,81 @@ describe('BoardTaskLogStreamService', () => { expect(response.segments).toHaveLength(1); expect(recordSource.getTaskRecords).toHaveBeenCalledTimes(1); expect(strictParser.parseFiles).toHaveBeenCalledTimes(1); + expect(transcriptSourceLocator.getContext).toHaveBeenCalledTimes(1); + }); + + it('does not cache a stream layout when transcript discovery changes during build', async () => { + const tom = { + memberName: 'tom', + role: 'member' as const, + sessionId: 'session-tom', + agentId: 'agent-tom', + isSidechain: true, + }; + const baseCandidate = makeCandidate( + 'c1', + '2026-04-12T16:00:00.000Z', + tom, + 'tool-1' + ); + const executionRecord: BoardTaskActivityRecord = { + ...baseCandidate.records[0]!, + linkKind: 'execution', + }; + const candidate: BoardTaskExactLogBundleCandidate = { + ...baseCandidate, + records: [executionRecord], + linkKinds: ['execution'], + }; + let generation = 0; + let recordReadCount = 0; + const recordSource = { + getTaskRecords: vi.fn(async () => { + recordReadCount += 1; + if (recordReadCount === 1) { + generation += 1; + } + return candidate.records; + }), + }; + const summarySelector = { + selectSummaries: vi.fn(() => [candidate]), + }; + const strictParser = { + parseFiles: vi.fn(async () => new Map([['/tmp/task.jsonl', []]])), + }; + const detailSelector = { + selectDetail: vi.fn(() => ({ + id: candidate.id, + timestamp: candidate.timestamp, + actor: candidate.actor, + source: candidate.source, + records: candidate.records, + filteredMessages: [makeMessage(candidate.id, candidate.timestamp, 'native work')], + })), + }; + const transcriptSourceLocator = { + getGeneration: vi.fn(() => generation), + getContext: vi.fn(async () => null), + }; + const buildBundleChunks = vi.fn((messages: ParsedMessage[]) => [{ id: messages[0]?.uuid }]); + + const service = new BoardTaskLogStreamService( + recordSource as never, + summarySelector as never, + strictParser as never, + detailSelector as never, + { buildBundleChunks } as never, + undefined as never, + transcriptSourceLocator as never + ); + + await service.getTaskLogStream('demo', 'task-a'); + await service.getTaskLogStream('demo', 'task-a'); + await service.getTaskLogStream('demo', 'task-a'); + + expect(recordSource.getTaskRecords).toHaveBeenCalledTimes(2); + expect(buildBundleChunks).toHaveBeenCalledTimes(3); }); it('merges duplicate message uuids inside one participant segment before chunk building', async () => { diff --git a/test/main/services/team/TeamTranscriptSourceLocator.test.ts b/test/main/services/team/TeamTranscriptSourceLocator.test.ts index 7030c3f7..66bff155 100644 --- a/test/main/services/team/TeamTranscriptSourceLocator.test.ts +++ b/test/main/services/team/TeamTranscriptSourceLocator.test.ts @@ -1,6 +1,6 @@ import * as os from 'os'; import * as path from 'path'; -import { afterEach, describe, expect, it } from 'vitest'; +import { afterEach, describe, expect, it, vi } from 'vitest'; import * as fs from 'fs/promises'; @@ -18,6 +18,30 @@ describe('TeamTranscriptSourceLocator', () => { } }); + async function writeSessionFixture(projectRoot: string, sessionId: string): Promise { + const rootTranscript = path.join(projectRoot, `${sessionId}.jsonl`); + const subagentsDir = path.join(projectRoot, sessionId, 'subagents'); + const subagentTranscript = path.join(subagentsDir, 'agent-worker.jsonl'); + + await fs.mkdir(subagentsDir, { recursive: true }); + await fs.writeFile(rootTranscript, '{}\n', 'utf8'); + await fs.writeFile(subagentTranscript, '{}\n', 'utf8'); + return [rootTranscript, subagentTranscript]; + } + + function makeResolverContext(projectRoot: string, teamName: string, sessionIds: string[]) { + return { + projectDir: projectRoot, + projectId: '-Users-test-cache', + config: { + name: teamName, + projectPath: '/Users/test/cache', + members: [], + }, + sessionIds, + }; + } + it('recovers projectPath from member cwd and includes only team-related root sessions', async () => { tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-')); setClaudeBasePathOverride(tmpDir); @@ -182,4 +206,76 @@ describe('TeamTranscriptSourceLocator', () => { expect(transcriptFiles).toEqual([...expectedFiles].sort((a, b) => a.localeCompare(b))); }); + + it('shares in-flight context discovery across parallel context and file-list reads', async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-')); + + const teamName = 'inflight-discovery-test'; + const projectRoot = path.join(tmpDir, 'projects', '-Users-test-cache'); + const expectedFiles = await writeSessionFixture(projectRoot, 'session-a'); + const resolver = { + getContext: vi.fn(async () => { + await Promise.resolve(); + return makeResolverContext(projectRoot, teamName, ['session-a']); + }), + }; + const locator = new TeamTranscriptSourceLocator(resolver as never); + + const [context, transcriptFiles] = await Promise.all([ + locator.getContext(teamName), + locator.listTranscriptFiles(teamName), + ]); + + expect(context?.sessionIds).toEqual(['session-a']); + expect(transcriptFiles).toEqual(expectedFiles.sort((a, b) => a.localeCompare(b))); + expect(resolver.getContext).toHaveBeenCalledTimes(1); + }); + + it('reuses cached context inside the TTL and rebuilds after team invalidation', async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-')); + + const teamName = 'cached-discovery-test'; + const projectRoot = path.join(tmpDir, 'projects', '-Users-test-cache'); + await writeSessionFixture(projectRoot, 'session-a'); + const sessionBFiles = await writeSessionFixture(projectRoot, 'session-b'); + let sessionIds = ['session-a']; + const resolver = { + getContext: vi.fn(async () => makeResolverContext(projectRoot, teamName, [...sessionIds])), + }; + const locator = new TeamTranscriptSourceLocator(resolver as never); + + await locator.listTranscriptFiles(teamName); + await locator.listTranscriptFiles(teamName); + expect(resolver.getContext).toHaveBeenCalledTimes(1); + + sessionIds = ['session-a', 'session-b']; + locator.invalidateTeam(teamName); + const transcriptFiles = await locator.listTranscriptFiles(teamName); + + expect(resolver.getContext).toHaveBeenCalledTimes(2); + expect(transcriptFiles).toEqual(expect.arrayContaining(sessionBFiles)); + }); + + it('bypasses cached context when forceRefresh is requested', async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-transcripts-')); + + const teamName = 'force-refresh-discovery-test'; + const projectRoot = path.join(tmpDir, 'projects', '-Users-test-cache'); + await writeSessionFixture(projectRoot, 'session-a'); + let sessionIds = ['session-a']; + const resolver = { + getContext: vi.fn(async () => makeResolverContext(projectRoot, teamName, [...sessionIds])), + }; + const locator = new TeamTranscriptSourceLocator(resolver as never); + + await locator.getContext(teamName); + sessionIds = ['session-a', 'session-b']; + await locator.getContext(teamName); + expect(resolver.getContext).toHaveBeenCalledTimes(1); + + const refreshed = await locator.getContext(teamName, { forceRefresh: true }); + + expect(refreshed?.sessionIds).toEqual(['session-a', 'session-b']); + expect(resolver.getContext).toHaveBeenCalledTimes(2); + }); });