diff --git a/src/main/index.ts b/src/main/index.ts index e7aae28b..1bf686f0 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -138,6 +138,7 @@ import { } from './services/team/TeamControlApiState'; import { TeamInboxReader } from './services/team/TeamInboxReader'; import { getTeamDataWorkerClient } from './services/team/TeamDataWorkerClient'; +import { getTeamFsWorkerClient } from './services/team/TeamFsWorkerClient'; import { TeamMemberRuntimeAdvisoryService } from './services/team/TeamMemberRuntimeAdvisoryService'; import { createTeamReconcileDrainScheduler, @@ -1841,6 +1842,30 @@ function createWindow(): void { updaterService.startPeriodicCheck(60 * 60 * 1000); } + scheduleStartupTask( + () => { + void getTeamFsWorkerClient() + .prewarm() + .catch((error: unknown) => + logger.debug( + `[startup] team-fs-worker prewarm skipped: ${ + error instanceof Error ? error.message : String(error) + }` + ) + ); + void getTeamDataWorkerClient() + .prewarm() + .catch((error: unknown) => + logger.debug( + `[startup] team-data-worker prewarm skipped: ${ + error instanceof Error ? error.message : String(error) + }` + ) + ); + }, + process.platform === 'win32' ? 2500 : 1000 + ); + // Defer non-critical startup work to avoid thread pool contention. // The window is now visible and responsive; these run in the background. scheduleStartupTask(() => { diff --git a/src/main/services/team/TeamDataWorkerClient.ts b/src/main/services/team/TeamDataWorkerClient.ts index f53259a5..ed7398a7 100644 --- a/src/main/services/team/TeamDataWorkerClient.ts +++ b/src/main/services/team/TeamDataWorkerClient.ts @@ -66,6 +66,9 @@ interface PendingEntry { function summarizeWorkerPayload( payload: TeamDataWorkerRequest['payload'] ): Record { + if (!payload) { + return {}; + } if ('taskId' in payload) { return { teamName: payload.teamName, @@ -213,6 +216,21 @@ export class TeamDataWorkerClient { }); } + async prewarm(): Promise { + if (this.worker) { + return; + } + if (!this.isAvailable()) { + return; + } + const startedAt = Date.now(); + await this.call('warmup', {}); + const ms = Date.now() - startedAt; + if (ms >= 1500) { + logger.warn(`worker prewarm slow ms=${ms}`); + } + } + private postBestEffort( op: TeamDataWorkerRequest['op'], payload: TeamDataWorkerRequest['payload'] diff --git a/src/main/services/team/TeamFsWorkerClient.ts b/src/main/services/team/TeamFsWorkerClient.ts index f2089afd..f9991efb 100644 --- a/src/main/services/team/TeamFsWorkerClient.ts +++ b/src/main/services/team/TeamFsWorkerClient.ts @@ -36,6 +36,7 @@ interface GetAllTasksPayload { } type WorkerRequest = + | { id: string; op: 'warmup'; payload?: Record } | { id: string; op: 'listTeams'; payload: ListTeamsPayload } | { id: string; op: 'getAllTasks'; payload: GetAllTasksPayload }; @@ -44,6 +45,9 @@ type WorkerResponse = | { id: string; ok: false; error: string }; function summarizeWorkerPayload(payload: WorkerRequest['payload']): Record { + if (!payload) { + return {}; + } if ('teamsDir' in payload) { return { teamsDir: payload.teamsDir, @@ -52,6 +56,9 @@ function summarizeWorkerPayload(payload: WorkerRequest['payload']): Record void; reject: (e: Error) => void } >(); + private failWorker(worker: Worker, error: Error): void { + if (this.worker !== worker) return; + + this.worker = null; + const pendingEntries = Array.from(this.pending.values()); + this.pending.clear(); + + for (const entry of pendingEntries) { + entry.reject(error); + } + } + isAvailable(): boolean { if (!this.workerPath && !this.warnedUnavailable && shouldWarnUnavailableWorker()) { this.warnedUnavailable = true; @@ -134,8 +153,9 @@ export class TeamFsWorkerClient { return this.worker; } - this.worker = new Worker(this.workerPath); - this.worker.on('message', (msg: WorkerResponse) => { + const worker = new Worker(this.workerPath); + this.worker = worker; + worker.on('message', (msg: WorkerResponse) => { const entry = this.pending.get(msg.id); if (!entry) return; this.pending.delete(msg.id); @@ -145,26 +165,18 @@ export class TeamFsWorkerClient { entry.reject(new Error(msg.error)); } }); - this.worker.on('error', (err) => { + worker.on('error', (err) => { logger.error('Worker error', err); - for (const [, entry] of this.pending) { - entry.reject(err instanceof Error ? err : new Error(String(err))); - } - this.pending.clear(); - this.worker = null; + this.failWorker(worker, err instanceof Error ? err : new Error(String(err))); }); - this.worker.on('exit', (code) => { + worker.on('exit', (code) => { if (code !== 0) { logger.warn(`Worker exited with code ${code}`); } - for (const [, entry] of this.pending) { - entry.reject(new Error(`Worker exited with code ${code}`)); - } - this.pending.clear(); - this.worker = null; + this.failWorker(worker, new Error(`Worker exited with code ${code}`)); }); - return this.worker; + return worker; } private call( @@ -177,21 +189,22 @@ export class TeamFsWorkerClient { const pendingAtStart = this.pending.size; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { - this.pending.delete(id); - try { - // Terminate and recreate on next call — worker may be stuck in native IO. - this.worker?.terminate().catch(() => undefined); - } catch { - // ignore - } finally { - this.worker = null; - } + const timeoutError = new Error( + `Worker call timeout after ${WORKER_CALL_TIMEOUT_MS}ms (${op})` + ); logger.warn( `worker call timeout op=${op} ms=${Date.now() - startedAt} pendingAtStart=${pendingAtStart} pendingNow=${this.pending.size} payload=${JSON.stringify( summarizeWorkerPayload(payload) )}` ); - reject(new Error(`Worker call timeout after ${WORKER_CALL_TIMEOUT_MS}ms (${op})`)); + this.failWorker(worker, timeoutError); + try { + // Terminate and recreate on next call - worker may be stuck in native IO. + worker.terminate().catch(() => undefined); + } catch { + // ignore + } + reject(timeoutError); }, WORKER_CALL_TIMEOUT_MS); this.pending.set(id, { @@ -224,6 +237,21 @@ export class TeamFsWorkerClient { }); } + async prewarm(): Promise { + if (this.worker) { + return; + } + if (!this.isAvailable()) { + return; + } + const startedAt = Date.now(); + await this.call('warmup', {}); + const ms = Date.now() - startedAt; + if (ms >= 1500) { + logger.warn(`worker prewarm slow ms=${ms}`); + } + } + async listTeams(options: { largeConfigBytes: number; configHeadBytes: number; diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index 0aae9695..6374cb2e 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -4565,10 +4565,31 @@ export class TeamProvisioningService { string, { expiresAtMs: number; snapshot: TeamAgentRuntimeSnapshot } >(); + private readonly agentRuntimeSnapshotInFlightByTeam = new Map< + string, + { + generationAtStart: number; + runIdAtStart: string | null; + promise: Promise; + } + >(); private readonly liveTeamAgentRuntimeMetadataCache = new Map< string, - { expiresAtMs: number; metadata: Map } + { + expiresAtMs: number; + metadata: Map; + runId: string | null; + } >(); + private readonly liveTeamAgentRuntimeMetadataInFlightByTeam = new Map< + string, + { + generationAtStart: number; + runIdAtStart: string | null; + promise: Promise>; + } + >(); + private readonly runtimeSnapshotCacheGenerationByTeam = new Map(); private readonly launchStateStore = new TeamLaunchStateStore(); private readonly launchStateStoreQueue = new Map>(); private readonly memberLogsFinder: TeamMemberLogsFinder; @@ -4651,6 +4672,35 @@ export class TeamProvisioningService { return { config, teamMeta, metaMembers }; } + private getRuntimeSnapshotCacheGeneration(teamName: string): number { + return this.runtimeSnapshotCacheGenerationByTeam.get(teamName) ?? 0; + } + + private invalidateRuntimeSnapshotCaches(teamName: string): void { + this.runtimeSnapshotCacheGenerationByTeam.set( + teamName, + this.getRuntimeSnapshotCacheGeneration(teamName) + 1 + ); + this.agentRuntimeSnapshotCache.delete(teamName); + this.agentRuntimeSnapshotInFlightByTeam.delete(teamName); + this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.liveTeamAgentRuntimeMetadataInFlightByTeam.delete(teamName); + } + + private cloneLiveTeamAgentRuntimeMetadata( + metadata: ReadonlyMap + ): Map { + return new Map( + [...metadata.entries()].map(([memberName, entry]) => [ + memberName, + { + ...entry, + ...(entry.diagnostics ? { diagnostics: [...entry.diagnostics] } : {}), + }, + ]) + ); + } + private resolveOpenCodeMemberIdentityFromDirectory( teamName: string, memberName: string, @@ -5463,6 +5513,7 @@ export class TeamProvisioningService { this.runtimeAdapterRunByTeam.delete(teamName); this.aliveRunByTeam.delete(teamName); this.provisioningRunByTeam.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); } } if (cleaned > 0) { @@ -7625,6 +7676,7 @@ export class TeamProvisioningService { private resetTeamScopedTransientStateForNewRun(teamName: string): void { peekAutoResumeService()?.cancelPendingAutoResume(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); this.retainedClaudeLogsByTeam.delete(teamName); this.persistedTranscriptClaudeLogsCache.delete(teamName); this.leadInboxRelayInFlight.delete(teamName); @@ -9070,8 +9122,7 @@ export class TeamProvisioningService { trackedUpdate.run, this.getMixedSecondaryLaunchPhase(trackedUpdate.run) ); - this.agentRuntimeSnapshotCache.delete(input.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(input.teamName); + this.invalidateRuntimeSnapshotCaches(input.teamName); if (trackedUpdate.changed) { this.teamChangeEmitter?.({ type: 'member-spawn', @@ -9158,8 +9209,7 @@ export class TeamProvisioningService { updatedAt: input.observedAt, }); await this.writeLaunchStateSnapshot(input.teamName, snapshot); - this.agentRuntimeSnapshotCache.delete(input.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(input.teamName); + this.invalidateRuntimeSnapshotCaches(input.teamName); if (shouldEmitMemberSpawnChange) { this.teamChangeEmitter?.({ type: 'member-spawn', @@ -9936,8 +9986,7 @@ export class TeamProvisioningService { ); return; } - this.agentRuntimeSnapshotCache.delete(run.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName); + this.invalidateRuntimeSnapshotCaches(run.teamName); this.setMemberSpawnStatus(run, spawnedMemberName, 'waiting'); this.appendMemberBootstrapDiagnostic( run, @@ -10405,6 +10454,36 @@ export class TeamProvisioningService { return cached.snapshot; } + const generationAtStart = this.getRuntimeSnapshotCacheGeneration(teamName); + const existingRequest = this.agentRuntimeSnapshotInFlightByTeam.get(teamName); + if ( + existingRequest && + existingRequest.generationAtStart === generationAtStart && + existingRequest.runIdAtStart === runId + ) { + return existingRequest.promise; + } + + const request = this.buildTeamAgentRuntimeSnapshot(teamName, runId, generationAtStart).finally( + () => { + if (this.agentRuntimeSnapshotInFlightByTeam.get(teamName)?.promise === request) { + this.agentRuntimeSnapshotInFlightByTeam.delete(teamName); + } + } + ); + this.agentRuntimeSnapshotInFlightByTeam.set(teamName, { + generationAtStart, + runIdAtStart: runId, + promise: request, + }); + return request; + } + + private async buildTeamAgentRuntimeSnapshot( + teamName: string, + runId: string | null, + generationAtStart: number + ): Promise { const updatedAt = nowIso(); const run = runId ? (this.runs.get(runId) ?? null) : null; const currentRuntimeAdapterRun = this.runtimeAdapterRunByTeam.get(teamName); @@ -10627,10 +10706,15 @@ export class TeamProvisioningService { members: snapshotMembers, }; - this.agentRuntimeSnapshotCache.set(teamName, { - expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS, - snapshot, - }); + if ( + this.getRuntimeSnapshotCacheGeneration(teamName) === generationAtStart && + this.getTrackedRunId(teamName) === runId + ) { + this.agentRuntimeSnapshotCache.set(teamName, { + expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS, + snapshot, + }); + } return snapshot; } @@ -11004,8 +11088,7 @@ export class TeamProvisioningService { ); } - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); const liveRuntimeByMember = await this.getLiveTeamAgentRuntimeMetadata(teamName); const livePids = new Set(); let hasAliveRuntimeWithoutPid = false; @@ -11150,8 +11233,7 @@ export class TeamProvisioningService { throw new Error('Lead restart is not supported from member controls'); } - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); this.resetRuntimeToolActivity(run, memberName); this.clearMemberSpawnToolTracking(run, memberName); this.setMemberSpawnStatus(run, memberName, 'spawning'); @@ -11311,8 +11393,7 @@ export class TeamProvisioningService { : 'Skipped by user for this launch'; if (run && !run.processKilled && !run.cancelRequested) { - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); this.resetRuntimeToolActivity(run, normalizedMemberName); this.clearMemberSpawnToolTracking(run, normalizedMemberName); this.setMemberSpawnStatus(run, normalizedMemberName, 'skipped', reason); @@ -11374,8 +11455,7 @@ export class TeamProvisioningService { updatedAt, }); await this.writeLaunchStateSnapshot(teamName, nextSnapshot); - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); } private getMutableAliveRunOrThrow(teamName: string): ProvisioningRun { @@ -11489,8 +11569,7 @@ export class TeamProvisioningService { } this.upsertRunAllEffectiveMember(run, memberSpec); - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); this.resetRuntimeToolActivity(run, memberName); this.clearMemberSpawnToolTracking(run, memberName); run.pendingMemberRestarts.delete(memberName); @@ -11544,8 +11623,7 @@ export class TeamProvisioningService { ); if (laneIndex < 0) { this.removeRunAllEffectiveMember(run, memberName); - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); await this.persistLaunchStateSnapshot(run, this.getMixedSecondaryLaunchPhase(run)); return; } @@ -11553,8 +11631,7 @@ export class TeamProvisioningService { const [lane] = run.mixedSecondaryLanes.splice(laneIndex, 1); await this.stopSingleMixedSecondaryRuntimeLane(run, lane, 'cleanup'); this.removeRunAllEffectiveMember(run, memberName); - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); this.resetRuntimeToolActivity(run, memberName); this.clearMemberSpawnToolTracking(run, memberName); run.pendingMemberRestarts.delete(memberName); @@ -14429,6 +14506,7 @@ export class TeamProvisioningService { }).catch(() => undefined); this.runtimeAdapterRunByTeam.delete(input.request.teamName); this.aliveRunByTeam.delete(input.request.teamName); + this.invalidateRuntimeSnapshotCaches(input.request.teamName); } else { this.runtimeAdapterRunByTeam.set(input.request.teamName, { runId, @@ -14437,6 +14515,7 @@ export class TeamProvisioningService { members: result.members, }); this.aliveRunByTeam.set(input.request.teamName, runId); + this.invalidateRuntimeSnapshotCaches(input.request.teamName); } if (this.provisioningRunByTeam.get(input.request.teamName) === runId) { this.provisioningRunByTeam.delete(input.request.teamName); @@ -15399,6 +15478,7 @@ export class TeamProvisioningService { if (this.provisioningRunByTeam.get(teamName) === runId) { this.provisioningRunByTeam.delete(teamName); } + this.invalidateRuntimeSnapshotCaches(teamName); this.setRuntimeAdapterProgress({ ...runtimeProgress, state: 'cancelled', @@ -15478,6 +15558,7 @@ export class TeamProvisioningService { if (this.provisioningRunByTeam.get(teamName) === runId) { this.provisioningRunByTeam.delete(teamName); } + this.invalidateRuntimeSnapshotCaches(teamName); } private recordCancelledOpenCodeRuntimeAdapterLaunch( @@ -15490,6 +15571,7 @@ export class TeamProvisioningService { this.provisioningRunByTeam.delete(teamName); this.runtimeAdapterRunByTeam.delete(teamName); this.aliveRunByTeam.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); const progress: TeamProvisioningProgress = { runId, teamName, @@ -17519,12 +17601,44 @@ export class TeamProvisioningService { private async getLiveTeamAgentRuntimeMetadata( teamName: string ): Promise> { + const runId = this.getTrackedRunId(teamName); const cached = this.liveTeamAgentRuntimeMetadataCache.get(teamName); - if (cached && cached.expiresAtMs > Date.now()) { - return cached.metadata; + if (cached && cached.expiresAtMs > Date.now() && cached.runId === runId) { + return this.cloneLiveTeamAgentRuntimeMetadata(cached.metadata); } - const runId = this.getTrackedRunId(teamName); + const generationAtStart = this.getRuntimeSnapshotCacheGeneration(teamName); + const existingRequest = this.liveTeamAgentRuntimeMetadataInFlightByTeam.get(teamName); + if ( + existingRequest && + existingRequest.generationAtStart === generationAtStart && + existingRequest.runIdAtStart === runId + ) { + return this.cloneLiveTeamAgentRuntimeMetadata(await existingRequest.promise); + } + + const request = this.buildLiveTeamAgentRuntimeMetadata( + teamName, + runId, + generationAtStart + ).finally(() => { + if (this.liveTeamAgentRuntimeMetadataInFlightByTeam.get(teamName)?.promise === request) { + this.liveTeamAgentRuntimeMetadataInFlightByTeam.delete(teamName); + } + }); + this.liveTeamAgentRuntimeMetadataInFlightByTeam.set(teamName, { + generationAtStart, + runIdAtStart: runId, + promise: request, + }); + return this.cloneLiveTeamAgentRuntimeMetadata(await request); + } + + private async buildLiveTeamAgentRuntimeMetadata( + teamName: string, + runId: string | null, + generationAtStart: number + ): Promise> { const run = runId ? (this.runs.get(runId) ?? null) : null; let configuredMembers: TeamConfig['members'] = []; @@ -17865,10 +17979,16 @@ export class TeamProvisioningService { }); } - this.liveTeamAgentRuntimeMetadataCache.set(teamName, { - expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS, - metadata: metadataByMember, - }); + if ( + this.getRuntimeSnapshotCacheGeneration(teamName) === generationAtStart && + this.getTrackedRunId(teamName) === runId + ) { + this.liveTeamAgentRuntimeMetadataCache.set(teamName, { + expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS, + metadata: this.cloneLiveTeamAgentRuntimeMetadata(metadataByMember), + runId, + }); + } return metadataByMember; } @@ -18884,14 +19004,12 @@ export class TeamProvisioningService { if (filteredSnapshot.teamLaunchState === 'clean_success' && launchPhase !== 'active') { await this.clearPersistedLaunchStateNow(run.teamName); - this.agentRuntimeSnapshotCache.delete(run.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName); + this.invalidateRuntimeSnapshotCaches(run.teamName); return null; } const writtenSnapshot = await this.writeLaunchStateSnapshotNow(run.teamName, filteredSnapshot); - this.agentRuntimeSnapshotCache.delete(run.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName); + this.invalidateRuntimeSnapshotCaches(run.teamName); return writtenSnapshot; } @@ -20772,8 +20890,7 @@ export class TeamProvisioningService { * Always uses SIGKILL via killTeamProcess() to prevent CLI cleanup. */ async stopTeam(teamName: string): Promise { - this.agentRuntimeSnapshotCache.delete(teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); this.stopPersistentTeamMembers(teamName); const runId = this.getTrackedRunId(teamName); @@ -21001,6 +21118,7 @@ export class TeamProvisioningService { this.runtimeAdapterRunByTeam.delete(teamName); this.aliveRunByTeam.delete(teamName); this.provisioningRunByTeam.delete(teamName); + this.invalidateRuntimeSnapshotCaches(teamName); return; } const startedAt = nowIso(); @@ -21019,6 +21137,7 @@ export class TeamProvisioningService { if (this.provisioningRunByTeam.get(teamName) === runId) { this.provisioningRunByTeam.delete(teamName); } + this.invalidateRuntimeSnapshotCaches(teamName); try { await clearOpenCodeRuntimeLaneStorage({ teamsBasePath: getTeamsBasePath(), @@ -21364,8 +21483,7 @@ export class TeamProvisioningService { ); return true; } - this.agentRuntimeSnapshotCache.delete(run.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName); + this.invalidateRuntimeSnapshotCaches(run.teamName); this.setMemberSpawnStatus(run, memberName, 'waiting'); this.appendMemberBootstrapDiagnostic( run, @@ -23760,8 +23878,7 @@ export class TeamProvisioningService { this.clearSecondaryRuntimeRuns(run.teamName); } if (!hasNewerTrackedRun) { - this.agentRuntimeSnapshotCache.delete(run.teamName); - this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName); + this.invalidateRuntimeSnapshotCaches(run.teamName); this.leadInboxRelayInFlight.delete(run.teamName); this.relayedLeadInboxMessageIds.delete(run.teamName); this.pendingCrossTeamFirstReplies.delete(run.teamName); diff --git a/src/main/services/team/teamDataWorkerTypes.ts b/src/main/services/team/teamDataWorkerTypes.ts index 6b14860f..11a75219 100644 --- a/src/main/services/team/teamDataWorkerTypes.ts +++ b/src/main/services/team/teamDataWorkerTypes.ts @@ -56,6 +56,7 @@ export interface TeamDataWorkerDiag { // ── Request / Response ── export type TeamDataWorkerRequest = + | { id: string; op: 'warmup'; payload?: Record } | { id: string; op: 'getTeamData'; payload: GetTeamDataPayload } | { id: string; op: 'getMessagesPage'; payload: GetMessagesPagePayload } | { id: string; op: 'getMemberActivityMeta'; payload: GetMemberActivityMetaPayload } diff --git a/src/main/workers/team-data-worker.ts b/src/main/workers/team-data-worker.ts index 361685c4..931f26ec 100644 --- a/src/main/workers/team-data-worker.ts +++ b/src/main/workers/team-data-worker.ts @@ -39,12 +39,16 @@ parentPort?.on('message', async (msg: TeamDataWorkerRequest) => { const startedAt = Date.now(); const buildDiag = (): NonNullable['diag']> => ({ op: msg.op, - ...('teamName' in msg.payload ? { teamName: msg.payload.teamName } : {}), - ...('taskId' in msg.payload ? { taskId: msg.payload.taskId } : {}), + ...(msg.payload && 'teamName' in msg.payload ? { teamName: msg.payload.teamName } : {}), + ...(msg.payload && 'taskId' in msg.payload ? { taskId: msg.payload.taskId } : {}), totalMs: Date.now() - startedAt, }); try { switch (msg.op) { + case 'warmup': { + respond({ id: msg.id, ok: true, result: null, diag: buildDiag() }); + break; + } case 'getTeamData': { const result = await teamDataService.getTeamData(msg.payload.teamName); respond({ id: msg.id, ok: true, result, diag: buildDiag() }); diff --git a/src/main/workers/team-fs-worker.ts b/src/main/workers/team-fs-worker.ts index aa7f3bf6..b4eb215f 100644 --- a/src/main/workers/team-fs-worker.ts +++ b/src/main/workers/team-fs-worker.ts @@ -33,6 +33,7 @@ interface GetAllTasksPayload { } type WorkerRequest = + | { id: string; op: 'warmup'; payload?: Record } | { id: string; op: 'listTeams'; payload: ListTeamsPayload } | { id: string; op: 'getAllTasks'; payload: GetAllTasksPayload }; @@ -75,6 +76,10 @@ interface ListTeamsDiag { skipped: number; skipReasons: Record; slowest: SlowEntry[]; + cacheHits: number; + cacheMisses: number; + cacheWriteSkips: number; + cacheEvictions: number; totalMs: number; } @@ -87,12 +92,19 @@ interface GetAllTasksDiag { skipped: number; skipReasons: Record; slowestTeams: SlowEntry[]; + cacheHits: number; + cacheMisses: number; + cacheWriteSkips: number; + cacheEvictions: number; totalMs: number; } interface TaskReadDiag { skipped: number; skipReasons: Record; + cacheHits: number; + cacheMisses: number; + cacheWriteSkips: number; } const MAX_LAUNCH_STATE_BYTES = 32 * 1024; @@ -104,6 +116,60 @@ const REVIEW_LIFECYCLE_EVENTS = new Set([ 'review_started', ]); const REVIEW_RESET_STATUSES = new Set(['in_progress', 'deleted']); +const TEAM_SUMMARY_CACHE_MAX_ENTRIES = 1000; +const TASK_FILE_CACHE_MAX_ENTRIES = 10000; +const BOOTSTRAP_STATE_FILE = 'bootstrap-state.json'; +const BOOTSTRAP_JOURNAL_FILE = 'bootstrap-journal.jsonl'; + +interface PathFingerprint { + exists: boolean; + isFile?: boolean; + isDirectory?: boolean; + highResolution?: boolean; + size?: string; + mode?: string; + dev?: string; + ino?: string; + mtimeNs?: string; + ctimeNs?: string; + birthtimeNs?: string; + mtimeMs?: number; + ctimeMs?: number; + birthtimeMs?: number; + errorCode?: string; +} + +interface TeamSummaryCacheEntry { + fingerprint: string; + summary: Record; + teamsDir: string; + optionKey: string; + lastUsedAt: number; +} + +type CachedTaskReadResult = + | { task: Record; skipReason?: undefined } + | { task?: undefined; skipReason: string }; + +interface TaskFileCacheEntry { + fingerprint: string; + result: CachedTaskReadResult; + tasksBase: string; + lastUsedAt: number; +} + +const teamSummaryCache = new Map(); +const taskFileCache = new Map(); + +interface TeamSummaryDependencyFingerprint { + value: string; + cacheSafe: boolean; +} + +interface LaunchStateSummaryRead { + summary: ReturnType | null; + cacheable: boolean; +} // --------------------------------------------------------------------------- // Parsed JSON types (loose shapes from disk) @@ -272,6 +338,319 @@ function pushSlowest(list: SlowEntry[], entry: SlowEntry, maxLen: number): void if (list.length > maxLen) list.length = maxLen; } +function cloneCached(value: T): T { + return typeof structuredClone === 'function' + ? structuredClone(value) + : (JSON.parse(JSON.stringify(value)) as T); +} + +async function statPathFingerprint(filePath: string): Promise { + try { + const stat = await fs.promises.stat(filePath, { bigint: true }); + const mtimeNs = + typeof (stat as fs.BigIntStats & { mtimeNs?: bigint }).mtimeNs === 'bigint' + ? (stat as fs.BigIntStats & { mtimeNs: bigint }).mtimeNs + : undefined; + const ctimeNs = + typeof (stat as fs.BigIntStats & { ctimeNs?: bigint }).ctimeNs === 'bigint' + ? (stat as fs.BigIntStats & { ctimeNs: bigint }).ctimeNs + : undefined; + const birthtimeNs = + typeof (stat as fs.BigIntStats & { birthtimeNs?: bigint }).birthtimeNs === 'bigint' + ? (stat as fs.BigIntStats & { birthtimeNs: bigint }).birthtimeNs + : undefined; + return { + exists: true, + isFile: stat.isFile(), + isDirectory: stat.isDirectory(), + highResolution: typeof mtimeNs === 'bigint' && typeof ctimeNs === 'bigint', + size: stat.size.toString(), + mode: stat.mode.toString(), + dev: stat.dev.toString(), + ino: stat.ino.toString(), + mtimeNs: mtimeNs?.toString(), + ctimeNs: ctimeNs?.toString(), + birthtimeNs: birthtimeNs?.toString(), + mtimeMs: Number(stat.mtimeMs), + ctimeMs: Number(stat.ctimeMs), + birthtimeMs: Number(stat.birthtimeMs), + }; + } catch (error) { + return { + exists: false, + errorCode: + typeof (error as NodeJS.ErrnoException | undefined)?.code === 'string' + ? (error as NodeJS.ErrnoException).code + : undefined, + }; + } +} + +function fingerprintToString(value: unknown): string { + return JSON.stringify(value); +} + +function isCacheSafeFingerprint(fingerprint: PathFingerprint): boolean { + if (fingerprint.exists) { + return fingerprint.highResolution === true; + } + return fingerprint.errorCode === 'ENOENT' || fingerprint.errorCode === 'ENOTDIR'; +} + +function makeTeamSummaryOptionKey(payload: ListTeamsPayload): string { + return fingerprintToString({ + largeConfigBytes: payload.largeConfigBytes, + configHeadBytes: payload.configHeadBytes, + maxConfigBytes: payload.maxConfigBytes, + maxConfigReadMs: payload.maxConfigReadMs, + maxMembersMetaBytes: payload.maxMembersMetaBytes, + maxSessionHistoryInSummary: payload.maxSessionHistoryInSummary, + maxProjectPathHistoryInSummary: payload.maxProjectPathHistoryInSummary, + }); +} + +function makeTeamSummaryCacheKey(teamsDir: string, teamName: string, optionKey: string): string { + return `${teamsDir}\0${teamName}\0${optionKey}`; +} + +function canCacheTeamSummary(summary: Record): boolean { + if (summary.teamLaunchState === 'partial_pending') { + return false; + } + const pendingKeys = [ + 'pendingCount', + 'runtimeAlivePendingCount', + 'shellOnlyPendingCount', + 'runtimeProcessPendingCount', + 'runtimeCandidatePendingCount', + 'noRuntimePendingCount', + 'permissionPendingCount', + ]; + return pendingKeys.every((key) => { + const value = summary[key]; + return typeof value !== 'number' || value <= 0; + }); +} + +async function readInboxNamesFingerprint(inboxDir: string): Promise<{ + dir: PathFingerprint; + names: string[]; + cacheSafe: boolean; +}> { + const dir = await statPathFingerprint(inboxDir); + if (!dir.exists || !dir.isDirectory) { + return { dir, names: [], cacheSafe: isCacheSafeFingerprint(dir) }; + } + try { + const entries = await fs.promises.readdir(inboxDir, { withFileTypes: true }); + const names = entries + .filter((entry) => entry.isFile() && entry.name.endsWith('.json')) + .map((entry) => entry.name) + .sort(); + return { dir, names, cacheSafe: isCacheSafeFingerprint(dir) }; + } catch (error) { + return { + dir: { + ...dir, + errorCode: + typeof (error as NodeJS.ErrnoException | undefined)?.code === 'string' + ? (error as NodeJS.ErrnoException).code + : 'READDIR_FAILED', + }, + names: [], + cacheSafe: false, + }; + } +} + +async function buildTeamSummaryFingerprint( + teamsDir: string, + teamName: string, + optionKey: string +): Promise { + const teamDir = path.join(teamsDir, teamName); + const [ + config, + teamMeta, + membersMeta, + launchState, + launchSummary, + bootstrapState, + bootstrapJournal, + ] = await Promise.all([ + statPathFingerprint(path.join(teamDir, 'config.json')), + statPathFingerprint(path.join(teamDir, 'team.meta.json')), + statPathFingerprint(path.join(teamDir, 'members.meta.json')), + statPathFingerprint(path.join(teamDir, TEAM_LAUNCH_STATE_FILE)), + statPathFingerprint(path.join(teamDir, TEAM_LAUNCH_SUMMARY_FILE)), + statPathFingerprint(path.join(teamDir, BOOTSTRAP_STATE_FILE)), + statPathFingerprint(path.join(teamDir, BOOTSTRAP_JOURNAL_FILE)), + ]); + const inbox = await readInboxNamesFingerprint(path.join(teamDir, 'inboxes')); + + const dependencyFingerprint = { + version: 1, + optionKey, + config, + teamMeta, + membersMeta, + launchState, + launchSummary, + bootstrapState, + bootstrapJournal, + inbox, + }; + + return { + value: fingerprintToString(dependencyFingerprint), + cacheSafe: + [ + config, + teamMeta, + membersMeta, + launchState, + launchSummary, + bootstrapState, + bootstrapJournal, + ].every(isCacheSafeFingerprint) && inbox.cacheSafe, + }; +} + +async function cacheTeamSummaryIfStable( + cacheKey: string, + teamsDir: string, + teamName: string, + optionKey: string, + fingerprintBefore: TeamSummaryDependencyFingerprint, + summary: Record, + cacheAllowed: boolean, + diag: ListTeamsDiag +): Promise { + if (!cacheAllowed) { + teamSummaryCache.delete(cacheKey); + diag.cacheWriteSkips++; + return; + } + if (!canCacheTeamSummary(summary)) { + teamSummaryCache.delete(cacheKey); + diag.cacheWriteSkips++; + return; + } + if (!fingerprintBefore.cacheSafe) { + diag.cacheWriteSkips++; + return; + } + const fingerprintAfter = await buildTeamSummaryFingerprint(teamsDir, teamName, optionKey); + if (!fingerprintAfter.cacheSafe || fingerprintAfter.value !== fingerprintBefore.value) { + diag.cacheWriteSkips++; + return; + } + teamSummaryCache.set(cacheKey, { + fingerprint: fingerprintAfter.value, + summary: cloneCached(summary), + teamsDir, + optionKey, + lastUsedAt: nowMs(), + }); +} + +function pruneTeamSummaryCache( + teamsDir: string, + optionKey: string, + liveTeamNames: ReadonlySet, + diag: ListTeamsDiag +): void { + for (const [key, entry] of teamSummaryCache) { + if (entry.teamsDir === teamsDir && entry.optionKey === optionKey) { + const teamName = key.split('\0')[1] ?? ''; + if (!liveTeamNames.has(teamName)) { + teamSummaryCache.delete(key); + diag.cacheEvictions++; + } + } + } + while (teamSummaryCache.size > TEAM_SUMMARY_CACHE_MAX_ENTRIES) { + const oldest = teamSummaryCache.keys().next().value; + if (typeof oldest !== 'string') break; + teamSummaryCache.delete(oldest); + diag.cacheEvictions++; + } +} + +function makeTaskOptionKey(payload: GetAllTasksPayload): string { + return fingerprintToString({ + maxTaskBytes: payload.maxTaskBytes, + maxTaskReadMs: payload.maxTaskReadMs, + }); +} + +function makeTaskCacheKey( + tasksBase: string, + teamName: string, + fileName: string, + optionKey: string +): string { + return `${tasksBase}\0${teamName}\0${fileName}\0${optionKey}`; +} + +async function cacheTaskReadResultIfStable( + cacheKey: string, + taskPath: string, + tasksBase: string, + fingerprintBefore: string, + fingerprintBeforeCacheSafe: boolean, + result: CachedTaskReadResult, + taskDiag: TaskReadDiag +): Promise { + if (!fingerprintBeforeCacheSafe) { + taskDiag.cacheWriteSkips++; + return; + } + const after = await statPathFingerprint(taskPath); + if (!isCacheSafeFingerprint(after) || fingerprintToString(after) !== fingerprintBefore) { + taskDiag.cacheWriteSkips++; + return; + } + taskFileCache.set(cacheKey, { + fingerprint: fingerprintBefore, + result: cloneCached(result), + tasksBase, + lastUsedAt: nowMs(), + }); +} + +function applyCachedTaskReadResult( + cached: CachedTaskReadResult, + tasks: unknown[], + taskDiag: TaskReadDiag +): void { + if (cached.skipReason) { + taskDiag.skipped++; + bumpSkipReason(taskDiag.skipReasons, cached.skipReason); + return; + } + tasks.push(cloneCached(cached.task)); +} + +function pruneTaskFileCache( + tasksBase: string, + liveCacheKeys: ReadonlySet, + diag: GetAllTasksDiag +): void { + for (const [key, entry] of taskFileCache) { + if (entry.tasksBase === tasksBase && !liveCacheKeys.has(key)) { + taskFileCache.delete(key); + diag.cacheEvictions++; + } + } + while (taskFileCache.size > TASK_FILE_CACHE_MAX_ENTRIES) { + const oldest = taskFileCache.keys().next().value; + if (typeof oldest !== 'string') break; + taskFileCache.delete(oldest); + diag.cacheEvictions++; + } +} + // --------------------------------------------------------------------------- // listTeams // --------------------------------------------------------------------------- @@ -340,7 +719,7 @@ function dropCliProvisionerMembers( async function readLaunchState( teamsDir: string, teamName: string -): Promise> { +): Promise { const bootstrapSnapshot = await readBootstrapLaunchSnapshot(teamName); const launchStatePath = path.join(teamsDir, teamName, TEAM_LAUNCH_STATE_FILE); const launchSummaryPath = path.join(teamsDir, teamName, TEAM_LAUNCH_SUMMARY_FILE); @@ -371,11 +750,24 @@ async function readLaunchState( })(), ]); - return choosePreferredLaunchStateSummary({ + const summary = choosePreferredLaunchStateSummary({ bootstrapSnapshot, launchSnapshot, launchSummaryProjection, }); + if (launchSnapshot) { + return { summary, cacheable: true }; + } + if (!bootstrapSnapshot) { + return { summary, cacheable: true }; + } + if ( + bootstrapSnapshot.launchPhase === 'finished' && + bootstrapSnapshot.teamLaunchState !== 'partial_pending' + ) { + return { summary, cacheable: true }; + } + return { summary, cacheable: false }; } /** @@ -465,6 +857,10 @@ async function listTeams( skipped: 0, skipReasons: {}, slowest: [], + cacheHits: 0, + cacheMisses: 0, + cacheWriteSkips: 0, + cacheEvictions: 0, totalMs: 0, }; @@ -478,11 +874,26 @@ async function listTeams( const teamDirs = entries.filter((e) => e.isDirectory()); diag.totalDirs = teamDirs.length; + const optionKey = makeTeamSummaryOptionKey(payload); + const liveTeamNames = new Set(teamDirs.map((entry) => entry.name)); const perTeam = await mapLimit(teamDirs, payload.concurrency, async (entry) => { const teamName = entry.name; const t0 = nowMs(); const configPath = path.join(payload.teamsDir, teamName, 'config.json'); + const cacheKey = makeTeamSummaryCacheKey(payload.teamsDir, teamName, optionKey); + const dependencyFingerprint = await buildTeamSummaryFingerprint( + payload.teamsDir, + teamName, + optionKey + ); + const cached = teamSummaryCache.get(cacheKey); + if (dependencyFingerprint.cacheSafe && cached?.fingerprint === dependencyFingerprint.value) { + cached.lastUsedAt = nowMs(); + diag.cacheHits++; + return cloneCached(cached.summary); + } + diag.cacheMisses++; const skip = (reason: string): null => { diag.skipped++; @@ -496,12 +907,36 @@ async function listTeams( } catch { // Fallback: check for draft team (team.meta.json without config.json) const draft = await readDraftTeamMeta(payload.teamsDir, teamName, payload); - if (draft) return draft; + if (draft) { + await cacheTeamSummaryIfStable( + cacheKey, + payload.teamsDir, + teamName, + optionKey, + dependencyFingerprint, + draft, + true, + diag + ); + return draft; + } return skip('config_stat_failed'); } if (!stat.isFile()) { const draft = await readDraftTeamMeta(payload.teamsDir, teamName, payload); - if (draft) return draft; + if (draft) { + await cacheTeamSummaryIfStable( + cacheKey, + payload.teamsDir, + teamName, + optionKey, + dependencyFingerprint, + draft, + true, + diag + ); + return draft; + } return skip('config_not_file'); } if (stat.size > payload.maxConfigBytes) return skip('config_too_large'); @@ -692,32 +1127,28 @@ async function listTeams( leadProviderId, members: metaRuntimeMembers, }); - const launchStateSummary = - (await readLaunchState(payload.teamsDir, teamName)) ?? - (() => { - if (suppressLegacyLaunchArtifactHeuristic) { - return null; - } - if ( - !leadSessionId || - expectedTeammateNames.size === 0 || - confirmedArtifactNames.size === 0 - ) { - return null; - } - const missingMembers = Array.from(expectedTeammateNames).filter( - (name) => !confirmedArtifactNames.has(name) - ); - if (missingMembers.length === 0) { - return null; - } - return { - partialLaunchFailure: true as const, - expectedMemberCount: expectedTeammateNames.size, - confirmedMemberCount: confirmedArtifactNames.size, - missingMembers, - }; - })(); + const launchStateRead = await readLaunchState(payload.teamsDir, teamName); + const fallbackLaunchStateSummary = (): ReturnType => { + if (suppressLegacyLaunchArtifactHeuristic) { + return null; + } + if (!leadSessionId || expectedTeammateNames.size === 0 || confirmedArtifactNames.size === 0) { + return null; + } + const missingMembers = Array.from(expectedTeammateNames).filter( + (name) => !confirmedArtifactNames.has(name) + ); + if (missingMembers.length === 0) { + return null; + } + return { + partialLaunchFailure: true as const, + expectedMemberCount: expectedTeammateNames.size, + confirmedMemberCount: confirmedArtifactNames.size, + missingMembers, + }; + }; + const launchStateSummary = launchStateRead.summary ?? fallbackLaunchStateSummary(); const summary = { teamName, displayName, @@ -741,10 +1172,21 @@ async function listTeams( if (ms >= 250) { pushSlowest(diag.slowest, { teamName, ms }, 10); } + await cacheTeamSummaryIfStable( + cacheKey, + payload.teamsDir, + teamName, + optionKey, + dependencyFingerprint, + summary, + launchStateRead.cacheable, + diag + ); return summary; }); const teams = perTeam.filter((t): t is NonNullable => t !== null); + pruneTeamSummaryCache(payload.teamsDir, optionKey, liveTeamNames, diag); diag.returned = teams.length; diag.totalMs = nowMs() - startedAt; return { teams, diag }; @@ -880,19 +1322,27 @@ async function readTasksDirForTeam( tasksDir: string, teamName: string, payload: GetAllTasksPayload -): Promise<{ tasks: unknown[]; taskDiag: TaskReadDiag }> { - const taskDiag: TaskReadDiag = { skipped: 0, skipReasons: {} }; +): Promise<{ tasks: unknown[]; taskDiag: TaskReadDiag; liveCacheKeys: Set }> { + const taskDiag: TaskReadDiag = { + skipped: 0, + skipReasons: {}, + cacheHits: 0, + cacheMisses: 0, + cacheWriteSkips: 0, + }; let entries: string[]; try { entries = await fs.promises.readdir(tasksDir); } catch (error) { if ((error as NodeJS.ErrnoException).code === 'ENOENT') { - return { tasks: [], taskDiag }; + return { tasks: [], taskDiag, liveCacheKeys: new Set() }; } throw error; } const tasks: unknown[] = []; + const liveCacheKeys = new Set(); + const optionKey = makeTaskOptionKey(payload); for (const file of entries) { if ( !file.endsWith('.json') || @@ -904,25 +1354,61 @@ async function readTasksDirForTeam( } const taskPath = path.join(tasksDir, file); + const cacheKey = makeTaskCacheKey(payload.tasksBase, teamName, file, optionKey); + liveCacheKeys.add(cacheKey); try { - const stat = await fs.promises.stat(taskPath); - if (!stat.isFile() || stat.size > payload.maxTaskBytes) { + const pathFingerprint = await statPathFingerprint(taskPath); + const taskSize = Number(pathFingerprint.size ?? Number.NaN); + if ( + !pathFingerprint.isFile || + !Number.isFinite(taskSize) || + taskSize > payload.maxTaskBytes + ) { taskDiag.skipped++; bumpSkipReason(taskDiag.skipReasons, 'task_not_file_or_large'); continue; } + const fingerprint = fingerprintToString(pathFingerprint); + const fingerprintCacheSafe = isCacheSafeFingerprint(pathFingerprint); + const cached = taskFileCache.get(cacheKey); + if (fingerprintCacheSafe && cached?.fingerprint === fingerprint) { + cached.lastUsedAt = nowMs(); + taskDiag.cacheHits++; + applyCachedTaskReadResult(cached.result, tasks, taskDiag); + continue; + } + taskDiag.cacheMisses++; + const stat = await fs.promises.stat(taskPath); const raw = await readFileUtf8WithTimeout(taskPath, payload.maxTaskReadMs); const parsed = JSON.parse(raw) as ParsedTask; const metadata = parsed.metadata; if (metadata?._internal === true) { taskDiag.skipped++; bumpSkipReason(taskDiag.skipReasons, 'task_internal'); + await cacheTaskReadResultIfStable( + cacheKey, + taskPath, + payload.tasksBase, + fingerprint, + fingerprintCacheSafe, + { skipReason: 'task_internal' }, + taskDiag + ); continue; } if (parsed.status === 'deleted') { taskDiag.skipped++; bumpSkipReason(taskDiag.skipReasons, 'task_deleted'); + await cacheTaskReadResultIfStable( + cacheKey, + taskPath, + payload.tasksBase, + fingerprint, + fingerprintCacheSafe, + { skipReason: 'task_deleted' }, + taskDiag + ); continue; } @@ -962,7 +1448,7 @@ async function readTasksDirForTeam( deriveReviewStateFromEvents(historyEvents) ?? normalizeFallbackReviewState(parsed.reviewState, status); - tasks.push({ + const task = { id: typeof parsed.id === 'string' || typeof parsed.id === 'number' ? String(parsed.id) : '', displayId: typeof parsed.displayId === 'string' && parsed.displayId.trim().length > 0 @@ -1018,7 +1504,17 @@ async function readTasksDirForTeam( ? (parsed.sourceMessage as Record) : undefined, teamName, - }); + }; + tasks.push(task); + await cacheTaskReadResultIfStable( + cacheKey, + taskPath, + payload.tasksBase, + fingerprint, + fingerprintCacheSafe, + { task }, + taskDiag + ); } catch (error) { taskDiag.skipped++; const code = (error as NodeJS.ErrnoException).code; @@ -1029,11 +1525,14 @@ async function readTasksDirForTeam( } } } - return { tasks, taskDiag }; + return { tasks, taskDiag, liveCacheKeys }; } function mergeTaskDiag(target: GetAllTasksDiag, source: TaskReadDiag): void { target.skipped += source.skipped; + target.cacheHits += source.cacheHits; + target.cacheMisses += source.cacheMisses; + target.cacheWriteSkips += source.cacheWriteSkips; for (const [reason, count] of Object.entries(source.skipReasons)) { target.skipReasons[reason] = (target.skipReasons[reason] || 0) + count; } @@ -1052,6 +1551,10 @@ async function getAllTasks( skipped: 0, skipReasons: {}, slowestTeams: [], + cacheHits: 0, + cacheMisses: 0, + cacheWriteSkips: 0, + cacheEvictions: 0, totalMs: 0, }; @@ -1068,13 +1571,21 @@ async function getAllTasks( const dirs = entries.filter((e) => e.isDirectory()); diag.teamDirs = dirs.length; + const liveCacheKeys = new Set(); const chunks = await mapLimit(dirs, payload.concurrency, async (entry) => { const teamName = entry.name; const t0 = nowMs(); try { const tasksDir = path.join(payload.tasksBase, teamName); - const { tasks, taskDiag } = await readTasksDirForTeam(tasksDir, teamName, payload); + const { + tasks, + taskDiag, + liveCacheKeys: teamLiveCacheKeys, + } = await readTasksDirForTeam(tasksDir, teamName, payload); + for (const key of teamLiveCacheKeys) { + liveCacheKeys.add(key); + } mergeTaskDiag(diag, taskDiag); const ms = nowMs() - t0; if (ms >= 250) { @@ -1089,6 +1600,7 @@ async function getAllTasks( }); const tasks = chunks.flat(); + pruneTaskFileCache(payload.tasksBase, liveCacheKeys, diag); diag.returned = tasks.length; diag.totalMs = nowMs() - startedAt; return { tasks, diag }; @@ -1105,6 +1617,19 @@ function post(msg: WorkerResponse): void { parentPort?.on('message', async (msg: WorkerRequest) => { const { id, op } = msg; try { + if (op === 'warmup') { + post({ + id, + ok: true, + result: { + ready: true, + teamSummaryCacheEntries: teamSummaryCache.size, + taskFileCacheEntries: taskFileCache.size, + }, + diag: { op, totalMs: 0 }, + }); + return; + } if (op === 'listTeams') { const { teams, diag } = await listTeams(msg.payload); post({ id, ok: true, result: teams, diag }); diff --git a/test/main/services/team/TeamDataWorkerClient.test.ts b/test/main/services/team/TeamDataWorkerClient.test.ts index fac20503..a3cd518a 100644 --- a/test/main/services/team/TeamDataWorkerClient.test.ts +++ b/test/main/services/team/TeamDataWorkerClient.test.ts @@ -93,6 +93,25 @@ describe('TeamDataWorkerClient', () => { client.dispose(); }); + it('does not queue warmup behind an already running worker', async () => { + const { TeamDataWorkerClient } = await import( + '../../../../src/main/services/team/TeamDataWorkerClient' + ); + const client = new TeamDataWorkerClient(); + + await client.getTeamData('my-team'); + await client.prewarm(); + + expect(hoisted.workers).toHaveLength(1); + expect(hoisted.workers[0].messages).toHaveLength(1); + expect(hoisted.workers[0].messages[0]).toMatchObject({ + op: 'getTeamData', + payload: { teamName: 'my-team' }, + }); + + client.dispose(); + }); + it('sends best-effort team config invalidation to the worker', async () => { const { TeamDataWorkerClient } = await import( '../../../../src/main/services/team/TeamDataWorkerClient' diff --git a/test/main/services/team/TeamFsWorker.integration.test.ts b/test/main/services/team/TeamFsWorker.integration.test.ts index 5cacefb2..e2085b93 100644 --- a/test/main/services/team/TeamFsWorker.integration.test.ts +++ b/test/main/services/team/TeamFsWorker.integration.test.ts @@ -11,6 +11,7 @@ interface WorkerResponse { id: string; ok: boolean; result?: unknown; + diag?: unknown; error?: string; } @@ -44,7 +45,11 @@ function createWorker(workerPath: string): Worker { return new Worker(workerPath); } -function callListTeams(worker: Worker, teamsDir: string): Promise { +function callWorker( + worker: Worker, + op: string, + payload: Record = {} +): Promise<{ result: unknown; diag?: unknown }> { const requestId = `req-${Date.now()}`; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { @@ -72,29 +77,56 @@ function callListTeams(worker: Worker, teamsDir: string): Promise { reject(new Error(message.error || 'team-fs-worker returned an unknown error')); return; } - resolve(Array.isArray(message.result) ? message.result : []); + resolve({ result: message.result, diag: message.diag }); }; worker.on('message', onMessage); worker.on('error', onError); - worker.postMessage({ - id: requestId, - op: 'listTeams', - payload: { - teamsDir, - largeConfigBytes: 8 * 1024, - configHeadBytes: 4 * 1024, - maxConfigBytes: 256 * 1024, - maxConfigReadMs: 5_000, - maxMembersMetaBytes: 256 * 1024, - maxSessionHistoryInSummary: 10, - maxProjectPathHistoryInSummary: 10, - concurrency: 2, - }, - }); + worker.postMessage({ id: requestId, op, payload }); }); } +async function callListTeams(worker: Worker, teamsDir: string): Promise<{ + teams: unknown[]; + diag?: Record; +}> { + const { result, diag } = await callWorker(worker, 'listTeams', { + teamsDir, + largeConfigBytes: 8 * 1024, + configHeadBytes: 4 * 1024, + maxConfigBytes: 256 * 1024, + maxConfigReadMs: 5_000, + maxMembersMetaBytes: 256 * 1024, + maxSessionHistoryInSummary: 10, + maxProjectPathHistoryInSummary: 10, + concurrency: 2, + }); + return { + teams: Array.isArray(result) ? result : [], + diag: diag && typeof diag === 'object' ? (diag as Record) : undefined, + }; +} + +async function callGetAllTasks(worker: Worker, tasksBase: string): Promise<{ + tasks: unknown[]; + diag?: Record; +}> { + const { result, diag } = await callWorker(worker, 'getAllTasks', { + tasksBase, + maxTaskBytes: 256 * 1024, + maxTaskReadMs: 5_000, + concurrency: 2, + }); + return { + tasks: Array.isArray(result) ? result : [], + diag: diag && typeof diag === 'object' ? (diag as Record) : undefined, + }; +} + +async function callWarmup(worker: Worker): Promise { + await callWorker(worker, 'warmup'); +} + describe('team-fs-worker integration', () => { let tempDir = ''; @@ -183,7 +215,7 @@ describe('team-fs-worker integration', () => { const worker = createWorker(workerPath); try { - const teams = (await callListTeams(worker, tempDir)) as Array>; + const { teams } = await callListTeams(worker, tempDir); expect(teams).toHaveLength(1); expect(teams[0]).toMatchObject({ teamName, @@ -234,7 +266,7 @@ describe('team-fs-worker integration', () => { const worker = createWorker(workerPath); try { - const teams = (await callListTeams(worker, tempDir)) as Array>; + const { teams } = await callListTeams(worker, tempDir); expect(teams).toHaveLength(1); expect(teams[0]).toMatchObject({ teamName, @@ -247,4 +279,150 @@ describe('team-fs-worker integration', () => { await worker.terminate(); } }); + + it('prewarms and reuses unchanged team summaries by fingerprint', async () => { + const workerPath = await getWorkerPath(); + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-')); + const teamName = 'cached-worker-team'; + const teamDir = path.join(tempDir, teamName); + await fs.mkdir(path.join(teamDir, 'inboxes'), { recursive: true }); + await fs.writeFile( + path.join(teamDir, 'config.json'), + JSON.stringify({ + name: 'Cached Worker Team', + members: [{ name: 'team-lead', agentType: 'team-lead' }], + }), + 'utf8' + ); + await fs.writeFile( + path.join(teamDir, 'members.meta.json'), + JSON.stringify({ version: 1, members: [{ name: 'alice' }] }), + 'utf8' + ); + + const worker = createWorker(workerPath); + try { + await callWarmup(worker); + const first = await callListTeams(worker, tempDir); + expect(first.teams[0]).toMatchObject({ teamName, memberCount: 1 }); + expect(first.diag?.cacheMisses).toBe(1); + + const second = await callListTeams(worker, tempDir); + expect(second.teams[0]).toMatchObject({ teamName, memberCount: 1 }); + expect(second.diag?.cacheHits).toBe(1); + + await fs.writeFile( + path.join(teamDir, 'members.meta.json'), + JSON.stringify({ version: 1, members: [{ name: 'alice' }, { name: 'bob' }] }), + 'utf8' + ); + const changed = await callListTeams(worker, tempDir); + expect(changed.teams[0]).toMatchObject({ teamName, memberCount: 2 }); + expect(changed.diag?.cacheMisses).toBe(1); + } finally { + await worker.terminate(); + } + }); + + it('does not cache pending launch summaries because liveness can change without file writes', async () => { + const workerPath = await getWorkerPath(); + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-')); + const teamName = 'pending-launch-team'; + const teamDir = path.join(tempDir, teamName); + await fs.mkdir(teamDir, { recursive: true }); + await fs.writeFile( + path.join(teamDir, 'config.json'), + JSON.stringify({ + name: 'Pending Launch Team', + members: [{ name: 'team-lead', agentType: 'team-lead' }], + }), + 'utf8' + ); + await fs.writeFile( + path.join(teamDir, 'launch-summary.json'), + JSON.stringify({ + version: 1, + teamName, + updatedAt: '2026-05-02T12:00:00.000Z', + teamLaunchState: 'partial_pending', + expectedMemberCount: 1, + pendingCount: 1, + }), + 'utf8' + ); + + const worker = createWorker(workerPath); + try { + const first = await callListTeams(worker, tempDir); + expect(first.teams[0]).toMatchObject({ + teamName, + teamLaunchState: 'partial_pending', + pendingCount: 1, + }); + expect(first.diag?.cacheMisses).toBe(1); + expect(first.diag?.cacheWriteSkips).toBe(1); + + const second = await callListTeams(worker, tempDir); + expect(second.teams[0]).toMatchObject({ + teamName, + teamLaunchState: 'partial_pending', + pendingCount: 1, + }); + expect(second.diag?.cacheHits).toBe(0); + expect(second.diag?.cacheMisses).toBe(1); + expect(second.diag?.cacheWriteSkips).toBe(1); + } finally { + await worker.terminate(); + } + }); + + it('reuses unchanged parsed tasks and rereads changed task files by fingerprint', async () => { + const workerPath = await getWorkerPath(); + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-')); + const tasksBase = path.join(tempDir, 'tasks'); + const teamName = 'task-cache-team'; + const tasksDir = path.join(tasksBase, teamName); + await fs.mkdir(tasksDir, { recursive: true }); + const taskPath = path.join(tasksDir, '1.json'); + await fs.writeFile( + taskPath, + JSON.stringify({ + id: '1', + subject: 'First subject', + status: 'pending', + createdAt: '2026-05-02T12:00:00.000Z', + }), + 'utf8' + ); + + const worker = createWorker(workerPath); + try { + const first = await callGetAllTasks(worker, tasksBase); + expect(first.tasks[0]).toMatchObject({ teamName, subject: 'First subject' }); + expect(first.diag?.cacheMisses).toBe(1); + + const second = await callGetAllTasks(worker, tasksBase); + expect(second.tasks[0]).toMatchObject({ teamName, subject: 'First subject' }); + expect(second.diag?.cacheHits).toBe(1); + + await fs.writeFile( + taskPath, + JSON.stringify({ + id: '1', + subject: 'Changed subject with a different size', + status: 'pending', + createdAt: '2026-05-02T12:00:00.000Z', + }), + 'utf8' + ); + const changed = await callGetAllTasks(worker, tasksBase); + expect(changed.tasks[0]).toMatchObject({ + teamName, + subject: 'Changed subject with a different size', + }); + expect(changed.diag?.cacheMisses).toBe(1); + } finally { + await worker.terminate(); + } + }); }); diff --git a/test/main/services/team/TeamFsWorkerClient.test.ts b/test/main/services/team/TeamFsWorkerClient.test.ts new file mode 100644 index 00000000..48a3a9f9 --- /dev/null +++ b/test/main/services/team/TeamFsWorkerClient.test.ts @@ -0,0 +1,152 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +const hoisted = vi.hoisted(() => { + const skipResponsesForOps = new Set(); + const workers: Array<{ + messages: unknown[]; + handlers: Map void>; + postMessage: (message: unknown) => void; + on: (event: string, handler: (value: unknown) => void) => void; + terminate: ReturnType; + }> = []; + const createMockWorker = vi.fn().mockImplementation(() => { + const worker = { + messages: [] as unknown[], + handlers: new Map void>(), + postMessage(message: unknown) { + worker.messages.push(message); + const request = message as { id: string; op: string }; + if (skipResponsesForOps.has(request.op)) return; + queueMicrotask(() => { + const handler = worker.handlers.get('message'); + if (!handler) return; + handler({ + id: request.id, + ok: true, + result: request.op === 'listTeams' || request.op === 'getAllTasks' ? [] : null, + diag: { op: request.op, totalMs: 0 }, + }); + }); + }, + on(event: string, handler: (value: unknown) => void) { + worker.handlers.set(event, handler); + }, + terminate: vi.fn(async () => undefined), + }; + workers.push(worker); + return worker; + }); + return { + workers, + createMockWorker, + skipResponsesForOps, + }; +}); + +vi.mock('node:fs', async () => { + const actual = await vi.importActual('node:fs'); + return { + ...actual, + existsSync: vi.fn(() => true), + }; +}); + +vi.mock('node:worker_threads', () => ({ + Worker: hoisted.createMockWorker, + default: { + Worker: hoisted.createMockWorker, + }, +})); + +describe('TeamFsWorkerClient', () => { + afterEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + vi.useRealTimers(); + hoisted.workers.length = 0; + hoisted.skipResponsesForOps.clear(); + }); + + it('prewarms the worker without running a scan', async () => { + const { TeamFsWorkerClient } = await import( + '../../../../src/main/services/team/TeamFsWorkerClient' + ); + const client = new TeamFsWorkerClient(); + + await client.prewarm(); + + expect(hoisted.workers).toHaveLength(1); + expect(hoisted.workers[0].messages).toHaveLength(1); + expect(hoisted.workers[0].messages[0]).toMatchObject({ + op: 'warmup', + payload: {}, + }); + }); + + it('does not queue warmup behind an already running worker', async () => { + const { TeamFsWorkerClient } = await import( + '../../../../src/main/services/team/TeamFsWorkerClient' + ); + const client = new TeamFsWorkerClient(); + + await client.listTeams({ + largeConfigBytes: 8 * 1024, + configHeadBytes: 4 * 1024, + maxConfigBytes: 256 * 1024, + maxMembersMetaBytes: 256 * 1024, + maxSessionHistoryInSummary: 10, + maxProjectPathHistoryInSummary: 10, + }); + await client.prewarm(); + + expect(hoisted.workers).toHaveLength(1); + expect(hoisted.workers[0].messages).toHaveLength(1); + expect(hoisted.workers[0].messages[0]).toMatchObject({ + op: 'listTeams', + }); + }); + + it('ignores stale worker exit after timeout when a replacement worker owns pending work', async () => { + vi.useFakeTimers(); + hoisted.skipResponsesForOps.add('warmup'); + hoisted.skipResponsesForOps.add('listTeams'); + const { TeamFsWorkerClient } = await import( + '../../../../src/main/services/team/TeamFsWorkerClient' + ); + const client = new TeamFsWorkerClient(); + + const prewarmResult = client.prewarm().catch((error: unknown) => error); + await vi.advanceTimersByTimeAsync(20_001); + const prewarmError = await prewarmResult; + expect(prewarmError).toBeInstanceOf(Error); + expect((prewarmError as Error).message).toContain('Worker call timeout'); + expect(hoisted.workers).toHaveLength(1); + + const listPromise = client.listTeams({ + largeConfigBytes: 8 * 1024, + configHeadBytes: 4 * 1024, + maxConfigBytes: 256 * 1024, + maxMembersMetaBytes: 256 * 1024, + maxSessionHistoryInSummary: 10, + maxProjectPathHistoryInSummary: 10, + }); + + expect(hoisted.workers).toHaveLength(2); + const staleWorker = hoisted.workers[0]; + const replacementWorker = hoisted.workers[1]; + const listRequest = replacementWorker.messages[0] as { id: string }; + + staleWorker.handlers.get('exit')?.(1); + replacementWorker.handlers.get('message')?.({ + id: listRequest.id, + ok: true, + result: [{ teamName: 'fresh-team', displayName: 'Fresh Team' }], + diag: { op: 'listTeams', totalMs: 1 }, + }); + + await expect(listPromise).resolves.toEqual({ + teams: [{ teamName: 'fresh-team', displayName: 'Fresh Team' }], + diag: { op: 'listTeams', totalMs: 1 }, + }); + }); +}); diff --git a/test/main/services/team/TeamProvisioningService.test.ts b/test/main/services/team/TeamProvisioningService.test.ts index e0f21a28..02a791b4 100644 --- a/test/main/services/team/TeamProvisioningService.test.ts +++ b/test/main/services/team/TeamProvisioningService.test.ts @@ -206,6 +206,16 @@ function createPidusageStat(pid: number, memory: number) { }; } +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + function writeLaunchConfig( teamName: string, projectPath: string, @@ -619,6 +629,182 @@ describe('TeamProvisioningService', () => { }); describe('getTeamAgentRuntimeSnapshot', () => { + it('dedupes concurrent runtime snapshot probes for the same team', async () => { + const svc = new TeamProvisioningService(); + (svc as any).configReader = { + getConfig: vi.fn(async () => ({ + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'alice', model: 'gpt-5.4-mini' }, + ], + })), + }; + (svc as any).readPersistedRuntimeMembers = vi.fn(() => [ + { + name: 'alice', + agentId: 'alice@runtime-team', + tmuxPaneId: '%1', + backendType: 'tmux', + }, + ]); + (svc as any).aliveRunByTeam.set('runtime-team', 'run-1'); + (svc as any).runs.set('run-1', { + runId: 'run-1', + child: { pid: 111 }, + request: { model: 'gpt-5.4' }, + processKilled: false, + cancelRequested: false, + spawnContext: null, + }); + const paneInfo = createDeferred>(); + vi.mocked(listTmuxPaneRuntimeInfoForCurrentPlatform).mockReturnValueOnce( + paneInfo.promise as ReturnType + ); + vi.mocked(pidusage).mockResolvedValueOnce({ + '111': createPidusageStat(111, 123_000_000), + '222': createPidusageStat(222, 456_000_000), + } as any); + + const first = svc.getTeamAgentRuntimeSnapshot('runtime-team'); + const second = svc.getTeamAgentRuntimeSnapshot('runtime-team'); + paneInfo.resolve( + new Map([ + [ + '%1', + { + paneId: '%1', + panePid: 222, + }, + ], + ]) + ); + const [firstSnapshot, secondSnapshot] = await Promise.all([first, second]); + + expect(listTmuxPaneRuntimeInfoForCurrentPlatform).toHaveBeenCalledTimes(1); + expect(pidusage).toHaveBeenCalledTimes(1); + expect(firstSnapshot.members.alice?.pid).toBe(222); + expect(secondSnapshot.members.alice?.pid).toBe(222); + }); + + it('does not cache live runtime metadata when invalidated while the probe is in flight', async () => { + const svc = new TeamProvisioningService(); + (svc as any).configReader = { + getConfig: vi.fn(async () => ({ + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'alice', model: 'gpt-5.4-mini' }, + ], + })), + }; + const processRows = createDeferred>>(); + vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform) + .mockReturnValueOnce(processRows.promise) + .mockResolvedValueOnce([]); + + const first = (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team') as Promise< + Map + >; + (svc as any).invalidateRuntimeSnapshotCaches('runtime-team'); + processRows.resolve([]); + await first; + + await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team'); + + expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(2); + }); + + it('returns cloned live runtime metadata maps from cache', async () => { + const svc = new TeamProvisioningService(); + (svc as any).configReader = { + getConfig: vi.fn(async () => ({ + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'alice', model: 'gpt-5.4-mini' }, + ], + })), + }; + vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform).mockResolvedValueOnce([]); + + const first = (await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team')) as Map< + string, + unknown + >; + expect(first.has('alice')).toBe(true); + first.delete('alice'); + + const second = (await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team')) as Map< + string, + unknown + >; + + expect(second.has('alice')).toBe(true); + expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(1); + }); + + it('clears runtime probe caches when starting a new run for the team', async () => { + const svc = new TeamProvisioningService(); + (svc as any).configReader = { + getConfig: vi.fn(async () => ({ + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'alice', model: 'gpt-5.4-mini' }, + ], + })), + }; + vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform) + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]); + + await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team'); + (svc as any).resetTeamScopedTransientStateForNewRun('runtime-team'); + await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team'); + + expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(2); + }); + + it('does not cache a probe that started before runtime adapter evidence was installed', async () => { + const svc = new TeamProvisioningService(); + (svc as any).configReader = { + getConfig: vi.fn(async () => ({ + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'alice', providerId: 'opencode', model: 'gpt-5.4-mini' }, + ], + })), + }; + (svc as any).provisioningRunByTeam.set('runtime-team', 'run-1'); + const processRows = createDeferred>>(); + vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform) + .mockReturnValueOnce(processRows.promise) + .mockResolvedValueOnce([]); + + const first = (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team') as Promise< + Map + >; + (svc as any).runtimeAdapterRunByTeam.set('runtime-team', { + runId: 'run-1', + providerId: 'opencode', + cwd: '/tmp/runtime-project', + members: { + alice: { + providerId: 'opencode', + runtimeAlive: true, + bootstrapConfirmed: false, + runtimePid: 333, + livenessKind: 'runtime_process', + pidSource: 'agent_process_table', + }, + }, + }); + (svc as any).invalidateRuntimeSnapshotCaches('runtime-team'); + processRows.resolve([]); + await first; + + await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team'); + + expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(2); + }); + it('uses batched pidusage rss values for lead and teammates', async () => { const svc = new TeamProvisioningService(); (svc as any).configReader = {