diff --git a/src/main/services/infrastructure/FileWatcher.ts b/src/main/services/infrastructure/FileWatcher.ts index b1400d3f..3c0428d3 100644 --- a/src/main/services/infrastructure/FileWatcher.ts +++ b/src/main/services/infrastructure/FileWatcher.ts @@ -32,6 +32,7 @@ import { ConfigManager } from './ConfigManager'; import { type DataCache } from './DataCache'; import { LocalFileSystemProvider } from './LocalFileSystemProvider'; import { type NotificationManager } from './NotificationManager'; +import { type TeamTaskWatchKind, TeamTaskWatchRegistry } from './TeamTaskWatchRegistry'; import type { FileSystemProvider, FsDirent } from './FileSystemProvider'; import type { TeamChangeEvent } from '@shared/types'; @@ -42,6 +43,10 @@ const logger = createLogger('Service:FileWatcher'); const DEBOUNCE_MS = 100; /** Retry delay when watched directories are unavailable or watcher errors occur */ const WATCHER_RETRY_MS = 2000; +/** Poll interval for team metadata and inboxes when the teams watcher hits OS watcher limits */ +const TEAMS_POLL_INTERVAL_MS = 1000; +/** Poll interval for task files, which can be much larger than team metadata/inboxes */ +const TASKS_POLL_INTERVAL_MS = 3000; /** Interval for periodic catch-up scan to detect missed fs.watch events */ const CATCH_UP_INTERVAL_MS = 30_000; /** Only catch-up scan files modified within this window */ @@ -68,11 +73,23 @@ interface ActiveSessionFile { lastObservedAt: number; } +type RecursiveWatcherType = 'projects' | 'todos' | 'teams' | 'tasks'; +type PollingWatcherType = 'teams' | 'tasks'; +type CloseableWatcher = { close: () => void | Promise }; + export class FileWatcher extends EventEmitter { private projectsWatcher: fs.FSWatcher | null = null; private todosWatcher: fs.FSWatcher | null = null; - private teamsWatcher: fs.FSWatcher | null = null; - private tasksWatcher: fs.FSWatcher | null = null; + private teamsWatcher: TeamTaskWatchRegistry | null = null; + private tasksWatcher: TeamTaskWatchRegistry | null = null; + private teamsPollingTimer: NodeJS.Timeout | null = null; + private tasksPollingTimer: NodeJS.Timeout | null = null; + private teamsPollingInProgress = false; + private tasksPollingInProgress = false; + private teamsPollingPrimed = false; + private tasksPollingPrimed = false; + private polledTeamFiles = new Map(); + private polledTaskFiles = new Map(); private retryTimer: NodeJS.Timeout | null = null; private projectsPath: string; private todosPath: string; @@ -202,15 +219,31 @@ export class FileWatcher extends EventEmitter { } if (this.teamsWatcher) { - this.teamsWatcher.close(); + void this.teamsWatcher.close(); this.teamsWatcher = null; } if (this.tasksWatcher) { - this.tasksWatcher.close(); + void this.tasksWatcher.close(); this.tasksWatcher = null; } + if (this.teamsPollingTimer) { + clearInterval(this.teamsPollingTimer); + this.teamsPollingTimer = null; + } + + if (this.tasksPollingTimer) { + clearInterval(this.tasksPollingTimer); + this.tasksPollingTimer = null; + } + this.teamsPollingInProgress = false; + this.tasksPollingInProgress = false; + this.teamsPollingPrimed = false; + this.tasksPollingPrimed = false; + this.polledTeamFiles.clear(); + this.polledTaskFiles.clear(); + // Clear any pending debounce timers for (const timer of this.debounceTimers.values()) { clearTimeout(timer); @@ -284,6 +317,14 @@ export class FileWatcher extends EventEmitter { clearInterval(this.pollingTimer); this.pollingTimer = null; } + if (this.teamsPollingTimer) { + clearInterval(this.teamsPollingTimer); + this.teamsPollingTimer = null; + } + if (this.tasksPollingTimer) { + clearInterval(this.tasksPollingTimer); + this.tasksPollingTimer = null; + } // 6. Clear all tracking maps (stop() already handles most of these) this.lastProcessedLineCount.clear(); @@ -291,6 +332,8 @@ export class FileWatcher extends EventEmitter { this.activeSessionFiles.clear(); this.catchUpStatFailures.clear(); this.polledFileSizes.clear(); + this.polledTeamFiles.clear(); + this.polledTaskFiles.clear(); this.processingInProgress.clear(); this.pendingReprocess.clear(); @@ -377,7 +420,7 @@ export class FileWatcher extends EventEmitter { * Starts the teams directory watcher. */ private async startTeamsWatcher(): Promise { - if (this.teamsWatcher) { + if (this.teamsWatcher || this.teamsPollingTimer) { return; } @@ -390,15 +433,28 @@ export class FileWatcher extends EventEmitter { // Guard: stop() may have been called while awaiting pathExists if (!this.isWatching) return; - this.teamsWatcher = fs.watch(this.teamsPath, { recursive: true }, (eventType, filename) => { - if (filename) { - this.handleTeamsChange(eventType, filename); - } + const registry = new TeamTaskWatchRegistry({ + kind: 'teams', + rootPath: this.teamsPath, + onChange: (eventType, filename) => this.handleTeamsChange(eventType, filename), + onError: (error) => this.handleTeamTaskWatcherError('teams', error), }); - this.attachWatcherRecovery(this.teamsWatcher, 'teams'); + this.teamsWatcher = registry; + await registry.start(); + if (!this.isWatching || this.teamsWatcher !== registry) { + void registry.close(); + return; + } logger.info(`FileWatcher: Started watching teams at ${this.teamsPath}`); } catch (error) { + const registry = this.teamsWatcher; + if (this.startPollingFallbackForRecursiveWatchLimit('teams', error, registry ?? undefined)) { + return; + } logger.error('Error starting teams watcher:', error); + if (this.teamsWatcher) { + void this.teamsWatcher.close(); + } this.teamsWatcher = null; this.scheduleWatcherRetry(); } @@ -408,7 +464,7 @@ export class FileWatcher extends EventEmitter { * Starts the tasks directory watcher. */ private async startTasksWatcher(): Promise { - if (this.tasksWatcher) { + if (this.tasksWatcher || this.tasksPollingTimer) { return; } @@ -421,15 +477,28 @@ export class FileWatcher extends EventEmitter { // Guard: stop() may have been called while awaiting pathExists if (!this.isWatching) return; - this.tasksWatcher = fs.watch(this.tasksPath, { recursive: true }, (eventType, filename) => { - if (filename) { - this.handleTasksChange(eventType, filename); - } + const registry = new TeamTaskWatchRegistry({ + kind: 'tasks', + rootPath: this.tasksPath, + onChange: (eventType, filename) => this.handleTasksChange(eventType, filename), + onError: (error) => this.handleTeamTaskWatcherError('tasks', error), }); - this.attachWatcherRecovery(this.tasksWatcher, 'tasks'); + this.tasksWatcher = registry; + await registry.start(); + if (!this.isWatching || this.tasksWatcher !== registry) { + void registry.close(); + return; + } logger.info(`FileWatcher: Started watching tasks at ${this.tasksPath}`); } catch (error) { + const registry = this.tasksWatcher; + if (this.startPollingFallbackForRecursiveWatchLimit('tasks', error, registry ?? undefined)) { + return; + } logger.error('Error starting tasks watcher:', error); + if (this.tasksWatcher) { + void this.tasksWatcher.close(); + } this.tasksWatcher = null; this.scheduleWatcherRetry(); } @@ -472,7 +541,12 @@ export class FileWatcher extends EventEmitter { ]); } - if (!this.projectsWatcher || !this.todosWatcher || !this.teamsWatcher || !this.tasksWatcher) { + if ( + !this.projectsWatcher || + !this.todosWatcher || + !this.isWatcherOrPollingActive('teams') || + !this.isWatcherOrPollingActive('tasks') + ) { this.scheduleWatcherRetry(); } } @@ -488,18 +562,18 @@ export class FileWatcher extends EventEmitter { }, WATCHER_RETRY_MS); } - private attachWatcherRecovery( - watcher: fs.FSWatcher, - watcherType: 'projects' | 'todos' | 'teams' | 'tasks' - ): void { + private attachWatcherRecovery(watcher: fs.FSWatcher, watcherType: RecursiveWatcherType): void { watcher.on('error', (error: NodeJS.ErrnoException) => { // Ephemeral .lock files cause harmless ENOENT when the recursive watcher // tries to scandir a path that was already deleted. Log as debug and skip - // the teardown/retry — the watcher is still healthy. + // the teardown/retry - the watcher is still healthy. if (error.code === 'ENOENT' && error.path?.endsWith('.lock')) { logger.debug(`FileWatcher: ${watcherType} ignoring transient ENOENT on lock file`); return; } + if (this.startPollingFallbackForRecursiveWatchLimit(watcherType, error, watcher)) { + return; + } logger.error(`FileWatcher: ${watcherType} watcher error:`, error); if (watcherType === 'projects') { this.projectsWatcher = null; @@ -510,7 +584,9 @@ export class FileWatcher extends EventEmitter { } else { this.tasksWatcher = null; } - this.scheduleWatcherRetry(); + if (!this.isWatcherOrPollingActive(watcherType)) { + this.scheduleWatcherRetry(); + } }); watcher.on('close', () => { @@ -526,10 +602,295 @@ export class FileWatcher extends EventEmitter { } else { this.tasksWatcher = null; } - this.scheduleWatcherRetry(); + if (!this.isWatcherOrPollingActive(watcherType)) { + this.scheduleWatcherRetry(); + } }); } + private handleTeamTaskWatcherError(watcherType: TeamTaskWatchKind, error: unknown): void { + const watcher = watcherType === 'teams' ? this.teamsWatcher : this.tasksWatcher; + if (this.startPollingFallbackForRecursiveWatchLimit(watcherType, error, watcher ?? undefined)) { + return; + } + + logger.error(`FileWatcher: ${watcherType} watcher error:`, error); + if (watcherType === 'teams') { + this.teamsWatcher = null; + } else { + this.tasksWatcher = null; + } + void watcher?.close(); + + if (!this.isWatcherOrPollingActive(watcherType)) { + this.scheduleWatcherRetry(); + } + } + + private isWatcherOrPollingActive(watcherType: RecursiveWatcherType): boolean { + if (watcherType === 'projects') { + return this.projectsWatcher !== null; + } + if (watcherType === 'todos') { + return this.todosWatcher !== null; + } + if (watcherType === 'teams') { + return this.teamsWatcher !== null || this.teamsPollingTimer !== null; + } + return this.tasksWatcher !== null || this.tasksPollingTimer !== null; + } + + private startPollingFallbackForRecursiveWatchLimit( + watcherType: RecursiveWatcherType, + error: unknown, + watcher?: CloseableWatcher + ): boolean { + if ((watcherType !== 'teams' && watcherType !== 'tasks') || !this.isWatchLimitError(error)) { + return false; + } + + const err = error as NodeJS.ErrnoException; + logger.warn( + `FileWatcher: ${watcherType} watcher hit ${err.code ?? 'a platform limit'}; falling back to polling` + ); + + if (watcherType === 'teams') { + this.teamsWatcher = null; + } else { + this.tasksWatcher = null; + } + + this.startTeamTaskPolling(watcherType); + + try { + void watcher?.close(); + } catch (closeError) { + logger.debug(`FileWatcher: ${watcherType} watcher close after fallback failed`, closeError); + } + return true; + } + + private isWatchLimitError(error: unknown): boolean { + const code = (error as NodeJS.ErrnoException | undefined)?.code; + return ( + code === 'EMFILE' || + code === 'ENOSPC' || + code === 'ERR_FS_WATCHER_LIMIT' || + code === 'ERR_FEATURE_UNAVAILABLE_ON_PLATFORM' + ); + } + + private startTeamTaskPolling(watcherType: PollingWatcherType): void { + const existingTimer = watcherType === 'teams' ? this.teamsPollingTimer : this.tasksPollingTimer; + if (existingTimer) { + return; + } + + const runPoll = (): void => { + if (!this.isWatching) { + return; + } + if (watcherType === 'teams') { + if (this.teamsPollingInProgress) { + return; + } + this.teamsPollingInProgress = true; + this.pollTeamsForChanges() + .catch((err) => { + logger.error('FileWatcher: Error during teams polling:', err); + }) + .finally(() => { + this.teamsPollingInProgress = false; + }); + return; + } + + if (this.tasksPollingInProgress) { + return; + } + this.tasksPollingInProgress = true; + this.pollTasksForChanges() + .catch((err) => { + logger.error('FileWatcher: Error during tasks polling:', err); + }) + .finally(() => { + this.tasksPollingInProgress = false; + }); + }; + + runPoll(); + const timer = setInterval(runPoll, this.getTeamTaskPollIntervalMs(watcherType)); + timer.unref(); + + if (watcherType === 'teams') { + this.teamsPollingTimer = timer; + } else { + this.tasksPollingTimer = timer; + } + } + + private getTeamTaskPollIntervalMs(watcherType: PollingWatcherType): number { + return watcherType === 'teams' ? TEAMS_POLL_INTERVAL_MS : TASKS_POLL_INTERVAL_MS; + } + + private async pollTeamsForChanges(): Promise { + const nextSnapshot = await this.collectTeamsPollSnapshot(); + if (!this.isWatching) { + return; + } + this.emitPolledChanges( + 'teams', + this.polledTeamFiles, + nextSnapshot, + this.teamsPollingPrimed, + (eventType, relativePath) => this.handleTeamsChange(eventType, relativePath) + ); + this.polledTeamFiles = nextSnapshot; + this.teamsPollingPrimed = true; + } + + private async pollTasksForChanges(): Promise { + const nextSnapshot = await this.collectTasksPollSnapshot(); + if (!this.isWatching) { + return; + } + this.emitPolledChanges( + 'tasks', + this.polledTaskFiles, + nextSnapshot, + this.tasksPollingPrimed, + (eventType, relativePath) => this.handleTasksChange(eventType, relativePath) + ); + this.polledTaskFiles = nextSnapshot; + this.tasksPollingPrimed = true; + } + + private emitPolledChanges( + watcherType: PollingWatcherType, + previousSnapshot: Map, + nextSnapshot: Map, + isPrimed: boolean, + emitChange: (eventType: string, relativePath: string) => void + ): void { + if (!isPrimed) { + logger.info(`FileWatcher: ${watcherType} polling baseline captured`); + return; + } + + for (const [relativePath, fingerprint] of nextSnapshot) { + const previous = previousSnapshot.get(relativePath); + if (previous === undefined) { + emitChange('rename', relativePath); + } else if (previous !== fingerprint) { + emitChange('change', relativePath); + } + } + + for (const relativePath of previousSnapshot.keys()) { + if (!nextSnapshot.has(relativePath)) { + emitChange('rename', relativePath); + } + } + } + + private async collectTeamsPollSnapshot(): Promise> { + const snapshot = new Map(); + const teamEntries = await this.safeReadDir(this.teamsPath); + + for (const teamEntry of teamEntries) { + if (!teamEntry.isDirectory()) { + continue; + } + + const teamName = teamEntry.name; + const teamPath = path.join(this.teamsPath, teamName); + await this.collectPolledDirectoryFiles(snapshot, teamPath, teamName, (fileName) => + fileName.endsWith('.json') + ); + + await this.collectPolledDirectoryFiles( + snapshot, + path.join(teamPath, 'inboxes'), + `${teamName}/inboxes`, + (fileName) => fileName.endsWith('.json') + ); + } + + return snapshot; + } + + private async collectTasksPollSnapshot(): Promise> { + const snapshot = new Map(); + const teamEntries = await this.safeReadDir(this.tasksPath); + + for (const teamEntry of teamEntries) { + if (!teamEntry.isDirectory()) { + continue; + } + + const teamName = teamEntry.name; + await this.collectPolledDirectoryFiles( + snapshot, + path.join(this.tasksPath, teamName), + teamName, + (fileName) => !fileName.startsWith('.') && fileName.endsWith('.json') + ); + } + + return snapshot; + } + + private async collectPolledDirectoryFiles( + snapshot: Map, + dirPath: string, + relativeRoot: string, + shouldInclude: (fileName: string) => boolean + ): Promise { + const entries = await this.safeReadDir(dirPath); + for (const entry of entries) { + if (!entry.isFile() || !shouldInclude(entry.name)) { + continue; + } + await this.addPolledFile( + snapshot, + path.join(dirPath, entry.name), + `${relativeRoot}/${entry.name}` + ); + } + } + + private async addPolledFile( + snapshot: Map, + absolutePath: string, + relativePath: string + ): Promise { + try { + const stats = await fsp.stat(absolutePath); + if (!stats.isFile()) { + return; + } + snapshot.set( + relativePath, + `${stats.dev}:${stats.ino}:${stats.mtimeMs}:${stats.ctimeMs}:${stats.size}` + ); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + logger.debug(`FileWatcher: Unable to stat polled file ${absolutePath}`, error); + } + } + } + + private async safeReadDir(dirPath: string): Promise { + try { + return await fsp.readdir(dirPath, { withFileTypes: true }); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + logger.debug(`FileWatcher: Unable to read polled directory ${dirPath}`, error); + } + return []; + } + } + // =========================================================================== // SSH Polling Mode // =========================================================================== diff --git a/src/main/services/infrastructure/TeamTaskWatchRegistry.ts b/src/main/services/infrastructure/TeamTaskWatchRegistry.ts new file mode 100644 index 00000000..60409338 --- /dev/null +++ b/src/main/services/infrastructure/TeamTaskWatchRegistry.ts @@ -0,0 +1,290 @@ +import { OPENCODE_TASK_LOG_ATTRIBUTION_FILE } from '@shared/constants/opencodeTaskLogAttribution'; +import { watch } from 'chokidar'; +import * as fsp from 'fs/promises'; +import * as path from 'path'; + +import type { FSWatcher } from 'chokidar'; +import type { Dirent } from 'fs'; + +export type TeamTaskWatchKind = 'teams' | 'tasks'; +export type TeamTaskWatchEventType = 'add' | 'change' | 'unlink' | 'addDir' | 'unlinkDir'; + +export interface TeamTaskWatchRegistryOptions { + kind: TeamTaskWatchKind; + rootPath: string; + onChange: (eventType: TeamTaskWatchEventType, relativePath: string) => void; + onError: (error: unknown) => void; +} + +const RECONCILE_INTERVAL_MS = 30_000; + +const TEAM_ROOT_FILES = new Set([ + 'config.json', + 'processes.json', + 'sentMessages.json', + 'team.meta.json', + 'members.meta.json', + OPENCODE_TASK_LOG_ATTRIBUTION_FILE, +]); + +export class TeamTaskWatchRegistry { + private watcher: FSWatcher | null = null; + private reconcileTimer: NodeJS.Timeout | null = null; + private targets = new Set(); + private targetKey = ''; + private initialTargetsCaptured = false; + private closed = false; + private generation = 0; + private reconcileInProgress = false; + private reconcileAgain = false; + + constructor(private readonly options: TeamTaskWatchRegistryOptions) {} + + async start(): Promise { + if (this.closed) { + return; + } + await this.reconcileTargets(); + if (this.closed || this.reconcileTimer) { + return; + } + + this.reconcileTimer = setInterval(() => { + void this.reconcileTargets(); + }, RECONCILE_INTERVAL_MS); + this.reconcileTimer.unref(); + } + + async close(): Promise { + this.closed = true; + this.generation += 1; + + if (this.reconcileTimer) { + clearInterval(this.reconcileTimer); + this.reconcileTimer = null; + } + + const watcher = this.watcher; + this.watcher = null; + this.targets.clear(); + this.targetKey = ''; + if (watcher) { + await watcher.close().catch(() => undefined); + } + } + + private async reconcileTargets(): Promise { + if (this.closed) { + return; + } + if (this.reconcileInProgress) { + this.reconcileAgain = true; + return; + } + + this.reconcileInProgress = true; + try { + const targets = await this.collectTargets(); + const nextKey = targets.join('\n'); + if (nextKey !== this.targetKey) { + const addedTargets = targets.filter((target) => !this.targets.has(target)); + await this.rebuildWatcher(targets, nextKey, addedTargets); + } + } catch (error) { + if (!this.closed) { + this.options.onError(error); + } + } finally { + this.reconcileInProgress = false; + } + + if (this.reconcileAgain && !this.closed) { + this.reconcileAgain = false; + await this.reconcileTargets(); + } + } + + private async rebuildWatcher( + targets: string[], + nextKey: string, + addedTargets: string[] + ): Promise { + const generation = this.generation + 1; + this.generation = generation; + + const previousWatcher = this.watcher; + this.watcher = null; + if (previousWatcher) { + await previousWatcher.close().catch(() => undefined); + } + + if (this.closed || generation !== this.generation) { + return; + } + + const nextWatcher = watch(targets, { + ignoreInitial: true, + ignorePermissionErrors: true, + followSymlinks: false, + depth: 0, + }); + + this.watcher = nextWatcher; + this.targets = new Set(targets); + this.targetKey = nextKey; + const shouldEmitExistingFiles = this.initialTargetsCaptured; + this.initialTargetsCaptured = true; + + const handleEvent = (eventType: TeamTaskWatchEventType, changedPath?: string): void => { + if (this.closed || generation !== this.generation || !changedPath) { + return; + } + + const relativePath = this.toRelativePath(changedPath); + if (!relativePath) { + return; + } + + if (this.shouldReconcile(eventType, relativePath)) { + void this.reconcileTargets(); + } + + if (!this.shouldEmit(eventType, relativePath)) { + return; + } + + this.options.onChange(eventType, relativePath); + }; + + nextWatcher.on('add', (changedPath) => handleEvent('add', changedPath)); + nextWatcher.on('change', (changedPath) => handleEvent('change', changedPath)); + nextWatcher.on('unlink', (changedPath) => handleEvent('unlink', changedPath)); + nextWatcher.on('addDir', (changedPath) => handleEvent('addDir', changedPath)); + nextWatcher.on('unlinkDir', (changedPath) => handleEvent('unlinkDir', changedPath)); + nextWatcher.on('error', (error) => { + if (!this.closed && generation === this.generation) { + this.options.onError(error); + } + }); + + if (shouldEmitExistingFiles) { + await this.emitExistingFilesForNewTargets(addedTargets, generation); + } + } + + private async emitExistingFilesForNewTargets( + targets: string[], + generation: number + ): Promise { + const normalizedRoot = path.normalize(this.options.rootPath); + for (const targetPath of targets) { + if (this.closed || generation !== this.generation) { + return; + } + if (path.normalize(targetPath) === normalizedRoot) { + continue; + } + const entries = await this.readDirectory(targetPath); + for (const entry of entries) { + if (this.closed || generation !== this.generation) { + return; + } + if (!entry.isFile()) { + continue; + } + const relativePath = this.toRelativePath(path.join(targetPath, entry.name)); + if (relativePath && this.shouldEmit('add', relativePath)) { + this.options.onChange('add', relativePath); + } + } + } + } + + private async collectTargets(): Promise { + const targets = new Set([path.normalize(this.options.rootPath)]); + const rootEntries = await this.readDirectory(this.options.rootPath); + + for (const entry of rootEntries) { + if (!entry.isDirectory()) { + continue; + } + + const teamPath = path.join(this.options.rootPath, entry.name); + targets.add(path.normalize(teamPath)); + + if (this.options.kind === 'teams') { + const inboxPath = path.join(teamPath, 'inboxes'); + if (await this.isDirectory(inboxPath)) { + targets.add(path.normalize(inboxPath)); + } + } + } + + return [...targets].sort((left, right) => left.localeCompare(right)); + } + + private async readDirectory(dirPath: string): Promise { + try { + return await fsp.readdir(dirPath, { withFileTypes: true }); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') { + return []; + } + throw error; + } + } + + private async isDirectory(dirPath: string): Promise { + try { + return (await fsp.stat(dirPath)).isDirectory(); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') { + return false; + } + throw error; + } + } + + private toRelativePath(changedPath: string): string | null { + const absolutePath = path.isAbsolute(changedPath) + ? changedPath + : path.join(this.options.rootPath, changedPath); + const relativePath = path.relative(this.options.rootPath, absolutePath); + + if (!relativePath || relativePath.startsWith('..') || path.isAbsolute(relativePath)) { + return null; + } + + return relativePath.replace(/\\/g, '/'); + } + + private shouldReconcile(eventType: TeamTaskWatchEventType, relativePath: string): boolean { + if (eventType !== 'addDir' && eventType !== 'unlinkDir') { + return false; + } + + const parts = relativePath.split('/').filter(Boolean); + if (parts.length === 1) { + return true; + } + + return this.options.kind === 'teams' && parts.length === 2 && parts[1] === 'inboxes'; + } + + private shouldEmit(eventType: TeamTaskWatchEventType, relativePath: string): boolean { + if (eventType === 'addDir' || eventType === 'unlinkDir') { + return false; + } + + const parts = relativePath.split('/').filter(Boolean); + if (this.options.kind === 'tasks') { + return parts.length === 2 && !parts[1].startsWith('.') && parts[1].endsWith('.json'); + } + + if (parts.length === 2) { + return TEAM_ROOT_FILES.has(parts[1]); + } + + return parts.length === 3 && parts[1] === 'inboxes' && parts[2].endsWith('.json'); + } +} diff --git a/test/main/services/infrastructure/FileWatcher.test.ts b/test/main/services/infrastructure/FileWatcher.test.ts index db3027f3..5524df2b 100644 --- a/test/main/services/infrastructure/FileWatcher.test.ts +++ b/test/main/services/infrastructure/FileWatcher.test.ts @@ -1,10 +1,67 @@ import { EventEmitter } from 'events'; -import type * as FsType from 'fs'; import * as os from 'os'; import * as path from 'path'; import { Readable } from 'stream'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type * as FsType from 'fs'; + +type MockChokidarWatcher = { + targets: string[]; + options: unknown; + handlers: Map void>>; + on: (event: string, handler: (...args: unknown[]) => void) => MockChokidarWatcher; + close: ReturnType; + emit: (event: string, ...args: unknown[]) => void; +}; + +const chokidarMock = vi.hoisted(() => { + const instances: MockChokidarWatcher[] = []; + + const createWatchImplementation = () => (targets: string | string[], options: unknown) => { + const watcher = { + targets: (Array.isArray(targets) ? targets : [targets]).map((target) => String(target)), + options, + handlers: new Map void>>(), + close: vi.fn().mockResolvedValue(undefined), + emit(event: string, ...args: unknown[]) { + for (const handler of watcher.handlers.get(event) ?? []) { + handler(...args); + } + }, + } as MockChokidarWatcher; + + watcher.on = vi.fn((event: string, handler: (...args: unknown[]) => void) => { + const handlers = watcher.handlers.get(event) ?? []; + handlers.push(handler); + watcher.handlers.set(event, handlers); + return watcher; + }); + + instances.push(watcher); + return watcher; + }; + + const watch = vi.fn(createWatchImplementation()); + + return { + instances, + watch, + createWatcher(targets: string | string[], options: unknown): MockChokidarWatcher { + return createWatchImplementation()(targets, options); + }, + reset() { + instances.length = 0; + watch.mockReset(); + watch.mockImplementation(createWatchImplementation()); + }, + }; +}); + +vi.mock('chokidar', () => ({ + watch: chokidarMock.watch, +})); + vi.mock('@shared/utils/logger', () => ({ createLogger: () => ({ debug: vi.fn(), @@ -63,6 +120,7 @@ import * as fsp from 'fs/promises'; import { errorDetector } from '../../../../src/main/services/error/ErrorDetector'; import { DataCache } from '../../../../src/main/services/infrastructure/DataCache'; import { FileWatcher } from '../../../../src/main/services/infrastructure/FileWatcher'; +import { setClaudeBasePathOverride } from '../../../../src/main/utils/pathDecoder'; import { OPENCODE_TASK_LOG_ATTRIBUTION_FILE } from '../../../../src/shared/constants/opencodeTaskLogAttribution'; function createFakeWatcher(): FsType.FSWatcher { @@ -73,12 +131,37 @@ function createFakeWatcher(): FsType.FSWatcher { return emitter as unknown as FsType.FSWatcher; } +function getChokidarWatcherForRoot(rootPath: string): MockChokidarWatcher { + const normalizedRoot = path.normalize(rootPath); + const watcher = [...chokidarMock.instances] + .reverse() + .find((instance) => instance.targets.includes(normalizedRoot)); + if (!watcher) { + throw new Error(`Missing chokidar watcher for ${normalizedRoot}`); + } + return watcher; +} + +function expectChokidarOptions(watcher: MockChokidarWatcher): void { + expect(watcher.options).toEqual({ + ignoreInitial: true, + ignorePermissionErrors: true, + followSymlinks: false, + depth: 0, + }); +} + /** Make existsSync delegate to the real implementation (needed for tests with real temp files) */ function useRealExistsSync() { const realFn = (fs as unknown as { __realExistsSync: typeof fs.existsSync }).__realExistsSync; vi.mocked(fs.existsSync).mockImplementation((p) => realFn(p)); } +function useRealAccess() { + const realFn = (fsp as unknown as { __realAccess: typeof fsp.access }).__realAccess; + vi.mocked(fsp.access).mockImplementation((p, mode) => realFn(p, mode)); +} + function createMockNotificationManager() { return { addError: vi.fn().mockResolvedValue(null), @@ -103,9 +186,12 @@ function jsonlLine(uuid: string, text: string): string { describe('FileWatcher', () => { beforeEach(() => { vi.useFakeTimers(); + chokidarMock.reset(); }); afterEach(() => { + setClaudeBasePathOverride(null); + vi.unstubAllEnvs(); vi.useRealTimers(); vi.restoreAllMocks(); }); @@ -170,6 +256,853 @@ describe('FileWatcher', () => { watcher.stop(); }); + it('falls back to teams polling when the chokidar registry hits the file descriptor limit', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-emfile-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.writeFileSync(path.join(teamsDir, 'base-1', 'inboxes', 'user.json'), '[]', 'utf8'); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchersByPath = new Map([ + [projectsDir, createFakeWatcher()], + [todosDir, createFakeWatcher()], + ]); + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation((targetPath) => { + const watcherForPath = watchersByPath.get(String(targetPath)); + if (!watcherForPath) { + throw new Error(`Unexpected watch path: ${String(targetPath)}`); + } + return watcherForPath; + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2)); + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + teamsWatcher.emit( + 'error', + Object.assign(new Error('too many open files'), { + code: 'EMFILE', + path: path.join(teamsDir, 'base-1', 'inboxes'), + syscall: 'scandir', + }) + ); + await vi.advanceTimersByTimeAsync(0); + + const watcherAny = watcher as unknown as { + retryTimer: NodeJS.Timeout | null; + teamsPollingTimer: NodeJS.Timeout | null; + teamsPollingPrimed: boolean; + }; + expect(watcherAny.teamsPollingTimer).not.toBeNull(); + expect(watcherAny.retryTimer).toBeNull(); + await vi.waitFor(() => expect(watcherAny.teamsPollingPrimed).toBe(true)); + await vi.advanceTimersByTimeAsync(100); + expect(events).toEqual([]); + + await vi.advanceTimersByTimeAsync(2000); + expect(watchMock).toHaveBeenCalledTimes(2); + expect(teamsWatcher.close).toHaveBeenCalled(); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it.each(['ENOSPC', 'ERR_FS_WATCHER_LIMIT', 'ERR_FEATURE_UNAVAILABLE_ON_PLATFORM'])( + 'falls back to tasks polling when chokidar reports %s', + async (code) => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-task-limit-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const tasksWatcher = getChokidarWatcherForRoot(tasksDir); + tasksWatcher.emit('error', Object.assign(new Error(code), { code })); + await vi.advanceTimersByTimeAsync(0); + + const watcherAny = watcher as unknown as { + retryTimer: NodeJS.Timeout | null; + tasksPollingTimer: NodeJS.Timeout | null; + }; + expect(watcherAny.tasksPollingTimer).not.toBeNull(); + expect(watcherAny.retryTimer).toBeNull(); + expect(tasksWatcher.close).toHaveBeenCalled(); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + } + ); + + it('falls back to polling when chokidar throws a known error during initial teams start', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-sync-limit-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + chokidarMock.watch.mockImplementation((targets, options) => { + const targetList = Array.isArray(targets) ? targets : [targets]; + if (targetList.includes(path.normalize(teamsDir))) { + throw Object.assign(new Error('watch limit'), { code: 'EMFILE' }); + } + return chokidarMock.createWatcher(targets, options); + }); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => { + const watcherAny = watcher as unknown as { + retryTimer: NodeJS.Timeout | null; + teamsPollingTimer: NodeJS.Timeout | null; + }; + expect(watcherAny.teamsPollingTimer).not.toBeNull(); + expect(watcherAny.retryTimer).toBeNull(); + }); + + expect(chokidarMock.watch).toHaveBeenCalledTimes(2); + expect(getChokidarWatcherForRoot(tasksDir)).toBeTruthy(); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('retries chokidar registry after a non-limit error without enabling polling', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-nonlimit-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + teamsWatcher.emit('error', Object.assign(new Error('permission denied'), { code: 'EACCES' })); + await vi.advanceTimersByTimeAsync(0); + + const watcherAny = watcher as unknown as { + retryTimer: NodeJS.Timeout | null; + teamsPollingTimer: NodeJS.Timeout | null; + }; + expect(watcherAny.teamsPollingTimer).toBeNull(); + expect(watcherAny.retryTimer).not.toBeNull(); + expect(teamsWatcher.close).toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(2000); + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(3)); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('does not allow the legacy env var to force teams and tasks polling', async () => { + vi.stubEnv('AGENT_TEAMS_FILEWATCHER_TEAM_TASK_POLLING', '1'); + + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-chokidar-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2)); + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const watcherAny = watcher as unknown as { + teamsPollingTimer: NodeJS.Timeout | null; + tasksPollingTimer: NodeJS.Timeout | null; + }; + expect(watcherAny.teamsPollingTimer).toBeNull(); + expect(watcherAny.tasksPollingTimer).toBeNull(); + expect(watchMock).not.toHaveBeenCalledWith(teamsDir, expect.anything(), expect.anything()); + expect(watchMock).not.toHaveBeenCalledWith(tasksDir, expect.anything(), expect.anything()); + expectChokidarOptions(getChokidarWatcherForRoot(teamsDir)); + expectChokidarOptions(getChokidarWatcherForRoot(tasksDir)); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('emits team and task changes from the chokidar registry with stable relative paths', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-events-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'inboxes', 'user.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'sentMessages.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'processes.json')); + + const tasksWatcher = getChokidarWatcherForRoot(tasksDir); + tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'task-1.json')); + + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([ + { type: 'config', teamName: 'base-1', detail: 'config.json' }, + { type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }, + { type: 'inbox', teamName: 'base-1', detail: 'sentMessages.json' }, + { type: 'process', teamName: 'base-1', detail: 'processes.json' }, + { type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' }, + ]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('emits unlink events from the chokidar registry with the same relative path contract', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-unlink-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + getChokidarWatcherForRoot(teamsDir).emit( + 'unlink', + path.join(teamsDir, 'base-1', 'config.json') + ); + getChokidarWatcherForRoot(tasksDir).emit( + 'unlink', + path.join(tasksDir, 'base-1', 'task-1.json') + ); + + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([ + { type: 'config', teamName: 'base-1', detail: 'config.json' }, + { type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' }, + ]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('normalizes relative chokidar paths and ignores paths outside the watched root', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-paths-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + teamsWatcher.emit('change', 'base-1/config.json'); + teamsWatcher.emit('change', path.join(tempDir, 'outside', 'base-2', 'config.json')); + + const tasksWatcher = getChokidarWatcherForRoot(tasksDir); + tasksWatcher.emit('change', 'base-1/task-1.json'); + tasksWatcher.emit('change', path.join(tempDir, 'outside', 'base-2', 'task-2.json')); + + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([ + { type: 'config', teamName: 'base-1', detail: 'config.json' }, + { type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' }, + ]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('does not emit existing files from the initial chokidar registry baseline', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-baseline-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + fs.writeFileSync(path.join(teamsDir, 'base-1', 'config.json'), '{}', 'utf8'); + fs.writeFileSync(path.join(teamsDir, 'base-1', 'inboxes', 'user.json'), '[]', 'utf8'); + fs.writeFileSync(path.join(tasksDir, 'base-1', 'task-1.json'), '{}', 'utf8'); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('emits existing files once when reconciliation discovers new team and task directories', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-new-files-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + fs.mkdirSync(path.join(teamsDir, 'base-2', 'inboxes'), { recursive: true }); + fs.writeFileSync(path.join(teamsDir, 'base-2', 'config.json'), '{}', 'utf8'); + fs.writeFileSync(path.join(teamsDir, 'base-2', 'inboxes', 'user.json'), '[]', 'utf8'); + fs.mkdirSync(path.join(tasksDir, 'base-2'), { recursive: true }); + fs.writeFileSync(path.join(tasksDir, 'base-2', 'task-2.json'), '{}', 'utf8'); + + await vi.advanceTimersByTimeAsync(30_000); + await vi.waitFor(() => { + expect(events).toHaveLength(3); + expect(events).toEqual( + expect.arrayContaining([ + { type: 'config', teamName: 'base-2', detail: 'config.json' }, + { type: 'inbox', teamName: 'base-2', detail: 'inboxes/user.json' }, + { type: 'task', teamName: 'base-2', detail: 'task-2.json', taskId: 'task-2' }, + ]) + ); + }); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('emits existing inbox files once when reconciliation discovers a new inbox directory', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-new-inbox-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const inboxDir = path.join(teamsDir, 'base-1', 'inboxes'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + expect(getChokidarWatcherForRoot(teamsDir).targets).not.toContain(path.normalize(inboxDir)); + + fs.mkdirSync(inboxDir, { recursive: true }); + fs.writeFileSync(path.join(inboxDir, 'user.json'), '[]', 'utf8'); + + await vi.advanceTimersByTimeAsync(30_000); + await vi.waitFor(() => { + expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(inboxDir)); + expect(events).toEqual([ + { type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }, + ]); + }); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('reconciles new team directories immediately after an addDir event', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-adddir-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const addedTeamDir = path.join(teamsDir, 'base-2'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + + fs.mkdirSync(addedTeamDir, { recursive: true }); + fs.writeFileSync(path.join(addedTeamDir, 'config.json'), '{}', 'utf8'); + teamsWatcher.emit('addDir', addedTeamDir); + + await vi.waitFor(() => { + expect(chokidarMock.watch).toHaveBeenCalledTimes(3); + expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(addedTeamDir)); + expect(events).toEqual([{ type: 'config', teamName: 'base-2', detail: 'config.json' }]); + }); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('ignores events from an old chokidar generation after registry rebuild', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-generation-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + const oldTeamsWatcher = getChokidarWatcherForRoot(teamsDir); + + fs.mkdirSync(path.join(teamsDir, 'base-2'), { recursive: true }); + await vi.advanceTimersByTimeAsync(30_000); + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(3)); + const newTeamsWatcher = getChokidarWatcherForRoot(teamsDir); + expect(newTeamsWatcher).not.toBe(oldTeamsWatcher); + + oldTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + newTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([{ type: 'config', teamName: 'base-1', detail: 'config.json' }]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('ignores stale chokidar events after stop closes the registry', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-stale-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + const tasksWatcher = getChokidarWatcherForRoot(tasksDir); + watcher.stop(); + + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'task-1.json')); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([]); + expect(teamsWatcher.close).toHaveBeenCalled(); + expect(tasksWatcher.close).toHaveBeenCalled(); + + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('creates fresh chokidar registries after stop and start', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-restart-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + const oldTeamsWatcher = getChokidarWatcherForRoot(teamsDir); + + watcher.stop(); + watcher.start(); + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(4)); + const newTeamsWatcher = getChokidarWatcherForRoot(teamsDir); + expect(newTeamsWatcher).not.toBe(oldTeamsWatcher); + + oldTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + newTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json')); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([{ type: 'config', teamName: 'base-1', detail: 'config.json' }]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('filters irrelevant team and task registry events before they reach FileWatcher handlers', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-filter-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const teamsWatcher = getChokidarWatcherForRoot(teamsDir); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'members', 'member.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', '.opencode-runtime', 'state.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'runtime', 'runtime.json')); + teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'notes.json')); + teamsWatcher.emit('addDir', path.join(teamsDir, 'base-1', 'members')); + + const tasksWatcher = getChokidarWatcherForRoot(tasksDir); + tasksWatcher.emit('change', path.join(tasksDir, 'base-1', '.lock')); + tasksWatcher.emit('change', path.join(tasksDir, 'base-1', '.highwatermark')); + tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'notes.txt')); + tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'nested', 'task.json')); + + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('keeps the teams registry shallow and excludes irrelevant runtime directories', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-scope-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const teamDir = path.join(teamsDir, 'base-1'); + const inboxDir = path.join(teamDir, 'inboxes'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(inboxDir, { recursive: true }); + fs.mkdirSync(path.join(teamDir, 'members'), { recursive: true }); + fs.mkdirSync(path.join(teamDir, '.opencode-runtime'), { recursive: true }); + fs.mkdirSync(path.join(teamDir, 'runtime'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + + const targets = getChokidarWatcherForRoot(teamsDir).targets; + expect(targets).toContain(path.normalize(teamsDir)); + expect(targets).toContain(path.normalize(teamDir)); + expect(targets).toContain(path.normalize(inboxDir)); + expect(targets).not.toContain(path.normalize(path.join(teamDir, 'members'))); + expect(targets).not.toContain(path.normalize(path.join(teamDir, '.opencode-runtime'))); + expect(targets).not.toContain(path.normalize(path.join(teamDir, 'runtime'))); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('reconciles new and removed team inbox directories without content polling', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-reconcile-')); + setClaudeBasePathOverride(tempDir); + const projectsDir = path.join(tempDir, 'projects'); + const todosDir = path.join(tempDir, 'todos'); + const teamsDir = path.join(tempDir, 'teams'); + const tasksDir = path.join(tempDir, 'tasks'); + const addedTeamDir = path.join(teamsDir, 'base-2'); + const addedInboxDir = path.join(addedTeamDir, 'inboxes'); + fs.mkdirSync(projectsDir, { recursive: true }); + fs.mkdirSync(todosDir, { recursive: true }); + fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true }); + fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true }); + useRealAccess(); + + const watchMock = vi.mocked(fs.watch); + watchMock.mockImplementation(() => createFakeWatcher()); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher(dataCache, projectsDir, todosDir); + watcher.start(); + + await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2)); + expect(getChokidarWatcherForRoot(teamsDir).targets).not.toContain(path.normalize(addedTeamDir)); + + fs.mkdirSync(addedInboxDir, { recursive: true }); + await vi.advanceTimersByTimeAsync(30_000); + await vi.waitFor(() => + expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(addedInboxDir)) + ); + + fs.rmSync(addedTeamDir, { recursive: true, force: true }); + await vi.advanceTimersByTimeAsync(30_000); + await vi.waitFor(() => + expect(getChokidarWatcherForRoot(teamsDir).targets).not.toContain( + path.normalize(addedTeamDir) + ) + ); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('emits team inbox changes from the teams polling fallback', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-poll-')); + setClaudeBasePathOverride(tempDir); + const teamsDir = path.join(tempDir, 'teams'); + const inboxDir = path.join(teamsDir, 'base-1', 'inboxes'); + fs.mkdirSync(inboxDir, { recursive: true }); + const inboxPath = path.join(inboxDir, 'user.json'); + fs.writeFileSync(inboxPath, '[]', 'utf8'); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher( + dataCache, + path.join(tempDir, 'projects'), + path.join(tempDir, 'todos') + ); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + + const watcherAny = watcher as unknown as { + isWatching: boolean; + pollTeamsForChanges: () => Promise; + }; + watcherAny.isWatching = true; + await watcherAny.pollTeamsForChanges(); + expect(events).toEqual([]); + + fs.writeFileSync(inboxPath, '[{"messageId":"m1"}]', 'utf8'); + await watcherAny.pollTeamsForChanges(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('emits task changes from the tasks polling fallback', async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-task-poll-')); + setClaudeBasePathOverride(tempDir); + const taskDir = path.join(tempDir, 'tasks', 'base-1'); + fs.mkdirSync(taskDir, { recursive: true }); + const taskPath = path.join(taskDir, 'task-1.json'); + fs.writeFileSync(taskPath, '{"status":"queued"}', 'utf8'); + + const dataCache = new DataCache(50, 10, false); + const watcher = new FileWatcher( + dataCache, + path.join(tempDir, 'projects'), + path.join(tempDir, 'todos') + ); + const events: unknown[] = []; + watcher.on('team-change', (event) => events.push(event)); + + const watcherAny = watcher as unknown as { + isWatching: boolean; + pollTasksForChanges: () => Promise; + }; + watcherAny.isWatching = true; + await watcherAny.pollTasksForChanges(); + expect(events).toEqual([]); + + fs.writeFileSync(taskPath, '{"status":"running"}', 'utf8'); + await watcherAny.pollTasksForChanges(); + await vi.advanceTimersByTimeAsync(100); + + expect(events).toEqual([ + { type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' }, + ]); + + watcher.stop(); + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + it('emits log-source-change when OpenCode task-log attribution manifest changes', () => { const dataCache = new DataCache(50, 10, false); const watcher = new FileWatcher(dataCache, '/tmp/projects', '/tmp/todos');