From d59865f300958f09089090fcfeb59eac3eb11479 Mon Sep 17 00:00:00 2001 From: 777genius Date: Sat, 30 May 2026 20:03:45 +0300 Subject: [PATCH] perf(team): reduce runtime telemetry polling --- .../services/team/TeamProvisioningService.ts | 30 ++++- .../stream/BoardTaskLogStreamService.ts | 88 ++++++++++++-- .../team/BoardTaskLogStreamService.test.ts | 113 +++++++++++++++++- .../team/TeamProvisioningService.test.ts | 94 +++++++++++++++ 4 files changed, 310 insertions(+), 15 deletions(-) diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index eb4a75ac..9655b0d5 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -3764,7 +3764,8 @@ export class TeamProvisioningService { this.liveTeamAgentRuntimeMetadataCache.delete(teamName); this.liveTeamAgentRuntimeMetadataInFlightByTeam.delete(teamName); this.runtimeProcessRowsForUsageSnapshotByTeam.delete(teamName); - this.runtimeProcessUsageStatsCacheByPid.clear(); + // CPU/RSS samples are TTL-bound and do not decide liveness; keeping them + // avoids repeated pidusage forks when launch-state churn invalidates snapshots. } private cloneMemberSpawnStatusesSnapshot( @@ -14232,7 +14233,10 @@ export class TeamProvisioningService { ]; usageStatsByPid = this.buildProcessUsageStatsFromRows(runtimeProcessRows, runtimeUsagePids); const pidsMissingUsageStats = runtimeUsagePids.filter((pid) => !usageStatsByPid.has(pid)); - if (pidsMissingUsageStats.length > 0) { + if ( + pidsMissingUsageStats.length > 0 && + this.shouldSampleMissingRuntimeUsageStatsWithPidusage() + ) { const sampledUsageStats = await this.readProcessUsageStatsByPid(pidsMissingUsageStats); for (const [pid, stats] of sampledUsageStats) { usageStatsByPid.set(pid, stats); @@ -14543,7 +14547,8 @@ export class TeamProvisioningService { !usageStatsByPid.has(rssPid) && isSharedOpenCodeHost && typeof rssPid === 'number' && - rssPid > 0 + rssPid > 0 && + this.isRuntimePidusageTelemetryEnabled() ) { try { const refreshedUsageStats = ( @@ -26258,6 +26263,22 @@ export class TeamProvisioningService { return usageStatsByPid; } + private shouldSampleMissingRuntimeUsageStatsWithPidusage(): boolean { + if (!this.isRuntimePidusageTelemetryEnabled()) { + return false; + } + + // CPU/RSS telemetry already comes from the enriched process table in the + // default path. If this opt-in is enabled, preserve the older fallback for + // missing rows across platforms. + return true; + } + + private isRuntimePidusageTelemetryEnabled(): boolean { + const value = process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED?.trim().toLowerCase(); + return value === '1' || value === 'true' || value === 'yes'; + } + private async readProcessUsageStatsByPid( pids: readonly number[], cacheOptions: { ignoreCachedMisses?: boolean } = {} @@ -26290,6 +26311,9 @@ export class TeamProvisioningService { if (pidsToRead.length === 0) { return usageStatsByPid; } + if (!this.isRuntimePidusageTelemetryEnabled()) { + return usageStatsByPid; + } const rememberUsageStats = ( pid: number, diff --git a/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts b/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts index 3c8976b1..ee88bfed 100644 --- a/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts +++ b/src/main/services/team/taskLogs/stream/BoardTaskLogStreamService.ts @@ -78,6 +78,9 @@ const INFERRED_RECORD_RANGE_AFTER_MS = 60_000; const STREAM_LAYOUT_CACHE_TTL_MS = 1_000; const STREAM_LAYOUT_BUILD_WARN_MS = 3_000; const RUNTIME_FALLBACK_WARN_MS = 3_000; +const OPENCODE_RUNTIME_FALLBACK_HIT_CACHE_TTL_MS = 1_000; +const OPENCODE_RUNTIME_FALLBACK_MISS_CACHE_TTL_MS = 3_000; +const OPENCODE_RUNTIME_FALLBACK_CACHE_MAX_ENTRIES = 256; const INFERRED_CANDIDATE_SELECTION_WARN_COUNT = 100; const HISTORICAL_RAW_PROBE_WARN_MS = 3_000; const HISTORICAL_RAW_PROBE_WARN_FILE_COUNT = 500; @@ -1658,6 +1661,19 @@ export class BoardTaskLogStreamService { } >(); + private readonly openCodeRuntimeFallbackCache = new Map< + string, + { + expiresAt: number; + response: BoardTaskLogStreamResponse | null; + } + >(); + + private readonly openCodeRuntimeFallbackInFlight = new Map< + string, + Promise + >(); + constructor( private readonly recordSource: BoardTaskActivityRecordSource = new BoardTaskActivityRecordSource(), private readonly summarySelector: BoardTaskExactLogSummarySelector = new BoardTaskExactLogSummarySelector(), @@ -1684,6 +1700,26 @@ export class BoardTaskLogStreamService { return `${teamName}::${taskId}`; } + private buildOpenCodeRuntimeFallbackCacheKey(teamName: string, taskId: string): string { + return `${teamName}::${taskId}`; + } + + private pruneOpenCodeRuntimeFallbackCache(nowMs: number): void { + for (const [cacheKey, cached] of this.openCodeRuntimeFallbackCache) { + if (cached.expiresAt <= nowMs) { + this.openCodeRuntimeFallbackCache.delete(cacheKey); + } + } + + while (this.openCodeRuntimeFallbackCache.size > OPENCODE_RUNTIME_FALLBACK_CACHE_MAX_ENTRIES) { + const oldestKey = this.openCodeRuntimeFallbackCache.keys().next().value; + if (oldestKey == null) { + break; + } + this.openCodeRuntimeFallbackCache.delete(oldestKey); + } + } + private getTranscriptDiscoveryGeneration(teamName: string): number { const locator = this.transcriptSourceLocator as { getGeneration?: (teamName: string) => number; @@ -2237,17 +2273,49 @@ export class BoardTaskLogStreamService { teamName: string, taskId: string ): Promise { - const startedAt = Date.now(); - const fallback = await this.openCodeRuntimeFallbackSource.getTaskLogStream(teamName, taskId); - const elapsedMs = Date.now() - startedAt; - if (elapsedMs >= RUNTIME_FALLBACK_WARN_MS) { - logger.warn( - `Slow task-log runtime fallback: team=${teamName} task=${taskId} hit=${Boolean( - fallback - )} elapsedMs=${elapsedMs}` - ); + const cacheKey = this.buildOpenCodeRuntimeFallbackCacheKey(teamName, taskId); + const nowMs = Date.now(); + const cached = this.openCodeRuntimeFallbackCache.get(cacheKey); + if (cached && cached.expiresAt > nowMs) { + return cached.response; } - return fallback; + + const existingInFlight = this.openCodeRuntimeFallbackInFlight.get(cacheKey); + if (existingInFlight) { + return existingInFlight; + } + + const startedAt = Date.now(); + const request = this.openCodeRuntimeFallbackSource + .getTaskLogStream(teamName, taskId) + .then((fallback) => { + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= RUNTIME_FALLBACK_WARN_MS) { + logger.warn( + `Slow task-log runtime fallback: team=${teamName} task=${taskId} hit=${Boolean( + fallback + )} elapsedMs=${elapsedMs}` + ); + } + + const cacheTtlMs = fallback + ? OPENCODE_RUNTIME_FALLBACK_HIT_CACHE_TTL_MS + : OPENCODE_RUNTIME_FALLBACK_MISS_CACHE_TTL_MS; + this.openCodeRuntimeFallbackCache.set(cacheKey, { + expiresAt: Date.now() + cacheTtlMs, + response: fallback, + }); + this.pruneOpenCodeRuntimeFallbackCache(Date.now()); + return fallback; + }) + .finally(() => { + if (this.openCodeRuntimeFallbackInFlight.get(cacheKey) === request) { + this.openCodeRuntimeFallbackInFlight.delete(cacheKey); + } + }); + + this.openCodeRuntimeFallbackInFlight.set(cacheKey, request); + return request; } private async loadCodexNativeTraceFallback( diff --git a/test/main/services/team/BoardTaskLogStreamService.test.ts b/test/main/services/team/BoardTaskLogStreamService.test.ts index 510db92b..8ea61127 100644 --- a/test/main/services/team/BoardTaskLogStreamService.test.ts +++ b/test/main/services/team/BoardTaskLogStreamService.test.ts @@ -2,9 +2,9 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import { BoardTaskLogStreamService } from '../../../../src/main/services/team/taskLogs/stream/BoardTaskLogStreamService'; -import type { ParsedMessage } from '../../../../src/main/types'; import type { BoardTaskActivityRecord } from '../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityRecord'; import type { BoardTaskExactLogBundleCandidate } from '../../../../src/main/services/team/taskLogs/exact/BoardTaskExactLogTypes'; +import type { ParsedMessage } from '../../../../src/main/types'; function makeRecord( id: string, @@ -160,7 +160,116 @@ describe('BoardTaskLogStreamService', () => { expect(await service.getTaskLogStreamSummary('demo', 'task-a')).toEqual({ segmentCount: 1, }); - expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(2); + expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(1); + }); + + it('dedupes concurrent OpenCode runtime fallback reads for the same task', async () => { + let resolveFallback: (response: { + participants: { + key: string; + label: string; + role: 'member'; + isLead: false; + isSidechain: true; + }[]; + defaultFilter: string; + segments: { + id: string; + participantKey: string; + actor: { + memberName: string; + role: 'member'; + sessionId: string; + isSidechain: true; + }; + startTimestamp: string; + endTimestamp: string; + chunks: { id: string }[]; + }[]; + source: 'opencode_runtime_fallback'; + }) => void; + const fallbackPromise = new Promise<{ + participants: { + key: string; + label: string; + role: 'member'; + isLead: false; + isSidechain: true; + }[]; + defaultFilter: string; + segments: { + id: string; + participantKey: string; + actor: { + memberName: string; + role: 'member'; + sessionId: string; + isSidechain: true; + }; + startTimestamp: string; + endTimestamp: string; + chunks: { id: string }[]; + }[]; + source: 'opencode_runtime_fallback'; + }>((resolve) => { + resolveFallback = resolve; + }); + const runtimeFallbackSource = { + getTaskLogStream: vi.fn(() => fallbackPromise), + }; + + const service = new BoardTaskLogStreamService( + { + getTaskRecords: vi.fn(async () => []), + } as never, + undefined as never, + undefined as never, + undefined as never, + undefined as never, + undefined as never, + undefined as never, + runtimeFallbackSource as never + ); + + const streamPromise = service.getTaskLogStream('demo', 'task-a'); + const summaryPromise = service.getTaskLogStreamSummary('demo', 'task-a'); + await vi.waitFor(() => { + expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(1); + }); + + resolveFallback!({ + participants: [ + { + key: 'member:alice', + label: 'alice', + role: 'member' as const, + isLead: false, + isSidechain: true, + }, + ], + defaultFilter: 'member:alice', + segments: [ + { + id: 'opencode:segment-1', + participantKey: 'member:alice', + actor: { + memberName: 'alice', + role: 'member' as const, + sessionId: 'session-opencode', + isSidechain: true, + }, + startTimestamp: '2026-04-21T10:00:00.000Z', + endTimestamp: '2026-04-21T10:01:00.000Z', + chunks: [{ id: 'chunk-1' }], + }, + ], + source: 'opencode_runtime_fallback' as const, + }); + + const [stream, summary] = await Promise.all([streamPromise, summaryPromise]); + expect(stream.segments).toHaveLength(1); + expect(summary).toEqual({ segmentCount: 1 }); + expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(1); }); it('merges OpenCode runtime stream when board transcript slices mask member execution', async () => { diff --git a/test/main/services/team/TeamProvisioningService.test.ts b/test/main/services/team/TeamProvisioningService.test.ts index 7d3efcc0..4c0a750c 100644 --- a/test/main/services/team/TeamProvisioningService.test.ts +++ b/test/main/services/team/TeamProvisioningService.test.ts @@ -202,6 +202,34 @@ import pidusage from 'pidusage'; const EXPECTED_RUNTIME_PIDUSAGE_OPTIONS = process.platform === 'win32' ? { maxage: 10_000 } : { maxage: 0 }; +const ORIGINAL_RUNTIME_PIDUSAGE_ENABLED = process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED; + +function restoreRuntimePidusageTelemetryEnv() { + if (ORIGINAL_RUNTIME_PIDUSAGE_ENABLED === undefined) { + delete process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED; + return; + } + process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = ORIGINAL_RUNTIME_PIDUSAGE_ENABLED; +} + +function withRuntimePidusageTelemetryEnv( + value: string | undefined, + callback: () => Promise +): Promise { + const previous = process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED; + if (value === undefined) { + delete process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED; + } else { + process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = value; + } + return callback().finally(() => { + if (previous === undefined) { + delete process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED; + } else { + process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = previous; + } + }); +} function allowConsoleLogs() { vi.spyOn(console, 'error').mockImplementation(() => {}); @@ -789,7 +817,9 @@ async function waitForFile(filePath: string, timeoutMs = 2_000): Promise { describe('TeamProvisioningService', () => { beforeEach(() => { + process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = '1'; vi.clearAllMocks(); + vi.mocked(pidusage).mockReset(); vi.mocked(killTmuxPaneForCurrentPlatformSync).mockReset(); vi.mocked(listRuntimeProcessTableForCurrentPlatform).mockReset(); vi.mocked(listRuntimeProcessTableForCurrentPlatform).mockResolvedValue([]); @@ -818,6 +848,7 @@ describe('TeamProvisioningService', () => { }); afterEach(() => { + restoreRuntimePidusageTelemetryEnv(); clearAutoResumeService(); vi.useRealTimers(); try { @@ -3657,6 +3688,66 @@ describe('TeamProvisioningService', () => { }); }); + it('skips pidusage by default when process table metrics are missing', async () => { + await withRuntimePidusageTelemetryEnv(undefined, async () => { + const svc = new TeamProvisioningService(); + (svc as any).configReader = { + getConfig: vi.fn(async () => ({ + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'alice', model: 'gpt-5.4-mini' }, + ], + })), + }; + (svc as any).readPersistedRuntimeMembers = vi.fn(() => [ + { + name: 'alice', + agentId: 'alice@runtime-team', + tmuxPaneId: '%1', + backendType: 'tmux', + }, + ]); + (svc as any).aliveRunByTeam.set('runtime-team', 'run-1'); + (svc as any).runs.set('run-1', { + runId: 'run-1', + child: { pid: 111 }, + request: { model: 'gpt-5.4' }, + processKilled: false, + cancelRequested: false, + spawnContext: null, + }); + vi.mocked(listTmuxPaneRuntimeInfoForCurrentPlatform).mockResolvedValueOnce( + new Map([ + [ + '%1', + { + paneId: '%1', + panePid: 222, + currentCommand: 'codex', + }, + ], + ]) + ); + vi.mocked(listRuntimeProcessTableForCurrentPlatform).mockResolvedValue([ + { + pid: 999, + ppid: 1, + command: '/usr/bin/node unrelated.js', + cpuPercent: 1.5, + rssBytes: 12_000_000, + }, + ]); + + const snapshot = await svc.getTeamAgentRuntimeSnapshot('runtime-team'); + + expect(pidusage).not.toHaveBeenCalled(); + expect(snapshot.members['team-lead']).toMatchObject({ pid: 111 }); + expect(snapshot.members['team-lead']?.rssBytes).toBeUndefined(); + expect(snapshot.members.alice).toMatchObject({ pid: 222 }); + expect(snapshot.members.alice?.rssBytes).toBeUndefined(); + }); + }); + it('falls back to pidusage for root pids missing from an otherwise available process table', async () => { const svc = new TeamProvisioningService(); (svc as any).configReader = { @@ -3724,6 +3815,8 @@ describe('TeamProvisioningService', () => { }); it('captures CPU and memory history on runtime snapshots', async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date('2026-05-03T12:00:00.000Z')); const svc = new TeamProvisioningService(); (svc as any).configReader = { getConfig: vi.fn(async () => ({ @@ -3769,6 +3862,7 @@ describe('TeamProvisioningService', () => { ]); (svc as any).invalidateRuntimeSnapshotCaches('runtime-team'); + vi.setSystemTime(new Date('2026-05-03T12:00:31.000Z')); vi.mocked(pidusage).mockResolvedValueOnce({ '111': createPidusageStat(111, 130_000_000, 18), } as any);