fix(team): harden launch state tracking

This commit is contained in:
777genius 2026-05-01 18:41:33 +03:00
parent 511ea75c4f
commit 44f609ab6a
11 changed files with 403 additions and 20 deletions

View file

@ -37,11 +37,27 @@ const LARGE_CONFIG_BYTES = 512 * 1024;
const CONFIG_HEAD_BYTES = 64 * 1024;
const MAX_CONFIG_READ_BYTES = 10 * 1024 * 1024; // 10MB hard limit for full config reads
const PER_TEAM_READ_TIMEOUT_MS = 5_000;
const GET_CONFIG_CACHE_TTL_MS = 750;
const GET_CONFIG_SLOW_READ_WARN_MS = 500;
const MAX_SESSION_HISTORY_IN_SUMMARY = 2000;
const MAX_PROJECT_PATH_HISTORY_IN_SUMMARY = 200;
const MAX_LAUNCH_STATE_BYTES = 32 * 1024;
const TEAM_LAUNCH_STATE_FILE = 'launch-state.json';
interface CachedTeamConfig {
value: TeamConfig;
expiresAt: number;
}
interface ConfigReadTiming {
teamName: string;
size: number | null;
statMs: number | null;
readMs: number | null;
parseMs: number | null;
totalMs: number;
}
function normalizeProjectPathCandidate(value: unknown): string | undefined {
if (typeof value !== 'string') {
return undefined;
@ -155,7 +171,14 @@ function withReadTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
});
}
function cloneConfig(config: TeamConfig): TeamConfig {
return structuredClone(config);
}
export class TeamConfigReader {
private static readonly configCacheByPath = new Map<string, CachedTeamConfig>();
private static readonly configReadInFlightByPath = new Map<string, Promise<TeamConfig | null>>();
constructor(
private readonly membersMetaStore: TeamMembersMetaStore = new TeamMembersMetaStore(),
private readonly teamMetaStore: TeamMetaStore = new TeamMetaStore()
@ -506,8 +529,77 @@ export class TeamConfigReader {
async getConfig(teamName: string): Promise<TeamConfig | null> {
const configPath = path.join(getTeamsBasePath(), teamName, 'config.json');
const now = Date.now();
const cached = TeamConfigReader.configCacheByPath.get(configPath);
if (cached && cached.expiresAt > now) {
return cloneConfig(cached.value);
}
const existingRead = TeamConfigReader.configReadInFlightByPath.get(configPath);
if (existingRead) {
return this.resolveConfigRead(teamName, configPath, existingRead);
}
const readPromise = this.readConfigFromDisk(teamName, configPath).then((config) => {
if (config) {
TeamConfigReader.configCacheByPath.set(configPath, {
value: cloneConfig(config),
expiresAt: Date.now() + GET_CONFIG_CACHE_TTL_MS,
});
}
return config;
});
TeamConfigReader.configReadInFlightByPath.set(configPath, readPromise);
try {
return await this.resolveConfigRead(teamName, configPath, readPromise);
} catch (error) {
return null;
} finally {
if (TeamConfigReader.configReadInFlightByPath.get(configPath) === readPromise) {
TeamConfigReader.configReadInFlightByPath.delete(configPath);
}
}
}
private async resolveConfigRead(
teamName: string,
configPath: string,
readPromise: Promise<TeamConfig | null>
): Promise<TeamConfig | null> {
try {
const config = await readPromise;
return config ? cloneConfig(config) : null;
} catch {
return null;
}
}
private async readConfigFromDisk(
teamName: string,
configPath: string
): Promise<TeamConfig | null> {
const startedAt = performance.now();
let size: number | null = null;
let statMs: number | null = null;
let readMs: number | null = null;
let parseMs: number | null = null;
const buildTiming = (): ConfigReadTiming => ({
teamName,
size,
statMs,
readMs,
parseMs,
totalMs: Math.round(performance.now() - startedAt),
});
try {
const statStartedAt = performance.now();
const stat = await fs.promises.stat(configPath);
statMs = Math.round(performance.now() - statStartedAt);
size = stat.size;
// Safety: refuse special files and huge/binary configs
if (!stat.isFile()) {
return null;
@ -519,19 +611,31 @@ export class TeamConfigReader {
return null;
}
const readStartedAt = performance.now();
const raw = await readFileUtf8WithTimeout(configPath, PER_TEAM_READ_TIMEOUT_MS);
readMs = Math.round(performance.now() - readStartedAt);
const parseStartedAt = performance.now();
const config = JSON.parse(raw) as TeamConfig;
parseMs = Math.round(performance.now() - parseStartedAt);
if (typeof config.name !== 'string' || config.name.trim() === '') {
return null;
}
const resolvedProjectPath = resolveProjectPathFromConfig(config);
return resolvedProjectPath ? { ...config, projectPath: resolvedProjectPath } : config;
const resolvedConfig = resolvedProjectPath
? { ...config, projectPath: resolvedProjectPath }
: config;
const totalMs = performance.now() - startedAt;
if (totalMs >= GET_CONFIG_SLOW_READ_WARN_MS) {
logger.warn(`[getConfig] slow read diag=${JSON.stringify(buildTiming())}`);
}
return resolvedConfig;
} catch (error) {
if (error instanceof FileReadTimeoutError) {
logger.warn(`[getConfig] ${error.message}`);
return null;
logger.warn(`[getConfig] ${error.message} diag=${JSON.stringify(buildTiming())}`);
}
return null;
throw error;
}
}
@ -557,6 +661,10 @@ export class TeamConfigReader {
}
const configPath = path.join(getTeamsBasePath(), teamName, 'config.json');
await fs.promises.writeFile(configPath, JSON.stringify(config, null, 2), 'utf8');
TeamConfigReader.configCacheByPath.set(configPath, {
value: cloneConfig(config),
expiresAt: Date.now() + GET_CONFIG_CACHE_TTL_MS,
});
return config;
}
}

View file

@ -112,6 +112,7 @@ const PROCESS_HEALTH_INTERVAL_MS = 2_000;
const TASK_MAP_YIELD_EVERY = 250;
const TASK_COMMENT_NOTIFICATION_SOURCE = 'system_notification';
const PASSIVE_USER_REPLY_LINK_WINDOW_MS = 15_000;
const MEMBER_RUNTIME_ADVISORY_SNAPSHOT_BUDGET_MS = 750;
const MIXED_TEAM_LIVE_MUTATION_BLOCK_MESSAGE =
'Live roster mutation on a running mixed team is not supported in V1. Stop the team, edit the roster, then relaunch.';
@ -467,6 +468,34 @@ export class TeamDataService {
this.memberRuntimeAdvisoryService = service;
}
private async getMemberRuntimeAdvisoriesForSnapshot(
teamName: string,
members: readonly Pick<TeamMemberSnapshot, 'name' | 'removedAt'>[]
): Promise<Map<string, NonNullable<TeamMemberSnapshot['runtimeAdvisory']>>> {
const request = this.memberRuntimeAdvisoryService.getMemberAdvisories(teamName, members);
const timeoutToken = Symbol('member-runtime-advisory-timeout');
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
const timeout = new Promise<typeof timeoutToken>((resolve) => {
timeoutHandle = setTimeout(resolve, MEMBER_RUNTIME_ADVISORY_SNAPSHOT_BUDGET_MS, timeoutToken);
});
const result = await Promise.race([request, timeout]);
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (result === timeoutToken) {
request.catch(() => {
/* background advisory refresh is best-effort */
});
logger.warn(
`getTeamData team=${teamName} member runtime advisories exceeded ${MEMBER_RUNTIME_ADVISORY_SNAPSHOT_BUDGET_MS}ms budget; continuing without advisories for this snapshot`
);
return new Map();
}
return result;
}
private async synthesizeLeadMemberIfMissing(
teamName: string,
config: TeamConfig,
@ -1300,10 +1329,7 @@ export class TeamDataService {
mark('resolveMembers');
try {
const runtimeAdvisories = await this.memberRuntimeAdvisoryService.getMemberAdvisories(
teamName,
members
);
const runtimeAdvisories = await this.getMemberRuntimeAdvisoriesForSnapshot(teamName, members);
for (const member of members) {
const advisory = runtimeAdvisories.get(member.name);
if (advisory) {

View file

@ -6,9 +6,10 @@ import { TeamMemberLogsFinder } from './TeamMemberLogsFinder';
import type { MemberRuntimeAdvisory, ResolvedTeamMember } from '@shared/types';
const LOOKBACK_MS = 10 * 60 * 1000;
const CACHE_TTL_MS = 5_000;
const CACHE_TTL_MS = 30_000;
const TAIL_BYTES = 64 * 1024;
const BATCH_WARN_MS = 200;
const ADVISORY_FETCH_CONCURRENCY = 2;
const QUOTA_EXHAUSTED_TOKENS = [
'exhausted your capacity',
'capacity exceeded',
@ -94,6 +95,28 @@ function classifyRetryReason(message: string | undefined): MemberRuntimeAdvisory
return 'backend_error';
}
async function mapLimit<T, R>(
items: readonly T[],
limit: number,
fn: (item: T) => Promise<R>
): Promise<R[]> {
const results = new Array<R>(items.length);
let index = 0;
const workerCount = Math.max(1, Math.min(limit, items.length));
const workers = new Array(workerCount).fill(0).map(async () => {
while (true) {
const currentIndex = index;
index += 1;
if (currentIndex >= items.length) {
return;
}
results[currentIndex] = await fn(items[currentIndex]);
}
});
await Promise.all(workers);
return results;
}
export class TeamMemberRuntimeAdvisoryService {
private readonly memberCache = new Map<string, CachedRuntimeAdvisory>();
private readonly teamBatchCacheByTeam = new Map<string, CachedTeamBatchAdvisories>();
@ -187,11 +210,13 @@ export class TeamMemberRuntimeAdvisoryService {
}
if (membersToFetch.length > 0) {
const fetched = await Promise.all(
membersToFetch.map(async (memberName) => {
const fetched = await mapLimit(
membersToFetch,
ADVISORY_FETCH_CONCURRENCY,
async (memberName) => {
const advisory = await this.findRecentMemberAdvisory(teamName, memberName);
return [memberName, advisory] as const;
})
}
);
const fetchedAt = Date.now();
for (const [memberName, advisory] of fetched) {

View file

@ -5122,6 +5122,21 @@ export class TeamProvisioningService {
return this.provisioningRunByTeam.get(teamName) ?? null;
}
private getResolvableProvisioningRunId(teamName: string): string | null {
const runId = this.getProvisioningRunId(teamName);
if (!runId) {
return null;
}
if (this.runs.has(runId) || this.runtimeAdapterProgressByRunId.has(runId)) {
return runId;
}
if (this.provisioningRunByTeam.get(teamName) === runId) {
this.provisioningRunByTeam.delete(teamName);
}
logger.debug(`[${teamName}] Cleared stale provisioning run id before launch: ${runId}`);
return null;
}
private getAliveRunId(teamName: string): string | null {
return this.aliveRunByTeam.get(teamName) ?? null;
}
@ -13411,7 +13426,7 @@ export class TeamProvisioningService {
onProgress: (progress: TeamProvisioningProgress) => void
): Promise<TeamCreateResponse> {
this.cleanedStoppedTeamOpenCodeRuntimeLanes.delete(request.teamName);
const existingProvisioningRunId = this.getProvisioningRunId(request.teamName);
const existingProvisioningRunId = this.getResolvableProvisioningRunId(request.teamName);
if (existingProvisioningRunId) {
return { runId: existingProvisioningRunId };
}
@ -14311,7 +14326,7 @@ export class TeamProvisioningService {
request: TeamLaunchRequest,
onProgress: (progress: TeamProvisioningProgress) => void
): Promise<TeamLaunchResponse> {
const existingProvisioningRunId = this.getProvisioningRunId(request.teamName);
const existingProvisioningRunId = this.getResolvableProvisioningRunId(request.teamName);
if (existingProvisioningRunId) {
return { runId: existingProvisioningRunId };
}

View file

@ -21,6 +21,16 @@ import type {
const logger = createLogger('Service:TeamTaskReader');
const MAX_TASK_FILE_BYTES = 2 * 1024 * 1024;
const ALL_TASKS_CACHE_TTL_MS = 500;
interface CachedAllTasks {
value: (TeamTask & { teamName: string })[];
expiresAt: number;
}
function cloneTasks<T>(tasks: T[]): T[] {
return structuredClone(tasks);
}
/**
* Normalise escaped newline sequences (`\\n`) that some MCP/CLI sources
@ -63,6 +73,9 @@ function normalizeTaskRefs(value: unknown): TaskRef[] | undefined {
}
export class TeamTaskReader {
private static allTasksCache: CachedAllTasks | null = null;
private static allTasksInFlight: Promise<(TeamTask & { teamName: string })[]> | null = null;
/**
* Returns the next available numeric task ID by scanning ALL task files
* (including _internal ones) to avoid ID collisions.
@ -433,6 +446,32 @@ export class TeamTaskReader {
}
async getAllTasks(): Promise<(TeamTask & { teamName: string })[]> {
const cached = TeamTaskReader.allTasksCache;
if (cached && cached.expiresAt > Date.now()) {
return cloneTasks(cached.value);
}
if (TeamTaskReader.allTasksInFlight) {
return cloneTasks(await TeamTaskReader.allTasksInFlight);
}
const request = this.readAllTasksUncached();
TeamTaskReader.allTasksInFlight = request;
try {
const tasks = await request;
TeamTaskReader.allTasksCache = {
value: cloneTasks(tasks),
expiresAt: Date.now() + ALL_TASKS_CACHE_TTL_MS,
};
return cloneTasks(tasks);
} finally {
if (TeamTaskReader.allTasksInFlight === request) {
TeamTaskReader.allTasksInFlight = null;
}
}
}
private async readAllTasksUncached(): Promise<(TeamTask & { teamName: string })[]> {
const worker = getTeamFsWorkerClient();
if (worker.isAvailable()) {
const startedAt = Date.now();

View file

@ -61,6 +61,14 @@ function shouldBreakExistingLock(lockPath: string, staleTimeoutMs: number): bool
return info.ageMs !== null && info.ageMs > staleTimeoutMs;
}
function removeLockPath(lockPath: string): void {
try {
fs.rmSync(lockPath, { recursive: true, force: true });
} catch {
/* another process may have cleaned it */
}
}
function tryAcquire(lockPath: string, options: Required<FileLockOptions>): boolean {
try {
const dir = path.dirname(lockPath);
@ -72,13 +80,10 @@ function tryAcquire(lockPath: string, options: Required<FileLockOptions>): boole
fs.closeSync(fd);
return true;
} catch (err) {
if ((err as NodeJS.ErrnoException).code === 'EEXIST') {
const code = (err as NodeJS.ErrnoException).code;
if (code === 'EEXIST' || code === 'EISDIR') {
if (shouldBreakExistingLock(lockPath, options.staleTimeoutMs)) {
try {
fs.unlinkSync(lockPath);
} catch {
/* another process may have cleaned it */
}
removeLockPath(lockPath);
}
return false;
}

View file

@ -1,4 +1,5 @@
import * as fs from 'fs/promises';
import * as nodeFs from 'fs';
import * as os from 'os';
import * as path from 'path';
@ -254,4 +255,35 @@ describe('TeamConfigReader', () => {
pendingCreate: true,
});
});
it('shares in-flight getConfig reads and returns cloned cached configs', async () => {
const teamName = 'cached-config-team';
const teamDir = path.join(tempDir, teamName);
await fs.mkdir(teamDir, { recursive: true });
await fs.writeFile(
path.join(teamDir, 'config.json'),
JSON.stringify({
name: 'Cached Config Team',
projectPath: tempDir,
members: [{ name: 'team-lead', agentType: 'team-lead' }],
}),
'utf8'
);
const readFileSpy = vi.spyOn(nodeFs.promises, 'readFile');
const reader = new TeamConfigReader();
const [first, second] = await Promise.all([
reader.getConfig(teamName),
reader.getConfig(teamName),
]);
if (!first) {
throw new Error('Expected config to load.');
}
first.name = 'Mutated In Caller';
const third = await reader.getConfig(teamName);
expect(second?.name).toBe('Cached Config Team');
expect(third?.name).toBe('Cached Config Team');
expect(readFileSpy).toHaveBeenCalledTimes(1);
});
});

View file

@ -4520,6 +4520,30 @@ describe('TeamDataService', () => {
]);
});
it('does not block the team snapshot on slow runtime advisories', async () => {
vi.useFakeTimers();
const deferred = createDeferred<Map<string, unknown>>();
try {
const harness = createGetTeamDataHarness({
resolveMembers: () => [buildResolvedMember('alice')],
getMemberAdvisories: async () => deferred.promise,
});
const pending = harness.service.getTeamData('my-team');
await vi.advanceTimersByTimeAsync(751);
const data = await pending;
expect(harness.advisoryService.getMemberAdvisories).toHaveBeenCalledTimes(1);
expect(data.members).toEqual([expect.objectContaining({ name: 'alice' })]);
expect(data.members[0]?.runtimeAdvisory).toBeUndefined();
deferred.resolve(new Map());
await Promise.resolve();
} finally {
vi.useRealTimers();
}
});
it('synthesizes a team lead from team meta when config and members meta have no lead entry', async () => {
const harness = createGetTeamDataHarness({
config: {

View file

@ -386,6 +386,42 @@ describe('TeamMemberRuntimeAdvisoryService', () => {
expect(Array.from(advisories.keys())).toEqual(['Alice', 'Bob', 'Charlie']);
});
it('limits concurrent member advisory log scans', async () => {
const { service, logsFinder } = createStubbedServiceHarness();
let activeScans = 0;
let maxActiveScans = 0;
const activeGates: Deferred<void>[] = [];
logsFinder.findMemberLogs.mockImplementation(async (_teamName: string, memberName: string) => {
activeScans += 1;
maxActiveScans = Math.max(maxActiveScans, activeScans);
const gate = createDeferred<void>();
activeGates.push(gate);
await gate.promise;
activeScans -= 1;
return [{ filePath: `/logs/${memberName}.jsonl` }];
});
const request = service.getMemberAdvisories('signal-ops', [
buildMember('Alice'),
buildMember('Bob'),
buildMember('Charlie'),
buildMember('Tom'),
]);
await vi.waitFor(() => {
expect(logsFinder.findMemberLogs).toHaveBeenCalledTimes(2);
});
expect(maxActiveScans).toBe(2);
activeGates.splice(0).forEach((gate) => gate.resolve());
await vi.waitFor(() => {
expect(logsFinder.findMemberLogs).toHaveBeenCalledTimes(4);
});
activeGates.splice(0).forEach((gate) => gate.resolve());
await request;
expect(maxActiveScans).toBeLessThanOrEqual(2);
});
it('caches null advisory batches and avoids repeated lookups within ttl', async () => {
const { service, logsFinder } = createStubbedServiceHarness();
logsFinder.findMemberLogs.mockResolvedValue([]);

View file

@ -78,6 +78,65 @@ describe('TeamProvisioningService idempotent launch guards', () => {
expect(response.runId).toBe(aliveRun.runId);
});
it('does not expose unresolved internal provisioning ids', () => {
const teamName = 'team-alpha';
const svc = new TeamProvisioningService();
(svc as any).provisioningRunByTeam.set(teamName, 'pending-stale-run');
expect((svc as any).getResolvableProvisioningRunId(teamName)).toBeNull();
expect((svc as any).provisioningRunByTeam.get(teamName)).toBeUndefined();
});
it('keeps runtime adapter provisioning ids while their progress is still tracked', () => {
const teamName = 'team-alpha';
const runId = 'runtime-adapter-run-1';
const svc = new TeamProvisioningService();
(svc as any).provisioningRunByTeam.set(teamName, runId);
(svc as any).runtimeAdapterProgressByRunId.set(runId, { runId, state: 'launching' });
expect((svc as any).getResolvableProvisioningRunId(teamName)).toBe(runId);
expect((svc as any).provisioningRunByTeam.get(teamName)).toBe(runId);
});
it('clears stale pending provisioning ids before reusing an alive run', async () => {
const teamName = 'team-alpha';
const teamDir = path.join(tempTeamsBase, teamName);
fs.mkdirSync(teamDir, { recursive: true });
fs.writeFileSync(
path.join(teamDir, 'config.json'),
JSON.stringify({
name: teamName,
projectPath: process.cwd(),
members: [{ name: 'team-lead', agentType: 'team-lead' }, { name: 'dev' }],
})
);
const svc = new TeamProvisioningService();
const aliveRun = {
runId: 'alive-run-1',
teamName,
request: { cwd: process.cwd() },
child: Object.assign(new EventEmitter(), {
stdin: { writable: true },
stdout: new EventEmitter(),
stderr: new EventEmitter(),
}),
processKilled: false,
cancelRequested: false,
};
(svc as any).provisioningRunByTeam.set(teamName, 'pending-stale-run');
(svc as any).runs.set(aliveRun.runId, aliveRun);
(svc as any).aliveRunByTeam.set(teamName, aliveRun.runId);
const response = await svc.launchTeam({ teamName, cwd: process.cwd() }, () => {});
expect(response.runId).toBe(aliveRun.runId);
expect((svc as any).provisioningRunByTeam.get(teamName)).toBeUndefined();
});
it('does not reuse an alive run when cwd differs', async () => {
const teamName = 'team-alpha';
const currentCwd = fs.mkdtempSync(path.join(tempClaudeRoot, 'current-'));

View file

@ -67,6 +67,20 @@ describe('withFileLock', () => {
expect(result).toBe('ok');
});
it('removes stale directory lock and acquires', async () => {
const lockPath = `${testFile}.lock`;
fs.mkdirSync(lockPath);
const staleDate = new Date(Date.now() - 60_000);
fs.utimesSync(lockPath, staleDate, staleDate);
const result = await withFileLock(testFile, async () => 'ok', {
staleTimeoutMs: 1_000,
});
expect(result).toBe('ok');
expect(fs.existsSync(lockPath)).toBe(false);
});
it('removes a fresh abandoned lock when the owner process is gone', async () => {
const lockPath = `${testFile}.lock`;
const abandonedPid = 424_242;