fix(stall-monitor): isolate hung scans
This commit is contained in:
parent
3f188f9367
commit
cb411adb19
9 changed files with 683 additions and 30 deletions
|
|
@ -1,6 +1,10 @@
|
|||
import { createLogger } from '@shared/utils/logger';
|
||||
|
||||
import type { TeamLogSourceTracker } from '../TeamLogSourceTracker';
|
||||
import type { TeamChangeEvent } from '@shared/types';
|
||||
|
||||
const logger = createLogger('Service:ActiveTeamRegistry');
|
||||
|
||||
interface TeamAliveProcessesReader {
|
||||
listAliveProcessTeams(): Promise<string[]>;
|
||||
}
|
||||
|
|
@ -23,6 +27,8 @@ function unrefBackgroundTimer(timer: ReturnType<typeof setInterval>): void {
|
|||
|
||||
export class ActiveTeamRegistry {
|
||||
private readonly activeTeams = new Set<string>();
|
||||
private readonly activationInFlight = new Set<string>();
|
||||
private activationGeneration = 0;
|
||||
private reconcileTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
constructor(
|
||||
|
|
@ -41,8 +47,7 @@ export class ActiveTeamRegistry {
|
|||
(event.type === 'lead-activity' && event.detail !== 'offline')
|
||||
) {
|
||||
if (!this.activeTeams.has(event.teamName)) {
|
||||
this.activeTeams.add(event.teamName);
|
||||
void this.teamLogSourceTracker.enableTracking(event.teamName, 'stall_monitor');
|
||||
void this.activateTeam(event.teamName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
@ -70,6 +75,7 @@ export class ActiveTeamRegistry {
|
|||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.activationGeneration += 1;
|
||||
if (this.reconcileTimer) {
|
||||
clearInterval(this.reconcileTimer);
|
||||
this.reconcileTimer = null;
|
||||
|
|
@ -85,6 +91,7 @@ export class ActiveTeamRegistry {
|
|||
}
|
||||
|
||||
async reconcile(): Promise<void> {
|
||||
const reconcileGeneration = this.activationGeneration;
|
||||
const aliveTeams = await this.teamDataService.listAliveProcessTeams();
|
||||
const aliveSet = new Set(aliveTeams);
|
||||
|
||||
|
|
@ -92,8 +99,7 @@ export class ActiveTeamRegistry {
|
|||
if (this.activeTeams.has(teamName)) {
|
||||
continue;
|
||||
}
|
||||
this.activeTeams.add(teamName);
|
||||
await this.teamLogSourceTracker.enableTracking(teamName, 'stall_monitor');
|
||||
await this.activateTeam(teamName, reconcileGeneration);
|
||||
}
|
||||
|
||||
for (const teamName of [...this.activeTeams]) {
|
||||
|
|
@ -104,4 +110,41 @@ export class ActiveTeamRegistry {
|
|||
await this.teamLogSourceTracker.disableTracking(teamName, 'stall_monitor');
|
||||
}
|
||||
}
|
||||
|
||||
private async activateTeam(
|
||||
teamName: string,
|
||||
expectedGeneration = this.activationGeneration
|
||||
): Promise<void> {
|
||||
if (expectedGeneration !== this.activationGeneration) {
|
||||
return;
|
||||
}
|
||||
if (this.activeTeams.has(teamName) || this.activationInFlight.has(teamName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.activationInFlight.add(teamName);
|
||||
const activationGeneration = this.activationGeneration;
|
||||
try {
|
||||
await this.teamLogSourceTracker.enableTracking(teamName, 'stall_monitor');
|
||||
if (activationGeneration !== this.activationGeneration) {
|
||||
await this.disableStaleActivation(teamName);
|
||||
return;
|
||||
}
|
||||
this.activeTeams.add(teamName);
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to enable stall-monitor tracking for ${teamName}: ${String(error)}`);
|
||||
} finally {
|
||||
this.activationInFlight.delete(teamName);
|
||||
}
|
||||
}
|
||||
|
||||
private async disableStaleActivation(teamName: string): Promise<void> {
|
||||
try {
|
||||
await this.teamLogSourceTracker.disableTracking(teamName, 'stall_monitor');
|
||||
} catch (error) {
|
||||
logger.warn(
|
||||
`Failed to disable stale stall-monitor tracking for ${teamName}: ${String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ import * as path from 'path';
|
|||
import { atomicWriteAsync } from '../atomicWrite';
|
||||
import { withFileLock } from '../fileLock';
|
||||
|
||||
import { getTeamTaskStallAlertCooldownMs } from './featureGates';
|
||||
|
||||
import type {
|
||||
TaskStallEvaluation,
|
||||
TaskStallJournalEntry,
|
||||
|
|
@ -15,7 +17,28 @@ function isValidState(value: unknown): value is TaskStallJournalState {
|
|||
return value === 'suspected' || value === 'alert_ready' || value === 'alerted';
|
||||
}
|
||||
|
||||
function parseTime(value: string | undefined): number | null {
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
const time = new Date(value).getTime();
|
||||
return Number.isFinite(time) ? time : null;
|
||||
}
|
||||
|
||||
export interface TeamTaskStallJournalOptions {
|
||||
alertCooldownMs?: number;
|
||||
}
|
||||
|
||||
export class TeamTaskStallJournal {
|
||||
private readonly alertCooldownMs: number;
|
||||
|
||||
constructor(options: TeamTaskStallJournalOptions = {}) {
|
||||
this.alertCooldownMs =
|
||||
options.alertCooldownMs != null && options.alertCooldownMs > 0
|
||||
? options.alertCooldownMs
|
||||
: getTeamTaskStallAlertCooldownMs();
|
||||
}
|
||||
|
||||
private getFilePath(teamName: string): string {
|
||||
return path.join(getTeamsBasePath(), teamName, 'stall-monitor-journal.json');
|
||||
}
|
||||
|
|
@ -67,6 +90,7 @@ export class TeamTaskStallJournal {
|
|||
epochKey,
|
||||
teamName: args.teamName,
|
||||
taskId: evaluation.taskId,
|
||||
...(evaluation.memberName ? { memberName: evaluation.memberName } : {}),
|
||||
branch: evaluation.branch,
|
||||
signal: evaluation.signal,
|
||||
state: 'suspected',
|
||||
|
|
@ -78,7 +102,23 @@ export class TeamTaskStallJournal {
|
|||
}
|
||||
|
||||
existing.updatedAt = args.now;
|
||||
if (evaluation.memberName) {
|
||||
existing.memberName = evaluation.memberName;
|
||||
}
|
||||
if (existing.state === 'alerted') {
|
||||
const nowMs = parseTime(args.now) ?? Date.now();
|
||||
const alertedAtMs = parseTime(existing.alertedAt);
|
||||
if (
|
||||
alertedAtMs != null &&
|
||||
alertedAtMs <= nowMs &&
|
||||
nowMs - alertedAtMs < this.alertCooldownMs
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
existing.state = 'alert_ready';
|
||||
existing.consecutiveScans += 1;
|
||||
readyEvaluations.push(evaluation);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -138,6 +178,9 @@ export class TeamTaskStallJournal {
|
|||
)
|
||||
.map((entry) => ({
|
||||
...entry,
|
||||
...(typeof entry.memberName === 'string' && entry.memberName.trim()
|
||||
? { memberName: entry.memberName }
|
||||
: {}),
|
||||
...(entry.alertedAt ? { alertedAt: entry.alertedAt } : {}),
|
||||
}));
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -26,6 +26,16 @@ interface TeamObservationState {
|
|||
lastActivationAtMs: number;
|
||||
}
|
||||
|
||||
interface TeamTaskStallMonitorOptions {
|
||||
scanTimeoutMs?: number;
|
||||
}
|
||||
|
||||
interface TeamTaskStallScanRun {
|
||||
cancelled: boolean;
|
||||
}
|
||||
|
||||
const DEFAULT_TEAM_TASK_STALL_SCAN_TIMEOUT_MS = 2 * 60_000;
|
||||
|
||||
function unrefBackgroundTimer(timer: ReturnType<typeof setTimeout>): void {
|
||||
const maybeTimer = timer as { unref?: () => void };
|
||||
maybeTimer.unref?.();
|
||||
|
|
@ -37,14 +47,21 @@ export class TeamTaskStallMonitor {
|
|||
private scanInFlight = false;
|
||||
private started = false;
|
||||
private readonly observationByTeam = new Map<string, TeamObservationState>();
|
||||
private readonly scanTimeoutMs: number;
|
||||
|
||||
constructor(
|
||||
private readonly registry: ActiveTeamRegistry,
|
||||
private readonly snapshotSource: TeamTaskStallSnapshotSource,
|
||||
private readonly policy: TeamTaskStallPolicy,
|
||||
private readonly journal: TeamTaskStallJournal,
|
||||
private readonly notifier: TeamTaskStallNotifier
|
||||
) {}
|
||||
private readonly notifier: TeamTaskStallNotifier,
|
||||
options: TeamTaskStallMonitorOptions = {}
|
||||
) {
|
||||
this.scanTimeoutMs = Math.max(
|
||||
1,
|
||||
options.scanTimeoutMs ?? DEFAULT_TEAM_TASK_STALL_SCAN_TIMEOUT_MS
|
||||
);
|
||||
}
|
||||
|
||||
start(): void {
|
||||
if (!isTeamTaskStallScannerEnabled()) {
|
||||
|
|
@ -127,38 +144,87 @@ export class TeamTaskStallMonitor {
|
|||
return;
|
||||
}
|
||||
this.scanInFlight = true;
|
||||
const scanRun: TeamTaskStallScanRun = { cancelled: false };
|
||||
try {
|
||||
const activeTeams = await this.registry.listActiveTeams();
|
||||
const activeSet = new Set(activeTeams);
|
||||
for (const teamName of [...this.observationByTeam.keys()]) {
|
||||
if (!activeSet.has(teamName)) {
|
||||
this.observationByTeam.delete(teamName);
|
||||
}
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
for (const teamName of activeTeams) {
|
||||
const observation = this.getOrCreateObservation(teamName, now.getTime());
|
||||
const startupAgeMs = now.getTime() - observation.firstSeenAtMs;
|
||||
if (startupAgeMs < getTeamTaskStallStartupGraceMs()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const activationAgeMs = now.getTime() - observation.lastActivationAtMs;
|
||||
if (activationAgeMs < getTeamTaskStallActivationGraceMs()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.scanTeam(teamName, now);
|
||||
}
|
||||
await this.runScanWithTimeout(scanRun);
|
||||
} catch (error) {
|
||||
logger.warn(`Task stall monitor scan failed: ${String(error)}`);
|
||||
} finally {
|
||||
scanRun.cancelled = true;
|
||||
this.scanInFlight = false;
|
||||
this.scheduleNextScan(getTeamTaskStallScanIntervalMs());
|
||||
}
|
||||
}
|
||||
|
||||
private async runScanWithTimeout(scanRun: TeamTaskStallScanRun): Promise<void> {
|
||||
let timeout: ReturnType<typeof setTimeout> | null = null;
|
||||
try {
|
||||
await Promise.race([
|
||||
this.runScanBody(scanRun),
|
||||
new Promise<never>((_, reject) => {
|
||||
timeout = setTimeout(() => {
|
||||
scanRun.cancelled = true;
|
||||
reject(new Error(`task stall monitor scan timed out after ${this.scanTimeoutMs}ms`));
|
||||
}, this.scanTimeoutMs);
|
||||
unrefBackgroundTimer(timeout);
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private shouldContinueScan(scanRun: TeamTaskStallScanRun): boolean {
|
||||
return this.started && !scanRun.cancelled;
|
||||
}
|
||||
|
||||
private async runScanBody(scanRun: TeamTaskStallScanRun): Promise<void> {
|
||||
const activeTeams = await this.registry.listActiveTeams();
|
||||
if (!this.shouldContinueScan(scanRun)) {
|
||||
return;
|
||||
}
|
||||
const activeSet = new Set(activeTeams);
|
||||
for (const teamName of [...this.observationByTeam.keys()]) {
|
||||
if (!activeSet.has(teamName)) {
|
||||
this.observationByTeam.delete(teamName);
|
||||
}
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const eligibleTeamNames: string[] = [];
|
||||
for (const teamName of activeTeams) {
|
||||
const observation = this.getOrCreateObservation(teamName, now.getTime());
|
||||
const startupAgeMs = now.getTime() - observation.firstSeenAtMs;
|
||||
if (startupAgeMs < getTeamTaskStallStartupGraceMs()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const activationAgeMs = now.getTime() - observation.lastActivationAtMs;
|
||||
if (activationAgeMs < getTeamTaskStallActivationGraceMs()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
eligibleTeamNames.push(teamName);
|
||||
}
|
||||
|
||||
if (!this.shouldContinueScan(scanRun) || eligibleTeamNames.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
eligibleTeamNames.map((teamName) => this.scanTeam(teamName, now, scanRun))
|
||||
);
|
||||
for (const [index, result] of results.entries()) {
|
||||
if (result.status === 'rejected' && this.shouldContinueScan(scanRun)) {
|
||||
logger.warn(
|
||||
`Task stall monitor scan failed for ${eligibleTeamNames[index]}: ${String(result.reason)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private getOrCreateObservation(teamName: string, nowMs: number): TeamObservationState {
|
||||
const existing = this.observationByTeam.get(teamName);
|
||||
if (existing) {
|
||||
|
|
@ -172,8 +238,15 @@ export class TeamTaskStallMonitor {
|
|||
return created;
|
||||
}
|
||||
|
||||
private async scanTeam(teamName: string, now: Date): Promise<void> {
|
||||
private async scanTeam(
|
||||
teamName: string,
|
||||
now: Date,
|
||||
scanRun: TeamTaskStallScanRun
|
||||
): Promise<void> {
|
||||
const snapshot = await this.snapshotSource.getSnapshot(teamName);
|
||||
if (!this.shouldContinueScan(scanRun)) {
|
||||
return;
|
||||
}
|
||||
if (!snapshot) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -203,6 +276,9 @@ export class TeamTaskStallMonitor {
|
|||
...(scopedTaskIds ? { scopeTaskIds: scopedTaskIds } : {}),
|
||||
now: now.toISOString(),
|
||||
});
|
||||
if (!this.shouldContinueScan(scanRun)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const alerts = readyEvaluations
|
||||
.map((evaluation) => this.buildAlert(snapshot, evaluation))
|
||||
|
|
@ -215,6 +291,9 @@ export class TeamTaskStallMonitor {
|
|||
const alertedEpochKeys = new Set<string>();
|
||||
if (openCodeRemediationEnabled) {
|
||||
const remediatedAlerts = await this.notifier.notifyOpenCodeOwners(teamName, alerts);
|
||||
if (!this.shouldContinueScan(scanRun)) {
|
||||
return;
|
||||
}
|
||||
for (const alert of remediatedAlerts) {
|
||||
alertedEpochKeys.add(alert.epochKey);
|
||||
}
|
||||
|
|
@ -223,6 +302,9 @@ export class TeamTaskStallMonitor {
|
|||
const leadFallbackAlerts = alerts.filter((alert) => !alertedEpochKeys.has(alert.epochKey));
|
||||
if (leadFallbackAlerts.length > 0 && isTeamTaskStallAlertsEnabled()) {
|
||||
await this.notifier.notifyLead(teamName, leadFallbackAlerts);
|
||||
if (!this.shouldContinueScan(scanRun)) {
|
||||
return;
|
||||
}
|
||||
for (const alert of leadFallbackAlerts) {
|
||||
alertedEpochKeys.add(alert.epochKey);
|
||||
}
|
||||
|
|
@ -233,6 +315,9 @@ export class TeamTaskStallMonitor {
|
|||
return;
|
||||
}
|
||||
|
||||
if (!this.shouldContinueScan(scanRun)) {
|
||||
return;
|
||||
}
|
||||
await Promise.all(
|
||||
alerts
|
||||
.filter((alert) => alertedEpochKeys.has(alert.epochKey))
|
||||
|
|
|
|||
|
|
@ -304,6 +304,7 @@ function buildOpenCodeNoProgressEpochKey(args: {
|
|||
|
||||
function buildAlertEvaluation(args: {
|
||||
task: TeamTask;
|
||||
memberName?: string;
|
||||
branch: TaskStallBranch;
|
||||
signal: TaskStallSignal;
|
||||
progressSignal?: TaskProgressSignal;
|
||||
|
|
@ -313,6 +314,7 @@ function buildAlertEvaluation(args: {
|
|||
return {
|
||||
status: 'alert',
|
||||
taskId: args.task.id,
|
||||
...(args.memberName ? { memberName: args.memberName } : {}),
|
||||
branch: args.branch,
|
||||
signal: args.signal,
|
||||
...(args.progressSignal ? { progressSignal: args.progressSignal } : {}),
|
||||
|
|
@ -330,6 +332,7 @@ function buildOpenCodeNoProgressAlertEvaluation(args: {
|
|||
return {
|
||||
status: 'alert',
|
||||
taskId: args.task.id,
|
||||
memberName: args.owner,
|
||||
branch: 'work',
|
||||
signal: 'mid_turn_after_touch',
|
||||
progressSignal: 'unknown',
|
||||
|
|
@ -488,6 +491,7 @@ export class TeamTaskStallPolicy {
|
|||
|
||||
return buildAlertEvaluation({
|
||||
task,
|
||||
memberName: task.owner,
|
||||
branch: 'work',
|
||||
signal,
|
||||
progressSignal: progressClassification.signal,
|
||||
|
|
@ -595,6 +599,7 @@ export class TeamTaskStallPolicy {
|
|||
|
||||
return buildAlertEvaluation({
|
||||
task,
|
||||
memberName: resolvedReviewer.reviewer,
|
||||
branch: 'review',
|
||||
signal,
|
||||
touch: reviewContext.lastMeaningfulTouch,
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ export interface ResolvedReviewer {
|
|||
export interface TaskStallEvaluation {
|
||||
status: TaskStallEvaluationStatus;
|
||||
taskId?: string;
|
||||
memberName?: string;
|
||||
branch?: TaskStallBranch;
|
||||
signal?: TaskStallSignal;
|
||||
progressSignal?: TaskProgressSignal;
|
||||
|
|
@ -135,6 +136,7 @@ export interface TaskStallJournalEntry {
|
|||
epochKey: string;
|
||||
teamName: string;
|
||||
taskId: string;
|
||||
memberName?: string;
|
||||
branch: TaskStallBranch;
|
||||
signal: TaskStallSignal;
|
||||
state: TaskStallJournalState;
|
||||
|
|
|
|||
|
|
@ -55,6 +55,10 @@ export function getTeamTaskStallActivationGraceMs(): number {
|
|||
return readInt(process.env.CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS, 60_000);
|
||||
}
|
||||
|
||||
export function getTeamTaskStallAlertCooldownMs(): number {
|
||||
return readInt(process.env.CLAUDE_TEAM_TASK_STALL_ALERT_COOLDOWN_MS, 10 * 60_000);
|
||||
}
|
||||
|
||||
export function getOpenCodeWeakStartStallThresholdMs(): number {
|
||||
// Shorter OpenCode threshold for "started work" comments that do not contain concrete progress.
|
||||
return readInt(process.env.CLAUDE_TEAM_OPENCODE_WEAK_START_STALL_THRESHOLD_MS, 100_000);
|
||||
|
|
|
|||
|
|
@ -3,6 +3,20 @@ import { describe, expect, it, vi } from 'vitest';
|
|||
import { ActiveTeamRegistry } from '../../../../../src/main/services/team/stallMonitor/ActiveTeamRegistry';
|
||||
|
||||
describe('ActiveTeamRegistry', () => {
|
||||
function createDeferred<T>(): {
|
||||
promise: Promise<T>;
|
||||
resolve: (value: T) => void;
|
||||
reject: (error: unknown) => void;
|
||||
} {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (error: unknown) => void;
|
||||
const promise = new Promise<T>((promiseResolve, promiseReject) => {
|
||||
resolve = promiseResolve;
|
||||
reject = promiseReject;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
it('activates a team on lead-activity and enables stall-monitor tracking', async () => {
|
||||
const tracker = {
|
||||
enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })),
|
||||
|
|
@ -99,6 +113,92 @@ describe('ActiveTeamRegistry', () => {
|
|||
await expect(registry.listActiveTeams()).resolves.toEqual(['beta']);
|
||||
});
|
||||
|
||||
it('retries activation when enabling stall-monitor tracking fails', async () => {
|
||||
const tracker = {
|
||||
enableTracking: vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error('tracker unavailable'))
|
||||
.mockResolvedValueOnce({ projectFingerprint: null, logSourceGeneration: null }),
|
||||
disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })),
|
||||
};
|
||||
const registry = new ActiveTeamRegistry(
|
||||
{ listAliveProcessTeams: vi.fn(async () => ['demo']) },
|
||||
tracker as never
|
||||
);
|
||||
|
||||
registry.noteTeamChange({
|
||||
type: 'lead-activity',
|
||||
teamName: 'demo',
|
||||
detail: 'active',
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(tracker.enableTracking).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
|
||||
'Failed to enable stall-monitor tracking for demo'
|
||||
);
|
||||
vi.mocked(console.warn).mockClear();
|
||||
await expect(registry.listActiveTeams()).resolves.toEqual([]);
|
||||
|
||||
await registry.reconcile();
|
||||
|
||||
expect(tracker.enableTracking).toHaveBeenCalledTimes(2);
|
||||
await expect(registry.listActiveTeams()).resolves.toEqual(['demo']);
|
||||
});
|
||||
|
||||
it('does not re-add a team when pending activation finishes after stop', async () => {
|
||||
const activation = createDeferred<{
|
||||
projectFingerprint: string | null;
|
||||
logSourceGeneration: string | null;
|
||||
}>();
|
||||
const tracker = {
|
||||
enableTracking: vi.fn(() => activation.promise),
|
||||
disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })),
|
||||
};
|
||||
const registry = new ActiveTeamRegistry(
|
||||
{ listAliveProcessTeams: vi.fn(async () => []) },
|
||||
tracker as never
|
||||
);
|
||||
|
||||
registry.noteTeamChange({
|
||||
type: 'lead-activity',
|
||||
teamName: 'demo',
|
||||
detail: 'active',
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(tracker.enableTracking).toHaveBeenCalledWith('demo', 'stall_monitor');
|
||||
});
|
||||
|
||||
await registry.stop();
|
||||
activation.resolve({ projectFingerprint: null, logSourceGeneration: null });
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(tracker.disableTracking).toHaveBeenCalledWith('demo', 'stall_monitor');
|
||||
});
|
||||
await expect(registry.listActiveTeams()).resolves.toEqual([]);
|
||||
});
|
||||
|
||||
it('does not activate a team when a reconcile resumes after stop', async () => {
|
||||
const aliveTeams = createDeferred<string[]>();
|
||||
const tracker = {
|
||||
enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })),
|
||||
disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })),
|
||||
};
|
||||
const registry = new ActiveTeamRegistry(
|
||||
{ listAliveProcessTeams: vi.fn(() => aliveTeams.promise) },
|
||||
tracker as never
|
||||
);
|
||||
|
||||
const reconcilePromise = registry.reconcile();
|
||||
await registry.stop();
|
||||
aliveTeams.resolve(['demo']);
|
||||
await reconcilePromise;
|
||||
|
||||
expect(tracker.enableTracking).not.toHaveBeenCalled();
|
||||
await expect(registry.listActiveTeams()).resolves.toEqual([]);
|
||||
});
|
||||
|
||||
it('does not re-enable tracking for teams that are already active during reconcile', async () => {
|
||||
const tracker = {
|
||||
enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })),
|
||||
|
|
|
|||
|
|
@ -49,6 +49,99 @@ describe('TeamTaskStallJournal', () => {
|
|||
expect(secondReady).toEqual([evaluation]);
|
||||
});
|
||||
|
||||
it('allows the same stalled epoch to alert again after the cooldown expires', async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-'));
|
||||
setClaudeBasePathOverride(tmpDir);
|
||||
await fs.mkdir(path.join(tmpDir, 'teams', 'demo'), { recursive: true });
|
||||
|
||||
const journal = new TeamTaskStallJournal({ alertCooldownMs: 10 * 60_000 });
|
||||
const evaluation = {
|
||||
status: 'alert',
|
||||
taskId: 'task-a',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
epochKey: 'task-a:epoch-1',
|
||||
reason: 'Potential work stall',
|
||||
} as const;
|
||||
|
||||
await journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:00:00.000Z',
|
||||
});
|
||||
await expect(
|
||||
journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:01:00.000Z',
|
||||
})
|
||||
).resolves.toEqual([evaluation]);
|
||||
await journal.markAlerted('demo', 'task-a:epoch-1', '2026-04-19T12:01:00.000Z');
|
||||
|
||||
await expect(
|
||||
journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:05:00.000Z',
|
||||
})
|
||||
).resolves.toEqual([]);
|
||||
await expect(
|
||||
journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:12:00.000Z',
|
||||
})
|
||||
).resolves.toEqual([evaluation]);
|
||||
});
|
||||
|
||||
it('does not suppress a stalled epoch forever when alertedAt is in the future', async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-'));
|
||||
setClaudeBasePathOverride(tmpDir);
|
||||
const teamDir = path.join(tmpDir, 'teams', 'demo');
|
||||
await fs.mkdir(teamDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(teamDir, 'stall-monitor-journal.json'),
|
||||
JSON.stringify([
|
||||
{
|
||||
epochKey: 'task-a:epoch-1',
|
||||
teamName: 'demo',
|
||||
taskId: 'task-a',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
state: 'alerted',
|
||||
consecutiveScans: 2,
|
||||
createdAt: '2026-04-19T12:00:00.000Z',
|
||||
updatedAt: '2026-04-19T12:01:00.000Z',
|
||||
alertedAt: '2026-04-19T13:00:00.000Z',
|
||||
},
|
||||
]),
|
||||
'utf8'
|
||||
);
|
||||
|
||||
const journal = new TeamTaskStallJournal({ alertCooldownMs: 10 * 60_000 });
|
||||
const evaluation = {
|
||||
status: 'alert',
|
||||
taskId: 'task-a',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
epochKey: 'task-a:epoch-1',
|
||||
reason: 'Potential work stall',
|
||||
} as const;
|
||||
|
||||
await expect(
|
||||
journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:05:00.000Z',
|
||||
})
|
||||
).resolves.toEqual([evaluation]);
|
||||
});
|
||||
|
||||
it('does not prune journal entries outside an explicit task scope', async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-'));
|
||||
setClaudeBasePathOverride(tmpDir);
|
||||
|
|
@ -102,6 +195,64 @@ describe('TeamTaskStallJournal', () => {
|
|||
expect(saved.map((entry) => entry.epochKey)).toEqual(['task-codex:epoch-1']);
|
||||
});
|
||||
|
||||
it('backfills member name on existing stall entries before alerting', async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-'));
|
||||
setClaudeBasePathOverride(tmpDir);
|
||||
const teamDir = path.join(tmpDir, 'teams', 'demo');
|
||||
await fs.mkdir(teamDir, { recursive: true });
|
||||
const journalPath = path.join(teamDir, 'stall-monitor-journal.json');
|
||||
await fs.writeFile(
|
||||
journalPath,
|
||||
JSON.stringify([
|
||||
{
|
||||
epochKey: 'task-a:epoch-1',
|
||||
teamName: 'demo',
|
||||
taskId: 'task-a',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
state: 'suspected',
|
||||
consecutiveScans: 1,
|
||||
createdAt: '2026-04-19T12:00:00.000Z',
|
||||
updatedAt: '2026-04-19T12:00:00.000Z',
|
||||
},
|
||||
]),
|
||||
'utf8'
|
||||
);
|
||||
|
||||
const journal = new TeamTaskStallJournal();
|
||||
const evaluation = {
|
||||
status: 'alert',
|
||||
taskId: 'task-a',
|
||||
memberName: 'bob',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
epochKey: 'task-a:epoch-1',
|
||||
reason: 'Potential work stall',
|
||||
} as const;
|
||||
|
||||
await expect(
|
||||
journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:10:00.000Z',
|
||||
})
|
||||
).resolves.toEqual([evaluation]);
|
||||
|
||||
const saved = JSON.parse(await fs.readFile(journalPath, 'utf8')) as Array<{
|
||||
epochKey: string;
|
||||
memberName?: string;
|
||||
state: string;
|
||||
}>;
|
||||
expect(saved).toEqual([
|
||||
expect.objectContaining({
|
||||
epochKey: 'task-a:epoch-1',
|
||||
memberName: 'bob',
|
||||
state: 'alert_ready',
|
||||
}),
|
||||
]);
|
||||
});
|
||||
|
||||
it('recovers from an invalid journal file on the next scan', async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-'));
|
||||
setClaudeBasePathOverride(tmpDir);
|
||||
|
|
|
|||
|
|
@ -2,6 +2,32 @@ import { afterEach, describe, expect, it, vi } from 'vitest';
|
|||
|
||||
import { TeamTaskStallMonitor } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallMonitor';
|
||||
|
||||
function neverResolves(): Promise<never> {
|
||||
return new Promise(() => undefined);
|
||||
}
|
||||
|
||||
interface Deferred<T> {
|
||||
promise: Promise<T>;
|
||||
resolve: (value: T) => void;
|
||||
reject: (reason?: unknown) => void;
|
||||
}
|
||||
|
||||
function createDeferred<T>(): Deferred<T> {
|
||||
let resolve!: (value: T) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
async function flushAsyncWork(): Promise<void> {
|
||||
for (let i = 0; i < 8; i += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
describe('TeamTaskStallMonitor', () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
|
|
@ -113,6 +139,200 @@ describe('TeamTaskStallMonitor', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('times out a hung scan so later stall scans continue', async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS', '1000');
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_STARTUP_GRACE_MS', '1');
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS', '1');
|
||||
|
||||
const snapshotSource = {
|
||||
getSnapshot: vi.fn().mockImplementationOnce(neverResolves).mockResolvedValueOnce(null),
|
||||
};
|
||||
const monitor = new TeamTaskStallMonitor(
|
||||
{
|
||||
start: vi.fn(),
|
||||
stop: vi.fn(async () => undefined),
|
||||
noteTeamChange: vi.fn(),
|
||||
listActiveTeams: vi.fn(async () => ['demo']),
|
||||
} as never,
|
||||
snapshotSource as never,
|
||||
{ evaluateWork: vi.fn(), evaluateReview: vi.fn() } as never,
|
||||
{ reconcileScan: vi.fn(), markAlerted: vi.fn() } as never,
|
||||
{ notifyLead: vi.fn(), notifyOpenCodeOwners: vi.fn() } as never,
|
||||
{ scanTimeoutMs: 10 }
|
||||
);
|
||||
|
||||
monitor.start();
|
||||
await vi.advanceTimersByTimeAsync(3_010);
|
||||
expect(snapshotSource.getSnapshot).toHaveBeenCalledTimes(1);
|
||||
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
|
||||
'task stall monitor scan timed out after 10ms'
|
||||
);
|
||||
vi.mocked(console.warn).mockClear();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_001);
|
||||
expect(snapshotSource.getSnapshot).toHaveBeenCalledTimes(2);
|
||||
|
||||
await monitor.stop();
|
||||
});
|
||||
|
||||
it('does not let one stuck team block stall scans for other active teams', async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS', '1000');
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_STARTUP_GRACE_MS', '1');
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS', '1');
|
||||
|
||||
const task = {
|
||||
id: 'task-healthy',
|
||||
displayId: 'beef1234',
|
||||
subject: 'Healthy team task',
|
||||
};
|
||||
const readyEvaluation = {
|
||||
status: 'alert',
|
||||
taskId: 'task-healthy',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
epochKey: 'task-healthy:epoch',
|
||||
reason: 'Potential work stall.',
|
||||
};
|
||||
const snapshotSource = {
|
||||
getSnapshot: vi.fn(async (teamName: string) => {
|
||||
if (teamName === 'stuck') {
|
||||
return neverResolves();
|
||||
}
|
||||
return {
|
||||
teamName: 'healthy',
|
||||
inProgressTasks: [task],
|
||||
reviewOpenTasks: [],
|
||||
allTasksById: new Map([['task-healthy', task]]),
|
||||
};
|
||||
}),
|
||||
};
|
||||
const journal = {
|
||||
reconcileScan: vi.fn(async () => [readyEvaluation]),
|
||||
markAlerted: vi.fn(async () => undefined),
|
||||
};
|
||||
const notifier = {
|
||||
notifyLead: vi.fn(async () => undefined),
|
||||
notifyOpenCodeOwners: vi.fn(async () => []),
|
||||
};
|
||||
const monitor = new TeamTaskStallMonitor(
|
||||
{
|
||||
start: vi.fn(),
|
||||
stop: vi.fn(async () => undefined),
|
||||
noteTeamChange: vi.fn(),
|
||||
listActiveTeams: vi.fn(async () => ['stuck', 'healthy']),
|
||||
} as never,
|
||||
snapshotSource as never,
|
||||
{
|
||||
evaluateWork: vi.fn(() => readyEvaluation),
|
||||
evaluateReview: vi.fn(),
|
||||
} as never,
|
||||
journal as never,
|
||||
notifier as never,
|
||||
{ scanTimeoutMs: 100 }
|
||||
);
|
||||
|
||||
monitor.start();
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
await flushAsyncWork();
|
||||
|
||||
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
|
||||
'task stall monitor scan timed out after 100ms'
|
||||
);
|
||||
vi.mocked(console.warn).mockClear();
|
||||
expect(snapshotSource.getSnapshot).toHaveBeenCalledWith('stuck');
|
||||
expect(snapshotSource.getSnapshot).toHaveBeenCalledWith('healthy');
|
||||
expect(notifier.notifyLead).toHaveBeenCalledWith(
|
||||
'healthy',
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
taskId: 'task-healthy',
|
||||
}),
|
||||
])
|
||||
);
|
||||
expect(journal.markAlerted).toHaveBeenCalledWith(
|
||||
'healthy',
|
||||
'task-healthy:epoch',
|
||||
expect.any(String)
|
||||
);
|
||||
|
||||
await monitor.stop();
|
||||
});
|
||||
|
||||
it('ignores late side effects from a scan that already timed out', async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS', '1000');
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_STARTUP_GRACE_MS', '1');
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS', '1');
|
||||
|
||||
const staleJournalScan = createDeferred<unknown[]>();
|
||||
const readyEvaluation = {
|
||||
status: 'alert',
|
||||
taskId: 'task-a',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
epochKey: 'task-a:epoch',
|
||||
reason: 'Potential work stall.',
|
||||
};
|
||||
const task = { id: 'task-a', displayId: 'abcd1234', subject: 'Task A' };
|
||||
const notifier = {
|
||||
notifyLead: vi.fn(async () => undefined),
|
||||
notifyOpenCodeOwners: vi.fn(async () => []),
|
||||
};
|
||||
const journal = {
|
||||
reconcileScan: vi
|
||||
.fn()
|
||||
.mockImplementationOnce(() => staleJournalScan.promise)
|
||||
.mockResolvedValueOnce([]),
|
||||
markAlerted: vi.fn(async () => undefined),
|
||||
};
|
||||
const monitor = new TeamTaskStallMonitor(
|
||||
{
|
||||
start: vi.fn(),
|
||||
stop: vi.fn(async () => undefined),
|
||||
noteTeamChange: vi.fn(),
|
||||
listActiveTeams: vi.fn(async () => ['demo']),
|
||||
} as never,
|
||||
{
|
||||
getSnapshot: vi.fn(async () => ({
|
||||
teamName: 'demo',
|
||||
inProgressTasks: [task],
|
||||
reviewOpenTasks: [],
|
||||
allTasksById: new Map([['task-a', task]]),
|
||||
})),
|
||||
} as never,
|
||||
{
|
||||
evaluateWork: vi.fn(() => readyEvaluation),
|
||||
evaluateReview: vi.fn(),
|
||||
} as never,
|
||||
journal as never,
|
||||
notifier as never,
|
||||
{ scanTimeoutMs: 10 }
|
||||
);
|
||||
|
||||
monitor.start();
|
||||
await vi.advanceTimersByTimeAsync(3_010);
|
||||
expect(journal.reconcileScan).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
expect(vi.mocked(console.warn).mock.calls[0]?.join(' ')).toContain(
|
||||
'task stall monitor scan timed out after 10ms'
|
||||
);
|
||||
vi.mocked(console.warn).mockClear();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_001);
|
||||
expect(journal.reconcileScan).toHaveBeenCalledTimes(2);
|
||||
|
||||
staleJournalScan.resolve([readyEvaluation]);
|
||||
await flushAsyncWork();
|
||||
|
||||
expect(notifier.notifyLead).not.toHaveBeenCalled();
|
||||
expect(journal.markAlerted).not.toHaveBeenCalled();
|
||||
|
||||
await monitor.stop();
|
||||
});
|
||||
|
||||
it('defaults to OpenCode owner remediation without duplicate lead alerts when remediation is accepted', async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.stubEnv('CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS', '1000');
|
||||
|
|
|
|||
Loading…
Reference in a new issue