From 91b153459a07646f66031117fb7bdc6bff03e715 Mon Sep 17 00:00:00 2001 From: 777genius Date: Sun, 24 May 2026 00:22:48 +0300 Subject: [PATCH] feat: add resilient cross-platform file watching Share watcher fallback behavior across project, todo, team, and task file monitoring. Add polling fallback coverage for watcher-limit and startup failure cases so Linux EMFILE conditions degrade instead of amplifying renderer crashes. --- .../CrossPlatformFileChangeSource.ts | 320 +++++++ .../services/infrastructure/FileWatcher.ts | 826 ++++++----------- .../infrastructure/TeamTaskWatchRegistry.ts | 21 +- .../CrossPlatformFileChangeSource.test.ts | 598 ++++++++++++ .../infrastructure/FileWatcher.test.ts | 862 +++++++++++++++++- 5 files changed, 2047 insertions(+), 580 deletions(-) create mode 100644 src/main/services/infrastructure/CrossPlatformFileChangeSource.ts create mode 100644 test/main/services/infrastructure/CrossPlatformFileChangeSource.test.ts diff --git a/src/main/services/infrastructure/CrossPlatformFileChangeSource.ts b/src/main/services/infrastructure/CrossPlatformFileChangeSource.ts new file mode 100644 index 00000000..636fa274 --- /dev/null +++ b/src/main/services/infrastructure/CrossPlatformFileChangeSource.ts @@ -0,0 +1,320 @@ +import { createLogger } from '@shared/utils/logger'; + +const logger = createLogger('Service:CrossPlatformFileChangeSource'); + +export type PollingChangeEventType = 'rename' | 'change'; + +export interface CloseableWatcher { + close: () => void | Promise; +} + +export interface WatcherLifecycle { + onError: (error: unknown) => void; + onClose: () => void; + isCurrent: () => boolean; +} + +export interface CrossPlatformFileChangeSourceOptions { + name: string; + pollIntervalMs: number; + createWatcher?: (lifecycle: WatcherLifecycle) => Promise | CloseableWatcher; + collectPollSnapshot: () => Promise>; + emitPolledChange: (eventType: PollingChangeEventType, relativePath: string) => void; + isOwnerActive: () => boolean; + isWatchLimitError: (error: unknown) => boolean; + requestRetry: () => void; + onWatcherStartError?: (error: unknown) => void; + onWatcherError?: (error: unknown) => void; + onPollingError?: (error: unknown) => void; +} + +export class CrossPlatformFileChangeSource { + private watcher: CloseableWatcher | null = null; + private pollingTimer: NodeJS.Timeout | null = null; + private pollingGenerationInProgress: number | null = null; + private pollingPrimed = false; + private pollSnapshot = new Map(); + private closedGeneration: number | null = null; + private rejectedGeneration: number | null = null; + private generation = 0; + private startPromise: Promise | null = null; + + constructor(private readonly options: CrossPlatformFileChangeSourceOptions) {} + + get isActive(): boolean { + return this.watcher !== null || this.pollingTimer !== null; + } + + get currentPollingTimer(): NodeJS.Timeout | null { + return this.pollingTimer; + } + + get isPollingPrimed(): boolean { + return this.pollingPrimed; + } + + async start(): Promise { + if (this.isActive) { + return; + } + + if (this.startPromise) { + await this.startPromise; + return; + } + + const createWatcher = this.options.createWatcher; + if (!createWatcher) { + this.startPolling(); + return; + } + + const generation = this.nextGeneration(); + this.closedGeneration = null; + this.rejectedGeneration = null; + const startPromise = this.startWatcher(generation, createWatcher); + this.startPromise = startPromise; + try { + await startPromise; + } finally { + if (this.startPromise === startPromise) { + this.startPromise = null; + } + } + } + + private async startWatcher( + generation: number, + createWatcher: NonNullable + ): Promise { + try { + const watcher = await createWatcher({ + onError: (error) => this.handleWatcherError(error, generation), + onClose: () => this.handleWatcherClose(generation), + isCurrent: () => this.isCurrentGeneration(generation), + }); + + if (!this.isCurrentGeneration(generation)) { + await this.closeWatcher(watcher); + return; + } + + this.watcher = watcher; + } catch (error) { + if (generation !== this.generation || !this.options.isOwnerActive()) { + return; + } + if (this.pollingTimer || this.rejectedGeneration === generation) { + return; + } + if (this.startPollingFallback(error, generation)) { + return; + } + if (this.closedGeneration === generation) { + return; + } + this.rejectedGeneration = generation; + this.options.onWatcherStartError?.(error); + this.options.requestRetry(); + } + } + + startPolling(): void { + if (this.pollingTimer || !this.options.isOwnerActive()) { + return; + } + + const generation = this.nextGeneration(); + this.startPollingForGeneration(generation); + } + + private startPollingForGeneration(generation: number): void { + if (this.pollingTimer || generation !== this.generation || !this.options.isOwnerActive()) { + return; + } + + const watcher = this.watcher; + this.watcher = null; + + const runPoll = (): void => { + void this.pollOnce(generation); + }; + + this.pollingTimer = setInterval(runPoll, this.options.pollIntervalMs); + this.pollingTimer.unref(); + runPoll(); + + if (watcher) { + void this.closeWatcher(watcher); + } + } + + async pollOnce(expectedGeneration = this.generation): Promise { + if ( + expectedGeneration !== this.generation || + !this.options.isOwnerActive() || + this.pollingGenerationInProgress !== null + ) { + return; + } + + this.pollingGenerationInProgress = expectedGeneration; + try { + await this.pollForChanges(expectedGeneration); + } catch (error) { + if (expectedGeneration === this.generation && this.options.isOwnerActive()) { + this.options.onPollingError?.(error); + } + } finally { + if (this.pollingGenerationInProgress === expectedGeneration) { + this.pollingGenerationInProgress = null; + } + } + } + + stop(): void { + this.generation += 1; + this.startPromise = null; + this.closedGeneration = null; + this.rejectedGeneration = null; + this.pollingGenerationInProgress = null; + this.pollingPrimed = false; + this.pollSnapshot.clear(); + + const timer = this.pollingTimer; + this.pollingTimer = null; + if (timer) { + clearInterval(timer); + } + + const watcher = this.watcher; + this.watcher = null; + if (watcher) { + void this.closeWatcher(watcher); + } + } + + private handleWatcherError(error: unknown, generation: number): void { + if ( + generation !== this.generation || + !this.options.isOwnerActive() || + this.rejectedGeneration === generation + ) { + return; + } + + if (this.startPollingFallback(error, generation)) { + return; + } + + if (this.closedGeneration === generation) { + return; + } + + this.rejectedGeneration = generation; + this.options.onWatcherError?.(error); + const watcher = this.watcher; + this.watcher = null; + if (watcher) { + void this.closeWatcher(watcher); + } + if (!this.isActive) { + this.options.requestRetry(); + } + } + + private handleWatcherClose(generation: number): void { + if ( + generation !== this.generation || + !this.options.isOwnerActive() || + this.closedGeneration === generation || + this.rejectedGeneration === generation + ) { + return; + } + + this.closedGeneration = generation; + this.watcher = null; + if (!this.isActive) { + this.options.requestRetry(); + } + } + + private startPollingFallback(error: unknown, generation: number): boolean { + if ( + generation !== this.generation || + !this.options.isOwnerActive() || + !this.options.isWatchLimitError(error) + ) { + return false; + } + + this.rejectedGeneration = generation; + const err = error as NodeJS.ErrnoException; + logger.warn( + `${this.options.name} watcher hit ${err.code ?? 'a platform limit'}; falling back to polling` + ); + + const watcher = this.watcher; + this.watcher = null; + this.startPollingForGeneration(generation); + if (watcher) { + void this.closeWatcher(watcher); + } + return true; + } + + private async pollForChanges(expectedGeneration: number): Promise { + const nextSnapshot = await this.options.collectPollSnapshot(); + if (expectedGeneration !== this.generation || !this.options.isOwnerActive()) { + return; + } + + if (!this.pollingPrimed) { + logger.info(`${this.options.name} polling baseline captured`); + this.pollSnapshot = nextSnapshot; + this.pollingPrimed = true; + return; + } + + for (const [relativePath, fingerprint] of nextSnapshot) { + const previous = this.pollSnapshot.get(relativePath); + if (previous === undefined) { + this.options.emitPolledChange('rename', relativePath); + } else if (previous !== fingerprint) { + this.options.emitPolledChange('change', relativePath); + } + } + + for (const relativePath of this.pollSnapshot.keys()) { + if (!nextSnapshot.has(relativePath)) { + this.options.emitPolledChange('rename', relativePath); + } + } + + this.pollSnapshot = nextSnapshot; + } + + private async closeWatcher(watcher: CloseableWatcher): Promise { + try { + await watcher.close(); + } catch (error) { + logger.debug(`${this.options.name} watcher close failed`, error); + } + } + + private nextGeneration(): number { + this.generation += 1; + return this.generation; + } + + private isCurrentGeneration(generation: number): boolean { + return ( + generation === this.generation && + this.options.isOwnerActive() && + !this.pollingTimer && + this.closedGeneration !== generation && + this.rejectedGeneration !== generation + ); + } +} diff --git a/src/main/services/infrastructure/FileWatcher.ts b/src/main/services/infrastructure/FileWatcher.ts index c318debc..5b37a7e9 100644 --- a/src/main/services/infrastructure/FileWatcher.ts +++ b/src/main/services/infrastructure/FileWatcher.ts @@ -29,6 +29,7 @@ import { projectPathResolver } from '../discovery/ProjectPathResolver'; import { errorDetector } from '../error/ErrorDetector'; import { ConfigManager } from './ConfigManager'; +import { CrossPlatformFileChangeSource } from './CrossPlatformFileChangeSource'; import { type DataCache } from './DataCache'; import { LocalFileSystemProvider } from './LocalFileSystemProvider'; import { type NotificationManager } from './NotificationManager'; @@ -73,25 +74,10 @@ interface ActiveSessionFile { lastObservedAt: number; } -type RecursiveWatcherType = 'projects' | 'todos' | 'teams' | 'tasks'; -type PollingWatcherType = 'teams' | 'tasks'; -interface CloseableWatcher { - close: () => void | Promise; -} +type FileWatcherSourceType = 'projects' | 'todos' | 'teams' | 'tasks'; export class FileWatcher extends EventEmitter { - private projectsWatcher: fs.FSWatcher | null = null; - private todosWatcher: fs.FSWatcher | null = null; - private teamsWatcher: TeamTaskWatchRegistry | null = null; - private tasksWatcher: TeamTaskWatchRegistry | null = null; - private teamsPollingTimer: NodeJS.Timeout | null = null; - private tasksPollingTimer: NodeJS.Timeout | null = null; - private teamsPollingInProgress = false; - private tasksPollingInProgress = false; - private teamsPollingPrimed = false; - private tasksPollingPrimed = false; - private polledTeamFiles = new Map(); - private polledTaskFiles = new Map(); + private readonly changeSources: Record; private retryTimer: NodeJS.Timeout | null = null; private projectsPath: string; private todosPath: string; @@ -116,16 +102,8 @@ export class FileWatcher extends EventEmitter { private catchUpCursor = 0; /** Consecutive catch-up stat timeouts per file. */ private catchUpStatFailures = new Map(); - /** Timer for SSH polling mode (replaces fs.watch) */ - private pollingTimer: NodeJS.Timeout | null = null; - /** Polling interval for SSH mode */ + /** Polling interval for projects fallback and SSH mode. */ private static readonly SSH_POLL_INTERVAL_MS = 3000; - /** Guard to prevent overlapping SSH polling runs */ - private pollingInProgress = false; - /** Indicates whether the first polling baseline snapshot has completed */ - private sshPollPrimed = false; - /** Track file sizes for SSH polling change detection */ - private polledFileSizes = new Map(); /** Files currently being processed (concurrency guard) */ private processingInProgress = new Set(); /** Files that need reprocessing after current processing completes */ @@ -151,6 +129,95 @@ export class FileWatcher extends EventEmitter { this.tasksPath = getTasksBasePath(); this.dataCache = dataCache; this.fsProvider = fsProvider ?? new LocalFileSystemProvider(); + this.changeSources = this.createChangeSources(); + } + + private createChangeSources(): Record { + return { + projects: new CrossPlatformFileChangeSource({ + name: 'projects', + pollIntervalMs: FileWatcher.SSH_POLL_INTERVAL_MS, + createWatcher: ({ onError, onClose, isCurrent }) => { + const watcher = fs.watch( + this.projectsPath, + { recursive: true }, + (eventType, filename) => { + if (filename && isCurrent()) { + this.handleProjectsChange(eventType, filename); + } + } + ); + watcher.on('error', (error) => this.handleNativeWatcherError('projects', error, onError)); + watcher.on('close', onClose); + logger.info(`FileWatcher: Started watching projects at ${this.projectsPath}`); + return watcher; + }, + collectPollSnapshot: () => this.collectProjectsPollSnapshot(), + emitPolledChange: (eventType, relativePath) => + this.handleProjectsChange(eventType, relativePath), + isOwnerActive: () => this.isWatching, + isWatchLimitError: (error) => this.isWatchLimitError(error), + requestRetry: () => this.scheduleWatcherRetry(), + onWatcherStartError: (error) => logger.error('Error starting projects watcher:', error), + onWatcherError: (error) => logger.error('FileWatcher: projects watcher error:', error), + onPollingError: (error) => + logger.error('FileWatcher: Error during projects polling:', error), + }), + todos: new CrossPlatformFileChangeSource({ + name: 'todos', + pollIntervalMs: FileWatcher.SSH_POLL_INTERVAL_MS, + createWatcher: ({ onError, onClose, isCurrent }) => { + const watcher = fs.watch(this.todosPath, (eventType, filename) => { + if (filename && isCurrent()) { + this.handleTodosChange(eventType, filename); + } + }); + watcher.on('error', (error) => this.handleNativeWatcherError('todos', error, onError)); + watcher.on('close', onClose); + logger.info(`FileWatcher: Started watching todos at ${this.todosPath}`); + return watcher; + }, + collectPollSnapshot: () => this.collectTodosPollSnapshot(), + emitPolledChange: (eventType, relativePath) => + this.handleTodosChange(eventType, relativePath), + isOwnerActive: () => this.isWatching, + isWatchLimitError: (error) => this.isWatchLimitError(error), + requestRetry: () => this.scheduleWatcherRetry(), + onWatcherStartError: (error) => logger.error('Error starting todos watcher:', error), + onWatcherError: (error) => logger.error('FileWatcher: todos watcher error:', error), + onPollingError: (error) => logger.error('FileWatcher: Error during todos polling:', error), + }), + teams: new CrossPlatformFileChangeSource({ + name: 'teams', + pollIntervalMs: TEAMS_POLL_INTERVAL_MS, + createWatcher: ({ onError, isCurrent }) => + this.createTeamTaskRegistry('teams', this.teamsPath, onError, isCurrent), + collectPollSnapshot: () => this.collectTeamsPollSnapshot(), + emitPolledChange: (eventType, relativePath) => + this.handleTeamsChange(eventType, relativePath), + isOwnerActive: () => this.isWatching, + isWatchLimitError: (error) => this.isWatchLimitError(error), + requestRetry: () => this.scheduleWatcherRetry(), + onWatcherStartError: (error) => logger.error('Error starting teams watcher:', error), + onWatcherError: (error) => logger.error('FileWatcher: teams watcher error:', error), + onPollingError: (error) => logger.error('FileWatcher: Error during teams polling:', error), + }), + tasks: new CrossPlatformFileChangeSource({ + name: 'tasks', + pollIntervalMs: TASKS_POLL_INTERVAL_MS, + createWatcher: ({ onError, isCurrent }) => + this.createTeamTaskRegistry('tasks', this.tasksPath, onError, isCurrent), + collectPollSnapshot: () => this.collectTasksPollSnapshot(), + emitPolledChange: (eventType, relativePath) => + this.handleTasksChange(eventType, relativePath), + isOwnerActive: () => this.isWatching, + isWatchLimitError: (error) => this.isWatchLimitError(error), + requestRetry: () => this.scheduleWatcherRetry(), + onWatcherStartError: (error) => logger.error('Error starting tasks watcher:', error), + onWatcherError: (error) => logger.error('FileWatcher: tasks watcher error:', error), + onPollingError: (error) => logger.error('FileWatcher: Error during tasks polling:', error), + }), + }; } /** @@ -210,42 +277,10 @@ export class FileWatcher extends EventEmitter { this.retryTimer = null; } - if (this.projectsWatcher) { - this.projectsWatcher.close(); - this.projectsWatcher = null; + for (const source of Object.values(this.changeSources)) { + source.stop(); } - if (this.todosWatcher) { - this.todosWatcher.close(); - this.todosWatcher = null; - } - - if (this.teamsWatcher) { - void this.teamsWatcher.close(); - this.teamsWatcher = null; - } - - if (this.tasksWatcher) { - void this.tasksWatcher.close(); - this.tasksWatcher = null; - } - - if (this.teamsPollingTimer) { - clearInterval(this.teamsPollingTimer); - this.teamsPollingTimer = null; - } - - if (this.tasksPollingTimer) { - clearInterval(this.tasksPollingTimer); - this.tasksPollingTimer = null; - } - this.teamsPollingInProgress = false; - this.tasksPollingInProgress = false; - this.teamsPollingPrimed = false; - this.tasksPollingPrimed = false; - this.polledTeamFiles.clear(); - this.polledTaskFiles.clear(); - // Clear any pending debounce timers for (const timer of this.debounceTimers.values()) { clearTimeout(timer); @@ -258,15 +293,6 @@ export class FileWatcher extends EventEmitter { this.catchUpTimer = null; } - // Clear SSH polling timer - if (this.pollingTimer) { - clearInterval(this.pollingTimer); - this.pollingTimer = null; - } - this.pollingInProgress = false; - this.sshPollPrimed = false; - this.polledFileSizes.clear(); - // Clear error detection tracking this.lastProcessedLineCount.clear(); this.lastProcessedSize.clear(); @@ -314,35 +340,18 @@ export class FileWatcher extends EventEmitter { this.catchUpTimer = null; } - // 5. Clear polling timer (stop() already handles this) - if (this.pollingTimer) { - clearInterval(this.pollingTimer); - this.pollingTimer = null; - } - if (this.teamsPollingTimer) { - clearInterval(this.teamsPollingTimer); - this.teamsPollingTimer = null; - } - if (this.tasksPollingTimer) { - clearInterval(this.tasksPollingTimer); - this.tasksPollingTimer = null; - } - - // 6. Clear all tracking maps (stop() already handles most of these) + // 5. Clear all tracking maps (stop() already handles most of these) this.lastProcessedLineCount.clear(); this.lastProcessedSize.clear(); this.activeSessionFiles.clear(); this.catchUpStatFailures.clear(); - this.polledFileSizes.clear(); - this.polledTeamFiles.clear(); - this.polledTaskFiles.clear(); this.processingInProgress.clear(); this.pendingReprocess.clear(); - // 7. Remove all EventEmitter listeners (MUST be last) + // 6. Remove all EventEmitter listeners (MUST be last) this.removeAllListeners(); - // 8. Mark as disposed + // 7. Mark as disposed this.disposed = true; logger.info('FileWatcher disposed'); @@ -352,164 +361,78 @@ export class FileWatcher extends EventEmitter { * Starts the projects directory watcher. */ private async startProjectsWatcher(): Promise { - if (this.projectsWatcher) { + if (this.changeSources.projects.isActive) { return; } - try { - if (!(await this.pathExists(this.projectsPath))) { - logger.warn(`FileWatcher: Projects directory does not exist: ${this.projectsPath}`); - this.scheduleWatcherRetry(); - return; - } - - // Guard: stop() may have been called while awaiting pathExists - if (!this.isWatching) return; - - this.projectsWatcher = fs.watch( - this.projectsPath, - { recursive: true }, - (eventType, filename) => { - if (filename) { - this.handleProjectsChange(eventType, filename); - } - } - ); - this.attachWatcherRecovery(this.projectsWatcher, 'projects'); - - logger.info(`FileWatcher: Started watching projects at ${this.projectsPath}`); - } catch (error) { - logger.error('Error starting projects watcher:', error); - this.projectsWatcher = null; + if (!(await this.pathExists(this.projectsPath))) { + logger.warn(`FileWatcher: Projects directory does not exist: ${this.projectsPath}`); this.scheduleWatcherRetry(); + return; } + + // Guard: stop() may have been called while awaiting pathExists + if (!this.isWatching) return; + + await this.changeSources.projects.start(); } /** * Starts the todos directory watcher. */ private async startTodosWatcher(): Promise { - if (this.todosWatcher) { + if (this.changeSources.todos.isActive) { return; } - try { - if (!(await this.pathExists(this.todosPath))) { - // Todos directory may not exist yet - that's OK - this.scheduleWatcherRetry(); - return; - } - - // Guard: stop() may have been called while awaiting pathExists - if (!this.isWatching) return; - - this.todosWatcher = fs.watch(this.todosPath, (eventType, filename) => { - if (filename) { - this.handleTodosChange(eventType, filename); - } - }); - this.attachWatcherRecovery(this.todosWatcher, 'todos'); - - logger.info(`FileWatcher: Started watching todos at ${this.todosPath}`); - } catch (error) { - logger.error('Error starting todos watcher:', error); - this.todosWatcher = null; + if (!(await this.pathExists(this.todosPath))) { + // Todos directory may not exist yet - that's OK this.scheduleWatcherRetry(); + return; } + + // Guard: stop() may have been called while awaiting pathExists + if (!this.isWatching) return; + + await this.changeSources.todos.start(); } /** * Starts the teams directory watcher. */ private async startTeamsWatcher(): Promise { - if (this.teamsWatcher || this.teamsPollingTimer) { + if (this.changeSources.teams.isActive) { return; } - try { - if (!(await this.pathExists(this.teamsPath))) { - this.scheduleWatcherRetry(); - return; - } - - // Guard: stop() may have been called while awaiting pathExists - if (!this.isWatching) return; - - // Teams deliberately use TeamTaskWatchRegistry instead of recursive fs.watch. - // Linux recursive watching expands across the whole team runtime tree and can - // hit EMFILE/ENOSPC. The registry keeps the watched surface aligned with - // processTeamsChange(): team root JSON files plus inbox JSON files only. - const registry = new TeamTaskWatchRegistry({ - kind: 'teams', - rootPath: this.teamsPath, - onChange: (eventType, filename) => this.handleTeamsChange(eventType, filename), - onError: (error) => this.handleTeamTaskWatcherError('teams', error), - }); - this.teamsWatcher = registry; - await registry.start(); - if (!this.isWatching || this.teamsWatcher !== registry) { - void registry.close(); - return; - } - logger.info(`FileWatcher: Started watching teams at ${this.teamsPath}`); - } catch (error) { - const registry = this.teamsWatcher; - if (this.startPollingFallbackForRecursiveWatchLimit('teams', error, registry ?? undefined)) { - return; - } - logger.error('Error starting teams watcher:', error); - if (this.teamsWatcher) { - void this.teamsWatcher.close(); - } - this.teamsWatcher = null; + if (!(await this.pathExists(this.teamsPath))) { this.scheduleWatcherRetry(); + return; } + + // Guard: stop() may have been called while awaiting pathExists + if (!this.isWatching) return; + + await this.changeSources.teams.start(); } /** * Starts the tasks directory watcher. */ private async startTasksWatcher(): Promise { - if (this.tasksWatcher || this.tasksPollingTimer) { + if (this.changeSources.tasks.isActive) { return; } - try { - if (!(await this.pathExists(this.tasksPath))) { - this.scheduleWatcherRetry(); - return; - } - - // Guard: stop() may have been called while awaiting pathExists - if (!this.isWatching) return; - - // Tasks share the same shallow registry rule as teams. Keep polling out of - // the normal path here; it is only the known-error fallback below. - const registry = new TeamTaskWatchRegistry({ - kind: 'tasks', - rootPath: this.tasksPath, - onChange: (eventType, filename) => this.handleTasksChange(eventType, filename), - onError: (error) => this.handleTeamTaskWatcherError('tasks', error), - }); - this.tasksWatcher = registry; - await registry.start(); - if (!this.isWatching || this.tasksWatcher !== registry) { - void registry.close(); - return; - } - logger.info(`FileWatcher: Started watching tasks at ${this.tasksPath}`); - } catch (error) { - const registry = this.tasksWatcher; - if (this.startPollingFallbackForRecursiveWatchLimit('tasks', error, registry ?? undefined)) { - return; - } - logger.error('Error starting tasks watcher:', error); - if (this.tasksWatcher) { - void this.tasksWatcher.close(); - } - this.tasksWatcher = null; + if (!(await this.pathExists(this.tasksPath))) { this.scheduleWatcherRetry(); + return; } + + // Guard: stop() may have been called while awaiting pathExists + if (!this.isWatching) return; + + await this.changeSources.tasks.start(); } /** @@ -550,8 +473,8 @@ export class FileWatcher extends EventEmitter { } if ( - !this.projectsWatcher || - !this.todosWatcher || + !this.isWatcherOrPollingActive('projects') || + !this.isWatcherOrPollingActive('todos') || !this.isWatcherOrPollingActive('teams') || !this.isWatcherOrPollingActive('tasks') ) { @@ -570,115 +493,56 @@ export class FileWatcher extends EventEmitter { }, WATCHER_RETRY_MS); } - private attachWatcherRecovery(watcher: fs.FSWatcher, watcherType: RecursiveWatcherType): void { - watcher.on('error', (error: NodeJS.ErrnoException) => { - // Ephemeral .lock files cause harmless ENOENT when the recursive watcher - // tries to scandir a path that was already deleted. Log as debug and skip - // the teardown/retry - the watcher is still healthy. - if (error.code === 'ENOENT' && error.path?.endsWith('.lock')) { - logger.debug(`FileWatcher: ${watcherType} ignoring transient ENOENT on lock file`); - return; - } - if (this.startPollingFallbackForRecursiveWatchLimit(watcherType, error, watcher)) { - return; - } - logger.error(`FileWatcher: ${watcherType} watcher error:`, error); - if (watcherType === 'projects') { - this.projectsWatcher = null; - } else if (watcherType === 'todos') { - this.todosWatcher = null; - } else if (watcherType === 'teams') { - this.teamsWatcher = null; - } else { - this.tasksWatcher = null; - } - if (!this.isWatcherOrPollingActive(watcherType)) { - this.scheduleWatcherRetry(); - } - }); - - watcher.on('close', () => { - if (!this.isWatching) { - return; - } - if (watcherType === 'projects') { - this.projectsWatcher = null; - } else if (watcherType === 'todos') { - this.todosWatcher = null; - } else if (watcherType === 'teams') { - this.teamsWatcher = null; - } else { - this.tasksWatcher = null; - } - if (!this.isWatcherOrPollingActive(watcherType)) { - this.scheduleWatcherRetry(); - } - }); - } - - private handleTeamTaskWatcherError(watcherType: TeamTaskWatchKind, error: unknown): void { - const watcher = watcherType === 'teams' ? this.teamsWatcher : this.tasksWatcher; - if (this.startPollingFallbackForRecursiveWatchLimit(watcherType, error, watcher ?? undefined)) { + private handleNativeWatcherError( + watcherType: FileWatcherSourceType, + error: NodeJS.ErrnoException, + onError: (error: unknown) => void + ): void { + // Ephemeral .lock files cause harmless ENOENT when the recursive watcher + // tries to scandir a path that was already deleted. Log as debug and skip + // teardown/retry - the watcher is still healthy. + if (error.code === 'ENOENT' && error.path?.endsWith('.lock')) { + logger.debug(`FileWatcher: ${watcherType} ignoring transient ENOENT on lock file`); return; } - - logger.error(`FileWatcher: ${watcherType} watcher error:`, error); - if (watcherType === 'teams') { - this.teamsWatcher = null; - } else { - this.tasksWatcher = null; - } - void watcher?.close(); - - if (!this.isWatcherOrPollingActive(watcherType)) { - this.scheduleWatcherRetry(); - } + onError(error); } - private isWatcherOrPollingActive(watcherType: RecursiveWatcherType): boolean { - if (watcherType === 'projects') { - return this.projectsWatcher !== null; - } - if (watcherType === 'todos') { - return this.todosWatcher !== null; - } - if (watcherType === 'teams') { - return this.teamsWatcher !== null || this.teamsPollingTimer !== null; - } - return this.tasksWatcher !== null || this.tasksPollingTimer !== null; - } - - private startPollingFallbackForRecursiveWatchLimit( - watcherType: RecursiveWatcherType, - error: unknown, - watcher?: CloseableWatcher - ): boolean { - // Polling fallback is intentionally narrow. Projects/todos keep their native - // watcher retry behavior, while teams/tasks can switch to scoped polling only - // after known OS watcher-limit or platform errors from Chokidar/fs.watch. - if ((watcherType !== 'teams' && watcherType !== 'tasks') || !this.isWatchLimitError(error)) { - return false; - } - - const err = error as NodeJS.ErrnoException; - logger.warn( - `FileWatcher: ${watcherType} watcher hit ${err.code ?? 'a platform limit'}; falling back to polling` - ); - - if (watcherType === 'teams') { - this.teamsWatcher = null; - } else { - this.tasksWatcher = null; - } - - this.startTeamTaskPolling(watcherType); + private async createTeamTaskRegistry( + watcherType: TeamTaskWatchKind, + rootPath: string, + onError: (error: unknown) => void, + isCurrent: () => boolean + ): Promise { + const registry = new TeamTaskWatchRegistry({ + kind: watcherType, + rootPath, + onChange: (eventType, filename) => { + if (!isCurrent()) { + return; + } + if (watcherType === 'teams') { + this.handleTeamsChange(eventType, filename); + } else { + this.handleTasksChange(eventType, filename); + } + }, + onError, + }); try { - void watcher?.close(); - } catch (closeError) { - logger.debug(`FileWatcher: ${watcherType} watcher close after fallback failed`, closeError); + await registry.start(); + } catch (error) { + await registry.close(); + throw error; } - return true; + + logger.info(`FileWatcher: Started watching ${watcherType} at ${rootPath}`); + return registry; + } + + private isWatcherOrPollingActive(watcherType: FileWatcherSourceType): boolean { + return this.changeSources[watcherType].isActive; } private isWatchLimitError(error: unknown): boolean { @@ -691,124 +555,14 @@ export class FileWatcher extends EventEmitter { ); } - private startTeamTaskPolling(watcherType: PollingWatcherType): void { - const existingTimer = watcherType === 'teams' ? this.teamsPollingTimer : this.tasksPollingTimer; - if (existingTimer) { - return; - } - - const runPoll = (): void => { - if (!this.isWatching) { - return; - } - if (watcherType === 'teams') { - if (this.teamsPollingInProgress) { - return; - } - this.teamsPollingInProgress = true; - this.pollTeamsForChanges() - .catch((err) => { - logger.error('FileWatcher: Error during teams polling:', err); - }) - .finally(() => { - this.teamsPollingInProgress = false; - }); - return; - } - - if (this.tasksPollingInProgress) { - return; - } - this.tasksPollingInProgress = true; - this.pollTasksForChanges() - .catch((err) => { - logger.error('FileWatcher: Error during tasks polling:', err); - }) - .finally(() => { - this.tasksPollingInProgress = false; - }); - }; - - runPoll(); - // This is fallback content polling after watcher failure, not the default mode. - // Keep intervals conservative and scoped to the same shallow artifacts as the registry. - const timer = setInterval(runPoll, this.getTeamTaskPollIntervalMs(watcherType)); - timer.unref(); - - if (watcherType === 'teams') { - this.teamsPollingTimer = timer; - } else { - this.tasksPollingTimer = timer; - } - } - - private getTeamTaskPollIntervalMs(watcherType: PollingWatcherType): number { - return watcherType === 'teams' ? TEAMS_POLL_INTERVAL_MS : TASKS_POLL_INTERVAL_MS; - } - - private async pollTeamsForChanges(): Promise { - const nextSnapshot = await this.collectTeamsPollSnapshot(); - if (!this.isWatching) { - return; - } - this.emitPolledChanges( - 'teams', - this.polledTeamFiles, - nextSnapshot, - this.teamsPollingPrimed, - (eventType, relativePath) => this.handleTeamsChange(eventType, relativePath) - ); - this.polledTeamFiles = nextSnapshot; - this.teamsPollingPrimed = true; - } - - private async pollTasksForChanges(): Promise { - const nextSnapshot = await this.collectTasksPollSnapshot(); - if (!this.isWatching) { - return; - } - this.emitPolledChanges( - 'tasks', - this.polledTaskFiles, - nextSnapshot, - this.tasksPollingPrimed, - (eventType, relativePath) => this.handleTasksChange(eventType, relativePath) - ); - this.polledTaskFiles = nextSnapshot; - this.tasksPollingPrimed = true; - } - - private emitPolledChanges( - watcherType: PollingWatcherType, - previousSnapshot: Map, - nextSnapshot: Map, - isPrimed: boolean, - emitChange: (eventType: string, relativePath: string) => void - ): void { - if (!isPrimed) { - logger.info(`FileWatcher: ${watcherType} polling baseline captured`); - return; - } - - for (const [relativePath, fingerprint] of nextSnapshot) { - const previous = previousSnapshot.get(relativePath); - if (previous === undefined) { - emitChange('rename', relativePath); - } else if (previous !== fingerprint) { - emitChange('change', relativePath); - } - } - - for (const relativePath of previousSnapshot.keys()) { - if (!nextSnapshot.has(relativePath)) { - emitChange('rename', relativePath); - } - } + private isNotFoundError(error: unknown): boolean { + const code = (error as { code?: unknown } | undefined)?.code; + return code === 'ENOENT' || code === '2' || code === 2; } private async collectTeamsPollSnapshot(): Promise> { const snapshot = new Map(); - const teamEntries = await this.safeReadDir(this.teamsPath); + const teamEntries = await this.readSnapshotDir(this.teamsPath); // Fallback polling mirrors TeamTaskWatchRegistry. Do not recurse into members, // runtime, .opencode-runtime, logs, or other deep trees from here. @@ -819,15 +573,20 @@ export class FileWatcher extends EventEmitter { const teamName = teamEntry.name; const teamPath = path.join(this.teamsPath, teamName); - await this.collectPolledDirectoryFiles(snapshot, teamPath, teamName, (fileName) => - fileName.endsWith('.json') + await this.collectPolledDirectoryFiles( + snapshot, + teamPath, + teamName, + (fileName) => fileName.endsWith('.json'), + { missingAsEmpty: false } ); await this.collectPolledDirectoryFiles( snapshot, path.join(teamPath, 'inboxes'), `${teamName}/inboxes`, - (fileName) => fileName.endsWith('.json') + (fileName) => fileName.endsWith('.json'), + { missingAsEmpty: true } ); } @@ -836,7 +595,7 @@ export class FileWatcher extends EventEmitter { private async collectTasksPollSnapshot(): Promise> { const snapshot = new Map(); - const teamEntries = await this.safeReadDir(this.tasksPath); + const teamEntries = await this.readSnapshotDir(this.tasksPath); // Keep task fallback scoped to tasks//*.json. Hidden files and nested // runtime directories are intentionally outside the public team-change surface. @@ -850,7 +609,8 @@ export class FileWatcher extends EventEmitter { snapshot, path.join(this.tasksPath, teamName), teamName, - (fileName) => !fileName.startsWith('.') && fileName.endsWith('.json') + (fileName) => !fileName.startsWith('.') && fileName.endsWith('.json'), + { missingAsEmpty: false } ); } @@ -861,9 +621,10 @@ export class FileWatcher extends EventEmitter { snapshot: Map, dirPath: string, relativeRoot: string, - shouldInclude: (fileName: string) => boolean + shouldInclude: (fileName: string) => boolean, + options: { missingAsEmpty?: boolean } = {} ): Promise { - const entries = await this.safeReadDir(dirPath); + const entries = await this.readSnapshotDir(dirPath, options); for (const entry of entries) { if (!entry.isFile() || !shouldInclude(entry.name)) { continue; @@ -881,30 +642,27 @@ export class FileWatcher extends EventEmitter { absolutePath: string, relativePath: string ): Promise { - try { - const stats = await fsp.stat(absolutePath); - if (!stats.isFile()) { - return; - } - snapshot.set( - relativePath, - `${stats.dev}:${stats.ino}:${stats.mtimeMs}:${stats.ctimeMs}:${stats.size}` - ); - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { - logger.debug(`FileWatcher: Unable to stat polled file ${absolutePath}`, error); - } + const stats = await fsp.stat(absolutePath); + if (!stats.isFile()) { + return; } + snapshot.set( + relativePath, + `${stats.dev}:${stats.ino}:${stats.mtimeMs}:${stats.ctimeMs}:${stats.size}` + ); } - private async safeReadDir(dirPath: string): Promise { + private async readSnapshotDir( + dirPath: string, + options: { missingAsEmpty?: boolean } = {} + ): Promise { try { return await fsp.readdir(dirPath, { withFileTypes: true }); } catch (error) { - if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { - logger.debug(`FileWatcher: Unable to read polled directory ${dirPath}`, error); + if (this.isNotFoundError(error) && options.missingAsEmpty) { + return []; } - return []; + throw error; } } @@ -917,99 +675,107 @@ export class FileWatcher extends EventEmitter { * Polls the projects directory for file changes instead of using fs.watch(). */ private startPollingMode(): void { - if (this.pollingTimer) return; - logger.info('FileWatcher: Starting SSH polling mode'); - const runPoll = (): void => { - if (this.pollingInProgress) { - return; - } - - this.pollingInProgress = true; - this.pollForChanges() - .catch((err) => { - logger.error('Error during SSH polling:', err); - }) - .finally(() => { - this.pollingInProgress = false; - }); - }; - - // Prime immediately so newly created sessions appear without waiting a full interval. - runPoll(); - this.pollingTimer = setInterval(runPoll, FileWatcher.SSH_POLL_INTERVAL_MS); - // Polling is a background task and should not keep the process alive. - this.pollingTimer.unref(); + this.changeSources.projects.startPolling(); } - /** - * Polls the projects directory for file changes in SSH mode. - */ - private async pollForChanges(): Promise { - try { - const seenFiles = new Set(); - const projectDirs = await this.fsProvider.readdir(this.projectsPath); - for (const dir of projectDirs) { - if (!dir.isDirectory()) continue; + private async collectProjectsPollSnapshot(): Promise> { + const snapshot = new Map(); + const projectDirs = await this.readProviderSnapshotDir(this.projectsPath); - const projectPath = path.join(this.projectsPath, dir.name); - let entries: FsDirent[]; - try { - entries = await this.fsProvider.readdir(projectPath); - } catch { + for (const projectDir of projectDirs) { + if (!projectDir.isDirectory()) { + continue; + } + + const projectPath = path.join(this.projectsPath, projectDir.name); + const entries = await this.readProviderSnapshotDir(projectPath); + for (const entry of entries) { + const entryPath = path.join(projectPath, entry.name); + if (entry.isFile() && entry.name.endsWith('.jsonl')) { + await this.addProviderPolledFile( + snapshot, + entryPath, + path.join(projectDir.name, entry.name), + entry + ); continue; } - for (const entry of entries) { - if (!entry.isFile() || !entry.name.endsWith('.jsonl')) continue; + if (!entry.isDirectory()) { + continue; + } - const fullPath = path.join(projectPath, entry.name); - seenFiles.add(fullPath); - try { - const observedSize = - typeof entry.size === 'number' - ? entry.size - : (await this.fsProvider.stat(fullPath)).size; - const lastSize = this.polledFileSizes.get(fullPath); - const relativePath = path.join(dir.name, entry.name); - - if (lastSize === undefined) { - // First time seeing this file: after baseline, emit add. - this.polledFileSizes.set(fullPath, observedSize); - if (this.sshPollPrimed) { - this.handleProjectsChange('rename', relativePath); - } - } else if (observedSize !== lastSize) { - // File changed - this.polledFileSizes.set(fullPath, observedSize); - this.handleProjectsChange('change', relativePath); - } - } catch { + const subagentsPath = path.join(entryPath, 'subagents'); + const subagentEntries = await this.readProviderSnapshotDir(subagentsPath, { + missingAsEmpty: true, + }); + for (const subagentEntry of subagentEntries) { + if (!subagentEntry.isFile() || !subagentEntry.name.endsWith('.jsonl')) { continue; } + await this.addProviderPolledFile( + snapshot, + path.join(subagentsPath, subagentEntry.name), + path.join(projectDir.name, entry.name, 'subagents', subagentEntry.name), + subagentEntry + ); } } + } - // Detect deleted files after baseline is established. - if (this.sshPollPrimed) { - const removedFiles: string[] = []; - for (const trackedPath of this.polledFileSizes.keys()) { - if (!seenFiles.has(trackedPath)) { - removedFiles.push(trackedPath); - } - } - for (const removedPath of removedFiles) { - this.polledFileSizes.delete(removedPath); - const relativePath = path.relative(this.projectsPath, removedPath); - if (relativePath && !relativePath.startsWith('..')) { - this.handleProjectsChange('rename', relativePath); - } - } - } else { - this.sshPollPrimed = true; + return snapshot; + } + + private async collectTodosPollSnapshot(): Promise> { + const snapshot = new Map(); + const entries = await this.readProviderSnapshotDir(this.todosPath); + + for (const entry of entries) { + if (!entry.isFile() || !entry.name.endsWith('.json')) { + continue; } - } catch (err) { - logger.error('Error polling for changes:', err); + await this.addProviderPolledFile( + snapshot, + path.join(this.todosPath, entry.name), + entry.name, + entry + ); + } + + return snapshot; + } + + private async addProviderPolledFile( + snapshot: Map, + absolutePath: string, + relativePath: string, + entry?: FsDirent + ): Promise { + const stats = + typeof entry?.size === 'number' && typeof entry.mtimeMs === 'number' + ? entry + : await this.fsProvider.stat(absolutePath); + if (!stats.isFile()) { + return; + } + snapshot.set(relativePath, `${stats.mtimeMs ?? 0}:${stats.size}`); + } + + private async readProviderSnapshotDir( + dirPath: string, + options: { missingAsEmpty?: boolean } = {} + ): Promise { + try { + if (this.fsProvider instanceof LocalFileSystemProvider) { + return await fsp.readdir(dirPath, { withFileTypes: true }); + } + return await this.fsProvider.readdir(dirPath); + } catch (error) { + if (this.isNotFoundError(error) && options.missingAsEmpty) { + return []; + } + throw error; } } diff --git a/src/main/services/infrastructure/TeamTaskWatchRegistry.ts b/src/main/services/infrastructure/TeamTaskWatchRegistry.ts index 2c8437b9..334279b1 100644 --- a/src/main/services/infrastructure/TeamTaskWatchRegistry.ts +++ b/src/main/services/infrastructure/TeamTaskWatchRegistry.ts @@ -22,6 +22,7 @@ const RECONCILE_INTERVAL_MS = 30_000; // If a new team artifact should produce TeamChangeEvent, add it here too. const TEAM_ROOT_FILES = new Set([ 'config.json', + 'kanban-state.json', 'processes.json', 'sentMessages.json', 'team.meta.json', @@ -62,7 +63,7 @@ export class TeamTaskWatchRegistry { if (this.closed) { return; } - await this.reconcileTargets(); + await this.reconcileTargets({ rethrowErrors: true }); if (this.closed || this.reconcileTimer) { return; } @@ -89,11 +90,11 @@ export class TeamTaskWatchRegistry { this.targets.clear(); this.targetKey = ''; if (watcher) { - await watcher.close().catch(() => undefined); + await this.closeWatcher(watcher); } } - private async reconcileTargets(): Promise { + private async reconcileTargets(options: { rethrowErrors?: boolean } = {}): Promise { if (this.closed) { return; } @@ -111,6 +112,9 @@ export class TeamTaskWatchRegistry { await this.rebuildWatcher(targets, nextKey, addedTargets); } } catch (error) { + if (options.rethrowErrors) { + throw error; + } if (!this.closed) { this.options.onError(error); } @@ -135,7 +139,7 @@ export class TeamTaskWatchRegistry { const previousWatcher = this.watcher; this.watcher = null; if (previousWatcher) { - await previousWatcher.close().catch(() => undefined); + await this.closeWatcher(previousWatcher); } if (this.closed || generation !== this.generation) { @@ -264,6 +268,15 @@ export class TeamTaskWatchRegistry { } } + private async closeWatcher(watcher: FSWatcher): Promise { + try { + await watcher.close(); + } catch { + // Best-effort cleanup only. Chokidar close can fail if the underlying + // watcher is already torn down during startup or limit-error recovery. + } + } + private async isDirectory(dirPath: string): Promise { try { return (await fsp.stat(dirPath)).isDirectory(); diff --git a/test/main/services/infrastructure/CrossPlatformFileChangeSource.test.ts b/test/main/services/infrastructure/CrossPlatformFileChangeSource.test.ts new file mode 100644 index 00000000..0ea2b0d1 --- /dev/null +++ b/test/main/services/infrastructure/CrossPlatformFileChangeSource.test.ts @@ -0,0 +1,598 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { + CrossPlatformFileChangeSource, + type WatcherLifecycle, +} from '../../../../src/main/services/infrastructure/CrossPlatformFileChangeSource'; + +vi.mock('@shared/utils/logger', () => ({ + createLogger: () => ({ + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }), +})); + +function createSource(options: { + active: () => boolean; + createWatcher?: ( + lifecycle: WatcherLifecycle + ) => Promise<{ close: () => void }> | { close: () => void }; + collectPollSnapshot?: () => Promise>; +}) { + return new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: options.createWatcher, + collectPollSnapshot: options.collectPollSnapshot ?? vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: options.active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry: vi.fn(), + }); +} + +describe('CrossPlatformFileChangeSource', () => { + it('coalesces concurrent watcher starts into one watcher', async () => { + let active = true; + let resolveWatcher: ((watcher: { close: () => void }) => void) | undefined; + const close = vi.fn(); + const createWatcher = vi.fn( + () => + new Promise<{ close: () => void }>((resolve) => { + resolveWatcher = resolve; + }) + ); + const source = createSource({ active: () => active, createWatcher }); + + const firstStart = source.start(); + const secondStart = source.start(); + expect(createWatcher).toHaveBeenCalledTimes(1); + + resolveWatcher?.({ close }); + await Promise.all([firstStart, secondStart]); + + expect(source.isActive).toBe(true); + source.stop(); + active = false; + }); + + it('ignores stale watcher close events after restart', async () => { + let active = true; + const lifecycles: WatcherLifecycle[] = []; + const createWatcher = vi.fn((lifecycle: WatcherLifecycle) => { + lifecycles.push(lifecycle); + return { close: vi.fn() }; + }); + const source = createSource({ active: () => active, createWatcher }); + + await source.start(); + source.stop(); + await source.start(); + + lifecycles[0].onClose(); + + expect(source.isActive).toBe(true); + source.stop(); + active = false; + }); + + it('marks old watcher lifecycles stale after restart', async () => { + let active = true; + const lifecycles: WatcherLifecycle[] = []; + const createWatcher = vi.fn((lifecycle: WatcherLifecycle) => { + lifecycles.push(lifecycle); + return { close: vi.fn() }; + }); + const source = createSource({ active: () => active, createWatcher }); + + await source.start(); + expect(lifecycles[0].isCurrent()).toBe(true); + + source.stop(); + expect(lifecycles[0].isCurrent()).toBe(false); + + await source.start(); + expect(lifecycles[0].isCurrent()).toBe(false); + expect(lifecycles[1].isCurrent()).toBe(true); + + source.stop(); + active = false; + }); + + it('does not keep a watcher that closes during startup', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const close = vi.fn(); + const requestRetry = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + lifecycle.onClose(); + return { close }; + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry, + }); + + await source.start(); + + expect(source.isActive).toBe(false); + expect(close).toHaveBeenCalled(); + expect(requestRetry).toHaveBeenCalledTimes(1); + active = false; + }); + + it('falls back to polling when a watcher reports a limit error during startup', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const close = vi.fn(); + const requestRetry = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + lifecycle.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' })); + return { close }; + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + }); + + await source.start(); + + expect(source.currentPollingTimer).not.toBeNull(); + expect(lifecycle?.isCurrent()).toBe(false); + expect(close).toHaveBeenCalled(); + expect(requestRetry).not.toHaveBeenCalled(); + source.stop(); + active = false; + }); + + it('falls back to polling when startup closes before throwing a limit error', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const requestRetry = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + lifecycle.onClose(); + throw Object.assign(new Error('too many open files'), { code: 'EMFILE' }); + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + }); + + await source.start(); + + expect(source.currentPollingTimer).not.toBeNull(); + expect(lifecycle?.isCurrent()).toBe(false); + expect(requestRetry).toHaveBeenCalledTimes(1); + source.stop(); + active = false; + }); + + it('does not retry when startup throws after reporting a limit error', async () => { + let active = true; + const requestRetry = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((lifecycle: WatcherLifecycle) => { + lifecycle.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' })); + throw Object.assign(new Error('startup failed after limit error'), { code: 'EMFILE' }); + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + }); + + await source.start(); + + expect(source.currentPollingTimer).not.toBeNull(); + expect(requestRetry).not.toHaveBeenCalled(); + source.stop(); + active = false; + }); + + it('retries without keeping a watcher that reports a non-limit error during startup', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const close = vi.fn(); + const requestRetry = vi.fn(); + const onWatcherError = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + lifecycle.onError(Object.assign(new Error('permission denied'), { code: 'EACCES' })); + return { close }; + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + onWatcherError, + }); + + await source.start(); + + expect(source.isActive).toBe(false); + expect(lifecycle?.isCurrent()).toBe(false); + expect(close).toHaveBeenCalled(); + expect(onWatcherError).toHaveBeenCalledTimes(1); + expect(requestRetry).toHaveBeenCalledTimes(1); + active = false; + }); + + it('does not retry twice when startup throws after reporting a non-limit error', async () => { + let active = true; + const requestRetry = vi.fn(); + const onWatcherError = vi.fn(); + const onWatcherStartError = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((lifecycle: WatcherLifecycle) => { + lifecycle.onError(Object.assign(new Error('permission denied'), { code: 'EACCES' })); + throw Object.assign(new Error('startup failed after permission error'), { code: 'EACCES' }); + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + onWatcherError, + onWatcherStartError, + }); + + await source.start(); + + expect(source.isActive).toBe(false); + expect(onWatcherError).toHaveBeenCalledTimes(1); + expect(onWatcherStartError).not.toHaveBeenCalled(); + expect(requestRetry).toHaveBeenCalledTimes(1); + active = false; + }); + + it('invalidates startup lifecycles after a direct non-limit start failure', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const requestRetry = vi.fn(); + const onWatcherError = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + throw Object.assign(new Error('permission denied'), { code: 'EACCES' }); + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + onWatcherError, + }); + + await source.start(); + + expect(source.isActive).toBe(false); + expect(lifecycle?.isCurrent()).toBe(false); + expect(requestRetry).toHaveBeenCalledTimes(1); + + lifecycle?.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' })); + lifecycle?.onClose(); + + expect(source.currentPollingTimer).toBeNull(); + expect(onWatcherError).not.toHaveBeenCalled(); + expect(requestRetry).toHaveBeenCalledTimes(1); + active = false; + }); + + it('does not request retry twice when a watcher closes after an error', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const close = vi.fn(); + const requestRetry = vi.fn(); + const onWatcherError = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + return { close }; + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry, + onWatcherError, + }); + + await source.start(); + lifecycle?.onError(Object.assign(new Error('permission denied'), { code: 'EACCES' })); + lifecycle?.onClose(); + + expect(source.isActive).toBe(false); + expect(close).toHaveBeenCalledTimes(1); + expect(onWatcherError).toHaveBeenCalledTimes(1); + expect(requestRetry).toHaveBeenCalledTimes(1); + active = false; + }); + + it('falls back to polling when a close is followed by a limit error', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const requestRetry = vi.fn(); + const onWatcherError = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + return { close: vi.fn() }; + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE', + requestRetry, + onWatcherError, + }); + + await source.start(); + lifecycle?.onClose(); + lifecycle?.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' })); + + expect(source.currentPollingTimer).not.toBeNull(); + expect(onWatcherError).not.toHaveBeenCalled(); + expect(requestRetry).toHaveBeenCalledTimes(1); + source.stop(); + active = false; + }); + + it('does not request retry when switching an active watcher to polling', async () => { + let active = true; + let lifecycle: WatcherLifecycle | undefined; + const requestRetry = vi.fn(); + const close = vi.fn(() => lifecycle?.onClose()); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => { + lifecycle = nextLifecycle; + return { close }; + }), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry, + }); + + await source.start(); + source.startPolling(); + + expect(source.currentPollingTimer).not.toBeNull(); + expect(lifecycle?.isCurrent()).toBe(false); + expect(close).toHaveBeenCalled(); + expect(requestRetry).not.toHaveBeenCalled(); + source.stop(); + active = false; + }); + + it('closes a late watcher when polling starts during watcher startup', async () => { + let active = true; + let resolveWatcher: ((watcher: { close: () => void }) => void) | undefined; + const close = vi.fn(); + const lifecycles: WatcherLifecycle[] = []; + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn( + (lifecycle: WatcherLifecycle) => + new Promise<{ close: () => void }>((resolve) => { + lifecycles.push(lifecycle); + resolveWatcher = resolve; + }) + ), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry: vi.fn(), + }); + + const start = source.start(); + expect(lifecycles).toHaveLength(1); + expect(lifecycles[0].isCurrent()).toBe(true); + + source.startPolling(); + expect(source.currentPollingTimer).not.toBeNull(); + expect(lifecycles[0].isCurrent()).toBe(false); + + resolveWatcher?.({ close }); + await start; + + expect(close).toHaveBeenCalled(); + source.stop(); + active = false; + }); + + it('closes a stale pending watcher after stop and restart', async () => { + let active = true; + const resolvers: Array<(watcher: { close: () => void }) => void> = []; + const closeOld = vi.fn(); + const closeCurrent = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn( + () => + new Promise<{ close: () => void }>((resolve) => { + resolvers.push(resolve); + }) + ), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry: vi.fn(), + }); + + const firstStart = source.start(); + source.stop(); + const secondStart = source.start(); + + resolvers[1]?.({ close: closeCurrent }); + await secondStart; + expect(source.isActive).toBe(true); + + resolvers[0]?.({ close: closeOld }); + await firstStart; + + expect(closeOld).toHaveBeenCalledTimes(1); + expect(closeCurrent).not.toHaveBeenCalled(); + expect(source.isActive).toBe(true); + source.stop(); + active = false; + }); + + it('swallows synchronous watcher close failures during stale startup cleanup', async () => { + let active = true; + let resolveWatcher: ((watcher: { close: () => void }) => void) | undefined; + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + createWatcher: vi.fn( + () => + new Promise<{ close: () => void }>((resolve) => { + resolveWatcher = resolve; + }) + ), + collectPollSnapshot: vi.fn().mockResolvedValue(new Map()), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry: vi.fn(), + }); + + const start = source.start(); + source.stop(); + resolveWatcher?.({ + close: () => { + throw new Error('close failed'); + }, + }); + + await expect(start).resolves.toBeUndefined(); + expect(source.isActive).toBe(false); + active = false; + }); + + it('ignores stale in-flight polling snapshots after stop and restart', async () => { + let active = true; + const snapshots: Array<() => void> = []; + const emitted: string[] = []; + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + collectPollSnapshot: () => + new Promise>((resolve) => { + snapshots.push(() => resolve(new Map([['old.json', '1']]))); + }), + emitPolledChange: (_eventType, relativePath) => emitted.push(relativePath), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry: vi.fn(), + }); + + source.startPolling(); + expect(snapshots).toHaveLength(1); + source.stop(); + source.startPolling(); + expect(snapshots).toHaveLength(2); + + snapshots[0](); + await Promise.resolve(); + snapshots[1](); + await Promise.resolve(); + + expect(emitted).toEqual([]); + source.stop(); + active = false; + }); + + it('ignores stale in-flight polling errors after stop', async () => { + let active = true; + const onPollingError = vi.fn(); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + collectPollSnapshot: vi.fn().mockRejectedValue(new Error('old polling failure')), + emitPolledChange: vi.fn(), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry: vi.fn(), + onPollingError, + }); + + const poll = source.pollOnce(); + source.stop(); + active = false; + await poll; + + expect(onPollingError).not.toHaveBeenCalled(); + }); + + it('keeps the previous polling snapshot when a poll fails', async () => { + let active = true; + const emitted: Array<[string, string]> = []; + const onPollingError = vi.fn(); + const collectPollSnapshot = vi + .fn() + .mockResolvedValueOnce(new Map([['session.jsonl', '1']])) + .mockRejectedValueOnce(new Error('transient polling failure')) + .mockResolvedValueOnce(new Map([['session.jsonl', '2']])); + const source = new CrossPlatformFileChangeSource({ + name: 'test-source', + pollIntervalMs: 1000, + collectPollSnapshot, + emitPolledChange: (eventType, relativePath) => emitted.push([eventType, relativePath]), + isOwnerActive: () => active, + isWatchLimitError: () => false, + requestRetry: vi.fn(), + onPollingError, + }); + + await source.pollOnce(); + await source.pollOnce(); + await source.pollOnce(); + + expect(onPollingError).toHaveBeenCalledTimes(1); + expect(emitted).toEqual([['change', 'session.jsonl']]); + source.stop(); + active = false; + }); +}); diff --git a/test/main/services/infrastructure/FileWatcher.test.ts b/test/main/services/infrastructure/FileWatcher.test.ts index 5524df2b..497d953c 100644 --- a/test/main/services/infrastructure/FileWatcher.test.ts +++ b/test/main/services/infrastructure/FileWatcher.test.ts @@ -168,6 +168,61 @@ function createMockNotificationManager() { } as unknown as Parameters[0]; } +function createFsDirent( + name: string, + type: 'file' | 'directory', + metadata: { size?: number; mtimeMs?: number } = {} +) { + return { + name, + ...metadata, + isFile: () => type === 'file', + isDirectory: () => type === 'directory', + }; +} + +type TestChangeSourceName = 'projects' | 'todos' | 'teams' | 'tasks'; +interface TestChangeSourceState { + currentPollingTimer: NodeJS.Timeout | null; + isPollingPrimed: boolean; + pollOnce: () => Promise; +} + +function getChangeSource(watcher: FileWatcher, name: TestChangeSourceName): TestChangeSourceState { + return ( + watcher as unknown as { + changeSources: Record; + } + ).changeSources[name]; +} + +function getRetryTimer(watcher: FileWatcher): NodeJS.Timeout | null { + return (watcher as unknown as { retryTimer: NodeJS.Timeout | null }).retryTimer; +} + +function setWatcherActive(watcher: FileWatcher): void { + (watcher as unknown as { isWatching: boolean }).isWatching = true; +} + +type NativeWatchCallback = (eventType: string, filename: string) => void; + +function getNativeWatchCallback( + optionsOrListener: unknown, + maybeListener: unknown +): NativeWatchCallback | undefined { + return ( + typeof optionsOrListener === 'function' ? optionsOrListener : maybeListener + ) as NativeWatchCallback | undefined; +} + +function mockFsWatchImplementation(implementation: (...args: unknown[]) => FsType.FSWatcher): void { + ( + vi.mocked(fs.watch) as unknown as { + mockImplementation: (nextImplementation: (...args: unknown[]) => FsType.FSWatcher) => void; + } + ).mockImplementation(implementation); +} + /** Helper to write a valid JSONL line */ function jsonlLine(uuid: string, text: string): string { return ( @@ -256,6 +311,659 @@ describe('FileWatcher', () => { watcher.stop(); }); + it('ignores stale native watcher callbacks after stop and restart', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-native-stale-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const projectDir = path.join(projectsDir, 'encoded-project'); + fs.mkdirSync(projectDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(tempDir, 'teams', 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tempDir, 'tasks', 'base-1'), { recursive: true }); + fs.writeFileSync(path.join(projectDir, 'old-session.jsonl'), jsonlLine('old', 'old'), 'utf8'); + fs.writeFileSync(path.join(projectDir, 'new-session.jsonl'), jsonlLine('new', 'new'), 'utf8'); + fs.writeFileSync(path.join(todosDir, 'old-todo.json'), '{"items":[]}', 'utf8'); + fs.writeFileSync(path.join(todosDir, 'new-todo.json'), '{"items":[]}', 'utf8'); + useRealAccess(); + + const projectCallbacks: Array<(eventType: string, filename: string) => void> = []; + const todoCallbacks: Array<(eventType: string, filename: string) => void> = []; + mockFsWatchImplementation((targetPath, optionsOrListener, maybeListener) => { + const listener = getNativeWatchCallback(optionsOrListener, maybeListener); + if (String(targetPath) === projectsDir && listener) { + projectCallbacks.push(listener); + } else if (String(targetPath) === todosDir && listener) { + todoCallbacks.push(listener); + } + return createFakeWatcher(); + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const fileEvents: unknown[] = []; + const todoEvents: unknown[] = []; + watcher.on('file-change', (event) => fileEvents.push(event)); + watcher.on('todo-change', (event) => todoEvents.push(event)); + + watcher.start(); + await vi.waitFor(() => { + expect(projectCallbacks).toHaveLength(1); + expect(todoCallbacks).toHaveLength(1); + }); + + watcher.stop(); + watcher.start(); + await vi.waitFor(() => { + expect(projectCallbacks).toHaveLength(2); + expect(todoCallbacks).toHaveLength(2); + }); + + projectCallbacks[0]('rename', 'encoded-project/old-session.jsonl'); + todoCallbacks[0]('rename', 'old-todo.json'); + projectCallbacks[1]('rename', 'encoded-project/new-session.jsonl'); + todoCallbacks[1]('rename', 'new-todo.json'); + await vi.advanceTimersByTimeAsync(100); + + await vi.waitFor(() => { + expect(fileEvents).toEqual([ + { + type: 'add', + path: path.join(projectDir, 'new-session.jsonl'), + projectId: 'encoded-project', + sessionId: 'new-session', + isSubagent: false, + }, + ]); + expect(todoEvents).toEqual([ + { + type: 'add', + path: path.join(todosDir, 'new-todo.json'), + sessionId: 'new-todo', + isSubagent: false, + }, + ]); + }); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('falls back to projects polling on EMFILE and still emits session and subagent changes', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-project-emfile-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const projectDir = path.join(projectsDir, 'encoded-project'); + const sessionPath = path.join(projectDir, 'session-1.jsonl'); + fs.mkdirSync(projectDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + fs.writeFileSync(sessionPath, jsonlLine('a1', 'baseline'), 'utf8'); + useRealAccess(); + + const projectWatcher = createFakeWatcher(); + const todoWatcher = createFakeWatcher(); + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation((targetPath) => { + if (String(targetPath) === projectsDir) return projectWatcher; + if (String(targetPath) === todosDir) return todoWatcher; + throw new Error(`Unexpected watch path: ${String(targetPath)}`); + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2)); + (projectWatcher as unknown as EventEmitter).emit( + 'error', + Object.assign(new Error('too many open files'), { code: 'EMFILE' }) + ); + await vi.advanceTimersByTimeAsync(0); + + const projectsSource = getChangeSource(watcher, 'projects'); + expect(projectsSource.currentPollingTimer).not.toBeNull(); + expect(getRetryTimer(watcher)).toBeNull(); + expect(projectWatcher.close).toHaveBeenCalled(); + await vi.waitFor(() => expect(projectsSource.isPollingPrimed).toBe(true)); + + fs.appendFileSync(sessionPath, jsonlLine('a2', 'changed'), 'utf8'); + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + await vi.waitFor(() => + expect(events).toContainEqual({ + type: 'change', + path: sessionPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }) + ); + + const subagentsDir = path.join(projectDir, 'session-1', 'subagents'); + const subagentPath = path.join(subagentsDir, 'agent-worker.jsonl'); + fs.mkdirSync(subagentsDir, { recursive: true }); + fs.writeFileSync(subagentPath, jsonlLine('s1', 'subagent'), 'utf8'); + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + await vi.waitFor(() => + expect(events).toContainEqual({ + type: 'add', + path: subagentPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: true, + }) + ); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('ignores stale native project callbacks after EMFILE fallback switches to polling', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-project-stale-fallback-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const projectDir = path.join(projectsDir, 'encoded-project'); + fs.mkdirSync(projectDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + fs.writeFileSync(path.join(projectDir, 'stale-session.jsonl'), jsonlLine('stale', 'stale'), 'utf8'); + useRealAccess(); + + const projectWatcher = createFakeWatcher(); + const todoWatcher = createFakeWatcher(); + const projectCallbacks: Array<(eventType: string, filename: string) => void> = []; + mockFsWatchImplementation((targetPath, optionsOrListener, maybeListener) => { + const listener = getNativeWatchCallback(optionsOrListener, maybeListener); + if (String(targetPath) === projectsDir) { + if (listener) { + projectCallbacks.push(listener); + } + return projectWatcher; + } + if (String(targetPath) === todosDir) { + return todoWatcher; + } + throw new Error(`Unexpected watch path: ${String(targetPath)}`); + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(projectCallbacks).toHaveLength(1)); + (projectWatcher as unknown as EventEmitter).emit( + 'error', + Object.assign(new Error('too many open files'), { code: 'EMFILE' }) + ); + await vi.advanceTimersByTimeAsync(0); + + const projectsSource = getChangeSource(watcher, 'projects'); + expect(projectsSource.currentPollingTimer).not.toBeNull(); + await vi.waitFor(() => expect(projectsSource.isPollingPrimed).toBe(true)); + + projectCallbacks[0]('rename', 'encoded-project/stale-session.jsonl'); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('falls back to todos polling on EMFILE and still emits todo changes', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-todo-emfile-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const todoPath = path.join(todosDir, 'session-1.json'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + fs.writeFileSync(todoPath, '{"items":[]}', 'utf8'); + useRealAccess(); + + const projectWatcher = createFakeWatcher(); + const todoWatcher = createFakeWatcher(); + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation((targetPath) => { + if (String(targetPath) === projectsDir) return projectWatcher; + if (String(targetPath) === todosDir) return todoWatcher; + throw new Error(`Unexpected watch path: ${String(targetPath)}`); + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('todo-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2)); + (todoWatcher as unknown as EventEmitter).emit( + 'error', + Object.assign(new Error('too many open files'), { code: 'EMFILE' }) + ); + await vi.advanceTimersByTimeAsync(0); + + const todosSource = getChangeSource(watcher, 'todos'); + expect(todosSource.currentPollingTimer).not.toBeNull(); + expect(getRetryTimer(watcher)).toBeNull(); + expect(todoWatcher.close).toHaveBeenCalled(); + await vi.waitFor(() => expect(todosSource.isPollingPrimed).toBe(true)); + + fs.writeFileSync(todoPath, '{"items":[{"text":"done"}]}', 'utf8'); + await todosSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + await vi.waitFor(() => + expect(events).toContainEqual({ + type: 'change', + path: todoPath, + sessionId: 'session-1', + isSubagent: false, + }) + ); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('uses a custom local provider for project polling snapshots', async () => { + const projectsDir = '/virtual/projects'; + const todosDir = '/virtual/todos'; + const sessionEntry = createFsDirent('session-1.jsonl', 'file', { size: 10, mtimeMs: 1000 }); + const fsProvider = { + type: 'local' as const, + exists: vi.fn().mockResolvedValue(true), + readFile: vi.fn().mockResolvedValue(''), + stat: vi.fn().mockResolvedValue({ + size: 10, + mtimeMs: 1000, + birthtimeMs: 1000, + isFile: () => true, + isDirectory: () => false, + }), + readdir: vi.fn(async (dirPath: string) => { + if (dirPath === projectsDir) { + return [createFsDirent('encoded-project', 'directory')]; + } + if (dirPath === path.join(projectsDir, 'encoded-project')) { + return [sessionEntry]; + } + return []; + }), + createReadStream: vi.fn(() => Readable.from([])), + dispose: vi.fn(), + }; + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + + setWatcherActive(watcher); + const projectsSource = getChangeSource(watcher, 'projects'); + await projectsSource.pollOnce(); + expect(events).toEqual([]); + + sessionEntry.size = 12; + sessionEntry.mtimeMs = 2000; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + expect(fsProvider.readdir).toHaveBeenCalledWith(projectsDir); + expect(events).toContainEqual({ + type: 'change', + path: path.join(projectsDir, 'encoded-project', 'session-1.jsonl'), + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }); + + watcher.stop(); + }); + + it('treats SSH not-found subagent directories as empty during project polling', async () => { + const projectsDir = '/remote/projects'; + const todosDir = '/remote/todos'; + const projectDir = path.join(projectsDir, 'encoded-project'); + const sessionPath = path.join(projectDir, 'session-1.jsonl'); + let size = 10; + let mtimeMs = 1000; + const fsProvider = { + type: 'ssh' as const, + exists: vi.fn(async (filePath: string) => filePath === sessionPath), + readFile: vi.fn().mockResolvedValue(''), + stat: vi.fn(async (filePath: string) => { + if (filePath !== sessionPath) { + throw Object.assign(new Error('not found'), { code: '2' }); + } + return { + size, + mtimeMs, + birthtimeMs: 1000, + isFile: () => true, + isDirectory: () => false, + }; + }), + readdir: vi.fn(async (dirPath: string) => { + if (dirPath === projectsDir) { + return [createFsDirent('encoded-project', 'directory')]; + } + if (dirPath === projectDir) { + return [ + createFsDirent('session-1.jsonl', 'file'), + createFsDirent('session-1', 'directory'), + ]; + } + if (dirPath === path.join(projectDir, 'session-1', 'subagents')) { + throw Object.assign(new Error('not found'), { code: '2' }); + } + return []; + }), + createReadStream: vi.fn(() => Readable.from([])), + dispose: vi.fn(), + }; + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + + setWatcherActive(watcher); + const projectsSource = getChangeSource(watcher, 'projects'); + await projectsSource.pollOnce(); + expect(events).toEqual([]); + + size = 12; + mtimeMs = 2000; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toContainEqual({ + type: 'change', + path: sessionPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }); + + watcher.stop(); + }); + + it.each(['EMFILE', 'ENOENT'])( + 'does not emit false project deletes when a polling stat fails with %s', + async (failureCode) => { + const projectsDir = '/virtual/projects'; + const todosDir = '/virtual/todos'; + const sessionPath = path.join(projectsDir, 'encoded-project', 'session-1.jsonl'); + let statShouldFail = false; + let size = 10; + let mtimeMs = 1000; + const fsProvider = { + type: 'local' as const, + exists: vi.fn().mockResolvedValue(true), + readFile: vi.fn().mockResolvedValue(''), + stat: vi.fn(async () => { + if (statShouldFail) { + throw Object.assign(new Error(failureCode), { code: failureCode }); + } + return { + size, + mtimeMs, + birthtimeMs: 1000, + isFile: () => true, + isDirectory: () => false, + }; + }), + readdir: vi.fn(async (dirPath: string) => { + if (dirPath === projectsDir) { + return [createFsDirent('encoded-project', 'directory')]; + } + if (dirPath === path.join(projectsDir, 'encoded-project')) { + return [createFsDirent('session-1.jsonl', 'file')]; + } + return []; + }), + createReadStream: vi.fn(() => Readable.from([])), + dispose: vi.fn(), + }; + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + + setWatcherActive(watcher); + const projectsSource = getChangeSource(watcher, 'projects'); + await projectsSource.pollOnce(); + expect(events).toEqual([]); + + statShouldFail = true; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + expect(events).toEqual([]); + + statShouldFail = false; + size = 12; + mtimeMs = 2000; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toContainEqual({ + type: 'change', + path: sessionPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }); + + watcher.stop(); + } + ); + + it('does not emit false project deletes when the polling root is temporarily missing', async () => { + const projectsDir = '/virtual/projects'; + const todosDir = '/virtual/todos'; + const sessionPath = path.join(projectsDir, 'encoded-project', 'session-1.jsonl'); + let rootMissing = false; + let size = 10; + let mtimeMs = 1000; + const fsProvider = { + type: 'local' as const, + exists: vi.fn().mockResolvedValue(true), + readFile: vi.fn().mockResolvedValue(''), + stat: vi.fn().mockImplementation(async () => ({ + size, + mtimeMs, + birthtimeMs: 1000, + isFile: () => true, + isDirectory: () => false, + })), + readdir: vi.fn(async (dirPath: string) => { + if (dirPath === projectsDir) { + if (rootMissing) { + throw Object.assign(new Error('missing root'), { code: 'ENOENT' }); + } + return [createFsDirent('encoded-project', 'directory')]; + } + if (dirPath === path.join(projectsDir, 'encoded-project')) { + return [createFsDirent('session-1.jsonl', 'file')]; + } + return []; + }), + createReadStream: vi.fn(() => Readable.from([])), + dispose: vi.fn(), + }; + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + + setWatcherActive(watcher); + const projectsSource = getChangeSource(watcher, 'projects'); + await projectsSource.pollOnce(); + expect(events).toEqual([]); + + rootMissing = true; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + expect(events).toEqual([]); + + rootMissing = false; + size = 12; + mtimeMs = 2000; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toContainEqual({ + type: 'change', + path: sessionPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }); + + watcher.stop(); + }); + + it('still emits project deletes when polling no longer lists a file', async () => { + const projectsDir = '/virtual/projects'; + const todosDir = '/virtual/todos'; + const sessionPath = path.join(projectsDir, 'encoded-project', 'session-1.jsonl'); + let filePresent = true; + const fsProvider = { + type: 'local' as const, + exists: vi.fn(async (filePath: string) => filePath !== sessionPath || filePresent), + readFile: vi.fn().mockResolvedValue(''), + stat: vi.fn().mockResolvedValue({ + size: 10, + mtimeMs: 1000, + birthtimeMs: 1000, + isFile: () => true, + isDirectory: () => false, + }), + readdir: vi.fn(async (dirPath: string) => { + if (dirPath === projectsDir) { + return [createFsDirent('encoded-project', 'directory')]; + } + if (dirPath === path.join(projectsDir, 'encoded-project')) { + return filePresent ? [createFsDirent('session-1.jsonl', 'file')] : []; + } + return []; + }), + createReadStream: vi.fn(() => Readable.from([])), + dispose: vi.fn(), + }; + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + + setWatcherActive(watcher); + const projectsSource = getChangeSource(watcher, 'projects'); + await projectsSource.pollOnce(); + expect(events).toEqual([]); + + filePresent = false; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toContainEqual({ + type: 'unlink', + path: sessionPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }); + + watcher.stop(); + }); + + it('does not emit false project deletes when a listed project dir vanishes during polling', async () => { + const projectsDir = '/virtual/projects'; + const todosDir = '/virtual/todos'; + const projectDir = path.join(projectsDir, 'encoded-project'); + const sessionPath = path.join(projectDir, 'session-1.jsonl'); + let rootListsProject = true; + let projectDirMissing = false; + const fsProvider = { + type: 'local' as const, + exists: vi.fn(async (filePath: string) => filePath !== sessionPath || rootListsProject), + readFile: vi.fn().mockResolvedValue(''), + stat: vi.fn().mockResolvedValue({ + size: 10, + mtimeMs: 1000, + birthtimeMs: 1000, + isFile: () => true, + isDirectory: () => false, + }), + readdir: vi.fn(async (dirPath: string) => { + if (dirPath === projectsDir) { + return rootListsProject ? [createFsDirent('encoded-project', 'directory')] : []; + } + if (dirPath === projectDir) { + if (projectDirMissing) { + throw Object.assign(new Error('missing project dir'), { code: 'ENOENT' }); + } + return [createFsDirent('session-1.jsonl', 'file')]; + } + return []; + }), + createReadStream: vi.fn(() => Readable.from([])), + dispose: vi.fn(), + }; + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider); + const events: unknown[] = []; + watcher.on('file-change', (event) => events.push(event)); + + setWatcherActive(watcher); + const projectsSource = getChangeSource(watcher, 'projects'); + await projectsSource.pollOnce(); + expect(events).toEqual([]); + + projectDirMissing = true; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + expect(events).toEqual([]); + + rootListsProject = false; + await projectsSource.pollOnce(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toContainEqual({ + type: 'unlink', + path: sessionPath, + projectId: 'encoded-project', + sessionId: 'session-1', + isSubagent: false, + }); + + watcher.stop(); + }); + it('falls back to teams polling when the chokidar registry hits the file descriptor limit', async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-emfile-')); setClaudeBasePathOverride(tempDir); @@ -303,14 +1011,10 @@ describe('FileWatcher', () => { ); await vi.advanceTimersByTimeAsync(0); - const watcherAny = watcher as unknown as { - retryTimer: NodeJS.Timeout | null; - teamsPollingTimer: NodeJS.Timeout | null; - teamsPollingPrimed: boolean; - }; - expect(watcherAny.teamsPollingTimer).not.toBeNull(); - expect(watcherAny.retryTimer).toBeNull(); - await vi.waitFor(() => expect(watcherAny.teamsPollingPrimed).toBe(true)); + const teamsSource = getChangeSource(watcher, 'teams'); + expect(teamsSource.currentPollingTimer).not.toBeNull(); + expect(getRetryTimer(watcher)).toBeNull(); + await vi.waitFor(() => expect(teamsSource.isPollingPrimed).toBe(true)); await vi.advanceTimersByTimeAsync(100); expect(events).toEqual([]); @@ -350,12 +1054,8 @@ describe('FileWatcher', () => { tasksWatcher.emit('error', Object.assign(new Error(code), { code })); await vi.advanceTimersByTimeAsync(0); - const watcherAny = watcher as unknown as { - retryTimer: NodeJS.Timeout | null; - tasksPollingTimer: NodeJS.Timeout | null; - }; - expect(watcherAny.tasksPollingTimer).not.toBeNull(); - expect(watcherAny.retryTimer).toBeNull(); + expect(getChangeSource(watcher, 'tasks').currentPollingTimer).not.toBeNull(); + expect(getRetryTimer(watcher)).toBeNull(); expect(tasksWatcher.close).toHaveBeenCalled(); watcher.stop(); @@ -391,12 +1091,8 @@ describe('FileWatcher', () => { watcher.start(); await vi.waitFor(() => { - const watcherAny = watcher as unknown as { - retryTimer: NodeJS.Timeout | null; - teamsPollingTimer: NodeJS.Timeout | null; - }; - expect(watcherAny.teamsPollingTimer).not.toBeNull(); - expect(watcherAny.retryTimer).toBeNull(); + expect(getChangeSource(watcher, 'teams').currentPollingTimer).not.toBeNull(); + expect(getRetryTimer(watcher)).toBeNull(); }); expect(chokidarMock.watch).toHaveBeenCalledTimes(2); @@ -406,6 +1102,49 @@ describe('FileWatcher', () => { fs.rmSync(tempDir, { recursive: true, force: true }); }); + it('closes a partially-created teams registry when initial start fails', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-partial-limit-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + const partialWatcher = chokidarMock.createWatcher([path.normalize(teamsDir)], {}); + partialWatcher.on = vi.fn(() => { + throw Object.assign(new Error('watch limit during listener registration'), { + code: 'EMFILE', + }); + }); + chokidarMock.instances.length = 0; + chokidarMock.watch.mockImplementation((targets, options) => { + const targetList = Array.isArray(targets) ? targets : [targets]; + if (targetList.includes(path.normalize(teamsDir))) { + return partialWatcher; + } + return chokidarMock.createWatcher(targets, options); + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => { + expect(getChangeSource(watcher, 'teams').currentPollingTimer).not.toBeNull(); + expect(partialWatcher.close).toHaveBeenCalled(); + }); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + it('retries chokidar registry after a non-limit error without enabling polling', async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-nonlimit-')); setClaudeBasePathOverride(tempDir); @@ -432,12 +1171,8 @@ describe('FileWatcher', () => { teamsWatcher.emit('error', Object.assign(new Error('permission denied'), { code: 'EACCES' })); await vi.advanceTimersByTimeAsync(0); - const watcherAny = watcher as unknown as { - retryTimer: NodeJS.Timeout | null; - teamsPollingTimer: NodeJS.Timeout | null; - }; - expect(watcherAny.teamsPollingTimer).toBeNull(); - expect(watcherAny.retryTimer).not.toBeNull(); + expect(getChangeSource(watcher, 'teams').currentPollingTimer).toBeNull(); + expect(getRetryTimer(watcher)).not.toBeNull(); expect(teamsWatcher.close).toHaveBeenCalled(); await vi.advanceTimersByTimeAsync(2000); @@ -472,12 +1207,8 @@ describe('FileWatcher', () => { await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2)); await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); - const watcherAny = watcher as unknown as { - teamsPollingTimer: NodeJS.Timeout | null; - tasksPollingTimer: NodeJS.Timeout | null; - }; - expect(watcherAny.teamsPollingTimer).toBeNull(); - expect(watcherAny.tasksPollingTimer).toBeNull(); + expect(getChangeSource(watcher, 'teams').currentPollingTimer).toBeNull(); + expect(getChangeSource(watcher, 'tasks').currentPollingTimer).toBeNull(); expect(watchMock).not.toHaveBeenCalledWith(teamsDir, expect.anything(), expect.anything()); expect(watchMock).not.toHaveBeenCalledWith(tasksDir, expect.anything(), expect.anything()); expectChokidarOptions(getChokidarWatcherForRoot(teamsDir)); @@ -513,6 +1244,7 @@ describe('FileWatcher', () => { const teamsWatcher = getChokidarWatcherForRoot(teamsDir); teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'kanban-state.json')); teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'inboxes', 'user.json')); teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'sentMessages.json')); teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'processes.json')); @@ -524,6 +1256,7 @@ describe('FileWatcher', () => { expect(events).toEqual([ { type: 'config', teamName: 'base-1', detail: 'config.json' }, + { type: 'config', teamName: 'base-1', detail: 'kanban-state.json' }, { type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }, { type: 'inbox', teamName: 'base-1', detail: 'sentMessages.json' }, { type: 'process', teamName: 'base-1', detail: 'processes.json' }, @@ -782,6 +1515,49 @@ describe('FileWatcher', () => { fs.rmSync(tempDir, { recursive: true, force: true }); }); + it('keeps rebuilding the registry when the previous chokidar close throws synchronously', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-close-throw-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const addedTeamDir = path.join(teamsDir, 'base-2'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + teamsWatcher.close.mockImplementationOnce(() => { + throw new Error('close failed'); + }); + + fs.mkdirSync(addedTeamDir, { recursive: true }); + fs.writeFileSync(path.join(addedTeamDir, 'config.json'), '{}', 'utf8'); + teamsWatcher.emit('addDir', addedTeamDir); + + await vi.waitFor(() => { + expect(chokidarMock.watch).toHaveBeenCalledTimes(3); + expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(addedTeamDir)); + expect(events).toEqual([{ type: 'config', teamName: 'base-2', detail: 'config.json' }]); + }); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + it('ignores events from an old chokidar generation after registry rebuild', async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-generation-')); setClaudeBasePathOverride(tempDir); @@ -1048,16 +1824,13 @@ describe('FileWatcher', () => { const events: unknown[] = []; watcher.on('team-change', (event) => events.push(event)); - const watcherAny = watcher as unknown as { - isWatching: boolean; - pollTeamsForChanges: () => Promise; - }; - watcherAny.isWatching = true; - await watcherAny.pollTeamsForChanges(); + setWatcherActive(watcher); + const teamsSource = getChangeSource(watcher, 'teams'); + await teamsSource.pollOnce(); expect(events).toEqual([]); fs.writeFileSync(inboxPath, '[{"messageId":"m1"}]', 'utf8'); - await watcherAny.pollTeamsForChanges(); + await teamsSource.pollOnce(); await vi.advanceTimersByTimeAsync(100); expect(events).toEqual([{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }]); @@ -1083,16 +1856,13 @@ describe('FileWatcher', () => { const events: unknown[] = []; watcher.on('team-change', (event) => events.push(event)); - const watcherAny = watcher as unknown as { - isWatching: boolean; - pollTasksForChanges: () => Promise; - }; - watcherAny.isWatching = true; - await watcherAny.pollTasksForChanges(); + setWatcherActive(watcher); + const tasksSource = getChangeSource(watcher, 'tasks'); + await tasksSource.pollOnce(); expect(events).toEqual([]); fs.writeFileSync(taskPath, '{"status":"running"}', 'utf8'); - await watcherAny.pollTasksForChanges(); + await tasksSource.pollOnce(); await vi.advanceTimersByTimeAsync(100); expect(events).toEqual([