From a8cca6565809bff67ea163a52ea6910d6b2ab4ef Mon Sep 17 00:00:00 2001 From: 777genius Date: Sun, 19 Apr 2026 21:17:09 +0300 Subject: [PATCH] feat(team): add task stall monitor shadow rollout --- src/main/index.ts | 20 + src/main/services/team/TeamDataService.ts | 38 ++ .../services/team/TeamLogSourceTracker.ts | 6 +- src/main/services/team/index.ts | 9 + .../team/stallMonitor/ActiveTeamRegistry.ts | 101 ++++ .../BoardTaskActivityBatchIndexer.ts | 30 ++ .../TeamTaskLogFreshnessReader.ts | 124 +++++ .../TeamTaskStallExactRowReader.ts | 127 +++++ .../team/stallMonitor/TeamTaskStallJournal.ts | 145 +++++ .../team/stallMonitor/TeamTaskStallMonitor.ts | 246 +++++++++ .../stallMonitor/TeamTaskStallNotifier.ts | 32 ++ .../team/stallMonitor/TeamTaskStallPolicy.ts | 508 ++++++++++++++++++ .../TeamTaskStallSnapshotSource.ts | 119 ++++ .../team/stallMonitor/TeamTaskStallTypes.ts | 139 +++++ .../team/stallMonitor/featureGates.ts | 42 ++ .../team/stallMonitor/reviewerResolution.ts | 47 ++ .../BoardTaskActivityRecordBuilder.ts | 131 +++-- .../team/TeamDataService.stallMonitor.test.ts | 133 +++++ .../team/TeamLogSourceTracker.test.ts | 34 ++ .../stallMonitor/ActiveTeamRegistry.test.ts | 127 +++++ .../BoardTaskActivityBatchIndexer.test.ts | 118 ++++ .../TeamTaskLogFreshnessReader.test.ts | 57 ++ .../TeamTaskStallExactRowReader.test.ts | 149 +++++ .../stallMonitor/TeamTaskStallJournal.test.ts | 51 ++ .../stallMonitor/TeamTaskStallMonitor.test.ts | 87 +++ .../stallMonitor/TeamTaskStallPolicy.test.ts | 460 ++++++++++++++++ .../TeamTaskStallSnapshotSource.test.ts | 142 +++++ .../team/stallMonitor/featureGates.test.ts | 37 ++ 28 files changed, 3217 insertions(+), 42 deletions(-) create mode 100644 src/main/services/team/stallMonitor/ActiveTeamRegistry.ts create mode 100644 src/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallExactRowReader.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallJournal.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallMonitor.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallNotifier.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallPolicy.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.ts create mode 100644 src/main/services/team/stallMonitor/TeamTaskStallTypes.ts create mode 100644 src/main/services/team/stallMonitor/featureGates.ts create mode 100644 src/main/services/team/stallMonitor/reviewerResolution.ts create mode 100644 test/main/services/team/TeamDataService.stallMonitor.test.ts create mode 100644 test/main/services/team/stallMonitor/ActiveTeamRegistry.test.ts create mode 100644 test/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.test.ts create mode 100644 test/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.test.ts create mode 100644 test/main/services/team/stallMonitor/TeamTaskStallExactRowReader.test.ts create mode 100644 test/main/services/team/stallMonitor/TeamTaskStallJournal.test.ts create mode 100644 test/main/services/team/stallMonitor/TeamTaskStallMonitor.test.ts create mode 100644 test/main/services/team/stallMonitor/TeamTaskStallPolicy.test.ts create mode 100644 test/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.test.ts create mode 100644 test/main/services/team/stallMonitor/featureGates.test.ts diff --git a/src/main/index.ts b/src/main/index.ts index 42b9d15a..ff376759 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -111,6 +111,7 @@ import { } from './utils/safeWebContentsSend'; import { syncTelemetryFlag } from './sentry'; import { + ActiveTeamRegistry, BoardTaskActivityDetailService, BoardTaskActivityRecordSource, BoardTaskActivityService, @@ -130,6 +131,11 @@ import { TaskBoundaryParser, TeamDataService, TeamLogSourceTracker, + TeamTaskStallJournal, + TeamTaskStallMonitor, + TeamTaskStallNotifier, + TeamTaskStallPolicy, + TeamTaskStallSnapshotSource, TeammateToolTracker, TeamMemberLogsFinder, TeamProvisioningService, @@ -415,6 +421,7 @@ let cliInstallerService: CliInstallerService; let ptyTerminalService: PtyTerminalService; let httpServer: HttpServer; let schedulerService: SchedulerService; +let teamTaskStallMonitor: TeamTaskStallMonitor | null = null; let skillsWatcherService: SkillsWatcherService | null = null; let teamBackupService: TeamBackupService | null = null; let branchStatusService: BranchStatusService | null = null; @@ -848,6 +855,13 @@ async function initializeServices(): Promise { const taskChangePresenceRepository = new JsonTaskChangePresenceRepository(); const teamLogSourceTracker = new TeamLogSourceTracker(teamMemberLogsFinder); + teamTaskStallMonitor = new TeamTaskStallMonitor( + new ActiveTeamRegistry(teamDataService, teamLogSourceTracker), + new TeamTaskStallSnapshotSource(), + new TeamTaskStallPolicy(), + new TeamTaskStallJournal(), + new TeamTaskStallNotifier(teamDataService) + ); let teammateToolTracker: TeammateToolTracker | null = null; branchStatusService = new BranchStatusService((event) => { safeSendToRenderer(mainWindow, TEAM_PROJECT_BRANCH_CHANGE, event); @@ -930,6 +944,7 @@ async function initializeServices(): Promise { // Allow TeamProvisioningService to trigger team refresh events (e.g. live lead replies). const teamChangeEmitter = (event: TeamChangeEvent): void => { forwardTeamChange(event); + teamTaskStallMonitor?.noteTeamChange(event); if (event.type === 'lead-activity' && event.detail === 'offline') { teammateToolTracker?.handleTeamOffline(event.teamName); } @@ -939,6 +954,7 @@ async function initializeServices(): Promise { teamLogSourceTracker.onLogSourceChange((teamName) => { teammateToolTracker?.handleLogSourceChange(teamName); }); + teamTaskStallMonitor.start(); // Allow SchedulerService to push schedule events to renderer schedulerService.setChangeEmitter((event) => { @@ -1142,6 +1158,10 @@ function shutdownServices(): void { if (teamDataService) { teamDataService.stopProcessHealthPolling(); } + if (teamTaskStallMonitor) { + void teamTaskStallMonitor.stop(); + teamTaskStallMonitor = null; + } branchStatusService?.dispose(); branchStatusService = null; diff --git a/src/main/services/team/TeamDataService.ts b/src/main/services/team/TeamDataService.ts index 0fc2be98..d989b79d 100644 --- a/src/main/services/team/TeamDataService.ts +++ b/src/main/services/team/TeamDataService.ts @@ -511,6 +511,27 @@ export class TeamDataService { return this.configReader.listTeams(); } + async listAliveProcessTeams(): Promise { + const teams = await this.listTeams(); + const alive: string[] = []; + + for (const team of teams) { + if (team.deletedAt) { + continue; + } + try { + const processes = await this.readProcesses(team.teamName); + if (processes.some((process) => !process.stoppedAt)) { + alive.push(team.teamName); + } + } catch { + // best-effort per team + } + } + + return alive.sort((left, right) => left.localeCompare(right)); + } + async getAllTasks(): Promise { const rawTasks = await this.taskReader.getAllTasks(); const teams = await this.configReader.listTeams(); @@ -1741,6 +1762,23 @@ export class TeamDataService { return result; } + async sendSystemNotificationToLead(args: { + teamName: string; + summary: string; + text: string; + taskRefs?: TaskRef[]; + }): Promise { + const leadName = await this.resolveLeadName(args.teamName); + return this.sendMessage(args.teamName, { + member: leadName, + from: 'system', + summary: args.summary, + text: args.text, + ...(args.taskRefs && args.taskRefs.length > 0 ? { taskRefs: args.taskRefs } : {}), + source: TASK_COMMENT_NOTIFICATION_SOURCE, + }); + } + private resolveLeadNameFromConfig(config: TeamConfig | null): string { if (!config) return 'team-lead'; const lead = config.members?.find((m) => m.role?.toLowerCase().includes('lead')); diff --git a/src/main/services/team/TeamLogSourceTracker.ts b/src/main/services/team/TeamLogSourceTracker.ts index 045cc007..0f99a0ce 100644 --- a/src/main/services/team/TeamLogSourceTracker.ts +++ b/src/main/services/team/TeamLogSourceTracker.ts @@ -22,7 +22,11 @@ interface TeamLogSourceSnapshot { logSourceGeneration: string | null; } -export type TeamLogSourceTrackingConsumer = 'change_presence' | 'tool_activity' | 'task_log_stream'; +export type TeamLogSourceTrackingConsumer = + | 'change_presence' + | 'tool_activity' + | 'task_log_stream' + | 'stall_monitor'; interface TrackingState { watcher: FSWatcher | null; diff --git a/src/main/services/team/index.ts b/src/main/services/team/index.ts index f6290a2f..fda988d3 100644 --- a/src/main/services/team/index.ts +++ b/src/main/services/team/index.ts @@ -39,3 +39,12 @@ export { TeamSentMessagesStore } from './TeamSentMessagesStore'; export { TeamTaskReader } from './TeamTaskReader'; export { TeamTaskWriter } from './TeamTaskWriter'; export { countLineChanges } from './UnifiedLineCounter'; +export { ActiveTeamRegistry } from './stallMonitor/ActiveTeamRegistry'; +export { BoardTaskActivityBatchIndexer } from './stallMonitor/BoardTaskActivityBatchIndexer'; +export { TeamTaskLogFreshnessReader } from './stallMonitor/TeamTaskLogFreshnessReader'; +export { TeamTaskStallExactRowReader } from './stallMonitor/TeamTaskStallExactRowReader'; +export { TeamTaskStallJournal } from './stallMonitor/TeamTaskStallJournal'; +export { TeamTaskStallMonitor } from './stallMonitor/TeamTaskStallMonitor'; +export { TeamTaskStallNotifier } from './stallMonitor/TeamTaskStallNotifier'; +export { TeamTaskStallPolicy } from './stallMonitor/TeamTaskStallPolicy'; +export { TeamTaskStallSnapshotSource } from './stallMonitor/TeamTaskStallSnapshotSource'; diff --git a/src/main/services/team/stallMonitor/ActiveTeamRegistry.ts b/src/main/services/team/stallMonitor/ActiveTeamRegistry.ts new file mode 100644 index 00000000..8e838772 --- /dev/null +++ b/src/main/services/team/stallMonitor/ActiveTeamRegistry.ts @@ -0,0 +1,101 @@ +import type { TeamLogSourceTracker } from '../TeamLogSourceTracker'; +import type { TeamChangeEvent } from '@shared/types'; + +interface TeamAliveProcessesReader { + listAliveProcessTeams(): Promise; +} + +interface TeamLogSourceTrackingHandle { + enableTracking( + teamName: string, + consumer: 'stall_monitor' + ): Promise<{ projectFingerprint: string | null; logSourceGeneration: string | null }>; + disableTracking( + teamName: string, + consumer: 'stall_monitor' + ): Promise<{ projectFingerprint: string | null; logSourceGeneration: string | null }>; +} + +export class ActiveTeamRegistry { + private readonly activeTeams = new Set(); + private reconcileTimer: ReturnType | null = null; + + constructor( + private readonly teamDataService: TeamAliveProcessesReader, + private readonly teamLogSourceTracker: Pick< + TeamLogSourceTracker, + 'enableTracking' | 'disableTracking' + > & + TeamLogSourceTrackingHandle, + private readonly reconcileIntervalMs: number = 5 * 60_000 + ) {} + + noteTeamChange(event: TeamChangeEvent): void { + if ( + event.type === 'member-spawn' || + (event.type === 'lead-activity' && event.detail !== 'offline') + ) { + if (!this.activeTeams.has(event.teamName)) { + this.activeTeams.add(event.teamName); + void this.teamLogSourceTracker.enableTracking(event.teamName, 'stall_monitor'); + } + return; + } + + if (event.type === 'task-log-change' || event.type === 'log-source-change') { + if (!this.activeTeams.has(event.teamName)) { + return; + } + } + } + + async listActiveTeams(): Promise { + return [...this.activeTeams].sort((left, right) => left.localeCompare(right)); + } + + start(): void { + if (this.reconcileTimer) { + return; + } + void this.reconcile(); + this.reconcileTimer = setInterval(() => { + void this.reconcile(); + }, this.reconcileIntervalMs); + } + + async stop(): Promise { + if (this.reconcileTimer) { + clearInterval(this.reconcileTimer); + this.reconcileTimer = null; + } + + const teamNames = [...this.activeTeams]; + this.activeTeams.clear(); + await Promise.all( + teamNames.map((teamName) => + this.teamLogSourceTracker.disableTracking(teamName, 'stall_monitor') + ) + ); + } + + async reconcile(): Promise { + const aliveTeams = await this.teamDataService.listAliveProcessTeams(); + const aliveSet = new Set(aliveTeams); + + for (const teamName of aliveTeams) { + if (this.activeTeams.has(teamName)) { + continue; + } + this.activeTeams.add(teamName); + await this.teamLogSourceTracker.enableTracking(teamName, 'stall_monitor'); + } + + for (const teamName of [...this.activeTeams]) { + if (aliveSet.has(teamName)) { + continue; + } + this.activeTeams.delete(teamName); + await this.teamLogSourceTracker.disableTracking(teamName, 'stall_monitor'); + } + } +} diff --git a/src/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.ts b/src/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.ts new file mode 100644 index 00000000..548effb5 --- /dev/null +++ b/src/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.ts @@ -0,0 +1,30 @@ +import { BoardTaskActivityRecordBuilder } from '../taskLogs/activity/BoardTaskActivityRecordBuilder'; + +import type { BoardTaskActivityRecord } from '../taskLogs/activity/BoardTaskActivityRecord'; +import type { RawTaskActivityMessage } from '../taskLogs/activity/BoardTaskActivityTranscriptReader'; +import type { TeamTask } from '@shared/types'; + +export class BoardTaskActivityBatchIndexer { + constructor( + private readonly recordBuilder: Pick< + BoardTaskActivityRecordBuilder, + 'buildForTasks' + > = new BoardTaskActivityRecordBuilder() + ) {} + + buildIndex(args: { + teamName: string; + tasks: TeamTask[]; + messages: RawTaskActivityMessage[]; + }): Map { + if (args.tasks.length === 0 || args.messages.length === 0) { + return new Map(); + } + + return this.recordBuilder.buildForTasks({ + teamName: args.teamName, + tasks: args.tasks, + messages: args.messages, + }); + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.ts b/src/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.ts new file mode 100644 index 00000000..326af24e --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.ts @@ -0,0 +1,124 @@ +import * as fs from 'fs/promises'; +import * as path from 'path'; + +import { BoardTaskActivityParseCache } from '../taskLogs/activity/BoardTaskActivityParseCache'; + +import type { TaskLogFreshnessSignal } from './TeamTaskStallTypes'; + +const BOARD_TASK_LOG_FRESHNESS_DIRNAME = '.board-task-log-freshness'; +const BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX = '.json'; + +interface ParsedFreshnessSignal { + taskId: string; + updatedAt: string; + transcriptFileBasename?: string; +} + +function encodeTaskId(taskId: string): string { + return encodeURIComponent(taskId); +} + +function isValidTimestamp(value: unknown): value is string { + return typeof value === 'string' && value.trim().length > 0 && Number.isFinite(Date.parse(value)); +} + +export class TeamTaskLogFreshnessReader { + private readonly cache = new BoardTaskActivityParseCache(); + + async readSignals( + projectDir: string, + taskIds: string[] + ): Promise> { + const uniqueTaskIds = [...new Set(taskIds)].filter((taskId) => taskId.trim().length > 0).sort(); + const signalFilePaths = uniqueTaskIds.map((taskId) => + path.join( + projectDir, + BOARD_TASK_LOG_FRESHNESS_DIRNAME, + `${encodeTaskId(taskId)}${BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX}` + ) + ); + this.cache.retainOnly(new Set(signalFilePaths)); + + const rows = await Promise.all( + uniqueTaskIds.map(async (taskId, index) => { + const filePath = signalFilePaths[index]; + const parsed = await this.readSignal(filePath); + if (!parsed || parsed.taskId !== taskId) { + return null; + } + return [ + taskId, + { + taskId, + updatedAt: parsed.updatedAt, + filePath, + ...(parsed.transcriptFileBasename + ? { transcriptFileBasename: parsed.transcriptFileBasename } + : {}), + } satisfies TaskLogFreshnessSignal, + ] as const; + }) + ); + + return new Map(rows.filter((row): row is NonNullable => row !== null)); + } + + private async readSignal(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + if (!stat.isFile()) { + this.cache.clearForPath(filePath); + return false; + } + + const cached = this.cache.getIfFresh(filePath, stat.mtimeMs, stat.size); + if (cached !== null) { + return cached; + } + + const inFlight = this.cache.getInFlight(filePath); + if (inFlight) { + return inFlight; + } + + const promise = this.parseSignal(filePath); + this.cache.setInFlight(filePath, promise); + try { + const parsed = await promise; + this.cache.set(filePath, stat.mtimeMs, stat.size, parsed); + return parsed; + } finally { + this.cache.clearInFlight(filePath); + } + } catch { + this.cache.clearForPath(filePath); + return false; + } + } + + private async parseSignal(filePath: string): Promise { + const raw = await fs.readFile(filePath, 'utf8'); + const parsed = JSON.parse(raw) as unknown; + if (!parsed || typeof parsed !== 'object') { + return false; + } + + const record = parsed as Record; + const taskId = + typeof record.taskId === 'string' && record.taskId.trim().length > 0 + ? record.taskId.trim() + : null; + const updatedAt = isValidTimestamp(record.updatedAt) ? record.updatedAt : null; + if (!taskId || !updatedAt) { + return false; + } + + return { + taskId, + updatedAt, + ...(typeof record.transcriptFile === 'string' && record.transcriptFile.trim().length > 0 + ? { transcriptFileBasename: path.basename(record.transcriptFile.trim()) } + : {}), + }; + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallExactRowReader.ts b/src/main/services/team/stallMonitor/TeamTaskStallExactRowReader.ts new file mode 100644 index 00000000..b515eb3a --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallExactRowReader.ts @@ -0,0 +1,127 @@ +import { yieldToEventLoop } from '@main/utils/asyncYield'; +import { parseJsonlLine } from '@main/utils/jsonl'; +import { createLogger } from '@shared/utils/logger'; +import { createReadStream } from 'fs'; +import * as fs from 'fs/promises'; +import * as readline from 'readline'; + +import { BoardTaskActivityParseCache } from '../taskLogs/activity/BoardTaskActivityParseCache'; + +import type { TeamTaskStallExactRow } from './TeamTaskStallTypes'; + +const logger = createLogger('Service:TeamTaskStallExactRowReader'); + +function asRecord(value: unknown): Record | null { + return value && typeof value === 'object' ? (value as Record) : null; +} + +function hasStrictTimestamp(record: Record): boolean { + return typeof record.timestamp === 'string' && Number.isFinite(Date.parse(record.timestamp)); +} + +function parseSystemSubtype(record: Record): 'turn_duration' | 'init' | undefined { + return record.subtype === 'turn_duration' || record.subtype === 'init' + ? record.subtype + : undefined; +} + +export class TeamTaskStallExactRowReader { + private readonly cache = new BoardTaskActivityParseCache(); + + async parseFiles(filePaths: string[]): Promise> { + const uniquePaths = [...new Set(filePaths)].sort(); + this.cache.retainOnly(new Set(uniquePaths)); + + const rows = await Promise.all( + uniquePaths.map(async (filePath) => [filePath, await this.parseFile(filePath)] as const) + ); + return new Map(rows); + } + + private async parseFile(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + const cached = this.cache.getIfFresh(filePath, stat.mtimeMs, stat.size); + if (cached !== null) { + return cached; + } + + const inFlight = this.cache.getInFlight(filePath); + if (inFlight) { + return inFlight; + } + + const promise = this.readFile(filePath); + this.cache.setInFlight(filePath, promise); + try { + const parsed = await promise; + this.cache.set(filePath, stat.mtimeMs, stat.size, parsed); + return parsed; + } finally { + this.cache.clearInFlight(filePath); + } + } catch (error) { + logger.debug(`Skipping unreadable stall exact-log transcript ${filePath}: ${String(error)}`); + this.cache.clearForPath(filePath); + return []; + } + } + + private async readFile(filePath: string): Promise { + const rows: TeamTaskStallExactRow[] = []; + const stream = createReadStream(filePath, { encoding: 'utf8' }); + const rl = readline.createInterface({ + input: stream, + crlfDelay: Infinity, + }); + + let lineCount = 0; + let sourceOrder = 0; + + for await (const line of rl) { + if (!line.trim()) { + continue; + } + lineCount += 1; + + try { + const raw = JSON.parse(line) as unknown; + const record = asRecord(raw); + if (!record || !hasStrictTimestamp(record)) { + continue; + } + + const parsed = parseJsonlLine(line); + if (!parsed) { + continue; + } + + sourceOrder += 1; + const systemSubtype = parseSystemSubtype(record); + rows.push({ + filePath, + sourceOrder, + messageUuid: parsed.uuid, + timestamp: record.timestamp as string, + parsedMessage: parsed, + ...(parsed.requestId ? { requestId: parsed.requestId } : {}), + ...(parsed.sourceToolUseID ? { sourceToolUseId: parsed.sourceToolUseID } : {}), + ...(parsed.sourceToolAssistantUUID + ? { sourceToolAssistantUuid: parsed.sourceToolAssistantUUID } + : {}), + ...(systemSubtype ? { systemSubtype } : {}), + toolUseIds: parsed.toolCalls.map((toolCall) => toolCall.id), + toolResultIds: parsed.toolResults.map((toolResult) => toolResult.toolUseId), + }); + } catch (error) { + logger.debug(`Skipping malformed stall exact-log line in ${filePath}: ${String(error)}`); + } + + if (lineCount % 250 === 0) { + await yieldToEventLoop(); + } + } + + return rows; + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallJournal.ts b/src/main/services/team/stallMonitor/TeamTaskStallJournal.ts new file mode 100644 index 00000000..316796e6 --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallJournal.ts @@ -0,0 +1,145 @@ +import { getTeamsBasePath } from '@main/utils/pathDecoder'; +import * as fs from 'fs'; +import * as path from 'path'; + +import { atomicWriteAsync } from '../atomicWrite'; +import { withFileLock } from '../fileLock'; + +import type { + TaskStallEvaluation, + TaskStallJournalEntry, + TaskStallJournalState, +} from './TeamTaskStallTypes'; + +function isValidState(value: unknown): value is TaskStallJournalState { + return value === 'suspected' || value === 'alert_ready' || value === 'alerted'; +} + +export class TeamTaskStallJournal { + private getFilePath(teamName: string): string { + return path.join(getTeamsBasePath(), teamName, 'stall-monitor-journal.json'); + } + + async reconcileScan(args: { + teamName: string; + evaluations: TaskStallEvaluation[]; + activeTaskIds: string[]; + now: string; + }): Promise { + const filePath = this.getFilePath(args.teamName); + let readyEvaluations: TaskStallEvaluation[] = []; + + await withFileLock(filePath, async () => { + const entries = await this.readUnlocked(filePath); + const candidateByEpoch = new Map( + args.evaluations + .filter( + ( + evaluation + ): evaluation is TaskStallEvaluation & + Required> => + evaluation.status === 'alert' && + typeof evaluation.taskId === 'string' && + typeof evaluation.branch === 'string' && + typeof evaluation.signal === 'string' && + typeof evaluation.epochKey === 'string' + ) + .map((evaluation) => [evaluation.epochKey, evaluation] as const) + ); + + const activeTaskIdSet = new Set(args.activeTaskIds); + for (let i = entries.length - 1; i >= 0; i -= 1) { + const entry = entries[i]; + if (!activeTaskIdSet.has(entry.taskId) || !candidateByEpoch.has(entry.epochKey)) { + entries.splice(i, 1); + } + } + + for (const [epochKey, evaluation] of candidateByEpoch) { + const existing = entries.find((entry) => entry.epochKey === epochKey); + if (!existing) { + entries.push({ + epochKey, + teamName: args.teamName, + taskId: evaluation.taskId, + branch: evaluation.branch, + signal: evaluation.signal, + state: 'suspected', + consecutiveScans: 1, + createdAt: args.now, + updatedAt: args.now, + }); + continue; + } + + existing.updatedAt = args.now; + if (existing.state === 'alerted') { + continue; + } + + existing.consecutiveScans += 1; + if (existing.consecutiveScans >= 2) { + existing.state = 'alert_ready'; + readyEvaluations.push(evaluation); + } + } + + await atomicWriteAsync(filePath, JSON.stringify(entries, null, 2)); + }); + + return readyEvaluations; + } + + async markAlerted(teamName: string, epochKey: string, now: string): Promise { + const filePath = this.getFilePath(teamName); + await withFileLock(filePath, async () => { + const entries = await this.readUnlocked(filePath); + const target = entries.find((entry) => entry.epochKey === epochKey); + if (!target) { + return; + } + target.state = 'alerted'; + target.updatedAt = now; + target.alertedAt = now; + await atomicWriteAsync(filePath, JSON.stringify(entries, null, 2)); + }); + } + + private async readUnlocked(filePath: string): Promise { + try { + const raw = await fs.promises.readFile(filePath, 'utf8'); + const parsed = JSON.parse(raw) as unknown; + if (!Array.isArray(parsed)) { + return []; + } + + return parsed + .filter( + (item): item is TaskStallJournalEntry => + item != null && + typeof item === 'object' && + typeof (item as TaskStallJournalEntry).epochKey === 'string' && + typeof (item as TaskStallJournalEntry).teamName === 'string' && + typeof (item as TaskStallJournalEntry).taskId === 'string' && + ((item as TaskStallJournalEntry).branch === 'work' || + (item as TaskStallJournalEntry).branch === 'review') && + ((item as TaskStallJournalEntry).signal === 'turn_ended_after_touch' || + (item as TaskStallJournalEntry).signal === 'mid_turn_after_touch' || + (item as TaskStallJournalEntry).signal === 'touch_then_other_turns') && + isValidState((item as TaskStallJournalEntry).state) && + typeof (item as TaskStallJournalEntry).consecutiveScans === 'number' && + typeof (item as TaskStallJournalEntry).createdAt === 'string' && + typeof (item as TaskStallJournalEntry).updatedAt === 'string' + ) + .map((entry) => ({ + ...entry, + ...(entry.alertedAt ? { alertedAt: entry.alertedAt } : {}), + })); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') { + return []; + } + throw error; + } + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallMonitor.ts b/src/main/services/team/stallMonitor/TeamTaskStallMonitor.ts new file mode 100644 index 00000000..c5cfbe66 --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallMonitor.ts @@ -0,0 +1,246 @@ +import { createLogger } from '@shared/utils/logger'; +import { getTaskDisplayId } from '@shared/utils/taskIdentity'; + +import { ActiveTeamRegistry } from './ActiveTeamRegistry'; +import { + getTeamTaskStallActivationGraceMs, + getTeamTaskStallScanIntervalMs, + getTeamTaskStallStartupGraceMs, + isTeamTaskStallAlertsEnabled, + isTeamTaskStallMonitorEnabled, +} from './featureGates'; + +import type { TeamTaskStallSnapshotSource } from './TeamTaskStallSnapshotSource'; +import type { TeamTaskStallPolicy } from './TeamTaskStallPolicy'; +import type { TeamTaskStallJournal } from './TeamTaskStallJournal'; +import type { TeamTaskStallNotifier } from './TeamTaskStallNotifier'; +import type { TaskStallAlert, TaskStallEvaluation } from './TeamTaskStallTypes'; +import type { TeamChangeEvent } from '@shared/types'; + +const logger = createLogger('Service:TeamTaskStallMonitor'); + +interface TeamObservationState { + firstSeenAtMs: number; + lastActivationAtMs: number; +} + +export class TeamTaskStallMonitor { + private scanTimer: ReturnType | null = null; + private nudgeTimer: ReturnType | null = null; + private scanInFlight = false; + private started = false; + private readonly observationByTeam = new Map(); + + constructor( + private readonly registry: ActiveTeamRegistry, + private readonly snapshotSource: TeamTaskStallSnapshotSource, + private readonly policy: TeamTaskStallPolicy, + private readonly journal: TeamTaskStallJournal, + private readonly notifier: TeamTaskStallNotifier + ) {} + + start(): void { + if (!isTeamTaskStallMonitorEnabled()) { + logger.debug('Task stall monitor disabled by feature gate'); + return; + } + if (this.started) { + return; + } + this.started = true; + this.registry.start(); + this.scheduleNextScan(2_000); + } + + async stop(): Promise { + this.started = false; + if (this.scanTimer) { + clearTimeout(this.scanTimer); + this.scanTimer = null; + } + if (this.nudgeTimer) { + clearTimeout(this.nudgeTimer); + this.nudgeTimer = null; + } + await this.registry.stop(); + } + + noteTeamChange(event: TeamChangeEvent): void { + this.registry.noteTeamChange(event); + if (!isTeamTaskStallMonitorEnabled()) { + return; + } + + if ( + event.type === 'member-spawn' || + (event.type === 'lead-activity' && event.detail !== 'offline') + ) { + const now = Date.now(); + const existing = this.observationByTeam.get(event.teamName); + this.observationByTeam.set(event.teamName, { + firstSeenAtMs: existing?.firstSeenAtMs ?? now, + lastActivationAtMs: now, + }); + this.scheduleNudgedScan(); + return; + } + + if (event.type === 'task-log-change' || event.type === 'log-source-change') { + this.scheduleNudgedScan(); + } + } + + private scheduleNextScan(delayMs: number): void { + if (!this.started) { + return; + } + if (this.scanTimer) { + clearTimeout(this.scanTimer); + } + this.scanTimer = setTimeout(() => { + this.scanTimer = null; + void this.runScan(); + }, delayMs); + } + + private scheduleNudgedScan(): void { + if (!this.started || this.nudgeTimer) { + return; + } + this.nudgeTimer = setTimeout(() => { + this.nudgeTimer = null; + void this.runScan(); + }, 5_000); + } + + private async runScan(): Promise { + if (!this.started || this.scanInFlight) { + return; + } + this.scanInFlight = true; + try { + const activeTeams = await this.registry.listActiveTeams(); + const activeSet = new Set(activeTeams); + for (const teamName of [...this.observationByTeam.keys()]) { + if (!activeSet.has(teamName)) { + this.observationByTeam.delete(teamName); + } + } + + const now = new Date(); + for (const teamName of activeTeams) { + const observation = this.getOrCreateObservation(teamName, now.getTime()); + const startupAgeMs = now.getTime() - observation.firstSeenAtMs; + if (startupAgeMs < getTeamTaskStallStartupGraceMs()) { + continue; + } + + const activationAgeMs = now.getTime() - observation.lastActivationAtMs; + if (activationAgeMs < getTeamTaskStallActivationGraceMs()) { + continue; + } + + await this.scanTeam(teamName, now); + } + } catch (error) { + logger.warn(`Task stall monitor scan failed: ${String(error)}`); + } finally { + this.scanInFlight = false; + this.scheduleNextScan(getTeamTaskStallScanIntervalMs()); + } + } + + private getOrCreateObservation(teamName: string, nowMs: number): TeamObservationState { + const existing = this.observationByTeam.get(teamName); + if (existing) { + return existing; + } + const created = { + firstSeenAtMs: nowMs, + lastActivationAtMs: nowMs, + }; + this.observationByTeam.set(teamName, created); + return created; + } + + private async scanTeam(teamName: string, now: Date): Promise { + const snapshot = await this.snapshotSource.getSnapshot(teamName); + if (!snapshot) { + return; + } + + const evaluations: TaskStallEvaluation[] = []; + for (const task of snapshot.inProgressTasks) { + evaluations.push(this.policy.evaluateWork({ now, task, snapshot })); + } + for (const task of snapshot.reviewOpenTasks) { + evaluations.push(this.policy.evaluateReview({ now, task, snapshot })); + } + + const activeTaskIds = [ + ...new Set([...snapshot.inProgressTasks, ...snapshot.reviewOpenTasks].map((task) => task.id)), + ]; + const readyEvaluations = await this.journal.reconcileScan({ + teamName, + evaluations, + activeTaskIds, + now: now.toISOString(), + }); + + const alerts = readyEvaluations + .map((evaluation) => this.buildAlert(snapshot, evaluation)) + .filter((alert): alert is TaskStallAlert => alert !== null); + + if (alerts.length === 0) { + return; + } + + if (!isTeamTaskStallAlertsEnabled()) { + logger.debug(`Task stall monitor shadow-ready alerts for ${teamName}: ${alerts.length}`); + return; + } + + await this.notifier.notifyLead(teamName, alerts); + await Promise.all( + alerts.map((alert) => this.journal.markAlerted(teamName, alert.epochKey, now.toISOString())) + ); + } + + private buildAlert( + snapshot: Awaited>, + evaluation: TaskStallEvaluation + ): TaskStallAlert | null { + if ( + !snapshot || + evaluation.status !== 'alert' || + !evaluation.taskId || + !evaluation.branch || + !evaluation.signal || + !evaluation.epochKey + ) { + return null; + } + + const task = snapshot.allTasksById.get(evaluation.taskId); + if (!task) { + return null; + } + + const displayId = getTaskDisplayId(task); + return { + teamName: snapshot.teamName, + taskId: task.id, + displayId, + subject: task.subject, + branch: evaluation.branch, + signal: evaluation.signal, + reason: evaluation.reason, + epochKey: evaluation.epochKey, + taskRef: { + taskId: task.id, + displayId, + teamName: snapshot.teamName, + }, + }; + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallNotifier.ts b/src/main/services/team/stallMonitor/TeamTaskStallNotifier.ts new file mode 100644 index 00000000..0f00b766 --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallNotifier.ts @@ -0,0 +1,32 @@ +import { formatTaskDisplayLabel } from '@shared/utils/taskIdentity'; + +import type { TaskStallAlert } from './TeamTaskStallTypes'; +import type { TeamDataService } from '../TeamDataService'; + +function buildLeadAlertText(alerts: TaskStallAlert[]): string { + return alerts + .map( + (alert) => + `- ${formatTaskDisplayLabel({ id: alert.taskId, displayId: alert.displayId })} [${alert.branch}] ${alert.subject} - ${alert.reason}` + ) + .join('\n'); +} + +export class TeamTaskStallNotifier { + constructor( + private readonly teamDataService: Pick + ) {} + + async notifyLead(teamName: string, alerts: TaskStallAlert[]): Promise { + if (alerts.length === 0) { + return; + } + + await this.teamDataService.sendSystemNotificationToLead({ + teamName, + summary: 'Potential stalled tasks detected', + text: buildLeadAlertText(alerts), + taskRefs: alerts.map((alert) => alert.taskRef), + }); + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallPolicy.ts b/src/main/services/team/stallMonitor/TeamTaskStallPolicy.ts new file mode 100644 index 00000000..1d339dec --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallPolicy.ts @@ -0,0 +1,508 @@ +import type { + ReviewTaskContext, + TaskStallBranch, + TaskStallEvaluation, + TaskStallSignal, + TeamTaskStallExactRow, + TeamTaskStallSnapshot, + WorkTaskContext, +} from './TeamTaskStallTypes'; +import type { BoardTaskActivityRecord } from '../taskLogs/activity/BoardTaskActivityRecord'; +import type { TeamTask, TaskWorkInterval, TaskHistoryEvent } from '@shared/types'; + +const WORK_TOUCH_TOOLS = new Set(['task_start', 'task_add_comment', 'task_set_status']); +const REVIEW_TOUCH_TOOLS = new Set(['review_start', 'task_add_comment']); + +const ONE_MINUTE_MS = 60_000; +const WORK_THRESHOLDS_MS: Record = { + turn_ended_after_touch: 8 * ONE_MINUTE_MS, + touch_then_other_turns: 10 * ONE_MINUTE_MS, + mid_turn_after_touch: 20 * ONE_MINUTE_MS, +}; +const REVIEW_THRESHOLDS_MS: Record = { + turn_ended_after_touch: 10 * ONE_MINUTE_MS, + touch_then_other_turns: 10 * ONE_MINUTE_MS, + mid_turn_after_touch: 25 * ONE_MINUTE_MS, +}; + +function skip( + taskId: string, + reason: string, + skipReason: TaskStallEvaluation['skipReason'] +): TaskStallEvaluation { + return { + status: 'skip', + taskId, + reason, + skipReason, + }; +} + +function isAfterOrEqual(timestamp: string, lowerBound: string): boolean { + return Date.parse(timestamp) >= Date.parse(lowerBound); +} + +function getOpenWorkInterval(task: TeamTask): TaskWorkInterval | null { + const intervals = task.workIntervals ?? []; + for (let i = intervals.length - 1; i >= 0; i -= 1) { + const interval = intervals[i]; + if (!interval.completedAt) { + return interval; + } + } + return null; +} + +function getOpenReviewWindowStart(task: TeamTask): string | null { + if (task.reviewState !== 'review' || !task.historyEvents?.length) { + return null; + } + + for (let i = task.historyEvents.length - 1; i >= 0; i -= 1) { + const event = task.historyEvents[i]; + if (event.type === 'review_started') { + return event.timestamp; + } + if ( + event.type === 'review_approved' || + event.type === 'review_changes_requested' || + (event.type === 'status_changed' && event.to === 'in_progress') + ) { + return null; + } + } + return null; +} + +function hasReviewStartedByReviewer( + historyEvents: TaskHistoryEvent[] | undefined, + reviewer: string, + windowStartedAt: string +): boolean { + if (!historyEvents?.length) { + return false; + } + + return historyEvents.some( + (event) => + event.type === 'review_started' && + event.actor === reviewer && + isAfterOrEqual(event.timestamp, windowStartedAt) + ); +} + +function isStrongReviewTouch( + record: BoardTaskActivityRecord, + reviewer: string, + hasExplicitStartedReview: boolean, + windowStartedAt: string +): boolean { + if ( + record.actor.memberName !== reviewer || + !record.action?.canonicalToolName || + !REVIEW_TOUCH_TOOLS.has(record.action.canonicalToolName) || + !isAfterOrEqual(record.timestamp, windowStartedAt) + ) { + return false; + } + + if (record.action.canonicalToolName === 'review_start') { + return true; + } + + if ( + record.actorContext.relation === 'same_task' && + record.actorContext.activePhase === 'review' + ) { + return true; + } + + return hasExplicitStartedReview; +} + +function findLastMeaningfulWorkTouch( + records: BoardTaskActivityRecord[], + owner: string, + intervalStartedAt: string +): BoardTaskActivityRecord | null { + return ( + [...records] + .filter((record) => record.actor.memberName === owner) + .filter((record) => isAfterOrEqual(record.timestamp, intervalStartedAt)) + .filter((record) => WORK_TOUCH_TOOLS.has(record.action?.canonicalToolName ?? '')) + .at(-1) ?? null + ); +} + +function findLastMeaningfulReviewTouch( + records: BoardTaskActivityRecord[], + reviewer: string, + windowStartedAt: string, + hasExplicitStartedReview: boolean +): BoardTaskActivityRecord | null { + return ( + [...records] + .filter((record) => + isStrongReviewTouch(record, reviewer, hasExplicitStartedReview, windowStartedAt) + ) + .at(-1) ?? null + ); +} + +function anchorEvidenceRank(row: TeamTaskStallExactRow, toolUseId: string | undefined): number { + if (!toolUseId || row.parsedMessage.type !== 'assistant') { + return 0; + } + if (row.toolUseIds.includes(toolUseId)) { + return 2; + } + if (row.sourceToolUseId === toolUseId || row.toolResultIds.includes(toolUseId)) { + return 1; + } + return 0; +} + +function deduplicateAssistantRowsByRequestId( + rows: TeamTaskStallExactRow[], + toolUseId: string | undefined +): TeamTaskStallExactRow[] { + const preferredIndexByRequestId = new Map(); + for (let i = 0; i < rows.length; i += 1) { + const row = rows[i]; + if (row.parsedMessage.type !== 'assistant' || !row.requestId) { + continue; + } + const existingIndex = preferredIndexByRequestId.get(row.requestId); + if (existingIndex === undefined) { + preferredIndexByRequestId.set(row.requestId, i); + continue; + } + const existingRank = anchorEvidenceRank(rows[existingIndex], toolUseId); + const nextRank = anchorEvidenceRank(row, toolUseId); + if (nextRank > existingRank || (nextRank === existingRank && i > existingIndex)) { + preferredIndexByRequestId.set(row.requestId, i); + } + } + + if (preferredIndexByRequestId.size === 0) { + return rows; + } + + return rows.filter((row, index) => { + if (row.parsedMessage.type !== 'assistant' || !row.requestId) { + return true; + } + return preferredIndexByRequestId.get(row.requestId) === index; + }); +} + +function findAnchorRowIndex( + rows: TeamTaskStallExactRow[], + messageUuid: string, + toolUseId?: string +): number { + const candidates = rows + .map((row, index) => ({ row, index })) + .filter(({ row }) => row.messageUuid === messageUuid); + if (candidates.length === 0) { + return -1; + } + + if (toolUseId) { + const explicitToolUse = candidates.filter(({ row }) => row.toolUseIds.includes(toolUseId)); + if (explicitToolUse.length > 0) { + return explicitToolUse.at(-1)!.index; + } + + const linkedRows = candidates.filter( + ({ row }) => row.sourceToolUseId === toolUseId || row.toolResultIds.includes(toolUseId) + ); + if (linkedRows.length > 0) { + return linkedRows.at(-1)!.index; + } + } + + return candidates.at(-1)!.index; +} + +function classifyPostTouchState(args: { + rows: TeamTaskStallExactRow[]; + anchorMessageUuid: string; + anchorToolUseId?: string; +}): TaskStallSignal | 'ambiguous' { + const normalizedRows = deduplicateAssistantRowsByRequestId(args.rows, args.anchorToolUseId); + const anchorIndex = findAnchorRowIndex( + normalizedRows, + args.anchorMessageUuid, + args.anchorToolUseId + ); + if (anchorIndex < 0) { + return 'ambiguous'; + } + + let sawTurnEnd = false; + let sawLaterRows = false; + + for (let i = anchorIndex + 1; i < normalizedRows.length; i += 1) { + const row = normalizedRows[i]; + if (row.systemSubtype === 'turn_duration') { + sawTurnEnd = true; + continue; + } + + sawLaterRows = true; + if (sawTurnEnd) { + return 'touch_then_other_turns'; + } + } + + if (sawTurnEnd) { + return 'turn_ended_after_touch'; + } + if (sawLaterRows) { + return 'mid_turn_after_touch'; + } + return 'mid_turn_after_touch'; +} + +function buildEpochKey( + task: TeamTask, + branch: TaskStallBranch, + signal: TaskStallSignal, + touch: BoardTaskActivityRecord +): string { + return [ + task.id, + branch, + signal, + touch.timestamp, + touch.source.filePath, + touch.source.messageUuid, + touch.source.toolUseId ?? 'ambient', + ].join(':'); +} + +function buildAlertEvaluation(args: { + task: TeamTask; + branch: TaskStallBranch; + signal: TaskStallSignal; + touch: BoardTaskActivityRecord; + reason: string; +}): TaskStallEvaluation { + return { + status: 'alert', + taskId: args.task.id, + branch: args.branch, + signal: args.signal, + epochKey: buildEpochKey(args.task, args.branch, args.signal, args.touch), + reason: args.reason, + }; +} + +export class TeamTaskStallPolicy { + evaluateWork(args: { + now: Date; + task: TeamTask; + snapshot: TeamTaskStallSnapshot; + }): TaskStallEvaluation { + const { task, snapshot } = args; + + if (!snapshot.activityReadsEnabled) { + return skip(task.id, 'Task activity reads are disabled', 'activity_reads_disabled'); + } + if (!snapshot.exactReadsEnabled) { + return skip(task.id, 'Exact log reads are disabled', 'exact_reads_disabled'); + } + if (task.status !== 'in_progress') { + return skip(task.id, 'Task is not in progress', 'task_not_in_progress'); + } + if (!task.owner) { + return skip(task.id, 'Task has no owner', 'owner_missing'); + } + if (task.owner === snapshot.leadName) { + return skip(task.id, 'Task owner is the lead', 'owner_is_lead'); + } + if (task.reviewState === 'review') { + return skip(task.id, 'Task is currently under review', 'review_active'); + } + if (task.blockedBy?.length) { + return skip(task.id, 'Task is blocked', 'task_blocked'); + } + if (task.needsClarification) { + return skip(task.id, 'Task is waiting for clarification', 'needs_clarification'); + } + + const openWorkInterval = getOpenWorkInterval(task); + if (!openWorkInterval?.startedAt) { + return skip(task.id, 'Task has no open work interval', 'no_open_work_interval'); + } + + const records = snapshot.recordsByTaskId.get(task.id) ?? []; + if (records.length === 0 && !snapshot.freshnessByTaskId.has(task.id)) { + return skip( + task.id, + 'Task run is not instrumented enough for stall evaluation', + 'non_instrumented_run' + ); + } + + const workContext: WorkTaskContext | null = (() => { + const touch = findLastMeaningfulWorkTouch(records, task.owner!, openWorkInterval.startedAt); + if (!touch) { + return null; + } + return { + owner: task.owner!, + intervalStartedAt: openWorkInterval.startedAt, + lastMeaningfulTouch: touch, + lastMeaningfulTouchAt: touch.timestamp, + }; + })(); + + if (!workContext) { + return skip( + task.id, + 'No positive work touch found in current work interval', + 'no_positive_touch' + ); + } + + const exactRows = snapshot.exactRowsByFilePath.get( + workContext.lastMeaningfulTouch.source.filePath + ); + if (!exactRows?.length) { + return skip(task.id, 'Post-touch exact rows are unavailable', 'ambiguous_state'); + } + + const signal = classifyPostTouchState({ + rows: exactRows, + anchorMessageUuid: workContext.lastMeaningfulTouch.source.messageUuid, + anchorToolUseId: workContext.lastMeaningfulTouch.source.toolUseId, + }); + if (signal === 'ambiguous') { + return skip(task.id, 'Post-touch state is ambiguous', 'ambiguous_state'); + } + + const elapsedMs = args.now.getTime() - Date.parse(workContext.lastMeaningfulTouchAt); + const thresholdMs = WORK_THRESHOLDS_MS[signal]; + if (elapsedMs < thresholdMs) { + return skip( + task.id, + 'Work touch is still below the configured stall threshold', + 'below_threshold' + ); + } + + return buildAlertEvaluation({ + task, + branch: 'work', + signal, + touch: workContext.lastMeaningfulTouch, + reason: `Potential work stall after ${signal.replaceAll('_', ' ')}.`, + }); + } + + evaluateReview(args: { + now: Date; + task: TeamTask; + snapshot: TeamTaskStallSnapshot; + }): TaskStallEvaluation { + const { task, snapshot } = args; + + if (!snapshot.activityReadsEnabled) { + return skip(task.id, 'Task activity reads are disabled', 'activity_reads_disabled'); + } + if (!snapshot.exactReadsEnabled) { + return skip(task.id, 'Exact log reads are disabled', 'exact_reads_disabled'); + } + if (task.reviewState !== 'review') { + return skip(task.id, 'Task is not in an open review window', 'review_terminal'); + } + if (task.needsClarification) { + return skip(task.id, 'Task is waiting for clarification', 'needs_clarification'); + } + + const reviewWindowStartedAt = getOpenReviewWindowStart(task); + if (!reviewWindowStartedAt) { + return skip(task.id, 'Task has no open review window', 'no_open_review_window'); + } + + const resolvedReviewer = snapshot.resolvedReviewersByTaskId.get(task.id) ?? { + reviewer: null, + source: 'none', + }; + if (!resolvedReviewer.reviewer) { + return skip(task.id, 'Reviewer could not be resolved safely', 'reviewer_unresolved'); + } + + const records = snapshot.recordsByTaskId.get(task.id) ?? []; + if (records.length === 0 && !snapshot.freshnessByTaskId.has(task.id)) { + return skip( + task.id, + 'Review run is not instrumented enough for stall evaluation', + 'non_instrumented_run' + ); + } + + const explicitReviewStarted = hasReviewStartedByReviewer( + task.historyEvents, + resolvedReviewer.reviewer, + reviewWindowStartedAt + ); + const reviewContext: ReviewTaskContext | null = (() => { + const touch = findLastMeaningfulReviewTouch( + records, + resolvedReviewer.reviewer!, + reviewWindowStartedAt, + explicitReviewStarted + ); + if (!touch) { + return null; + } + return { + resolvedReviewer, + reviewWindowStartedAt, + lastMeaningfulTouch: touch, + lastMeaningfulTouchAt: touch.timestamp, + }; + })(); + + if (!reviewContext) { + return skip(task.id, 'No explicit started-review evidence was found', 'no_positive_touch'); + } + + const exactRows = snapshot.exactRowsByFilePath.get( + reviewContext.lastMeaningfulTouch.source.filePath + ); + if (!exactRows?.length) { + return skip(task.id, 'Post-review exact rows are unavailable', 'ambiguous_state'); + } + + const signal = classifyPostTouchState({ + rows: exactRows, + anchorMessageUuid: reviewContext.lastMeaningfulTouch.source.messageUuid, + anchorToolUseId: reviewContext.lastMeaningfulTouch.source.toolUseId, + }); + if (signal === 'ambiguous') { + return skip(task.id, 'Post-review state is ambiguous', 'ambiguous_state'); + } + + const elapsedMs = args.now.getTime() - Date.parse(reviewContext.lastMeaningfulTouchAt); + const thresholdMs = REVIEW_THRESHOLDS_MS[signal]; + if (elapsedMs < thresholdMs) { + return skip( + task.id, + 'Review touch is still below the configured stall threshold', + 'below_threshold' + ); + } + + return buildAlertEvaluation({ + task, + branch: 'review', + signal, + touch: reviewContext.lastMeaningfulTouch, + reason: `Potential started-review stall after ${signal.replaceAll('_', ' ')}.`, + }); + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.ts b/src/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.ts new file mode 100644 index 00000000..b6118f28 --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.ts @@ -0,0 +1,119 @@ +import { TeamTaskReader } from '../TeamTaskReader'; +import { TeamKanbanManager } from '../TeamKanbanManager'; +import { TeamTranscriptSourceLocator } from '../taskLogs/discovery/TeamTranscriptSourceLocator'; +import { BoardTaskActivityTranscriptReader } from '../taskLogs/activity/BoardTaskActivityTranscriptReader'; +import { isBoardTaskActivityReadEnabled } from '../taskLogs/activity/featureGates'; +import { isBoardTaskExactLogsReadEnabled } from '../taskLogs/exact/featureGates'; + +import { BoardTaskActivityBatchIndexer } from './BoardTaskActivityBatchIndexer'; +import { TeamTaskLogFreshnessReader } from './TeamTaskLogFreshnessReader'; +import { TeamTaskStallExactRowReader } from './TeamTaskStallExactRowReader'; +import { buildResolvedReviewerIndex } from './reviewerResolution'; + +import type { BoardTaskActivityRecord } from '../taskLogs/activity/BoardTaskActivityRecord'; +import type { TeamTaskStallSnapshot } from './TeamTaskStallTypes'; +import type { TeamConfig, TeamTask } from '@shared/types'; + +function resolveLeadNameFromConfig(config: TeamConfig): string { + const lead = config.members?.find((member) => member.role?.toLowerCase().includes('lead')); + return lead?.name ?? config.members?.[0]?.name ?? 'team-lead'; +} + +export class TeamTaskStallSnapshotSource { + constructor( + private readonly transcriptSourceLocator: TeamTranscriptSourceLocator = new TeamTranscriptSourceLocator(), + private readonly taskReader: TeamTaskReader = new TeamTaskReader(), + private readonly kanbanManager: TeamKanbanManager = new TeamKanbanManager(), + private readonly transcriptReader: BoardTaskActivityTranscriptReader = new BoardTaskActivityTranscriptReader(), + private readonly activityBatchIndexer: BoardTaskActivityBatchIndexer = new BoardTaskActivityBatchIndexer(), + private readonly freshnessReader: TeamTaskLogFreshnessReader = new TeamTaskLogFreshnessReader(), + private readonly exactRowReader: TeamTaskStallExactRowReader = new TeamTaskStallExactRowReader() + ) {} + + async getSnapshot(teamName: string): Promise { + const transcriptContext = await this.transcriptSourceLocator.getContext(teamName); + if (!transcriptContext) { + return null; + } + + const [activeTasks, deletedTasks, kanbanState] = await Promise.all([ + this.taskReader.getTasks(teamName), + this.taskReader.getDeletedTasks(teamName), + this.kanbanManager.getState(teamName), + ]); + const allTasks = [...activeTasks, ...deletedTasks]; + const allTasksById = new Map(allTasks.map((task) => [task.id, task] as const)); + const inProgressTasks = activeTasks.filter( + (task) => task.status === 'in_progress' && task.reviewState !== 'review' + ); + const reviewOpenTasks = activeTasks.filter((task) => task.reviewState === 'review'); + const resolvedReviewersByTaskId = buildResolvedReviewerIndex(activeTasks, kanbanState); + const activityReadsEnabled = isBoardTaskActivityReadEnabled(); + const exactReadsEnabled = isBoardTaskExactLogsReadEnabled(); + + let recordsByTaskId = new Map(); + if ( + activityReadsEnabled && + allTasks.length > 0 && + transcriptContext.transcriptFiles.length > 0 + ) { + const messages = await this.transcriptReader.readFiles(transcriptContext.transcriptFiles); + recordsByTaskId = this.activityBatchIndexer.buildIndex({ + teamName, + tasks: allTasks, + messages, + }); + } + + const relevantMonitorTasks = [...inProgressTasks, ...reviewOpenTasks]; + const relevantExactFiles = this.collectRelevantExactFiles( + relevantMonitorTasks, + recordsByTaskId + ); + const [freshnessByTaskId, exactRowsByFilePath] = await Promise.all([ + this.freshnessReader.readSignals( + transcriptContext.projectDir, + relevantMonitorTasks.map((task) => task.id) + ), + exactReadsEnabled + ? this.exactRowReader.parseFiles(relevantExactFiles) + : Promise.resolve(new Map()), + ]); + + return { + teamName, + scannedAt: new Date().toISOString(), + projectDir: transcriptContext.projectDir, + projectId: transcriptContext.projectId, + leadName: resolveLeadNameFromConfig(transcriptContext.config), + transcriptFiles: transcriptContext.transcriptFiles, + activityReadsEnabled, + exactReadsEnabled, + activeTasks, + deletedTasks, + allTasksById, + inProgressTasks, + reviewOpenTasks, + resolvedReviewersByTaskId, + recordsByTaskId, + freshnessByTaskId, + exactRowsByFilePath, + }; + } + + private collectRelevantExactFiles( + inProgressTasks: TeamTask[], + recordsByTaskId: Map + ): string[] { + const filePaths = new Set(); + + for (const task of inProgressTasks) { + const records = recordsByTaskId.get(task.id) ?? []; + for (const record of records) { + filePaths.add(record.source.filePath); + } + } + + return [...filePaths].sort((left, right) => left.localeCompare(right)); + } +} diff --git a/src/main/services/team/stallMonitor/TeamTaskStallTypes.ts b/src/main/services/team/stallMonitor/TeamTaskStallTypes.ts new file mode 100644 index 00000000..46550e05 --- /dev/null +++ b/src/main/services/team/stallMonitor/TeamTaskStallTypes.ts @@ -0,0 +1,139 @@ +import type { BoardTaskActivityRecord } from '../taskLogs/activity/BoardTaskActivityRecord'; +import type { ParsedMessage } from '@main/types'; +import type { TeamTask } from '@shared/types'; + +export type TaskStallBranch = 'work' | 'review'; + +export type TaskStallSignal = + | 'turn_ended_after_touch' + | 'mid_turn_after_touch' + | 'touch_then_other_turns'; + +export type TaskStallEvaluationStatus = 'skip' | 'suspected' | 'alert'; + +export type TaskStallSkipReason = + | 'task_not_in_progress' + | 'owner_missing' + | 'owner_is_lead' + | 'task_blocked' + | 'needs_clarification' + | 'review_active' + | 'review_terminal' + | 'reviewer_unresolved' + | 'non_instrumented_run' + | 'activity_reads_disabled' + | 'exact_reads_disabled' + | 'no_positive_touch' + | 'no_open_work_interval' + | 'no_open_review_window' + | 'ambiguous_state' + | 'below_threshold' + | 'first_scan_only'; + +export type ResolvedReviewerSource = + | 'kanban_state' + | 'history_review_approved_actor' + | 'history_review_started_actor' + | 'history_review_requested_reviewer' + | 'none'; + +export interface ResolvedReviewer { + reviewer: string | null; + source: ResolvedReviewerSource; +} + +export interface TaskStallEvaluation { + status: TaskStallEvaluationStatus; + taskId?: string; + branch?: TaskStallBranch; + signal?: TaskStallSignal; + epochKey?: string; + reason: string; + skipReason?: TaskStallSkipReason; +} + +export interface TaskLogFreshnessSignal { + taskId: string; + updatedAt: string; + filePath: string; + transcriptFileBasename?: string; +} + +export interface TeamTaskStallExactRow { + filePath: string; + sourceOrder: number; + messageUuid: string; + timestamp: string; + parsedMessage: ParsedMessage; + requestId?: string; + sourceToolUseId?: string; + sourceToolAssistantUuid?: string; + systemSubtype?: 'turn_duration' | 'init'; + toolUseIds: string[]; + toolResultIds: string[]; +} + +export interface TeamTaskStallSnapshot { + teamName: string; + scannedAt: string; + projectDir: string; + projectId: string; + leadName: string; + transcriptFiles: string[]; + activityReadsEnabled: boolean; + exactReadsEnabled: boolean; + activeTasks: TeamTask[]; + deletedTasks: TeamTask[]; + allTasksById: Map; + inProgressTasks: TeamTask[]; + reviewOpenTasks: TeamTask[]; + resolvedReviewersByTaskId: Map; + recordsByTaskId: Map; + freshnessByTaskId: Map; + exactRowsByFilePath: Map; +} + +export interface WorkTaskContext { + owner: string; + intervalStartedAt: string; + lastMeaningfulTouch: BoardTaskActivityRecord; + lastMeaningfulTouchAt: string; +} + +export interface ReviewTaskContext { + resolvedReviewer: ResolvedReviewer; + reviewWindowStartedAt: string; + lastMeaningfulTouch: BoardTaskActivityRecord; + lastMeaningfulTouchAt: string; +} + +export interface TaskStallAlert { + teamName: string; + taskId: string; + displayId: string; + subject: string; + branch: TaskStallBranch; + signal: TaskStallSignal; + reason: string; + epochKey: string; + taskRef: { + taskId: string; + displayId: string; + teamName: string; + }; +} + +export type TaskStallJournalState = 'suspected' | 'alert_ready' | 'alerted'; + +export interface TaskStallJournalEntry { + epochKey: string; + teamName: string; + taskId: string; + branch: TaskStallBranch; + signal: TaskStallSignal; + state: TaskStallJournalState; + consecutiveScans: number; + createdAt: string; + updatedAt: string; + alertedAt?: string; +} diff --git a/src/main/services/team/stallMonitor/featureGates.ts b/src/main/services/team/stallMonitor/featureGates.ts new file mode 100644 index 00000000..f9c24682 --- /dev/null +++ b/src/main/services/team/stallMonitor/featureGates.ts @@ -0,0 +1,42 @@ +function readEnabledFlag(value: string | undefined, defaultValue: boolean): boolean { + if (value == null) { + return defaultValue; + } + + const normalized = value.trim().toLowerCase(); + if (normalized === '0' || normalized === 'false' || normalized === 'off' || normalized === 'no') { + return false; + } + if (normalized === '1' || normalized === 'true' || normalized === 'on' || normalized === 'yes') { + return true; + } + return defaultValue; +} + +function readInt(value: string | undefined, defaultValue: number): number { + if (value == null) { + return defaultValue; + } + const parsed = Number.parseInt(value.trim(), 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : defaultValue; +} + +export function isTeamTaskStallMonitorEnabled(): boolean { + return readEnabledFlag(process.env.CLAUDE_TEAM_TASK_STALL_MONITOR_ENABLED, false); +} + +export function isTeamTaskStallAlertsEnabled(): boolean { + return readEnabledFlag(process.env.CLAUDE_TEAM_TASK_STALL_ALERTS_ENABLED, false); +} + +export function getTeamTaskStallScanIntervalMs(): number { + return readInt(process.env.CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS, 60_000); +} + +export function getTeamTaskStallStartupGraceMs(): number { + return readInt(process.env.CLAUDE_TEAM_TASK_STALL_STARTUP_GRACE_MS, 180_000); +} + +export function getTeamTaskStallActivationGraceMs(): number { + return readInt(process.env.CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS, 120_000); +} diff --git a/src/main/services/team/stallMonitor/reviewerResolution.ts b/src/main/services/team/stallMonitor/reviewerResolution.ts new file mode 100644 index 00000000..962f4f84 --- /dev/null +++ b/src/main/services/team/stallMonitor/reviewerResolution.ts @@ -0,0 +1,47 @@ +import { TeamKanbanManager } from '../TeamKanbanManager'; + +import type { ResolvedReviewer } from './TeamTaskStallTypes'; +import type { TeamTask } from '@shared/types'; + +export function resolveReviewerFromHistory(task: TeamTask): ResolvedReviewer { + if (!task.historyEvents?.length) { + return { reviewer: null, source: 'none' }; + } + + for (let i = task.historyEvents.length - 1; i >= 0; i -= 1) { + const event = task.historyEvents[i]; + if (event.type === 'review_approved' && event.actor) { + return { reviewer: event.actor, source: 'history_review_approved_actor' }; + } + if (event.type === 'review_started' && event.actor) { + return { reviewer: event.actor, source: 'history_review_started_actor' }; + } + if (event.type === 'review_requested' && event.reviewer) { + return { reviewer: event.reviewer, source: 'history_review_requested_reviewer' }; + } + } + + return { reviewer: null, source: 'none' }; +} + +export function buildResolvedReviewerIndex( + tasks: TeamTask[], + kanbanState: Awaited> +): Map { + const resolved = new Map(); + + for (const task of tasks) { + const kanbanReviewer = kanbanState.tasks[task.id]?.reviewer; + if (typeof kanbanReviewer === 'string' && kanbanReviewer.trim().length > 0) { + resolved.set(task.id, { + reviewer: kanbanReviewer.trim(), + source: 'kanban_state', + }); + continue; + } + + resolved.set(task.id, resolveReviewerFromHistory(task)); + } + + return resolved; +} diff --git a/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordBuilder.ts b/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordBuilder.ts index fc58f657..01d780a3 100644 --- a/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordBuilder.ts +++ b/src/main/services/team/taskLogs/activity/BoardTaskActivityRecordBuilder.ts @@ -312,6 +312,21 @@ function compareRecords(left: BoardTaskActivityRecord, right: BoardTaskActivityR return left.id.localeCompare(right.id); } +function resolveCandidateTaskIds(locator: BoardTaskLocator, lookup: TaskLookup): string[] { + const canonicalTask = + (locator.canonicalId && lookup.byId.get(locator.canonicalId)) || + (locator.refKind === 'canonical' ? lookup.byId.get(locator.ref) : undefined) || + (locator.refKind === 'unknown' && looksLikeCanonicalTaskId(locator.ref) + ? lookup.byId.get(locator.ref) + : undefined); + if (canonicalTask) { + return [canonicalTask.id]; + } + + const displayCandidates = lookup.byDisplayId.get(normalizeDisplayRef(locator.ref)) ?? []; + return [...new Set(displayCandidates.map((task) => task.id))]; +} + export class BoardTaskActivityRecordBuilder { buildForTask(args: { teamName: string; @@ -319,64 +334,98 @@ export class BoardTaskActivityRecordBuilder { tasks: TeamTask[]; messages: RawTaskActivityMessage[]; }): BoardTaskActivityRecord[] { + return ( + this.buildForTasks({ + teamName: args.teamName, + tasks: args.tasks, + messages: args.messages, + }).get(args.targetTask.id) ?? [] + ); + } + + buildForTasks(args: { + teamName: string; + tasks: TeamTask[]; + messages: RawTaskActivityMessage[]; + }): Map { const lookup = buildTaskLookup(args.tasks); - const records: BoardTaskActivityRecord[] = []; - const seenIds = new Set(); + const recordsByTaskId = new Map(); + const seenIdsByTaskId = new Map>(); for (const message of args.messages) { const actionMap = buildActionMap(message.boardTaskToolActions); for (const link of message.boardTaskLinks) { const resolvedTask = resolveLocatorToTaskRef(args.teamName, link.task, lookup); - if ( - resolvedTask.taskRef?.taskId !== args.targetTask.id && - !locatorCouldMatchTask(link.task, args.targetTask, lookup) - ) { + const candidateTaskIds = resolveCandidateTaskIds(link.task, lookup); + if (candidateTaskIds.length === 0) { continue; } - const action = link.linkKind === 'execution' || !link.toolUseId ? undefined : actionMap.get(link.toolUseId); - const peerTask = resolvePeerTask( - args.teamName, - link, - message.boardTaskLinks, - args.targetTask, - lookup - ); - const record: BoardTaskActivityRecord = { - id: [ - message.uuid, - link.toolUseId ?? 'ambient', - link.task.ref, - link.targetRole, - link.linkKind, - ].join(':'), - timestamp: message.timestamp, - task: resolvedTask, - linkKind: link.linkKind, - targetRole: link.targetRole, - actor: resolveActivityActor(message), - actorContext: buildActorContext(args.teamName, link.actorContext, lookup), - ...(action ? { action: buildAction({ action, link, peerTask }) } : {}), - source: { - messageUuid: message.uuid, - filePath: message.filePath, - ...(link.toolUseId ? { toolUseId: link.toolUseId } : {}), - sourceOrder: message.sourceOrder, - }, - }; - if (seenIds.has(record.id)) { - continue; + for (const taskId of candidateTaskIds) { + const targetTask = lookup.byId.get(taskId); + if (!targetTask) { + continue; + } + if ( + resolvedTask.taskRef?.taskId !== targetTask.id && + !locatorCouldMatchTask(link.task, targetTask, lookup) + ) { + continue; + } + + const peerTask = resolvePeerTask( + args.teamName, + link, + message.boardTaskLinks, + targetTask, + lookup + ); + const record: BoardTaskActivityRecord = { + id: [ + message.uuid, + link.toolUseId ?? 'ambient', + link.task.ref, + link.targetRole, + link.linkKind, + ].join(':'), + timestamp: message.timestamp, + task: resolvedTask, + linkKind: link.linkKind, + targetRole: link.targetRole, + actor: resolveActivityActor(message), + actorContext: buildActorContext(args.teamName, link.actorContext, lookup), + ...(action ? { action: buildAction({ action, link, peerTask }) } : {}), + source: { + messageUuid: message.uuid, + filePath: message.filePath, + ...(link.toolUseId ? { toolUseId: link.toolUseId } : {}), + sourceOrder: message.sourceOrder, + }, + }; + + const seenIds = seenIdsByTaskId.get(taskId) ?? new Set(); + if (seenIds.has(record.id)) { + continue; + } + seenIds.add(record.id); + seenIdsByTaskId.set(taskId, seenIds); + + const taskRecords = recordsByTaskId.get(taskId) ?? []; + taskRecords.push(record); + recordsByTaskId.set(taskId, taskRecords); } - seenIds.add(record.id); - records.push(record); } } - return records.sort(compareRecords); + for (const [taskId, records] of recordsByTaskId) { + recordsByTaskId.set(taskId, records.sort(compareRecords)); + } + + return recordsByTaskId; } } diff --git a/test/main/services/team/TeamDataService.stallMonitor.test.ts b/test/main/services/team/TeamDataService.stallMonitor.test.ts new file mode 100644 index 00000000..6105e27b --- /dev/null +++ b/test/main/services/team/TeamDataService.stallMonitor.test.ts @@ -0,0 +1,133 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { TeamDataService } from '../../../../src/main/services/team/TeamDataService'; + +import type { SendMessageResult, TaskRef, TeamSummary } from '../../../../src/shared/types'; + +function createService(configReaderOverrides: Record = {}): TeamDataService { + return new TeamDataService( + { + getConfig: vi.fn(async () => null), + listTeams: vi.fn(async () => []), + ...configReaderOverrides, + } as never, + { getTasks: vi.fn(async () => []) } as never, + { listInboxNames: vi.fn(async () => []), getMessages: vi.fn(async () => []) } as never, + {} as never, + {} as never, + { resolveMembers: vi.fn(() => []) } as never, + { getState: vi.fn(async () => ({ teamName: 'demo', reviewers: [], tasks: {} })) } as never, + {} as never, + { getMembers: vi.fn(async () => []), writeMembers: vi.fn(async () => {}) } as never, + { readMessages: vi.fn(async () => []) } as never + ); +} + +describe('TeamDataService stall-monitor helpers', () => { + it('lists alive process teams using non-stopped processes and ignores per-team read errors', async () => { + const teams: TeamSummary[] = [ + { + teamName: 'beta', + displayName: 'beta', + description: '', + memberCount: 0, + taskCount: 0, + lastActivity: null, + }, + { + teamName: 'alpha', + displayName: 'alpha', + description: '', + memberCount: 0, + taskCount: 0, + lastActivity: null, + }, + { + teamName: 'gamma', + displayName: 'gamma', + description: '', + memberCount: 0, + taskCount: 0, + lastActivity: null, + }, + { + teamName: 'deleted', + displayName: 'deleted', + description: '', + memberCount: 0, + taskCount: 0, + lastActivity: null, + deletedAt: '2026-04-19T12:09:00.000Z', + }, + ]; + + const service = createService({ + listTeams: vi.fn(async () => teams), + }); + + const readProcesses = vi.fn(async (teamName: string) => { + if (teamName === 'alpha') { + return [{ id: '1', label: 'alpha', pid: 101, registeredAt: '2026-04-19T12:00:00.000Z' }]; + } + if (teamName === 'beta') { + return [ + { + id: '2', + label: 'beta', + pid: 202, + registeredAt: '2026-04-19T12:00:00.000Z', + stoppedAt: '2026-04-19T12:05:00.000Z', + }, + ]; + } + if (teamName === 'deleted') { + return [{ id: '9', label: 'deleted', pid: 909, registeredAt: '2026-04-19T12:00:00.000Z' }]; + } + throw new Error('boom'); + }); + + (service as unknown as { readProcesses: typeof readProcesses }).readProcesses = readProcesses; + + await expect(service.listAliveProcessTeams()).resolves.toEqual(['alpha']); + expect(readProcesses).not.toHaveBeenCalledWith('deleted'); + }); + + it('routes system notifications to the resolved lead via sendMessage', async () => { + const leadTaskRef: TaskRef = { + taskId: 'task-1', + displayId: '1', + teamName: 'demo', + }; + + const service = createService({ + getConfig: vi.fn(async () => ({ + name: 'demo', + members: [{ name: 'lead', role: 'Team Lead' }], + })), + }); + + const expectedResult = { messageId: 'msg-1' } as SendMessageResult; + const sendMessageSpy = vi.spyOn(service, 'sendMessage').mockResolvedValue(expectedResult); + + await expect( + service.sendSystemNotificationToLead({ + teamName: 'demo', + summary: 'Potential stalled tasks detected', + text: 'Task #1 looks stalled.', + taskRefs: [leadTaskRef], + }) + ).resolves.toBe(expectedResult); + + expect(sendMessageSpy).toHaveBeenCalledWith( + 'demo', + expect.objectContaining({ + member: 'lead', + from: 'system', + summary: 'Potential stalled tasks detected', + text: 'Task #1 looks stalled.', + taskRefs: [leadTaskRef], + source: 'system_notification', + }) + ); + }); +}); diff --git a/test/main/services/team/TeamLogSourceTracker.test.ts b/test/main/services/team/TeamLogSourceTracker.test.ts index 0baa59c6..6008cf55 100644 --- a/test/main/services/team/TeamLogSourceTracker.test.ts +++ b/test/main/services/team/TeamLogSourceTracker.test.ts @@ -116,4 +116,38 @@ describe('TeamLogSourceTracker', () => { await tracker.disableTracking('demo', 'task_log_stream'); await tracker.disableTracking('demo', 'tool_activity'); }); + + it('supports stall_monitor as an independent tracking consumer', async () => { + tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-stall-monitor-')); + + const logsFinder = { + getLogSourceWatchContext: vi.fn(async () => ({ + projectDir: tempDir!, + sessionIds: [], + })), + } as unknown as TeamMemberLogsFinder; + + const tracker = new TeamLogSourceTracker(logsFinder); + const emitter = vi.fn<(event: TeamChangeEvent) => void>(); + tracker.setEmitter(emitter); + + await tracker.enableTracking('demo', 'stall_monitor'); + emitter.mockClear(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + const taskId = '323e4567-e89b-12d3-a456-426614174999'; + const signalDir = path.join(tempDir, '.board-task-log-freshness'); + await mkdir(signalDir, { recursive: true }); + await writeFile(path.join(signalDir, `${encodeURIComponent(taskId)}.json`), '{"ok":true}'); + + await vi.waitFor(() => { + expect(emitter).toHaveBeenCalledWith({ + type: 'task-log-change', + teamName: 'demo', + taskId, + }); + }); + + await tracker.disableTracking('demo', 'stall_monitor'); + }); }); diff --git a/test/main/services/team/stallMonitor/ActiveTeamRegistry.test.ts b/test/main/services/team/stallMonitor/ActiveTeamRegistry.test.ts new file mode 100644 index 00000000..e27a7c6f --- /dev/null +++ b/test/main/services/team/stallMonitor/ActiveTeamRegistry.test.ts @@ -0,0 +1,127 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { ActiveTeamRegistry } from '../../../../../src/main/services/team/stallMonitor/ActiveTeamRegistry'; + +describe('ActiveTeamRegistry', () => { + it('activates a team on lead-activity and enables stall-monitor tracking', async () => { + const tracker = { + enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + }; + const registry = new ActiveTeamRegistry( + { listAliveProcessTeams: vi.fn(async () => []) }, + tracker as never + ); + + registry.noteTeamChange({ + type: 'lead-activity', + teamName: 'demo', + detail: 'active', + }); + + await vi.waitFor(() => { + expect(tracker.enableTracking).toHaveBeenCalledWith('demo', 'stall_monitor'); + }); + await expect(registry.listActiveTeams()).resolves.toEqual(['demo']); + }); + + it('does not re-enable tracking for repeated activation events on the same team', async () => { + const tracker = { + enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + }; + const registry = new ActiveTeamRegistry( + { listAliveProcessTeams: vi.fn(async () => []) }, + tracker as never + ); + + registry.noteTeamChange({ + type: 'lead-activity', + teamName: 'demo', + detail: 'active', + }); + registry.noteTeamChange({ + type: 'member-spawn', + teamName: 'demo', + detail: 'alice', + }); + + await vi.waitFor(() => { + expect(tracker.enableTracking).toHaveBeenCalledTimes(1); + }); + await expect(registry.listActiveTeams()).resolves.toEqual(['demo']); + }); + + it('does not cold-activate a team from task-log-change alone', async () => { + const tracker = { + enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + }; + const registry = new ActiveTeamRegistry( + { listAliveProcessTeams: vi.fn(async () => []) }, + tracker as never + ); + + registry.noteTeamChange({ + type: 'task-log-change', + teamName: 'cold-team', + taskId: 'task-1', + }); + + expect(tracker.enableTracking).not.toHaveBeenCalled(); + await expect(registry.listActiveTeams()).resolves.toEqual([]); + }); + + it('reconciles alive teams through TeamDataService helper and tracker consumer', async () => { + const tracker = { + enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + }; + const registry = new ActiveTeamRegistry( + { listAliveProcessTeams: vi.fn(async () => ['beta']) }, + tracker as never + ); + + registry.noteTeamChange({ + type: 'member-spawn', + teamName: 'alpha', + detail: 'alice', + }); + await vi.waitFor(() => { + expect(tracker.enableTracking).toHaveBeenCalledWith('alpha', 'stall_monitor'); + }); + + tracker.enableTracking.mockClear(); + await registry.reconcile(); + + expect(tracker.enableTracking).toHaveBeenCalledWith('beta', 'stall_monitor'); + expect(tracker.disableTracking).toHaveBeenCalledWith('alpha', 'stall_monitor'); + await expect(registry.listActiveTeams()).resolves.toEqual(['beta']); + }); + + it('does not re-enable tracking for teams that are already active during reconcile', async () => { + const tracker = { + enableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + disableTracking: vi.fn(async () => ({ projectFingerprint: null, logSourceGeneration: null })), + }; + const registry = new ActiveTeamRegistry( + { listAliveProcessTeams: vi.fn(async () => ['demo']) }, + tracker as never + ); + + registry.noteTeamChange({ + type: 'lead-activity', + teamName: 'demo', + detail: 'active', + }); + await vi.waitFor(() => { + expect(tracker.enableTracking).toHaveBeenCalledTimes(1); + }); + + tracker.enableTracking.mockClear(); + await registry.reconcile(); + + expect(tracker.enableTracking).not.toHaveBeenCalled(); + await expect(registry.listActiveTeams()).resolves.toEqual(['demo']); + }); +}); diff --git a/test/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.test.ts b/test/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.test.ts new file mode 100644 index 00000000..38b9d603 --- /dev/null +++ b/test/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer.test.ts @@ -0,0 +1,118 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { BoardTaskActivityBatchIndexer } from '../../../../../src/main/services/team/stallMonitor/BoardTaskActivityBatchIndexer'; +import { BoardTaskActivityRecordBuilder } from '../../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityRecordBuilder'; + +import type { RawTaskActivityMessage } from '../../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityTranscriptReader'; +import type { TeamTask } from '../../../../../src/shared/types'; + +describe('BoardTaskActivityBatchIndexer', () => { + it('delegates one batched build through buildForTasks', () => { + const built = new Map([['task-a', [{ id: 'r1' }]]]); + const builder = { + buildForTasks: vi.fn(() => built), + }; + + const indexer = new BoardTaskActivityBatchIndexer(builder as never); + const result = indexer.buildIndex({ + teamName: 'demo', + tasks: [{ id: 'task-a', subject: 'A', status: 'in_progress' } as TeamTask], + messages: [{ uuid: 'm1' } as RawTaskActivityMessage], + }); + + expect(result).toBe(built); + expect(builder.buildForTasks).toHaveBeenCalledTimes(1); + }); + + it('keeps buildForTask behavior consistent with batched build', () => { + const builder = new BoardTaskActivityRecordBuilder(); + const taskA: TeamTask = { + id: 'task-a', + displayId: 'abcd1234', + subject: 'Task A', + status: 'in_progress', + }; + const taskB: TeamTask = { + id: 'task-b', + displayId: 'deadbeef', + subject: 'Task B', + status: 'pending', + }; + const messages: RawTaskActivityMessage[] = [ + { + filePath: '/tmp/session.jsonl', + uuid: 'msg-1', + timestamp: '2026-04-19T12:00:00.000Z', + sessionId: 'session-a', + agentName: 'alice', + isSidechain: true, + sourceOrder: 1, + boardTaskLinks: [ + { + schemaVersion: 1, + toolUseId: 'tool-1', + task: { + ref: 'task-a', + refKind: 'canonical', + canonicalId: 'task-a', + }, + targetRole: 'subject', + linkKind: 'board_action', + actorContext: { + relation: 'same_task', + }, + }, + { + schemaVersion: 1, + toolUseId: 'tool-2', + task: { + ref: 'task-b', + refKind: 'canonical', + canonicalId: 'task-b', + }, + targetRole: 'subject', + linkKind: 'board_action', + actorContext: { + relation: 'same_task', + }, + }, + ], + boardTaskToolActions: [ + { + schemaVersion: 1, + toolUseId: 'tool-1', + canonicalToolName: 'task_start', + }, + { + schemaVersion: 1, + toolUseId: 'tool-2', + canonicalToolName: 'task_add_comment', + }, + ], + }, + ]; + + const recordsByTaskId = builder.buildForTasks({ + teamName: 'demo', + tasks: [taskA, taskB], + messages, + }); + + expect(recordsByTaskId.get('task-a')).toEqual( + builder.buildForTask({ + teamName: 'demo', + targetTask: taskA, + tasks: [taskA, taskB], + messages, + }) + ); + expect(recordsByTaskId.get('task-b')).toEqual( + builder.buildForTask({ + teamName: 'demo', + targetTask: taskB, + tasks: [taskA, taskB], + messages, + }) + ); + }); +}); diff --git a/test/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.test.ts b/test/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.test.ts new file mode 100644 index 00000000..66bf671d --- /dev/null +++ b/test/main/services/team/stallMonitor/TeamTaskLogFreshnessReader.test.ts @@ -0,0 +1,57 @@ +import * as fs from 'fs/promises'; +import * as os from 'os'; +import * as path from 'path'; +import { afterEach, describe, expect, it } from 'vitest'; + +import { TeamTaskLogFreshnessReader } from '../../../../../src/main/services/team/stallMonitor/TeamTaskLogFreshnessReader'; + +const tempDirs: string[] = []; + +afterEach(async () => { + await Promise.all( + tempDirs.splice(0).map(async (dirPath) => { + await fs.rm(dirPath, { recursive: true, force: true }); + }) + ); +}); + +describe('TeamTaskLogFreshnessReader', () => { + it('reads valid freshness signals and normalizes transcript basename', async () => { + const projectDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-freshness-')); + tempDirs.push(projectDir); + const signalDir = path.join(projectDir, '.board-task-log-freshness'); + await fs.mkdir(signalDir, { recursive: true }); + + await fs.writeFile( + path.join(signalDir, `${encodeURIComponent('task-a')}.json`), + JSON.stringify({ + taskId: 'task-a', + updatedAt: '2026-04-19T12:00:00.000Z', + transcriptFile: '/tmp/nested/session-a.jsonl', + }), + 'utf8' + ); + await fs.writeFile( + path.join(signalDir, `${encodeURIComponent('task-b')}.json`), + JSON.stringify({ + taskId: 'task-b', + updatedAt: 'not-a-date', + }), + 'utf8' + ); + + const signals = await new TeamTaskLogFreshnessReader().readSignals(projectDir, [ + 'task-a', + 'task-b', + 'task-missing', + ]); + + expect([...signals.keys()]).toEqual(['task-a']); + expect(signals.get('task-a')).toEqual({ + taskId: 'task-a', + updatedAt: '2026-04-19T12:00:00.000Z', + filePath: path.join(signalDir, `${encodeURIComponent('task-a')}.json`), + transcriptFileBasename: 'session-a.jsonl', + }); + }); +}); diff --git a/test/main/services/team/stallMonitor/TeamTaskStallExactRowReader.test.ts b/test/main/services/team/stallMonitor/TeamTaskStallExactRowReader.test.ts new file mode 100644 index 00000000..8f0c7b61 --- /dev/null +++ b/test/main/services/team/stallMonitor/TeamTaskStallExactRowReader.test.ts @@ -0,0 +1,149 @@ +import { afterEach, describe, expect, it } from 'vitest'; +import * as fs from 'fs/promises'; +import * as os from 'os'; +import * as path from 'path'; + +import { TeamTaskStallExactRowReader } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallExactRowReader'; + +const tempDirs: string[] = []; + +afterEach(async () => { + await Promise.all( + tempDirs.splice(0).map(async (dirPath) => { + await fs.rm(dirPath, { recursive: true, force: true }); + }) + ); +}); + +function createAssistantEntry(args: { + uuid: string; + timestamp: string; + content: unknown[]; + requestId?: string; +}): Record { + return { + type: 'assistant', + uuid: args.uuid, + timestamp: args.timestamp, + sessionId: 'session-a', + teamName: 'demo', + agentName: 'alice', + isSidechain: true, + ...(args.requestId ? { requestId: args.requestId } : {}), + message: { + id: `${args.uuid}-msg`, + role: 'assistant', + model: 'claude-test', + type: 'message', + stop_reason: 'tool_use', + stop_sequence: null, + usage: { + input_tokens: 0, + output_tokens: 0, + }, + content: args.content, + }, + }; +} + +function createUserEntry(args: { + uuid: string; + timestamp: string; + content: unknown[]; + sourceToolUseID?: string; +}): Record { + return { + type: 'user', + uuid: args.uuid, + timestamp: args.timestamp, + sessionId: 'session-a', + teamName: 'demo', + agentName: 'alice', + isSidechain: true, + ...(args.sourceToolUseID ? { sourceToolUseID: args.sourceToolUseID } : {}), + message: { + role: 'user', + content: args.content, + }, + }; +} + +describe('TeamTaskStallExactRowReader', () => { + it('keeps strict rows with subtype and tool ids', async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-exact-rows-')); + tempDirs.push(tempDir); + + const filePath = path.join(tempDir, 'session.jsonl'); + await fs.writeFile( + filePath, + [ + JSON.stringify({ + type: 'system', + uuid: 'sys-init', + timestamp: '2026-04-19T12:00:00.000Z', + sessionId: 'session-a', + teamName: 'demo', + agentName: 'alice', + isSidechain: true, + isMeta: true, + subtype: 'turn_duration', + durationMs: 1234, + }), + JSON.stringify( + createAssistantEntry({ + uuid: 'asst-1', + timestamp: '2026-04-19T12:01:00.000Z', + requestId: 'req-1', + content: [ + { + type: 'tool_use', + id: 'tool-1', + name: 'task_start', + input: { taskId: 'task-a' }, + }, + ], + }) + ), + JSON.stringify( + createUserEntry({ + uuid: 'user-1', + timestamp: '2026-04-19T12:01:01.000Z', + sourceToolUseID: 'tool-1', + content: [{ type: 'tool_result', tool_use_id: 'tool-1', content: 'ok' }], + }) + ), + JSON.stringify({ + uuid: 'bad-ts', + type: 'assistant', + timestamp: 'not-a-date', + message: { role: 'assistant', content: 'bad row' }, + }), + ].join('\n'), + 'utf8' + ); + + const parsed = await new TeamTaskStallExactRowReader().parseFiles([filePath]); + const rows = parsed.get(filePath) ?? []; + + expect(rows).toHaveLength(3); + expect(rows.map((row) => row.messageUuid)).toEqual(['sys-init', 'asst-1', 'user-1']); + expect(rows[0]).toMatchObject({ + systemSubtype: 'turn_duration', + sourceOrder: 1, + toolUseIds: [], + toolResultIds: [], + }); + expect(rows[1]).toMatchObject({ + requestId: 'req-1', + toolUseIds: ['tool-1'], + toolResultIds: [], + sourceOrder: 2, + }); + expect(rows[2]).toMatchObject({ + sourceToolUseId: 'tool-1', + toolUseIds: [], + toolResultIds: ['tool-1'], + sourceOrder: 3, + }); + }); +}); diff --git a/test/main/services/team/stallMonitor/TeamTaskStallJournal.test.ts b/test/main/services/team/stallMonitor/TeamTaskStallJournal.test.ts new file mode 100644 index 00000000..5fe89983 --- /dev/null +++ b/test/main/services/team/stallMonitor/TeamTaskStallJournal.test.ts @@ -0,0 +1,51 @@ +import * as os from 'os'; +import * as path from 'path'; +import { afterEach, describe, expect, it } from 'vitest'; +import * as fs from 'fs/promises'; + +import { TeamTaskStallJournal } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallJournal'; +import { setClaudeBasePathOverride } from '../../../../../src/main/utils/pathDecoder'; + +describe('TeamTaskStallJournal', () => { + let tmpDir: string | null = null; + + afterEach(async () => { + setClaudeBasePathOverride(null); + if (tmpDir) { + await fs.rm(tmpDir, { recursive: true, force: true }); + tmpDir = null; + } + }); + + it('requires two scans before returning an alert-ready candidate', async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-')); + setClaudeBasePathOverride(tmpDir); + await fs.mkdir(path.join(tmpDir, 'teams', 'demo'), { recursive: true }); + + const journal = new TeamTaskStallJournal(); + const evaluation = { + status: 'alert', + taskId: 'task-a', + branch: 'work', + signal: 'turn_ended_after_touch', + epochKey: 'task-a:epoch-1', + reason: 'Potential work stall', + } as const; + + const firstReady = await journal.reconcileScan({ + teamName: 'demo', + evaluations: [evaluation], + activeTaskIds: ['task-a'], + now: '2026-04-19T12:10:00.000Z', + }); + const secondReady = await journal.reconcileScan({ + teamName: 'demo', + evaluations: [evaluation], + activeTaskIds: ['task-a'], + now: '2026-04-19T12:11:00.000Z', + }); + + expect(firstReady).toEqual([]); + expect(secondReady).toEqual([evaluation]); + }); +}); diff --git a/test/main/services/team/stallMonitor/TeamTaskStallMonitor.test.ts b/test/main/services/team/stallMonitor/TeamTaskStallMonitor.test.ts new file mode 100644 index 00000000..808696f4 --- /dev/null +++ b/test/main/services/team/stallMonitor/TeamTaskStallMonitor.test.ts @@ -0,0 +1,87 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { TeamTaskStallMonitor } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallMonitor'; + +describe('TeamTaskStallMonitor', () => { + afterEach(() => { + vi.useRealTimers(); + vi.unstubAllEnvs(); + }); + + it('runs end-to-end and notifies only after a second confirmed scan', async () => { + vi.useFakeTimers(); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_MONITOR_ENABLED', 'true'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ALERTS_ENABLED', 'true'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS', '1000'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_STARTUP_GRACE_MS', '1'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS', '1'); + + const registry = { + start: vi.fn(), + stop: vi.fn(async () => undefined), + noteTeamChange: vi.fn(), + listActiveTeams: vi.fn(async () => ['demo']), + }; + const snapshot = { + teamName: 'demo', + inProgressTasks: [{ id: 'task-a', displayId: 'abcd1234', subject: 'Task A' }], + reviewOpenTasks: [], + allTasksById: new Map([ + ['task-a', { id: 'task-a', displayId: 'abcd1234', subject: 'Task A' }], + ]), + }; + const snapshotSource = { + getSnapshot: vi.fn(async () => snapshot), + }; + const policy = { + evaluateWork: vi.fn(() => ({ + status: 'alert', + taskId: 'task-a', + branch: 'work', + signal: 'turn_ended_after_touch', + epochKey: 'task-a:epoch', + reason: 'Potential work stall.', + })), + evaluateReview: vi.fn(), + }; + const journal = { + reconcileScan: vi + .fn() + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([ + { + status: 'alert', + taskId: 'task-a', + branch: 'work', + signal: 'turn_ended_after_touch', + epochKey: 'task-a:epoch', + reason: 'Potential work stall.', + }, + ]), + markAlerted: vi.fn(async () => undefined), + }; + const notifier = { + notifyLead: vi.fn(async () => undefined), + }; + + const monitor = new TeamTaskStallMonitor( + registry as never, + snapshotSource as never, + policy as never, + journal as never, + notifier as never + ); + + monitor.start(); + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(2_100); + + expect(snapshotSource.getSnapshot).toHaveBeenCalledTimes(2); + expect(notifier.notifyLead).toHaveBeenCalledTimes(1); + expect(journal.markAlerted).toHaveBeenCalledWith( + 'demo', + 'task-a:epoch', + expect.any(String) + ); + }); +}); diff --git a/test/main/services/team/stallMonitor/TeamTaskStallPolicy.test.ts b/test/main/services/team/stallMonitor/TeamTaskStallPolicy.test.ts new file mode 100644 index 00000000..ce214ad2 --- /dev/null +++ b/test/main/services/team/stallMonitor/TeamTaskStallPolicy.test.ts @@ -0,0 +1,460 @@ +import { describe, expect, it } from 'vitest'; + +import { TeamTaskStallPolicy } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallPolicy'; + +import type { TeamTaskStallExactRow, TeamTaskStallSnapshot } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallTypes'; +import type { BoardTaskActivityRecord } from '../../../../../src/main/services/team/taskLogs/activity/BoardTaskActivityRecord'; +import type { ParsedMessage } from '../../../../../src/main/types'; +import type { TeamTask } from '../../../../../src/shared/types'; + +function createParsedMessage(overrides: Partial): ParsedMessage { + return { + uuid: 'msg-default', + parentUuid: null, + type: 'assistant', + timestamp: new Date('2026-04-19T12:00:00.000Z'), + content: '', + isSidechain: true, + isMeta: false, + toolCalls: [], + toolResults: [], + ...overrides, + }; +} + +function createExactRow(overrides: Partial = {}): TeamTaskStallExactRow { + return { + filePath: '/tmp/session.jsonl', + sourceOrder: 1, + messageUuid: 'msg-touch', + timestamp: '2026-04-19T12:00:00.000Z', + parsedMessage: createParsedMessage({ uuid: 'msg-touch' }), + toolUseIds: [], + toolResultIds: [], + ...overrides, + }; +} + +function createRecord(overrides: Partial = {}): BoardTaskActivityRecord { + return { + id: 'rec-1', + timestamp: '2026-04-19T12:00:00.000Z', + task: { + locator: { + ref: 'task-a', + refKind: 'canonical', + canonicalId: 'task-a', + }, + resolution: 'resolved', + taskRef: { + taskId: 'task-a', + displayId: 'abcd1234', + teamName: 'demo', + }, + }, + linkKind: 'board_action', + targetRole: 'subject', + actor: { + memberName: 'alice', + role: 'member', + sessionId: 'session-a', + isSidechain: true, + }, + actorContext: { + relation: 'same_task', + }, + action: { + canonicalToolName: 'task_start', + category: 'status', + toolUseId: 'tool-1', + }, + source: { + messageUuid: 'msg-touch', + filePath: '/tmp/session.jsonl', + toolUseId: 'tool-1', + sourceOrder: 1, + }, + ...overrides, + }; +} + +function createSnapshot(overrides: Partial): TeamTaskStallSnapshot { + return { + teamName: 'demo', + scannedAt: '2026-04-19T12:30:00.000Z', + projectDir: '/tmp/project', + projectId: 'project-id', + leadName: 'team-lead', + transcriptFiles: ['/tmp/session.jsonl'], + activityReadsEnabled: true, + exactReadsEnabled: true, + activeTasks: [], + deletedTasks: [], + allTasksById: new Map(), + inProgressTasks: [], + reviewOpenTasks: [], + resolvedReviewersByTaskId: new Map(), + recordsByTaskId: new Map(), + freshnessByTaskId: new Map(), + exactRowsByFilePath: new Map(), + ...overrides, + }; +} + +describe('TeamTaskStallPolicy', () => { + const policy = new TeamTaskStallPolicy(); + + it('alerts for work stall after turn ended and threshold elapsed', () => { + const task: TeamTask = { + id: 'task-a', + displayId: 'abcd1234', + subject: 'Task A', + owner: 'alice', + status: 'in_progress', + workIntervals: [{ startedAt: '2026-04-19T11:50:00.000Z' }], + }; + const record = createRecord(); + const snapshot = createSnapshot({ + activeTasks: [task], + allTasksById: new Map([['task-a', task]]), + inProgressTasks: [task], + recordsByTaskId: new Map([['task-a', [record]]]), + exactRowsByFilePath: new Map([ + [ + '/tmp/session.jsonl', + [ + createExactRow({ + messageUuid: 'msg-touch', + toolUseIds: ['tool-1'], + }), + createExactRow({ + sourceOrder: 2, + messageUuid: 'msg-turn-end', + systemSubtype: 'turn_duration', + parsedMessage: createParsedMessage({ + uuid: 'msg-turn-end', + type: 'system', + }), + }), + ], + ], + ]), + }); + + const evaluation = policy.evaluateWork({ + now: new Date('2026-04-19T12:30:00.000Z'), + task, + snapshot, + }); + + expect(evaluation).toMatchObject({ + status: 'alert', + taskId: 'task-a', + branch: 'work', + signal: 'turn_ended_after_touch', + }); + }); + + it('fails closed on review branch when review has not started yet', () => { + const task: TeamTask = { + id: 'task-b', + displayId: 'deadbeef', + subject: 'Task B', + status: 'completed', + reviewState: 'review', + historyEvents: [ + { + id: 'evt-review-requested', + type: 'review_requested', + timestamp: '2026-04-19T12:00:00.000Z', + from: 'none', + to: 'review', + }, + ], + }; + + const evaluation = policy.evaluateReview({ + now: new Date('2026-04-19T12:30:00.000Z'), + task, + snapshot: createSnapshot({ + activeTasks: [task], + allTasksById: new Map([['task-b', task]]), + reviewOpenTasks: [task], + }), + }); + + expect(evaluation).toMatchObject({ + status: 'skip', + taskId: 'task-b', + skipReason: 'no_open_review_window', + }); + }); + + it('fails closed on review branch when reviewer cannot be resolved after review has started', () => { + const task: TeamTask = { + id: 'task-b2', + displayId: 'deadbe12', + subject: 'Task B2', + status: 'completed', + reviewState: 'review', + historyEvents: [ + { + id: 'evt-review-started', + type: 'review_started', + timestamp: '2026-04-19T12:01:00.000Z', + from: 'review', + to: 'review', + }, + ], + }; + + const evaluation = policy.evaluateReview({ + now: new Date('2026-04-19T12:30:00.000Z'), + task, + snapshot: createSnapshot({ + activeTasks: [task], + allTasksById: new Map([['task-b2', task]]), + reviewOpenTasks: [task], + }), + }); + + expect(evaluation).toMatchObject({ + status: 'skip', + taskId: 'task-b2', + skipReason: 'reviewer_unresolved', + }); + }); + + it('does not treat review_requested alone as started-review evidence', () => { + const task: TeamTask = { + id: 'task-review-requested-only', + displayId: 'feedbeef', + subject: 'Task review requested only', + status: 'completed', + reviewState: 'review', + historyEvents: [ + { + id: 'evt-review-requested', + type: 'review_requested', + timestamp: '2026-04-19T12:00:00.000Z', + from: 'none', + to: 'review', + reviewer: 'bob', + }, + ], + }; + + const evaluation = policy.evaluateReview({ + now: new Date('2026-04-19T12:30:00.000Z'), + task, + snapshot: createSnapshot({ + activeTasks: [task], + allTasksById: new Map([['task-review-requested-only', task]]), + reviewOpenTasks: [task], + resolvedReviewersByTaskId: new Map([ + [ + 'task-review-requested-only', + { reviewer: 'bob', source: 'history_review_requested_reviewer' }, + ], + ]), + }), + }); + + expect(evaluation).toMatchObject({ + status: 'skip', + taskId: 'task-review-requested-only', + skipReason: 'no_open_review_window', + }); + }); + + it('alerts for started-review stall after explicit review_start evidence', () => { + const task: TeamTask = { + id: 'task-c', + displayId: 'c0ffee12', + subject: 'Task C', + status: 'completed', + reviewState: 'review', + historyEvents: [ + { + id: 'evt-review-requested', + type: 'review_requested', + timestamp: '2026-04-19T12:00:00.000Z', + from: 'none', + to: 'review', + reviewer: 'bob', + }, + { + id: 'evt-review-started', + type: 'review_started', + timestamp: '2026-04-19T12:01:00.000Z', + from: 'review', + to: 'review', + actor: 'bob', + }, + ], + }; + const record = createRecord({ + id: 'rec-review', + timestamp: '2026-04-19T12:01:00.000Z', + actor: { + memberName: 'bob', + role: 'member', + sessionId: 'session-b', + isSidechain: true, + }, + actorContext: { + relation: 'same_task', + activePhase: 'review', + }, + action: { + canonicalToolName: 'review_start', + category: 'review', + toolUseId: 'tool-review', + }, + source: { + messageUuid: 'msg-review-touch', + filePath: '/tmp/review.jsonl', + toolUseId: 'tool-review', + sourceOrder: 1, + }, + }); + + const evaluation = policy.evaluateReview({ + now: new Date('2026-04-19T12:20:30.000Z'), + task, + snapshot: createSnapshot({ + activeTasks: [task], + allTasksById: new Map([['task-c', task]]), + reviewOpenTasks: [task], + resolvedReviewersByTaskId: new Map([ + ['task-c', { reviewer: 'bob', source: 'history_review_started_actor' }], + ]), + recordsByTaskId: new Map([['task-c', [record]]]), + exactRowsByFilePath: new Map([ + [ + '/tmp/review.jsonl', + [ + createExactRow({ + filePath: '/tmp/review.jsonl', + messageUuid: 'msg-review-touch', + toolUseIds: ['tool-review'], + }), + createExactRow({ + filePath: '/tmp/review.jsonl', + sourceOrder: 2, + messageUuid: 'msg-review-turn-end', + systemSubtype: 'turn_duration', + parsedMessage: createParsedMessage({ + uuid: 'msg-review-turn-end', + type: 'system', + }), + }), + ], + ], + ]), + }), + }); + + expect(evaluation).toMatchObject({ + status: 'alert', + taskId: 'task-c', + branch: 'review', + signal: 'turn_ended_after_touch', + }); + }); + + it('alerts for started-review stall when review_started actor is missing but same-task reviewer touch exists after the review start', () => { + const task: TeamTask = { + id: 'task-d', + displayId: 'ddaa5511', + subject: 'Task D', + status: 'completed', + reviewState: 'review', + historyEvents: [ + { + id: 'evt-review-requested', + type: 'review_requested', + timestamp: '2026-04-19T12:00:00.000Z', + from: 'none', + to: 'review', + reviewer: 'bob', + }, + { + id: 'evt-review-started', + type: 'review_started', + timestamp: '2026-04-19T12:01:00.000Z', + from: 'review', + to: 'review', + }, + ], + }; + const record = createRecord({ + id: 'rec-review-comment', + timestamp: '2026-04-19T12:02:00.000Z', + actor: { + memberName: 'bob', + role: 'member', + sessionId: 'session-b', + isSidechain: true, + }, + actorContext: { + relation: 'same_task', + activePhase: 'review', + }, + action: { + canonicalToolName: 'task_add_comment', + category: 'comment', + toolUseId: 'tool-review-comment', + }, + source: { + messageUuid: 'msg-review-comment', + filePath: '/tmp/review-missing-actor.jsonl', + toolUseId: 'tool-review-comment', + sourceOrder: 1, + }, + }); + + const evaluation = policy.evaluateReview({ + now: new Date('2026-04-19T12:20:30.000Z'), + task, + snapshot: createSnapshot({ + activeTasks: [task], + allTasksById: new Map([['task-d', task]]), + reviewOpenTasks: [task], + resolvedReviewersByTaskId: new Map([ + ['task-d', { reviewer: 'bob', source: 'history_review_requested_reviewer' }], + ]), + recordsByTaskId: new Map([['task-d', [record]]]), + exactRowsByFilePath: new Map([ + [ + '/tmp/review-missing-actor.jsonl', + [ + createExactRow({ + filePath: '/tmp/review-missing-actor.jsonl', + messageUuid: 'msg-review-comment', + toolUseIds: ['tool-review-comment'], + }), + createExactRow({ + filePath: '/tmp/review-missing-actor.jsonl', + sourceOrder: 2, + messageUuid: 'msg-review-turn-end', + systemSubtype: 'turn_duration', + parsedMessage: createParsedMessage({ + uuid: 'msg-review-turn-end', + type: 'system', + }), + }), + ], + ], + ]), + }), + }); + + expect(evaluation).toMatchObject({ + status: 'alert', + taskId: 'task-d', + branch: 'review', + signal: 'turn_ended_after_touch', + }); + }); +}); diff --git a/test/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.test.ts b/test/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.test.ts new file mode 100644 index 00000000..5ee13bf8 --- /dev/null +++ b/test/main/services/team/stallMonitor/TeamTaskStallSnapshotSource.test.ts @@ -0,0 +1,142 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { TeamTaskStallSnapshotSource } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallSnapshotSource'; + +describe('TeamTaskStallSnapshotSource', () => { + it('returns null when transcript context is unavailable', async () => { + const source = new TeamTaskStallSnapshotSource( + { getContext: vi.fn(async () => null) } as never, + {} as never, + {} as never, + {} as never, + {} as never, + {} as never, + {} as never + ); + + await expect(source.getSnapshot('demo')).resolves.toBeNull(); + }); + + it('builds one batched snapshot and narrows exact/freshness reads to work and started-review candidates', async () => { + const activeTasks = [ + { id: 'task-a', subject: 'A', status: 'in_progress' }, + { + id: 'task-b', + subject: 'B', + status: 'completed', + reviewState: 'review', + historyEvents: [ + { + id: 'evt-review-requested', + type: 'review_requested', + timestamp: '2026-04-19T12:00:00.000Z', + from: 'none', + to: 'review', + reviewer: 'alice', + }, + ], + }, + ]; + const deletedTasks = [{ id: 'task-deleted', subject: 'D', status: 'deleted' }]; + const transcriptContext = { + projectDir: '/tmp/project', + projectId: 'project-id', + config: { + members: [{ name: 'team-lead', role: 'team lead' }], + } as never, + sessionIds: ['session-a'], + transcriptFiles: ['/tmp/project/session-a.jsonl', '/tmp/project/session-b.jsonl'], + }; + const rawMessages = [{ uuid: 'm1' }]; + const recordsByTaskId = new Map([ + [ + 'task-a', + [ + { + id: 'r1', + source: { + filePath: '/tmp/project/session-b.jsonl', + }, + }, + ], + ], + [ + 'task-b', + [ + { + id: 'r2', + source: { + filePath: '/tmp/project/session-a.jsonl', + }, + }, + ], + ], + ]); + const freshnessByTaskId = new Map([ + ['task-a', { taskId: 'task-a', updatedAt: '2026-04-19T12:00:00.000Z', filePath: '/tmp/fresh.json' }], + ]); + const exactRowsByFilePath = new Map([['/tmp/project/session-b.jsonl', []]]); + + const locator = { + getContext: vi.fn(async () => transcriptContext), + }; + const taskReader = { + getTasks: vi.fn(async () => activeTasks), + getDeletedTasks: vi.fn(async () => deletedTasks), + }; + const kanbanManager = { + getState: vi.fn(async () => ({ + teamName: 'demo', + reviewers: ['alice'], + tasks: { + 'task-b': { + column: 'review', + movedAt: '2026-04-19T12:00:00.000Z', + reviewer: 'alice', + }, + }, + })), + }; + const transcriptReader = { + readFiles: vi.fn(async () => rawMessages), + }; + const batchIndexer = { + buildIndex: vi.fn(() => recordsByTaskId), + }; + const freshnessReader = { + readSignals: vi.fn(async () => freshnessByTaskId), + }; + const exactRowReader = { + parseFiles: vi.fn(async () => exactRowsByFilePath), + }; + + const source = new TeamTaskStallSnapshotSource( + locator as never, + taskReader as never, + kanbanManager as never, + transcriptReader as never, + batchIndexer as never, + freshnessReader as never, + exactRowReader as never + ); + + const snapshot = await source.getSnapshot('demo'); + + expect(snapshot).not.toBeNull(); + expect(batchIndexer.buildIndex).toHaveBeenCalledWith({ + teamName: 'demo', + tasks: [...activeTasks, ...deletedTasks], + messages: rawMessages, + }); + expect(freshnessReader.readSignals).toHaveBeenCalledWith('/tmp/project', ['task-a', 'task-b']); + expect(exactRowReader.parseFiles).toHaveBeenCalledWith(['/tmp/project/session-a.jsonl', '/tmp/project/session-b.jsonl']); + expect(snapshot?.inProgressTasks.map((task) => task.id)).toEqual(['task-a']); + expect(snapshot?.reviewOpenTasks.map((task) => task.id)).toEqual(['task-b']); + expect(snapshot?.leadName).toBe('team-lead'); + expect(snapshot?.resolvedReviewersByTaskId.get('task-b')).toEqual({ + reviewer: 'alice', + source: 'kanban_state', + }); + expect(snapshot?.recordsByTaskId).toBe(recordsByTaskId); + }); +}); diff --git a/test/main/services/team/stallMonitor/featureGates.test.ts b/test/main/services/team/stallMonitor/featureGates.test.ts new file mode 100644 index 00000000..49369b57 --- /dev/null +++ b/test/main/services/team/stallMonitor/featureGates.test.ts @@ -0,0 +1,37 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { + getTeamTaskStallActivationGraceMs, + getTeamTaskStallScanIntervalMs, + getTeamTaskStallStartupGraceMs, + isTeamTaskStallAlertsEnabled, + isTeamTaskStallMonitorEnabled, +} from '../../../../../src/main/services/team/stallMonitor/featureGates'; + +afterEach(() => { + vi.unstubAllEnvs(); +}); + +describe('stallMonitor feature gates', () => { + it('defaults both monitor and alerts to disabled', () => { + expect(isTeamTaskStallMonitorEnabled()).toBe(false); + expect(isTeamTaskStallAlertsEnabled()).toBe(false); + expect(getTeamTaskStallScanIntervalMs()).toBe(60_000); + expect(getTeamTaskStallStartupGraceMs()).toBe(180_000); + expect(getTeamTaskStallActivationGraceMs()).toBe(120_000); + }); + + it('parses truthy and falsy environment values', () => { + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_MONITOR_ENABLED', 'true'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ALERTS_ENABLED', 'off'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_SCAN_INTERVAL_MS', '1500'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_STARTUP_GRACE_MS', '2000'); + vi.stubEnv('CLAUDE_TEAM_TASK_STALL_ACTIVATION_GRACE_MS', '3000'); + + expect(isTeamTaskStallMonitorEnabled()).toBe(true); + expect(isTeamTaskStallAlertsEnabled()).toBe(false); + expect(getTeamTaskStallScanIntervalMs()).toBe(1500); + expect(getTeamTaskStallStartupGraceMs()).toBe(2000); + expect(getTeamTaskStallActivationGraceMs()).toBe(3000); + }); +});