perf(team): reduce runtime telemetry polling

This commit is contained in:
777genius 2026-05-30 20:03:45 +03:00
parent b13ee56359
commit d59865f300
4 changed files with 310 additions and 15 deletions

View file

@ -3764,7 +3764,8 @@ export class TeamProvisioningService {
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
this.liveTeamAgentRuntimeMetadataInFlightByTeam.delete(teamName);
this.runtimeProcessRowsForUsageSnapshotByTeam.delete(teamName);
this.runtimeProcessUsageStatsCacheByPid.clear();
// CPU/RSS samples are TTL-bound and do not decide liveness; keeping them
// avoids repeated pidusage forks when launch-state churn invalidates snapshots.
}
private cloneMemberSpawnStatusesSnapshot(
@ -14232,7 +14233,10 @@ export class TeamProvisioningService {
];
usageStatsByPid = this.buildProcessUsageStatsFromRows(runtimeProcessRows, runtimeUsagePids);
const pidsMissingUsageStats = runtimeUsagePids.filter((pid) => !usageStatsByPid.has(pid));
if (pidsMissingUsageStats.length > 0) {
if (
pidsMissingUsageStats.length > 0 &&
this.shouldSampleMissingRuntimeUsageStatsWithPidusage()
) {
const sampledUsageStats = await this.readProcessUsageStatsByPid(pidsMissingUsageStats);
for (const [pid, stats] of sampledUsageStats) {
usageStatsByPid.set(pid, stats);
@ -14543,7 +14547,8 @@ export class TeamProvisioningService {
!usageStatsByPid.has(rssPid) &&
isSharedOpenCodeHost &&
typeof rssPid === 'number' &&
rssPid > 0
rssPid > 0 &&
this.isRuntimePidusageTelemetryEnabled()
) {
try {
const refreshedUsageStats = (
@ -26258,6 +26263,22 @@ export class TeamProvisioningService {
return usageStatsByPid;
}
private shouldSampleMissingRuntimeUsageStatsWithPidusage(): boolean {
if (!this.isRuntimePidusageTelemetryEnabled()) {
return false;
}
// CPU/RSS telemetry already comes from the enriched process table in the
// default path. If this opt-in is enabled, preserve the older fallback for
// missing rows across platforms.
return true;
}
private isRuntimePidusageTelemetryEnabled(): boolean {
const value = process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED?.trim().toLowerCase();
return value === '1' || value === 'true' || value === 'yes';
}
private async readProcessUsageStatsByPid(
pids: readonly number[],
cacheOptions: { ignoreCachedMisses?: boolean } = {}
@ -26290,6 +26311,9 @@ export class TeamProvisioningService {
if (pidsToRead.length === 0) {
return usageStatsByPid;
}
if (!this.isRuntimePidusageTelemetryEnabled()) {
return usageStatsByPid;
}
const rememberUsageStats = (
pid: number,

View file

@ -78,6 +78,9 @@ const INFERRED_RECORD_RANGE_AFTER_MS = 60_000;
const STREAM_LAYOUT_CACHE_TTL_MS = 1_000;
const STREAM_LAYOUT_BUILD_WARN_MS = 3_000;
const RUNTIME_FALLBACK_WARN_MS = 3_000;
const OPENCODE_RUNTIME_FALLBACK_HIT_CACHE_TTL_MS = 1_000;
const OPENCODE_RUNTIME_FALLBACK_MISS_CACHE_TTL_MS = 3_000;
const OPENCODE_RUNTIME_FALLBACK_CACHE_MAX_ENTRIES = 256;
const INFERRED_CANDIDATE_SELECTION_WARN_COUNT = 100;
const HISTORICAL_RAW_PROBE_WARN_MS = 3_000;
const HISTORICAL_RAW_PROBE_WARN_FILE_COUNT = 500;
@ -1658,6 +1661,19 @@ export class BoardTaskLogStreamService {
}
>();
private readonly openCodeRuntimeFallbackCache = new Map<
string,
{
expiresAt: number;
response: BoardTaskLogStreamResponse | null;
}
>();
private readonly openCodeRuntimeFallbackInFlight = new Map<
string,
Promise<BoardTaskLogStreamResponse | null>
>();
constructor(
private readonly recordSource: BoardTaskActivityRecordSource = new BoardTaskActivityRecordSource(),
private readonly summarySelector: BoardTaskExactLogSummarySelector = new BoardTaskExactLogSummarySelector(),
@ -1684,6 +1700,26 @@ export class BoardTaskLogStreamService {
return `${teamName}::${taskId}`;
}
private buildOpenCodeRuntimeFallbackCacheKey(teamName: string, taskId: string): string {
return `${teamName}::${taskId}`;
}
private pruneOpenCodeRuntimeFallbackCache(nowMs: number): void {
for (const [cacheKey, cached] of this.openCodeRuntimeFallbackCache) {
if (cached.expiresAt <= nowMs) {
this.openCodeRuntimeFallbackCache.delete(cacheKey);
}
}
while (this.openCodeRuntimeFallbackCache.size > OPENCODE_RUNTIME_FALLBACK_CACHE_MAX_ENTRIES) {
const oldestKey = this.openCodeRuntimeFallbackCache.keys().next().value;
if (oldestKey == null) {
break;
}
this.openCodeRuntimeFallbackCache.delete(oldestKey);
}
}
private getTranscriptDiscoveryGeneration(teamName: string): number {
const locator = this.transcriptSourceLocator as {
getGeneration?: (teamName: string) => number;
@ -2237,17 +2273,49 @@ export class BoardTaskLogStreamService {
teamName: string,
taskId: string
): Promise<BoardTaskLogStreamResponse | null> {
const startedAt = Date.now();
const fallback = await this.openCodeRuntimeFallbackSource.getTaskLogStream(teamName, taskId);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= RUNTIME_FALLBACK_WARN_MS) {
logger.warn(
`Slow task-log runtime fallback: team=${teamName} task=${taskId} hit=${Boolean(
fallback
)} elapsedMs=${elapsedMs}`
);
const cacheKey = this.buildOpenCodeRuntimeFallbackCacheKey(teamName, taskId);
const nowMs = Date.now();
const cached = this.openCodeRuntimeFallbackCache.get(cacheKey);
if (cached && cached.expiresAt > nowMs) {
return cached.response;
}
return fallback;
const existingInFlight = this.openCodeRuntimeFallbackInFlight.get(cacheKey);
if (existingInFlight) {
return existingInFlight;
}
const startedAt = Date.now();
const request = this.openCodeRuntimeFallbackSource
.getTaskLogStream(teamName, taskId)
.then((fallback) => {
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= RUNTIME_FALLBACK_WARN_MS) {
logger.warn(
`Slow task-log runtime fallback: team=${teamName} task=${taskId} hit=${Boolean(
fallback
)} elapsedMs=${elapsedMs}`
);
}
const cacheTtlMs = fallback
? OPENCODE_RUNTIME_FALLBACK_HIT_CACHE_TTL_MS
: OPENCODE_RUNTIME_FALLBACK_MISS_CACHE_TTL_MS;
this.openCodeRuntimeFallbackCache.set(cacheKey, {
expiresAt: Date.now() + cacheTtlMs,
response: fallback,
});
this.pruneOpenCodeRuntimeFallbackCache(Date.now());
return fallback;
})
.finally(() => {
if (this.openCodeRuntimeFallbackInFlight.get(cacheKey) === request) {
this.openCodeRuntimeFallbackInFlight.delete(cacheKey);
}
});
this.openCodeRuntimeFallbackInFlight.set(cacheKey, request);
return request;
}
private async loadCodexNativeTraceFallback(

View file

@ -2,9 +2,9 @@ import { afterEach, describe, expect, it, vi } from 'vitest';
import { BoardTaskLogStreamService } from '../../../../src/main/services/team/taskLogs/stream/BoardTaskLogStreamService';
import type { ParsedMessage } from '../../../../src/main/types';
import type { BoardTaskActivityRecord } from '../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityRecord';
import type { BoardTaskExactLogBundleCandidate } from '../../../../src/main/services/team/taskLogs/exact/BoardTaskExactLogTypes';
import type { ParsedMessage } from '../../../../src/main/types';
function makeRecord(
id: string,
@ -160,7 +160,116 @@ describe('BoardTaskLogStreamService', () => {
expect(await service.getTaskLogStreamSummary('demo', 'task-a')).toEqual({
segmentCount: 1,
});
expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(2);
expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(1);
});
it('dedupes concurrent OpenCode runtime fallback reads for the same task', async () => {
let resolveFallback: (response: {
participants: {
key: string;
label: string;
role: 'member';
isLead: false;
isSidechain: true;
}[];
defaultFilter: string;
segments: {
id: string;
participantKey: string;
actor: {
memberName: string;
role: 'member';
sessionId: string;
isSidechain: true;
};
startTimestamp: string;
endTimestamp: string;
chunks: { id: string }[];
}[];
source: 'opencode_runtime_fallback';
}) => void;
const fallbackPromise = new Promise<{
participants: {
key: string;
label: string;
role: 'member';
isLead: false;
isSidechain: true;
}[];
defaultFilter: string;
segments: {
id: string;
participantKey: string;
actor: {
memberName: string;
role: 'member';
sessionId: string;
isSidechain: true;
};
startTimestamp: string;
endTimestamp: string;
chunks: { id: string }[];
}[];
source: 'opencode_runtime_fallback';
}>((resolve) => {
resolveFallback = resolve;
});
const runtimeFallbackSource = {
getTaskLogStream: vi.fn(() => fallbackPromise),
};
const service = new BoardTaskLogStreamService(
{
getTaskRecords: vi.fn(async () => []),
} as never,
undefined as never,
undefined as never,
undefined as never,
undefined as never,
undefined as never,
undefined as never,
runtimeFallbackSource as never
);
const streamPromise = service.getTaskLogStream('demo', 'task-a');
const summaryPromise = service.getTaskLogStreamSummary('demo', 'task-a');
await vi.waitFor(() => {
expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(1);
});
resolveFallback!({
participants: [
{
key: 'member:alice',
label: 'alice',
role: 'member' as const,
isLead: false,
isSidechain: true,
},
],
defaultFilter: 'member:alice',
segments: [
{
id: 'opencode:segment-1',
participantKey: 'member:alice',
actor: {
memberName: 'alice',
role: 'member' as const,
sessionId: 'session-opencode',
isSidechain: true,
},
startTimestamp: '2026-04-21T10:00:00.000Z',
endTimestamp: '2026-04-21T10:01:00.000Z',
chunks: [{ id: 'chunk-1' }],
},
],
source: 'opencode_runtime_fallback' as const,
});
const [stream, summary] = await Promise.all([streamPromise, summaryPromise]);
expect(stream.segments).toHaveLength(1);
expect(summary).toEqual({ segmentCount: 1 });
expect(runtimeFallbackSource.getTaskLogStream).toHaveBeenCalledTimes(1);
});
it('merges OpenCode runtime stream when board transcript slices mask member execution', async () => {

View file

@ -202,6 +202,34 @@ import pidusage from 'pidusage';
const EXPECTED_RUNTIME_PIDUSAGE_OPTIONS =
process.platform === 'win32' ? { maxage: 10_000 } : { maxage: 0 };
const ORIGINAL_RUNTIME_PIDUSAGE_ENABLED = process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED;
function restoreRuntimePidusageTelemetryEnv() {
if (ORIGINAL_RUNTIME_PIDUSAGE_ENABLED === undefined) {
delete process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED;
return;
}
process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = ORIGINAL_RUNTIME_PIDUSAGE_ENABLED;
}
function withRuntimePidusageTelemetryEnv(
value: string | undefined,
callback: () => Promise<void>
): Promise<void> {
const previous = process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED;
if (value === undefined) {
delete process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED;
} else {
process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = value;
}
return callback().finally(() => {
if (previous === undefined) {
delete process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED;
} else {
process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = previous;
}
});
}
function allowConsoleLogs() {
vi.spyOn(console, 'error').mockImplementation(() => {});
@ -789,7 +817,9 @@ async function waitForFile(filePath: string, timeoutMs = 2_000): Promise<void> {
describe('TeamProvisioningService', () => {
beforeEach(() => {
process.env.CLAUDE_TEAM_RUNTIME_PIDUSAGE_ENABLED = '1';
vi.clearAllMocks();
vi.mocked(pidusage).mockReset();
vi.mocked(killTmuxPaneForCurrentPlatformSync).mockReset();
vi.mocked(listRuntimeProcessTableForCurrentPlatform).mockReset();
vi.mocked(listRuntimeProcessTableForCurrentPlatform).mockResolvedValue([]);
@ -818,6 +848,7 @@ describe('TeamProvisioningService', () => {
});
afterEach(() => {
restoreRuntimePidusageTelemetryEnv();
clearAutoResumeService();
vi.useRealTimers();
try {
@ -3657,6 +3688,66 @@ describe('TeamProvisioningService', () => {
});
});
it('skips pidusage by default when process table metrics are missing', async () => {
await withRuntimePidusageTelemetryEnv(undefined, async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'alice', model: 'gpt-5.4-mini' },
],
})),
};
(svc as any).readPersistedRuntimeMembers = vi.fn(() => [
{
name: 'alice',
agentId: 'alice@runtime-team',
tmuxPaneId: '%1',
backendType: 'tmux',
},
]);
(svc as any).aliveRunByTeam.set('runtime-team', 'run-1');
(svc as any).runs.set('run-1', {
runId: 'run-1',
child: { pid: 111 },
request: { model: 'gpt-5.4' },
processKilled: false,
cancelRequested: false,
spawnContext: null,
});
vi.mocked(listTmuxPaneRuntimeInfoForCurrentPlatform).mockResolvedValueOnce(
new Map([
[
'%1',
{
paneId: '%1',
panePid: 222,
currentCommand: 'codex',
},
],
])
);
vi.mocked(listRuntimeProcessTableForCurrentPlatform).mockResolvedValue([
{
pid: 999,
ppid: 1,
command: '/usr/bin/node unrelated.js',
cpuPercent: 1.5,
rssBytes: 12_000_000,
},
]);
const snapshot = await svc.getTeamAgentRuntimeSnapshot('runtime-team');
expect(pidusage).not.toHaveBeenCalled();
expect(snapshot.members['team-lead']).toMatchObject({ pid: 111 });
expect(snapshot.members['team-lead']?.rssBytes).toBeUndefined();
expect(snapshot.members.alice).toMatchObject({ pid: 222 });
expect(snapshot.members.alice?.rssBytes).toBeUndefined();
});
});
it('falls back to pidusage for root pids missing from an otherwise available process table', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {
@ -3724,6 +3815,8 @@ describe('TeamProvisioningService', () => {
});
it('captures CPU and memory history on runtime snapshots', async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date('2026-05-03T12:00:00.000Z'));
const svc = new TeamProvisioningService();
(svc as any).configReader = {
getConfig: vi.fn(async () => ({
@ -3769,6 +3862,7 @@ describe('TeamProvisioningService', () => {
]);
(svc as any).invalidateRuntimeSnapshotCaches('runtime-team');
vi.setSystemTime(new Date('2026-05-03T12:00:31.000Z'));
vi.mocked(pidusage).mockResolvedValueOnce({
'111': createPidusageStat(111, 130_000_000, 18),
} as any);