perf(team): reduce launch IO pressure

This commit is contained in:
777genius 2026-05-02 23:15:34 +03:00
parent b187bbcdd0
commit fcb9179990
11 changed files with 1380 additions and 127 deletions

View file

@ -138,6 +138,7 @@ import {
} from './services/team/TeamControlApiState';
import { TeamInboxReader } from './services/team/TeamInboxReader';
import { getTeamDataWorkerClient } from './services/team/TeamDataWorkerClient';
import { getTeamFsWorkerClient } from './services/team/TeamFsWorkerClient';
import { TeamMemberRuntimeAdvisoryService } from './services/team/TeamMemberRuntimeAdvisoryService';
import {
createTeamReconcileDrainScheduler,
@ -1841,6 +1842,30 @@ function createWindow(): void {
updaterService.startPeriodicCheck(60 * 60 * 1000);
}
scheduleStartupTask(
() => {
void getTeamFsWorkerClient()
.prewarm()
.catch((error: unknown) =>
logger.debug(
`[startup] team-fs-worker prewarm skipped: ${
error instanceof Error ? error.message : String(error)
}`
)
);
void getTeamDataWorkerClient()
.prewarm()
.catch((error: unknown) =>
logger.debug(
`[startup] team-data-worker prewarm skipped: ${
error instanceof Error ? error.message : String(error)
}`
)
);
},
process.platform === 'win32' ? 2500 : 1000
);
// Defer non-critical startup work to avoid thread pool contention.
// The window is now visible and responsive; these run in the background.
scheduleStartupTask(() => {

View file

@ -66,6 +66,9 @@ interface PendingEntry {
function summarizeWorkerPayload(
payload: TeamDataWorkerRequest['payload']
): Record<string, unknown> {
if (!payload) {
return {};
}
if ('taskId' in payload) {
return {
teamName: payload.teamName,
@ -213,6 +216,21 @@ export class TeamDataWorkerClient {
});
}
async prewarm(): Promise<void> {
if (this.worker) {
return;
}
if (!this.isAvailable()) {
return;
}
const startedAt = Date.now();
await this.call('warmup', {});
const ms = Date.now() - startedAt;
if (ms >= 1500) {
logger.warn(`worker prewarm slow ms=${ms}`);
}
}
private postBestEffort(
op: TeamDataWorkerRequest['op'],
payload: TeamDataWorkerRequest['payload']

View file

@ -36,6 +36,7 @@ interface GetAllTasksPayload {
}
type WorkerRequest =
| { id: string; op: 'warmup'; payload?: Record<string, never> }
| { id: string; op: 'listTeams'; payload: ListTeamsPayload }
| { id: string; op: 'getAllTasks'; payload: GetAllTasksPayload };
@ -44,6 +45,9 @@ type WorkerResponse =
| { id: string; ok: false; error: string };
function summarizeWorkerPayload(payload: WorkerRequest['payload']): Record<string, unknown> {
if (!payload) {
return {};
}
if ('teamsDir' in payload) {
return {
teamsDir: payload.teamsDir,
@ -52,6 +56,9 @@ function summarizeWorkerPayload(payload: WorkerRequest['payload']): Record<strin
maxConfigBytes: payload.maxConfigBytes,
};
}
if (!('tasksBase' in payload)) {
return {};
}
return {
tasksBase: payload.tasksBase,
concurrency: payload.concurrency,
@ -108,6 +115,18 @@ export class TeamFsWorkerClient {
{ resolve: (v: { result: unknown; diag?: WorkerDiag }) => void; reject: (e: Error) => void }
>();
private failWorker(worker: Worker, error: Error): void {
if (this.worker !== worker) return;
this.worker = null;
const pendingEntries = Array.from(this.pending.values());
this.pending.clear();
for (const entry of pendingEntries) {
entry.reject(error);
}
}
isAvailable(): boolean {
if (!this.workerPath && !this.warnedUnavailable && shouldWarnUnavailableWorker()) {
this.warnedUnavailable = true;
@ -134,8 +153,9 @@ export class TeamFsWorkerClient {
return this.worker;
}
this.worker = new Worker(this.workerPath);
this.worker.on('message', (msg: WorkerResponse) => {
const worker = new Worker(this.workerPath);
this.worker = worker;
worker.on('message', (msg: WorkerResponse) => {
const entry = this.pending.get(msg.id);
if (!entry) return;
this.pending.delete(msg.id);
@ -145,26 +165,18 @@ export class TeamFsWorkerClient {
entry.reject(new Error(msg.error));
}
});
this.worker.on('error', (err) => {
worker.on('error', (err) => {
logger.error('Worker error', err);
for (const [, entry] of this.pending) {
entry.reject(err instanceof Error ? err : new Error(String(err)));
}
this.pending.clear();
this.worker = null;
this.failWorker(worker, err instanceof Error ? err : new Error(String(err)));
});
this.worker.on('exit', (code) => {
worker.on('exit', (code) => {
if (code !== 0) {
logger.warn(`Worker exited with code ${code}`);
}
for (const [, entry] of this.pending) {
entry.reject(new Error(`Worker exited with code ${code}`));
}
this.pending.clear();
this.worker = null;
this.failWorker(worker, new Error(`Worker exited with code ${code}`));
});
return this.worker;
return worker;
}
private call(
@ -177,21 +189,22 @@ export class TeamFsWorkerClient {
const pendingAtStart = this.pending.size;
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
this.pending.delete(id);
try {
// Terminate and recreate on next call — worker may be stuck in native IO.
this.worker?.terminate().catch(() => undefined);
} catch {
// ignore
} finally {
this.worker = null;
}
const timeoutError = new Error(
`Worker call timeout after ${WORKER_CALL_TIMEOUT_MS}ms (${op})`
);
logger.warn(
`worker call timeout op=${op} ms=${Date.now() - startedAt} pendingAtStart=${pendingAtStart} pendingNow=${this.pending.size} payload=${JSON.stringify(
summarizeWorkerPayload(payload)
)}`
);
reject(new Error(`Worker call timeout after ${WORKER_CALL_TIMEOUT_MS}ms (${op})`));
this.failWorker(worker, timeoutError);
try {
// Terminate and recreate on next call - worker may be stuck in native IO.
worker.terminate().catch(() => undefined);
} catch {
// ignore
}
reject(timeoutError);
}, WORKER_CALL_TIMEOUT_MS);
this.pending.set(id, {
@ -224,6 +237,21 @@ export class TeamFsWorkerClient {
});
}
async prewarm(): Promise<void> {
if (this.worker) {
return;
}
if (!this.isAvailable()) {
return;
}
const startedAt = Date.now();
await this.call('warmup', {});
const ms = Date.now() - startedAt;
if (ms >= 1500) {
logger.warn(`worker prewarm slow ms=${ms}`);
}
}
async listTeams(options: {
largeConfigBytes: number;
configHeadBytes: number;

View file

@ -4565,10 +4565,31 @@ export class TeamProvisioningService {
string,
{ expiresAtMs: number; snapshot: TeamAgentRuntimeSnapshot }
>();
private readonly agentRuntimeSnapshotInFlightByTeam = new Map<
string,
{
generationAtStart: number;
runIdAtStart: string | null;
promise: Promise<TeamAgentRuntimeSnapshot>;
}
>();
private readonly liveTeamAgentRuntimeMetadataCache = new Map<
string,
{ expiresAtMs: number; metadata: Map<string, LiveTeamAgentRuntimeMetadata> }
{
expiresAtMs: number;
metadata: Map<string, LiveTeamAgentRuntimeMetadata>;
runId: string | null;
}
>();
private readonly liveTeamAgentRuntimeMetadataInFlightByTeam = new Map<
string,
{
generationAtStart: number;
runIdAtStart: string | null;
promise: Promise<Map<string, LiveTeamAgentRuntimeMetadata>>;
}
>();
private readonly runtimeSnapshotCacheGenerationByTeam = new Map<string, number>();
private readonly launchStateStore = new TeamLaunchStateStore();
private readonly launchStateStoreQueue = new Map<string, Promise<unknown>>();
private readonly memberLogsFinder: TeamMemberLogsFinder;
@ -4651,6 +4672,35 @@ export class TeamProvisioningService {
return { config, teamMeta, metaMembers };
}
private getRuntimeSnapshotCacheGeneration(teamName: string): number {
return this.runtimeSnapshotCacheGenerationByTeam.get(teamName) ?? 0;
}
private invalidateRuntimeSnapshotCaches(teamName: string): void {
this.runtimeSnapshotCacheGenerationByTeam.set(
teamName,
this.getRuntimeSnapshotCacheGeneration(teamName) + 1
);
this.agentRuntimeSnapshotCache.delete(teamName);
this.agentRuntimeSnapshotInFlightByTeam.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataInFlightByTeam.delete(teamName);
}
private cloneLiveTeamAgentRuntimeMetadata(
metadata: ReadonlyMap<string, LiveTeamAgentRuntimeMetadata>
): Map<string, LiveTeamAgentRuntimeMetadata> {
return new Map(
[...metadata.entries()].map(([memberName, entry]) => [
memberName,
{
...entry,
...(entry.diagnostics ? { diagnostics: [...entry.diagnostics] } : {}),
},
])
);
}
private resolveOpenCodeMemberIdentityFromDirectory(
teamName: string,
memberName: string,
@ -5463,6 +5513,7 @@ export class TeamProvisioningService {
this.runtimeAdapterRunByTeam.delete(teamName);
this.aliveRunByTeam.delete(teamName);
this.provisioningRunByTeam.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
}
}
if (cleaned > 0) {
@ -7625,6 +7676,7 @@ export class TeamProvisioningService {
private resetTeamScopedTransientStateForNewRun(teamName: string): void {
peekAutoResumeService()?.cancelPendingAutoResume(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
this.retainedClaudeLogsByTeam.delete(teamName);
this.persistedTranscriptClaudeLogsCache.delete(teamName);
this.leadInboxRelayInFlight.delete(teamName);
@ -9070,8 +9122,7 @@ export class TeamProvisioningService {
trackedUpdate.run,
this.getMixedSecondaryLaunchPhase(trackedUpdate.run)
);
this.agentRuntimeSnapshotCache.delete(input.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(input.teamName);
this.invalidateRuntimeSnapshotCaches(input.teamName);
if (trackedUpdate.changed) {
this.teamChangeEmitter?.({
type: 'member-spawn',
@ -9158,8 +9209,7 @@ export class TeamProvisioningService {
updatedAt: input.observedAt,
});
await this.writeLaunchStateSnapshot(input.teamName, snapshot);
this.agentRuntimeSnapshotCache.delete(input.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(input.teamName);
this.invalidateRuntimeSnapshotCaches(input.teamName);
if (shouldEmitMemberSpawnChange) {
this.teamChangeEmitter?.({
type: 'member-spawn',
@ -9936,8 +9986,7 @@ export class TeamProvisioningService {
);
return;
}
this.agentRuntimeSnapshotCache.delete(run.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
this.invalidateRuntimeSnapshotCaches(run.teamName);
this.setMemberSpawnStatus(run, spawnedMemberName, 'waiting');
this.appendMemberBootstrapDiagnostic(
run,
@ -10405,6 +10454,36 @@ export class TeamProvisioningService {
return cached.snapshot;
}
const generationAtStart = this.getRuntimeSnapshotCacheGeneration(teamName);
const existingRequest = this.agentRuntimeSnapshotInFlightByTeam.get(teamName);
if (
existingRequest &&
existingRequest.generationAtStart === generationAtStart &&
existingRequest.runIdAtStart === runId
) {
return existingRequest.promise;
}
const request = this.buildTeamAgentRuntimeSnapshot(teamName, runId, generationAtStart).finally(
() => {
if (this.agentRuntimeSnapshotInFlightByTeam.get(teamName)?.promise === request) {
this.agentRuntimeSnapshotInFlightByTeam.delete(teamName);
}
}
);
this.agentRuntimeSnapshotInFlightByTeam.set(teamName, {
generationAtStart,
runIdAtStart: runId,
promise: request,
});
return request;
}
private async buildTeamAgentRuntimeSnapshot(
teamName: string,
runId: string | null,
generationAtStart: number
): Promise<TeamAgentRuntimeSnapshot> {
const updatedAt = nowIso();
const run = runId ? (this.runs.get(runId) ?? null) : null;
const currentRuntimeAdapterRun = this.runtimeAdapterRunByTeam.get(teamName);
@ -10627,10 +10706,15 @@ export class TeamProvisioningService {
members: snapshotMembers,
};
this.agentRuntimeSnapshotCache.set(teamName, {
expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS,
snapshot,
});
if (
this.getRuntimeSnapshotCacheGeneration(teamName) === generationAtStart &&
this.getTrackedRunId(teamName) === runId
) {
this.agentRuntimeSnapshotCache.set(teamName, {
expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS,
snapshot,
});
}
return snapshot;
}
@ -11004,8 +11088,7 @@ export class TeamProvisioningService {
);
}
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
const liveRuntimeByMember = await this.getLiveTeamAgentRuntimeMetadata(teamName);
const livePids = new Set<number>();
let hasAliveRuntimeWithoutPid = false;
@ -11150,8 +11233,7 @@ export class TeamProvisioningService {
throw new Error('Lead restart is not supported from member controls');
}
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
this.resetRuntimeToolActivity(run, memberName);
this.clearMemberSpawnToolTracking(run, memberName);
this.setMemberSpawnStatus(run, memberName, 'spawning');
@ -11311,8 +11393,7 @@ export class TeamProvisioningService {
: 'Skipped by user for this launch';
if (run && !run.processKilled && !run.cancelRequested) {
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
this.resetRuntimeToolActivity(run, normalizedMemberName);
this.clearMemberSpawnToolTracking(run, normalizedMemberName);
this.setMemberSpawnStatus(run, normalizedMemberName, 'skipped', reason);
@ -11374,8 +11455,7 @@ export class TeamProvisioningService {
updatedAt,
});
await this.writeLaunchStateSnapshot(teamName, nextSnapshot);
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
}
private getMutableAliveRunOrThrow(teamName: string): ProvisioningRun {
@ -11489,8 +11569,7 @@ export class TeamProvisioningService {
}
this.upsertRunAllEffectiveMember(run, memberSpec);
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
this.resetRuntimeToolActivity(run, memberName);
this.clearMemberSpawnToolTracking(run, memberName);
run.pendingMemberRestarts.delete(memberName);
@ -11544,8 +11623,7 @@ export class TeamProvisioningService {
);
if (laneIndex < 0) {
this.removeRunAllEffectiveMember(run, memberName);
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
await this.persistLaunchStateSnapshot(run, this.getMixedSecondaryLaunchPhase(run));
return;
}
@ -11553,8 +11631,7 @@ export class TeamProvisioningService {
const [lane] = run.mixedSecondaryLanes.splice(laneIndex, 1);
await this.stopSingleMixedSecondaryRuntimeLane(run, lane, 'cleanup');
this.removeRunAllEffectiveMember(run, memberName);
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
this.resetRuntimeToolActivity(run, memberName);
this.clearMemberSpawnToolTracking(run, memberName);
run.pendingMemberRestarts.delete(memberName);
@ -14429,6 +14506,7 @@ export class TeamProvisioningService {
}).catch(() => undefined);
this.runtimeAdapterRunByTeam.delete(input.request.teamName);
this.aliveRunByTeam.delete(input.request.teamName);
this.invalidateRuntimeSnapshotCaches(input.request.teamName);
} else {
this.runtimeAdapterRunByTeam.set(input.request.teamName, {
runId,
@ -14437,6 +14515,7 @@ export class TeamProvisioningService {
members: result.members,
});
this.aliveRunByTeam.set(input.request.teamName, runId);
this.invalidateRuntimeSnapshotCaches(input.request.teamName);
}
if (this.provisioningRunByTeam.get(input.request.teamName) === runId) {
this.provisioningRunByTeam.delete(input.request.teamName);
@ -15399,6 +15478,7 @@ export class TeamProvisioningService {
if (this.provisioningRunByTeam.get(teamName) === runId) {
this.provisioningRunByTeam.delete(teamName);
}
this.invalidateRuntimeSnapshotCaches(teamName);
this.setRuntimeAdapterProgress({
...runtimeProgress,
state: 'cancelled',
@ -15478,6 +15558,7 @@ export class TeamProvisioningService {
if (this.provisioningRunByTeam.get(teamName) === runId) {
this.provisioningRunByTeam.delete(teamName);
}
this.invalidateRuntimeSnapshotCaches(teamName);
}
private recordCancelledOpenCodeRuntimeAdapterLaunch(
@ -15490,6 +15571,7 @@ export class TeamProvisioningService {
this.provisioningRunByTeam.delete(teamName);
this.runtimeAdapterRunByTeam.delete(teamName);
this.aliveRunByTeam.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
const progress: TeamProvisioningProgress = {
runId,
teamName,
@ -17519,12 +17601,44 @@ export class TeamProvisioningService {
private async getLiveTeamAgentRuntimeMetadata(
teamName: string
): Promise<Map<string, LiveTeamAgentRuntimeMetadata>> {
const runId = this.getTrackedRunId(teamName);
const cached = this.liveTeamAgentRuntimeMetadataCache.get(teamName);
if (cached && cached.expiresAtMs > Date.now()) {
return cached.metadata;
if (cached && cached.expiresAtMs > Date.now() && cached.runId === runId) {
return this.cloneLiveTeamAgentRuntimeMetadata(cached.metadata);
}
const runId = this.getTrackedRunId(teamName);
const generationAtStart = this.getRuntimeSnapshotCacheGeneration(teamName);
const existingRequest = this.liveTeamAgentRuntimeMetadataInFlightByTeam.get(teamName);
if (
existingRequest &&
existingRequest.generationAtStart === generationAtStart &&
existingRequest.runIdAtStart === runId
) {
return this.cloneLiveTeamAgentRuntimeMetadata(await existingRequest.promise);
}
const request = this.buildLiveTeamAgentRuntimeMetadata(
teamName,
runId,
generationAtStart
).finally(() => {
if (this.liveTeamAgentRuntimeMetadataInFlightByTeam.get(teamName)?.promise === request) {
this.liveTeamAgentRuntimeMetadataInFlightByTeam.delete(teamName);
}
});
this.liveTeamAgentRuntimeMetadataInFlightByTeam.set(teamName, {
generationAtStart,
runIdAtStart: runId,
promise: request,
});
return this.cloneLiveTeamAgentRuntimeMetadata(await request);
}
private async buildLiveTeamAgentRuntimeMetadata(
teamName: string,
runId: string | null,
generationAtStart: number
): Promise<Map<string, LiveTeamAgentRuntimeMetadata>> {
const run = runId ? (this.runs.get(runId) ?? null) : null;
let configuredMembers: TeamConfig['members'] = [];
@ -17865,10 +17979,16 @@ export class TeamProvisioningService {
});
}
this.liveTeamAgentRuntimeMetadataCache.set(teamName, {
expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS,
metadata: metadataByMember,
});
if (
this.getRuntimeSnapshotCacheGeneration(teamName) === generationAtStart &&
this.getTrackedRunId(teamName) === runId
) {
this.liveTeamAgentRuntimeMetadataCache.set(teamName, {
expiresAtMs: Date.now() + TeamProvisioningService.AGENT_RUNTIME_SNAPSHOT_CACHE_TTL_MS,
metadata: this.cloneLiveTeamAgentRuntimeMetadata(metadataByMember),
runId,
});
}
return metadataByMember;
}
@ -18884,14 +19004,12 @@ export class TeamProvisioningService {
if (filteredSnapshot.teamLaunchState === 'clean_success' && launchPhase !== 'active') {
await this.clearPersistedLaunchStateNow(run.teamName);
this.agentRuntimeSnapshotCache.delete(run.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
this.invalidateRuntimeSnapshotCaches(run.teamName);
return null;
}
const writtenSnapshot = await this.writeLaunchStateSnapshotNow(run.teamName, filteredSnapshot);
this.agentRuntimeSnapshotCache.delete(run.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
this.invalidateRuntimeSnapshotCaches(run.teamName);
return writtenSnapshot;
}
@ -20772,8 +20890,7 @@ export class TeamProvisioningService {
* Always uses SIGKILL via killTeamProcess() to prevent CLI cleanup.
*/
async stopTeam(teamName: string): Promise<void> {
this.agentRuntimeSnapshotCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
this.stopPersistentTeamMembers(teamName);
const runId = this.getTrackedRunId(teamName);
@ -21001,6 +21118,7 @@ export class TeamProvisioningService {
this.runtimeAdapterRunByTeam.delete(teamName);
this.aliveRunByTeam.delete(teamName);
this.provisioningRunByTeam.delete(teamName);
this.invalidateRuntimeSnapshotCaches(teamName);
return;
}
const startedAt = nowIso();
@ -21019,6 +21137,7 @@ export class TeamProvisioningService {
if (this.provisioningRunByTeam.get(teamName) === runId) {
this.provisioningRunByTeam.delete(teamName);
}
this.invalidateRuntimeSnapshotCaches(teamName);
try {
await clearOpenCodeRuntimeLaneStorage({
teamsBasePath: getTeamsBasePath(),
@ -21364,8 +21483,7 @@ export class TeamProvisioningService {
);
return true;
}
this.agentRuntimeSnapshotCache.delete(run.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
this.invalidateRuntimeSnapshotCaches(run.teamName);
this.setMemberSpawnStatus(run, memberName, 'waiting');
this.appendMemberBootstrapDiagnostic(
run,
@ -23760,8 +23878,7 @@ export class TeamProvisioningService {
this.clearSecondaryRuntimeRuns(run.teamName);
}
if (!hasNewerTrackedRun) {
this.agentRuntimeSnapshotCache.delete(run.teamName);
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
this.invalidateRuntimeSnapshotCaches(run.teamName);
this.leadInboxRelayInFlight.delete(run.teamName);
this.relayedLeadInboxMessageIds.delete(run.teamName);
this.pendingCrossTeamFirstReplies.delete(run.teamName);

View file

@ -56,6 +56,7 @@ export interface TeamDataWorkerDiag {
// ── Request / Response ──
export type TeamDataWorkerRequest =
| { id: string; op: 'warmup'; payload?: Record<string, never> }
| { id: string; op: 'getTeamData'; payload: GetTeamDataPayload }
| { id: string; op: 'getMessagesPage'; payload: GetMessagesPagePayload }
| { id: string; op: 'getMemberActivityMeta'; payload: GetMemberActivityMetaPayload }

View file

@ -39,12 +39,16 @@ parentPort?.on('message', async (msg: TeamDataWorkerRequest) => {
const startedAt = Date.now();
const buildDiag = (): NonNullable<Extract<TeamDataWorkerResponse, { ok: true }>['diag']> => ({
op: msg.op,
...('teamName' in msg.payload ? { teamName: msg.payload.teamName } : {}),
...('taskId' in msg.payload ? { taskId: msg.payload.taskId } : {}),
...(msg.payload && 'teamName' in msg.payload ? { teamName: msg.payload.teamName } : {}),
...(msg.payload && 'taskId' in msg.payload ? { taskId: msg.payload.taskId } : {}),
totalMs: Date.now() - startedAt,
});
try {
switch (msg.op) {
case 'warmup': {
respond({ id: msg.id, ok: true, result: null, diag: buildDiag() });
break;
}
case 'getTeamData': {
const result = await teamDataService.getTeamData(msg.payload.teamName);
respond({ id: msg.id, ok: true, result, diag: buildDiag() });

View file

@ -33,6 +33,7 @@ interface GetAllTasksPayload {
}
type WorkerRequest =
| { id: string; op: 'warmup'; payload?: Record<string, never> }
| { id: string; op: 'listTeams'; payload: ListTeamsPayload }
| { id: string; op: 'getAllTasks'; payload: GetAllTasksPayload };
@ -75,6 +76,10 @@ interface ListTeamsDiag {
skipped: number;
skipReasons: Record<string, number>;
slowest: SlowEntry[];
cacheHits: number;
cacheMisses: number;
cacheWriteSkips: number;
cacheEvictions: number;
totalMs: number;
}
@ -87,12 +92,19 @@ interface GetAllTasksDiag {
skipped: number;
skipReasons: Record<string, number>;
slowestTeams: SlowEntry[];
cacheHits: number;
cacheMisses: number;
cacheWriteSkips: number;
cacheEvictions: number;
totalMs: number;
}
interface TaskReadDiag {
skipped: number;
skipReasons: Record<string, number>;
cacheHits: number;
cacheMisses: number;
cacheWriteSkips: number;
}
const MAX_LAUNCH_STATE_BYTES = 32 * 1024;
@ -104,6 +116,60 @@ const REVIEW_LIFECYCLE_EVENTS = new Set([
'review_started',
]);
const REVIEW_RESET_STATUSES = new Set(['in_progress', 'deleted']);
const TEAM_SUMMARY_CACHE_MAX_ENTRIES = 1000;
const TASK_FILE_CACHE_MAX_ENTRIES = 10000;
const BOOTSTRAP_STATE_FILE = 'bootstrap-state.json';
const BOOTSTRAP_JOURNAL_FILE = 'bootstrap-journal.jsonl';
interface PathFingerprint {
exists: boolean;
isFile?: boolean;
isDirectory?: boolean;
highResolution?: boolean;
size?: string;
mode?: string;
dev?: string;
ino?: string;
mtimeNs?: string;
ctimeNs?: string;
birthtimeNs?: string;
mtimeMs?: number;
ctimeMs?: number;
birthtimeMs?: number;
errorCode?: string;
}
interface TeamSummaryCacheEntry {
fingerprint: string;
summary: Record<string, unknown>;
teamsDir: string;
optionKey: string;
lastUsedAt: number;
}
type CachedTaskReadResult =
| { task: Record<string, unknown>; skipReason?: undefined }
| { task?: undefined; skipReason: string };
interface TaskFileCacheEntry {
fingerprint: string;
result: CachedTaskReadResult;
tasksBase: string;
lastUsedAt: number;
}
const teamSummaryCache = new Map<string, TeamSummaryCacheEntry>();
const taskFileCache = new Map<string, TaskFileCacheEntry>();
interface TeamSummaryDependencyFingerprint {
value: string;
cacheSafe: boolean;
}
interface LaunchStateSummaryRead {
summary: ReturnType<typeof choosePreferredLaunchStateSummary> | null;
cacheable: boolean;
}
// ---------------------------------------------------------------------------
// Parsed JSON types (loose shapes from disk)
@ -272,6 +338,319 @@ function pushSlowest(list: SlowEntry[], entry: SlowEntry, maxLen: number): void
if (list.length > maxLen) list.length = maxLen;
}
function cloneCached<T>(value: T): T {
return typeof structuredClone === 'function'
? structuredClone(value)
: (JSON.parse(JSON.stringify(value)) as T);
}
async function statPathFingerprint(filePath: string): Promise<PathFingerprint> {
try {
const stat = await fs.promises.stat(filePath, { bigint: true });
const mtimeNs =
typeof (stat as fs.BigIntStats & { mtimeNs?: bigint }).mtimeNs === 'bigint'
? (stat as fs.BigIntStats & { mtimeNs: bigint }).mtimeNs
: undefined;
const ctimeNs =
typeof (stat as fs.BigIntStats & { ctimeNs?: bigint }).ctimeNs === 'bigint'
? (stat as fs.BigIntStats & { ctimeNs: bigint }).ctimeNs
: undefined;
const birthtimeNs =
typeof (stat as fs.BigIntStats & { birthtimeNs?: bigint }).birthtimeNs === 'bigint'
? (stat as fs.BigIntStats & { birthtimeNs: bigint }).birthtimeNs
: undefined;
return {
exists: true,
isFile: stat.isFile(),
isDirectory: stat.isDirectory(),
highResolution: typeof mtimeNs === 'bigint' && typeof ctimeNs === 'bigint',
size: stat.size.toString(),
mode: stat.mode.toString(),
dev: stat.dev.toString(),
ino: stat.ino.toString(),
mtimeNs: mtimeNs?.toString(),
ctimeNs: ctimeNs?.toString(),
birthtimeNs: birthtimeNs?.toString(),
mtimeMs: Number(stat.mtimeMs),
ctimeMs: Number(stat.ctimeMs),
birthtimeMs: Number(stat.birthtimeMs),
};
} catch (error) {
return {
exists: false,
errorCode:
typeof (error as NodeJS.ErrnoException | undefined)?.code === 'string'
? (error as NodeJS.ErrnoException).code
: undefined,
};
}
}
function fingerprintToString(value: unknown): string {
return JSON.stringify(value);
}
function isCacheSafeFingerprint(fingerprint: PathFingerprint): boolean {
if (fingerprint.exists) {
return fingerprint.highResolution === true;
}
return fingerprint.errorCode === 'ENOENT' || fingerprint.errorCode === 'ENOTDIR';
}
function makeTeamSummaryOptionKey(payload: ListTeamsPayload): string {
return fingerprintToString({
largeConfigBytes: payload.largeConfigBytes,
configHeadBytes: payload.configHeadBytes,
maxConfigBytes: payload.maxConfigBytes,
maxConfigReadMs: payload.maxConfigReadMs,
maxMembersMetaBytes: payload.maxMembersMetaBytes,
maxSessionHistoryInSummary: payload.maxSessionHistoryInSummary,
maxProjectPathHistoryInSummary: payload.maxProjectPathHistoryInSummary,
});
}
function makeTeamSummaryCacheKey(teamsDir: string, teamName: string, optionKey: string): string {
return `${teamsDir}\0${teamName}\0${optionKey}`;
}
function canCacheTeamSummary(summary: Record<string, unknown>): boolean {
if (summary.teamLaunchState === 'partial_pending') {
return false;
}
const pendingKeys = [
'pendingCount',
'runtimeAlivePendingCount',
'shellOnlyPendingCount',
'runtimeProcessPendingCount',
'runtimeCandidatePendingCount',
'noRuntimePendingCount',
'permissionPendingCount',
];
return pendingKeys.every((key) => {
const value = summary[key];
return typeof value !== 'number' || value <= 0;
});
}
async function readInboxNamesFingerprint(inboxDir: string): Promise<{
dir: PathFingerprint;
names: string[];
cacheSafe: boolean;
}> {
const dir = await statPathFingerprint(inboxDir);
if (!dir.exists || !dir.isDirectory) {
return { dir, names: [], cacheSafe: isCacheSafeFingerprint(dir) };
}
try {
const entries = await fs.promises.readdir(inboxDir, { withFileTypes: true });
const names = entries
.filter((entry) => entry.isFile() && entry.name.endsWith('.json'))
.map((entry) => entry.name)
.sort();
return { dir, names, cacheSafe: isCacheSafeFingerprint(dir) };
} catch (error) {
return {
dir: {
...dir,
errorCode:
typeof (error as NodeJS.ErrnoException | undefined)?.code === 'string'
? (error as NodeJS.ErrnoException).code
: 'READDIR_FAILED',
},
names: [],
cacheSafe: false,
};
}
}
async function buildTeamSummaryFingerprint(
teamsDir: string,
teamName: string,
optionKey: string
): Promise<TeamSummaryDependencyFingerprint> {
const teamDir = path.join(teamsDir, teamName);
const [
config,
teamMeta,
membersMeta,
launchState,
launchSummary,
bootstrapState,
bootstrapJournal,
] = await Promise.all([
statPathFingerprint(path.join(teamDir, 'config.json')),
statPathFingerprint(path.join(teamDir, 'team.meta.json')),
statPathFingerprint(path.join(teamDir, 'members.meta.json')),
statPathFingerprint(path.join(teamDir, TEAM_LAUNCH_STATE_FILE)),
statPathFingerprint(path.join(teamDir, TEAM_LAUNCH_SUMMARY_FILE)),
statPathFingerprint(path.join(teamDir, BOOTSTRAP_STATE_FILE)),
statPathFingerprint(path.join(teamDir, BOOTSTRAP_JOURNAL_FILE)),
]);
const inbox = await readInboxNamesFingerprint(path.join(teamDir, 'inboxes'));
const dependencyFingerprint = {
version: 1,
optionKey,
config,
teamMeta,
membersMeta,
launchState,
launchSummary,
bootstrapState,
bootstrapJournal,
inbox,
};
return {
value: fingerprintToString(dependencyFingerprint),
cacheSafe:
[
config,
teamMeta,
membersMeta,
launchState,
launchSummary,
bootstrapState,
bootstrapJournal,
].every(isCacheSafeFingerprint) && inbox.cacheSafe,
};
}
async function cacheTeamSummaryIfStable(
cacheKey: string,
teamsDir: string,
teamName: string,
optionKey: string,
fingerprintBefore: TeamSummaryDependencyFingerprint,
summary: Record<string, unknown>,
cacheAllowed: boolean,
diag: ListTeamsDiag
): Promise<void> {
if (!cacheAllowed) {
teamSummaryCache.delete(cacheKey);
diag.cacheWriteSkips++;
return;
}
if (!canCacheTeamSummary(summary)) {
teamSummaryCache.delete(cacheKey);
diag.cacheWriteSkips++;
return;
}
if (!fingerprintBefore.cacheSafe) {
diag.cacheWriteSkips++;
return;
}
const fingerprintAfter = await buildTeamSummaryFingerprint(teamsDir, teamName, optionKey);
if (!fingerprintAfter.cacheSafe || fingerprintAfter.value !== fingerprintBefore.value) {
diag.cacheWriteSkips++;
return;
}
teamSummaryCache.set(cacheKey, {
fingerprint: fingerprintAfter.value,
summary: cloneCached(summary),
teamsDir,
optionKey,
lastUsedAt: nowMs(),
});
}
function pruneTeamSummaryCache(
teamsDir: string,
optionKey: string,
liveTeamNames: ReadonlySet<string>,
diag: ListTeamsDiag
): void {
for (const [key, entry] of teamSummaryCache) {
if (entry.teamsDir === teamsDir && entry.optionKey === optionKey) {
const teamName = key.split('\0')[1] ?? '';
if (!liveTeamNames.has(teamName)) {
teamSummaryCache.delete(key);
diag.cacheEvictions++;
}
}
}
while (teamSummaryCache.size > TEAM_SUMMARY_CACHE_MAX_ENTRIES) {
const oldest = teamSummaryCache.keys().next().value;
if (typeof oldest !== 'string') break;
teamSummaryCache.delete(oldest);
diag.cacheEvictions++;
}
}
function makeTaskOptionKey(payload: GetAllTasksPayload): string {
return fingerprintToString({
maxTaskBytes: payload.maxTaskBytes,
maxTaskReadMs: payload.maxTaskReadMs,
});
}
function makeTaskCacheKey(
tasksBase: string,
teamName: string,
fileName: string,
optionKey: string
): string {
return `${tasksBase}\0${teamName}\0${fileName}\0${optionKey}`;
}
async function cacheTaskReadResultIfStable(
cacheKey: string,
taskPath: string,
tasksBase: string,
fingerprintBefore: string,
fingerprintBeforeCacheSafe: boolean,
result: CachedTaskReadResult,
taskDiag: TaskReadDiag
): Promise<void> {
if (!fingerprintBeforeCacheSafe) {
taskDiag.cacheWriteSkips++;
return;
}
const after = await statPathFingerprint(taskPath);
if (!isCacheSafeFingerprint(after) || fingerprintToString(after) !== fingerprintBefore) {
taskDiag.cacheWriteSkips++;
return;
}
taskFileCache.set(cacheKey, {
fingerprint: fingerprintBefore,
result: cloneCached(result),
tasksBase,
lastUsedAt: nowMs(),
});
}
function applyCachedTaskReadResult(
cached: CachedTaskReadResult,
tasks: unknown[],
taskDiag: TaskReadDiag
): void {
if (cached.skipReason) {
taskDiag.skipped++;
bumpSkipReason(taskDiag.skipReasons, cached.skipReason);
return;
}
tasks.push(cloneCached(cached.task));
}
function pruneTaskFileCache(
tasksBase: string,
liveCacheKeys: ReadonlySet<string>,
diag: GetAllTasksDiag
): void {
for (const [key, entry] of taskFileCache) {
if (entry.tasksBase === tasksBase && !liveCacheKeys.has(key)) {
taskFileCache.delete(key);
diag.cacheEvictions++;
}
}
while (taskFileCache.size > TASK_FILE_CACHE_MAX_ENTRIES) {
const oldest = taskFileCache.keys().next().value;
if (typeof oldest !== 'string') break;
taskFileCache.delete(oldest);
diag.cacheEvictions++;
}
}
// ---------------------------------------------------------------------------
// listTeams
// ---------------------------------------------------------------------------
@ -340,7 +719,7 @@ function dropCliProvisionerMembers(
async function readLaunchState(
teamsDir: string,
teamName: string
): Promise<ReturnType<typeof choosePreferredLaunchStateSummary>> {
): Promise<LaunchStateSummaryRead> {
const bootstrapSnapshot = await readBootstrapLaunchSnapshot(teamName);
const launchStatePath = path.join(teamsDir, teamName, TEAM_LAUNCH_STATE_FILE);
const launchSummaryPath = path.join(teamsDir, teamName, TEAM_LAUNCH_SUMMARY_FILE);
@ -371,11 +750,24 @@ async function readLaunchState(
})(),
]);
return choosePreferredLaunchStateSummary({
const summary = choosePreferredLaunchStateSummary({
bootstrapSnapshot,
launchSnapshot,
launchSummaryProjection,
});
if (launchSnapshot) {
return { summary, cacheable: true };
}
if (!bootstrapSnapshot) {
return { summary, cacheable: true };
}
if (
bootstrapSnapshot.launchPhase === 'finished' &&
bootstrapSnapshot.teamLaunchState !== 'partial_pending'
) {
return { summary, cacheable: true };
}
return { summary, cacheable: false };
}
/**
@ -465,6 +857,10 @@ async function listTeams(
skipped: 0,
skipReasons: {},
slowest: [],
cacheHits: 0,
cacheMisses: 0,
cacheWriteSkips: 0,
cacheEvictions: 0,
totalMs: 0,
};
@ -478,11 +874,26 @@ async function listTeams(
const teamDirs = entries.filter((e) => e.isDirectory());
diag.totalDirs = teamDirs.length;
const optionKey = makeTeamSummaryOptionKey(payload);
const liveTeamNames = new Set(teamDirs.map((entry) => entry.name));
const perTeam = await mapLimit(teamDirs, payload.concurrency, async (entry) => {
const teamName = entry.name;
const t0 = nowMs();
const configPath = path.join(payload.teamsDir, teamName, 'config.json');
const cacheKey = makeTeamSummaryCacheKey(payload.teamsDir, teamName, optionKey);
const dependencyFingerprint = await buildTeamSummaryFingerprint(
payload.teamsDir,
teamName,
optionKey
);
const cached = teamSummaryCache.get(cacheKey);
if (dependencyFingerprint.cacheSafe && cached?.fingerprint === dependencyFingerprint.value) {
cached.lastUsedAt = nowMs();
diag.cacheHits++;
return cloneCached(cached.summary);
}
diag.cacheMisses++;
const skip = (reason: string): null => {
diag.skipped++;
@ -496,12 +907,36 @@ async function listTeams(
} catch {
// Fallback: check for draft team (team.meta.json without config.json)
const draft = await readDraftTeamMeta(payload.teamsDir, teamName, payload);
if (draft) return draft;
if (draft) {
await cacheTeamSummaryIfStable(
cacheKey,
payload.teamsDir,
teamName,
optionKey,
dependencyFingerprint,
draft,
true,
diag
);
return draft;
}
return skip('config_stat_failed');
}
if (!stat.isFile()) {
const draft = await readDraftTeamMeta(payload.teamsDir, teamName, payload);
if (draft) return draft;
if (draft) {
await cacheTeamSummaryIfStable(
cacheKey,
payload.teamsDir,
teamName,
optionKey,
dependencyFingerprint,
draft,
true,
diag
);
return draft;
}
return skip('config_not_file');
}
if (stat.size > payload.maxConfigBytes) return skip('config_too_large');
@ -692,32 +1127,28 @@ async function listTeams(
leadProviderId,
members: metaRuntimeMembers,
});
const launchStateSummary =
(await readLaunchState(payload.teamsDir, teamName)) ??
(() => {
if (suppressLegacyLaunchArtifactHeuristic) {
return null;
}
if (
!leadSessionId ||
expectedTeammateNames.size === 0 ||
confirmedArtifactNames.size === 0
) {
return null;
}
const missingMembers = Array.from(expectedTeammateNames).filter(
(name) => !confirmedArtifactNames.has(name)
);
if (missingMembers.length === 0) {
return null;
}
return {
partialLaunchFailure: true as const,
expectedMemberCount: expectedTeammateNames.size,
confirmedMemberCount: confirmedArtifactNames.size,
missingMembers,
};
})();
const launchStateRead = await readLaunchState(payload.teamsDir, teamName);
const fallbackLaunchStateSummary = (): ReturnType<typeof choosePreferredLaunchStateSummary> => {
if (suppressLegacyLaunchArtifactHeuristic) {
return null;
}
if (!leadSessionId || expectedTeammateNames.size === 0 || confirmedArtifactNames.size === 0) {
return null;
}
const missingMembers = Array.from(expectedTeammateNames).filter(
(name) => !confirmedArtifactNames.has(name)
);
if (missingMembers.length === 0) {
return null;
}
return {
partialLaunchFailure: true as const,
expectedMemberCount: expectedTeammateNames.size,
confirmedMemberCount: confirmedArtifactNames.size,
missingMembers,
};
};
const launchStateSummary = launchStateRead.summary ?? fallbackLaunchStateSummary();
const summary = {
teamName,
displayName,
@ -741,10 +1172,21 @@ async function listTeams(
if (ms >= 250) {
pushSlowest(diag.slowest, { teamName, ms }, 10);
}
await cacheTeamSummaryIfStable(
cacheKey,
payload.teamsDir,
teamName,
optionKey,
dependencyFingerprint,
summary,
launchStateRead.cacheable,
diag
);
return summary;
});
const teams = perTeam.filter((t): t is NonNullable<typeof t> => t !== null);
pruneTeamSummaryCache(payload.teamsDir, optionKey, liveTeamNames, diag);
diag.returned = teams.length;
diag.totalMs = nowMs() - startedAt;
return { teams, diag };
@ -880,19 +1322,27 @@ async function readTasksDirForTeam(
tasksDir: string,
teamName: string,
payload: GetAllTasksPayload
): Promise<{ tasks: unknown[]; taskDiag: TaskReadDiag }> {
const taskDiag: TaskReadDiag = { skipped: 0, skipReasons: {} };
): Promise<{ tasks: unknown[]; taskDiag: TaskReadDiag; liveCacheKeys: Set<string> }> {
const taskDiag: TaskReadDiag = {
skipped: 0,
skipReasons: {},
cacheHits: 0,
cacheMisses: 0,
cacheWriteSkips: 0,
};
let entries: string[];
try {
entries = await fs.promises.readdir(tasksDir);
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return { tasks: [], taskDiag };
return { tasks: [], taskDiag, liveCacheKeys: new Set() };
}
throw error;
}
const tasks: unknown[] = [];
const liveCacheKeys = new Set<string>();
const optionKey = makeTaskOptionKey(payload);
for (const file of entries) {
if (
!file.endsWith('.json') ||
@ -904,25 +1354,61 @@ async function readTasksDirForTeam(
}
const taskPath = path.join(tasksDir, file);
const cacheKey = makeTaskCacheKey(payload.tasksBase, teamName, file, optionKey);
liveCacheKeys.add(cacheKey);
try {
const stat = await fs.promises.stat(taskPath);
if (!stat.isFile() || stat.size > payload.maxTaskBytes) {
const pathFingerprint = await statPathFingerprint(taskPath);
const taskSize = Number(pathFingerprint.size ?? Number.NaN);
if (
!pathFingerprint.isFile ||
!Number.isFinite(taskSize) ||
taskSize > payload.maxTaskBytes
) {
taskDiag.skipped++;
bumpSkipReason(taskDiag.skipReasons, 'task_not_file_or_large');
continue;
}
const fingerprint = fingerprintToString(pathFingerprint);
const fingerprintCacheSafe = isCacheSafeFingerprint(pathFingerprint);
const cached = taskFileCache.get(cacheKey);
if (fingerprintCacheSafe && cached?.fingerprint === fingerprint) {
cached.lastUsedAt = nowMs();
taskDiag.cacheHits++;
applyCachedTaskReadResult(cached.result, tasks, taskDiag);
continue;
}
taskDiag.cacheMisses++;
const stat = await fs.promises.stat(taskPath);
const raw = await readFileUtf8WithTimeout(taskPath, payload.maxTaskReadMs);
const parsed = JSON.parse(raw) as ParsedTask;
const metadata = parsed.metadata;
if (metadata?._internal === true) {
taskDiag.skipped++;
bumpSkipReason(taskDiag.skipReasons, 'task_internal');
await cacheTaskReadResultIfStable(
cacheKey,
taskPath,
payload.tasksBase,
fingerprint,
fingerprintCacheSafe,
{ skipReason: 'task_internal' },
taskDiag
);
continue;
}
if (parsed.status === 'deleted') {
taskDiag.skipped++;
bumpSkipReason(taskDiag.skipReasons, 'task_deleted');
await cacheTaskReadResultIfStable(
cacheKey,
taskPath,
payload.tasksBase,
fingerprint,
fingerprintCacheSafe,
{ skipReason: 'task_deleted' },
taskDiag
);
continue;
}
@ -962,7 +1448,7 @@ async function readTasksDirForTeam(
deriveReviewStateFromEvents(historyEvents) ??
normalizeFallbackReviewState(parsed.reviewState, status);
tasks.push({
const task = {
id: typeof parsed.id === 'string' || typeof parsed.id === 'number' ? String(parsed.id) : '',
displayId:
typeof parsed.displayId === 'string' && parsed.displayId.trim().length > 0
@ -1018,7 +1504,17 @@ async function readTasksDirForTeam(
? (parsed.sourceMessage as Record<string, unknown>)
: undefined,
teamName,
});
};
tasks.push(task);
await cacheTaskReadResultIfStable(
cacheKey,
taskPath,
payload.tasksBase,
fingerprint,
fingerprintCacheSafe,
{ task },
taskDiag
);
} catch (error) {
taskDiag.skipped++;
const code = (error as NodeJS.ErrnoException).code;
@ -1029,11 +1525,14 @@ async function readTasksDirForTeam(
}
}
}
return { tasks, taskDiag };
return { tasks, taskDiag, liveCacheKeys };
}
function mergeTaskDiag(target: GetAllTasksDiag, source: TaskReadDiag): void {
target.skipped += source.skipped;
target.cacheHits += source.cacheHits;
target.cacheMisses += source.cacheMisses;
target.cacheWriteSkips += source.cacheWriteSkips;
for (const [reason, count] of Object.entries(source.skipReasons)) {
target.skipReasons[reason] = (target.skipReasons[reason] || 0) + count;
}
@ -1052,6 +1551,10 @@ async function getAllTasks(
skipped: 0,
skipReasons: {},
slowestTeams: [],
cacheHits: 0,
cacheMisses: 0,
cacheWriteSkips: 0,
cacheEvictions: 0,
totalMs: 0,
};
@ -1068,13 +1571,21 @@ async function getAllTasks(
const dirs = entries.filter((e) => e.isDirectory());
diag.teamDirs = dirs.length;
const liveCacheKeys = new Set<string>();
const chunks = await mapLimit(dirs, payload.concurrency, async (entry) => {
const teamName = entry.name;
const t0 = nowMs();
try {
const tasksDir = path.join(payload.tasksBase, teamName);
const { tasks, taskDiag } = await readTasksDirForTeam(tasksDir, teamName, payload);
const {
tasks,
taskDiag,
liveCacheKeys: teamLiveCacheKeys,
} = await readTasksDirForTeam(tasksDir, teamName, payload);
for (const key of teamLiveCacheKeys) {
liveCacheKeys.add(key);
}
mergeTaskDiag(diag, taskDiag);
const ms = nowMs() - t0;
if (ms >= 250) {
@ -1089,6 +1600,7 @@ async function getAllTasks(
});
const tasks = chunks.flat();
pruneTaskFileCache(payload.tasksBase, liveCacheKeys, diag);
diag.returned = tasks.length;
diag.totalMs = nowMs() - startedAt;
return { tasks, diag };
@ -1105,6 +1617,19 @@ function post(msg: WorkerResponse): void {
parentPort?.on('message', async (msg: WorkerRequest) => {
const { id, op } = msg;
try {
if (op === 'warmup') {
post({
id,
ok: true,
result: {
ready: true,
teamSummaryCacheEntries: teamSummaryCache.size,
taskFileCacheEntries: taskFileCache.size,
},
diag: { op, totalMs: 0 },
});
return;
}
if (op === 'listTeams') {
const { teams, diag } = await listTeams(msg.payload);
post({ id, ok: true, result: teams, diag });

View file

@ -93,6 +93,25 @@ describe('TeamDataWorkerClient', () => {
client.dispose();
});
it('does not queue warmup behind an already running worker', async () => {
const { TeamDataWorkerClient } = await import(
'../../../../src/main/services/team/TeamDataWorkerClient'
);
const client = new TeamDataWorkerClient();
await client.getTeamData('my-team');
await client.prewarm();
expect(hoisted.workers).toHaveLength(1);
expect(hoisted.workers[0].messages).toHaveLength(1);
expect(hoisted.workers[0].messages[0]).toMatchObject({
op: 'getTeamData',
payload: { teamName: 'my-team' },
});
client.dispose();
});
it('sends best-effort team config invalidation to the worker', async () => {
const { TeamDataWorkerClient } = await import(
'../../../../src/main/services/team/TeamDataWorkerClient'

View file

@ -11,6 +11,7 @@ interface WorkerResponse {
id: string;
ok: boolean;
result?: unknown;
diag?: unknown;
error?: string;
}
@ -44,7 +45,11 @@ function createWorker(workerPath: string): Worker {
return new Worker(workerPath);
}
function callListTeams(worker: Worker, teamsDir: string): Promise<unknown[]> {
function callWorker(
worker: Worker,
op: string,
payload: Record<string, unknown> = {}
): Promise<{ result: unknown; diag?: unknown }> {
const requestId = `req-${Date.now()}`;
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
@ -72,29 +77,56 @@ function callListTeams(worker: Worker, teamsDir: string): Promise<unknown[]> {
reject(new Error(message.error || 'team-fs-worker returned an unknown error'));
return;
}
resolve(Array.isArray(message.result) ? message.result : []);
resolve({ result: message.result, diag: message.diag });
};
worker.on('message', onMessage);
worker.on('error', onError);
worker.postMessage({
id: requestId,
op: 'listTeams',
payload: {
teamsDir,
largeConfigBytes: 8 * 1024,
configHeadBytes: 4 * 1024,
maxConfigBytes: 256 * 1024,
maxConfigReadMs: 5_000,
maxMembersMetaBytes: 256 * 1024,
maxSessionHistoryInSummary: 10,
maxProjectPathHistoryInSummary: 10,
concurrency: 2,
},
});
worker.postMessage({ id: requestId, op, payload });
});
}
async function callListTeams(worker: Worker, teamsDir: string): Promise<{
teams: unknown[];
diag?: Record<string, unknown>;
}> {
const { result, diag } = await callWorker(worker, 'listTeams', {
teamsDir,
largeConfigBytes: 8 * 1024,
configHeadBytes: 4 * 1024,
maxConfigBytes: 256 * 1024,
maxConfigReadMs: 5_000,
maxMembersMetaBytes: 256 * 1024,
maxSessionHistoryInSummary: 10,
maxProjectPathHistoryInSummary: 10,
concurrency: 2,
});
return {
teams: Array.isArray(result) ? result : [],
diag: diag && typeof diag === 'object' ? (diag as Record<string, unknown>) : undefined,
};
}
async function callGetAllTasks(worker: Worker, tasksBase: string): Promise<{
tasks: unknown[];
diag?: Record<string, unknown>;
}> {
const { result, diag } = await callWorker(worker, 'getAllTasks', {
tasksBase,
maxTaskBytes: 256 * 1024,
maxTaskReadMs: 5_000,
concurrency: 2,
});
return {
tasks: Array.isArray(result) ? result : [],
diag: diag && typeof diag === 'object' ? (diag as Record<string, unknown>) : undefined,
};
}
async function callWarmup(worker: Worker): Promise<void> {
await callWorker(worker, 'warmup');
}
describe('team-fs-worker integration', () => {
let tempDir = '';
@ -183,7 +215,7 @@ describe('team-fs-worker integration', () => {
const worker = createWorker(workerPath);
try {
const teams = (await callListTeams(worker, tempDir)) as Array<Record<string, unknown>>;
const { teams } = await callListTeams(worker, tempDir);
expect(teams).toHaveLength(1);
expect(teams[0]).toMatchObject({
teamName,
@ -234,7 +266,7 @@ describe('team-fs-worker integration', () => {
const worker = createWorker(workerPath);
try {
const teams = (await callListTeams(worker, tempDir)) as Array<Record<string, unknown>>;
const { teams } = await callListTeams(worker, tempDir);
expect(teams).toHaveLength(1);
expect(teams[0]).toMatchObject({
teamName,
@ -247,4 +279,150 @@ describe('team-fs-worker integration', () => {
await worker.terminate();
}
});
it('prewarms and reuses unchanged team summaries by fingerprint', async () => {
const workerPath = await getWorkerPath();
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-'));
const teamName = 'cached-worker-team';
const teamDir = path.join(tempDir, teamName);
await fs.mkdir(path.join(teamDir, 'inboxes'), { recursive: true });
await fs.writeFile(
path.join(teamDir, 'config.json'),
JSON.stringify({
name: 'Cached Worker Team',
members: [{ name: 'team-lead', agentType: 'team-lead' }],
}),
'utf8'
);
await fs.writeFile(
path.join(teamDir, 'members.meta.json'),
JSON.stringify({ version: 1, members: [{ name: 'alice' }] }),
'utf8'
);
const worker = createWorker(workerPath);
try {
await callWarmup(worker);
const first = await callListTeams(worker, tempDir);
expect(first.teams[0]).toMatchObject({ teamName, memberCount: 1 });
expect(first.diag?.cacheMisses).toBe(1);
const second = await callListTeams(worker, tempDir);
expect(second.teams[0]).toMatchObject({ teamName, memberCount: 1 });
expect(second.diag?.cacheHits).toBe(1);
await fs.writeFile(
path.join(teamDir, 'members.meta.json'),
JSON.stringify({ version: 1, members: [{ name: 'alice' }, { name: 'bob' }] }),
'utf8'
);
const changed = await callListTeams(worker, tempDir);
expect(changed.teams[0]).toMatchObject({ teamName, memberCount: 2 });
expect(changed.diag?.cacheMisses).toBe(1);
} finally {
await worker.terminate();
}
});
it('does not cache pending launch summaries because liveness can change without file writes', async () => {
const workerPath = await getWorkerPath();
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-'));
const teamName = 'pending-launch-team';
const teamDir = path.join(tempDir, teamName);
await fs.mkdir(teamDir, { recursive: true });
await fs.writeFile(
path.join(teamDir, 'config.json'),
JSON.stringify({
name: 'Pending Launch Team',
members: [{ name: 'team-lead', agentType: 'team-lead' }],
}),
'utf8'
);
await fs.writeFile(
path.join(teamDir, 'launch-summary.json'),
JSON.stringify({
version: 1,
teamName,
updatedAt: '2026-05-02T12:00:00.000Z',
teamLaunchState: 'partial_pending',
expectedMemberCount: 1,
pendingCount: 1,
}),
'utf8'
);
const worker = createWorker(workerPath);
try {
const first = await callListTeams(worker, tempDir);
expect(first.teams[0]).toMatchObject({
teamName,
teamLaunchState: 'partial_pending',
pendingCount: 1,
});
expect(first.diag?.cacheMisses).toBe(1);
expect(first.diag?.cacheWriteSkips).toBe(1);
const second = await callListTeams(worker, tempDir);
expect(second.teams[0]).toMatchObject({
teamName,
teamLaunchState: 'partial_pending',
pendingCount: 1,
});
expect(second.diag?.cacheHits).toBe(0);
expect(second.diag?.cacheMisses).toBe(1);
expect(second.diag?.cacheWriteSkips).toBe(1);
} finally {
await worker.terminate();
}
});
it('reuses unchanged parsed tasks and rereads changed task files by fingerprint', async () => {
const workerPath = await getWorkerPath();
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-'));
const tasksBase = path.join(tempDir, 'tasks');
const teamName = 'task-cache-team';
const tasksDir = path.join(tasksBase, teamName);
await fs.mkdir(tasksDir, { recursive: true });
const taskPath = path.join(tasksDir, '1.json');
await fs.writeFile(
taskPath,
JSON.stringify({
id: '1',
subject: 'First subject',
status: 'pending',
createdAt: '2026-05-02T12:00:00.000Z',
}),
'utf8'
);
const worker = createWorker(workerPath);
try {
const first = await callGetAllTasks(worker, tasksBase);
expect(first.tasks[0]).toMatchObject({ teamName, subject: 'First subject' });
expect(first.diag?.cacheMisses).toBe(1);
const second = await callGetAllTasks(worker, tasksBase);
expect(second.tasks[0]).toMatchObject({ teamName, subject: 'First subject' });
expect(second.diag?.cacheHits).toBe(1);
await fs.writeFile(
taskPath,
JSON.stringify({
id: '1',
subject: 'Changed subject with a different size',
status: 'pending',
createdAt: '2026-05-02T12:00:00.000Z',
}),
'utf8'
);
const changed = await callGetAllTasks(worker, tasksBase);
expect(changed.tasks[0]).toMatchObject({
teamName,
subject: 'Changed subject with a different size',
});
expect(changed.diag?.cacheMisses).toBe(1);
} finally {
await worker.terminate();
}
});
});

View file

@ -0,0 +1,152 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
const hoisted = vi.hoisted(() => {
const skipResponsesForOps = new Set<string>();
const workers: Array<{
messages: unknown[];
handlers: Map<string, (value: unknown) => void>;
postMessage: (message: unknown) => void;
on: (event: string, handler: (value: unknown) => void) => void;
terminate: ReturnType<typeof vi.fn>;
}> = [];
const createMockWorker = vi.fn().mockImplementation(() => {
const worker = {
messages: [] as unknown[],
handlers: new Map<string, (value: unknown) => void>(),
postMessage(message: unknown) {
worker.messages.push(message);
const request = message as { id: string; op: string };
if (skipResponsesForOps.has(request.op)) return;
queueMicrotask(() => {
const handler = worker.handlers.get('message');
if (!handler) return;
handler({
id: request.id,
ok: true,
result: request.op === 'listTeams' || request.op === 'getAllTasks' ? [] : null,
diag: { op: request.op, totalMs: 0 },
});
});
},
on(event: string, handler: (value: unknown) => void) {
worker.handlers.set(event, handler);
},
terminate: vi.fn(async () => undefined),
};
workers.push(worker);
return worker;
});
return {
workers,
createMockWorker,
skipResponsesForOps,
};
});
vi.mock('node:fs', async () => {
const actual = await vi.importActual<typeof import('node:fs')>('node:fs');
return {
...actual,
existsSync: vi.fn(() => true),
};
});
vi.mock('node:worker_threads', () => ({
Worker: hoisted.createMockWorker,
default: {
Worker: hoisted.createMockWorker,
},
}));
describe('TeamFsWorkerClient', () => {
afterEach(() => {
vi.resetModules();
vi.clearAllMocks();
vi.useRealTimers();
hoisted.workers.length = 0;
hoisted.skipResponsesForOps.clear();
});
it('prewarms the worker without running a scan', async () => {
const { TeamFsWorkerClient } = await import(
'../../../../src/main/services/team/TeamFsWorkerClient'
);
const client = new TeamFsWorkerClient();
await client.prewarm();
expect(hoisted.workers).toHaveLength(1);
expect(hoisted.workers[0].messages).toHaveLength(1);
expect(hoisted.workers[0].messages[0]).toMatchObject({
op: 'warmup',
payload: {},
});
});
it('does not queue warmup behind an already running worker', async () => {
const { TeamFsWorkerClient } = await import(
'../../../../src/main/services/team/TeamFsWorkerClient'
);
const client = new TeamFsWorkerClient();
await client.listTeams({
largeConfigBytes: 8 * 1024,
configHeadBytes: 4 * 1024,
maxConfigBytes: 256 * 1024,
maxMembersMetaBytes: 256 * 1024,
maxSessionHistoryInSummary: 10,
maxProjectPathHistoryInSummary: 10,
});
await client.prewarm();
expect(hoisted.workers).toHaveLength(1);
expect(hoisted.workers[0].messages).toHaveLength(1);
expect(hoisted.workers[0].messages[0]).toMatchObject({
op: 'listTeams',
});
});
it('ignores stale worker exit after timeout when a replacement worker owns pending work', async () => {
vi.useFakeTimers();
hoisted.skipResponsesForOps.add('warmup');
hoisted.skipResponsesForOps.add('listTeams');
const { TeamFsWorkerClient } = await import(
'../../../../src/main/services/team/TeamFsWorkerClient'
);
const client = new TeamFsWorkerClient();
const prewarmResult = client.prewarm().catch((error: unknown) => error);
await vi.advanceTimersByTimeAsync(20_001);
const prewarmError = await prewarmResult;
expect(prewarmError).toBeInstanceOf(Error);
expect((prewarmError as Error).message).toContain('Worker call timeout');
expect(hoisted.workers).toHaveLength(1);
const listPromise = client.listTeams({
largeConfigBytes: 8 * 1024,
configHeadBytes: 4 * 1024,
maxConfigBytes: 256 * 1024,
maxMembersMetaBytes: 256 * 1024,
maxSessionHistoryInSummary: 10,
maxProjectPathHistoryInSummary: 10,
});
expect(hoisted.workers).toHaveLength(2);
const staleWorker = hoisted.workers[0];
const replacementWorker = hoisted.workers[1];
const listRequest = replacementWorker.messages[0] as { id: string };
staleWorker.handlers.get('exit')?.(1);
replacementWorker.handlers.get('message')?.({
id: listRequest.id,
ok: true,
result: [{ teamName: 'fresh-team', displayName: 'Fresh Team' }],
diag: { op: 'listTeams', totalMs: 1 },
});
await expect(listPromise).resolves.toEqual({
teams: [{ teamName: 'fresh-team', displayName: 'Fresh Team' }],
diag: { op: 'listTeams', totalMs: 1 },
});
});
});

View file

@ -206,6 +206,16 @@ function createPidusageStat(pid: number, memory: number) {
};
}
function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
function writeLaunchConfig(
teamName: string,
projectPath: string,
@ -619,6 +629,182 @@ describe('TeamProvisioningService', () => {
});
describe('getTeamAgentRuntimeSnapshot', () => {
it('dedupes concurrent runtime snapshot probes for the same team', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'alice', model: 'gpt-5.4-mini' },
],
})),
};
(svc as any).readPersistedRuntimeMembers = vi.fn(() => [
{
name: 'alice',
agentId: 'alice@runtime-team',
tmuxPaneId: '%1',
backendType: 'tmux',
},
]);
(svc as any).aliveRunByTeam.set('runtime-team', 'run-1');
(svc as any).runs.set('run-1', {
runId: 'run-1',
child: { pid: 111 },
request: { model: 'gpt-5.4' },
processKilled: false,
cancelRequested: false,
spawnContext: null,
});
const paneInfo = createDeferred<Map<string, { paneId: string; panePid: number }>>();
vi.mocked(listTmuxPaneRuntimeInfoForCurrentPlatform).mockReturnValueOnce(
paneInfo.promise as ReturnType<typeof listTmuxPaneRuntimeInfoForCurrentPlatform>
);
vi.mocked(pidusage).mockResolvedValueOnce({
'111': createPidusageStat(111, 123_000_000),
'222': createPidusageStat(222, 456_000_000),
} as any);
const first = svc.getTeamAgentRuntimeSnapshot('runtime-team');
const second = svc.getTeamAgentRuntimeSnapshot('runtime-team');
paneInfo.resolve(
new Map([
[
'%1',
{
paneId: '%1',
panePid: 222,
},
],
])
);
const [firstSnapshot, secondSnapshot] = await Promise.all([first, second]);
expect(listTmuxPaneRuntimeInfoForCurrentPlatform).toHaveBeenCalledTimes(1);
expect(pidusage).toHaveBeenCalledTimes(1);
expect(firstSnapshot.members.alice?.pid).toBe(222);
expect(secondSnapshot.members.alice?.pid).toBe(222);
});
it('does not cache live runtime metadata when invalidated while the probe is in flight', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'alice', model: 'gpt-5.4-mini' },
],
})),
};
const processRows = createDeferred<Awaited<ReturnType<typeof listRuntimeProcessesForCurrentTmuxPlatform>>>();
vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform)
.mockReturnValueOnce(processRows.promise)
.mockResolvedValueOnce([]);
const first = (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team') as Promise<
Map<string, unknown>
>;
(svc as any).invalidateRuntimeSnapshotCaches('runtime-team');
processRows.resolve([]);
await first;
await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team');
expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(2);
});
it('returns cloned live runtime metadata maps from cache', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'alice', model: 'gpt-5.4-mini' },
],
})),
};
vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform).mockResolvedValueOnce([]);
const first = (await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team')) as Map<
string,
unknown
>;
expect(first.has('alice')).toBe(true);
first.delete('alice');
const second = (await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team')) as Map<
string,
unknown
>;
expect(second.has('alice')).toBe(true);
expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(1);
});
it('clears runtime probe caches when starting a new run for the team', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'alice', model: 'gpt-5.4-mini' },
],
})),
};
vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform)
.mockResolvedValueOnce([])
.mockResolvedValueOnce([]);
await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team');
(svc as any).resetTeamScopedTransientStateForNewRun('runtime-team');
await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team');
expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(2);
});
it('does not cache a probe that started before runtime adapter evidence was installed', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'alice', providerId: 'opencode', model: 'gpt-5.4-mini' },
],
})),
};
(svc as any).provisioningRunByTeam.set('runtime-team', 'run-1');
const processRows = createDeferred<Awaited<ReturnType<typeof listRuntimeProcessesForCurrentTmuxPlatform>>>();
vi.mocked(listRuntimeProcessesForCurrentTmuxPlatform)
.mockReturnValueOnce(processRows.promise)
.mockResolvedValueOnce([]);
const first = (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team') as Promise<
Map<string, unknown>
>;
(svc as any).runtimeAdapterRunByTeam.set('runtime-team', {
runId: 'run-1',
providerId: 'opencode',
cwd: '/tmp/runtime-project',
members: {
alice: {
providerId: 'opencode',
runtimeAlive: true,
bootstrapConfirmed: false,
runtimePid: 333,
livenessKind: 'runtime_process',
pidSource: 'agent_process_table',
},
},
});
(svc as any).invalidateRuntimeSnapshotCaches('runtime-team');
processRows.resolve([]);
await first;
await (svc as any).getLiveTeamAgentRuntimeMetadata('runtime-team');
expect(listRuntimeProcessesForCurrentTmuxPlatform).toHaveBeenCalledTimes(2);
});
it('uses batched pidusage rss values for lead and teammates', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {