perf: cache runtime usage telemetry

This commit is contained in:
777genius 2026-05-29 12:29:37 +03:00
parent 7d21f9bd76
commit 3b0c2ed24b
2 changed files with 101 additions and 10 deletions

View file

@ -772,7 +772,7 @@ interface RuntimeProcessLoadStats extends RuntimeProcessUsageStats {
type RuntimeTelemetryProcessSource = 'native' | 'wsl' | 'windows-host';
interface RuntimeTelemetryProcessTableRow extends RuntimeProcessTableRow {
interface RuntimeTelemetryProcessTableRow extends RuntimeProcessTableRow, RuntimeProcessUsageStats {
runtimeTelemetrySource?: RuntimeTelemetryProcessSource;
}
@ -3294,6 +3294,7 @@ export class TeamProvisioningService {
private static readonly MAX_RUNTIME_USAGE_PIDS_PER_SNAPSHOT = 512;
private static readonly RUNTIME_PROCESS_TABLE_TIMEOUT_MS = 1_500;
private static readonly RUNTIME_WINDOWS_PROCESS_TABLE_TIMEOUT_MS = 1_500;
private static readonly RUNTIME_PROCESS_USAGE_CACHE_TTL_MS = 30_000;
private static readonly RUNTIME_PIDUSAGE_BATCH_TIMEOUT_MS = 2_000;
private static readonly RUNTIME_PIDUSAGE_SINGLE_TIMEOUT_MS = 750;
private static readonly RUNTIME_PIDUSAGE_FALLBACK_CONCURRENCY = 16;
@ -3396,6 +3397,13 @@ export class TeamProvisioningService {
includesWindowsHostRows: boolean;
}
>();
private readonly runtimeProcessUsageStatsCacheByPid = new Map<
number,
{
expiresAtMs: number;
stats: RuntimeProcessUsageStats | null;
}
>();
private readonly agentRuntimeSnapshotInFlightByTeam = new Map<
string,
{
@ -14095,7 +14103,14 @@ export class TeamProvisioningService {
const runtimeUsagePids = [
...new Set([...runtimeUsageTreesByRootPid.values()].flatMap((tree) => tree.pids)),
];
usageStatsByPid = await this.readProcessUsageStatsByPid(runtimeUsagePids);
usageStatsByPid = this.buildProcessUsageStatsFromRows(runtimeProcessRows, runtimeUsagePids);
const pidsMissingUsageStats = runtimeUsagePids.filter((pid) => !usageStatsByPid.has(pid));
if (pidsMissingUsageStats.length > 0) {
const sampledUsageStats = await this.readProcessUsageStatsByPid(pidsMissingUsageStats);
for (const [pid, stats] of sampledUsageStats) {
usageStatsByPid.set(pid, stats);
}
}
} catch (error) {
logger.debug(
`[${teamName}] Runtime telemetry sampling failed; continuing without resource metrics: ${
@ -25663,10 +25678,12 @@ export class TeamProvisioningService {
candidate.runtimeTelemetrySource === 'windows-host'
? candidate.runtimeTelemetrySource
: undefined);
const usageStats = this.normalizeRuntimeProcessUsageStats(candidate);
normalizedRows.push({
pid: Math.floor(pid),
ppid: Math.floor(ppid),
command,
...(usageStats ?? {}),
...(runtimeTelemetrySource ? { runtimeTelemetrySource } : {}),
});
}
@ -26038,6 +26055,28 @@ export class TeamProvisioningService {
}
}
private buildProcessUsageStatsFromRows(
processRows: readonly RuntimeTelemetryProcessTableRow[] | null,
pids: readonly number[]
): Map<number, RuntimeProcessUsageStats> {
const usageStatsByPid = new Map<number, RuntimeProcessUsageStats>();
const requestedPids = new Set(pids.filter((pid) => Number.isFinite(pid) && pid > 0));
if (!Array.isArray(processRows) || requestedPids.size === 0) {
return usageStatsByPid;
}
for (const row of processRows) {
if (!requestedPids.has(row.pid)) {
continue;
}
const usageStats = this.normalizeRuntimeProcessUsageStats(row);
if (usageStats) {
usageStatsByPid.set(row.pid, usageStats);
}
}
return usageStatsByPid;
}
private async readProcessUsageStatsByPid(
pids: readonly number[]
): Promise<Map<number, RuntimeProcessUsageStats>> {
@ -26048,20 +26087,57 @@ export class TeamProvisioningService {
}
const usageStatsByPid = new Map<number, RuntimeProcessUsageStats>();
const pidsToRead: number[] = [];
const now = Date.now();
for (const pid of uniquePids) {
const cached = this.runtimeProcessUsageStatsCacheByPid.get(pid);
if (cached && cached.expiresAtMs > now) {
if (cached.stats) {
usageStatsByPid.set(pid, { ...cached.stats });
}
continue;
}
pidsToRead.push(pid);
}
if (pidsToRead.length === 0) {
return usageStatsByPid;
}
const rememberUsageStats = (
pid: number,
stats: RuntimeProcessUsageStats | null | undefined
): void => {
const normalized = stats ? { ...stats } : null;
this.runtimeProcessUsageStatsCacheByPid.set(pid, {
expiresAtMs: Date.now() + TeamProvisioningService.RUNTIME_PROCESS_USAGE_CACHE_TTL_MS,
stats: normalized,
});
if (normalized) {
usageStatsByPid.set(pid, { ...normalized });
}
};
const options = RUNTIME_PIDUSAGE_OPTIONS;
try {
const statsByPid = await this.withRuntimeTelemetryTimeout(
pidusage(uniquePids, options),
pidusage(pidsToRead, options),
TeamProvisioningService.RUNTIME_PIDUSAGE_BATCH_TIMEOUT_MS,
'pidusage batch runtime telemetry'
);
const observedPids = new Set<number>();
for (const [rawPid, stat] of Object.entries(
statsByPid && typeof statsByPid === 'object' ? statsByPid : {}
)) {
const pid = Number.parseInt(rawPid, 10);
const usageStats = this.normalizeRuntimeProcessUsageStats(stat);
if (Number.isFinite(pid) && pid > 0 && usageStats) {
usageStatsByPid.set(pid, usageStats);
if (Number.isFinite(pid) && pid > 0) {
observedPids.add(pid);
rememberUsageStats(pid, usageStats);
}
}
for (const pid of pidsToRead) {
if (!observedPids.has(pid)) {
rememberUsageStats(pid, null);
}
}
return usageStatsByPid;
@ -26079,10 +26155,10 @@ export class TeamProvisioningService {
for (
let offset = 0;
offset < uniquePids.length;
offset < pidsToRead.length;
offset += TeamProvisioningService.RUNTIME_PIDUSAGE_FALLBACK_CONCURRENCY
) {
const chunk = uniquePids.slice(
const chunk = pidsToRead.slice(
offset,
offset + TeamProvisioningService.RUNTIME_PIDUSAGE_FALLBACK_CONCURRENCY
);
@ -26095,13 +26171,12 @@ export class TeamProvisioningService {
`pidusage runtime telemetry pid=${pid}`
);
const usageStats = this.normalizeRuntimeProcessUsageStats(stat);
if (usageStats) {
usageStatsByPid.set(pid, usageStats);
}
rememberUsageStats(pid, usageStats);
} catch (error) {
if (error instanceof RuntimeTelemetryTimeoutError) {
logger.debug(error.message);
}
rememberUsageStats(pid, null);
// Process likely exited between discovery and sampling.
}
})

View file

@ -4740,6 +4740,22 @@ describe('TeamProvisioningService', () => {
expect(stats.get(333)).toEqual({ rssBytes: 123_000_000, cpuPercent: 7 });
});
it('caches runtime process usage stats for repeated reads', async () => {
const svc = new TeamProvisioningService();
const usageByPid: Record<string, ReturnType<typeof createPidusageStat>> = {
'111': createPidusageStat(111, 123_000_000, 7),
};
vi.mocked(pidusage).mockResolvedValueOnce(usageByPid);
const first = await (svc as any).readProcessUsageStatsByPid([111]);
const second = await (svc as any).readProcessUsageStatsByPid([111]);
expect(pidusage).toHaveBeenCalledTimes(1);
expect(pidusage).toHaveBeenCalledWith([111], EXPECTED_RUNTIME_PIDUSAGE_OPTIONS);
expect(first.get(111)).toEqual({ rssBytes: 123_000_000, cpuPercent: 7 });
expect(second.get(111)).toEqual({ rssBytes: 123_000_000, cpuPercent: 7 });
});
it('falls back to direct agent process lookup when tmux pane pid lookup is unavailable', async () => {
const svc = new TeamProvisioningService();
(svc as any).configReader = {