fix(file-watcher): avoid recursive team task watching

This commit is contained in:
777genius 2026-05-21 16:41:06 +03:00
parent ec7b1dbd18
commit 420ace3c15
3 changed files with 1609 additions and 25 deletions

View file

@ -32,6 +32,7 @@ import { ConfigManager } from './ConfigManager';
import { type DataCache } from './DataCache';
import { LocalFileSystemProvider } from './LocalFileSystemProvider';
import { type NotificationManager } from './NotificationManager';
import { type TeamTaskWatchKind, TeamTaskWatchRegistry } from './TeamTaskWatchRegistry';
import type { FileSystemProvider, FsDirent } from './FileSystemProvider';
import type { TeamChangeEvent } from '@shared/types';
@ -42,6 +43,10 @@ const logger = createLogger('Service:FileWatcher');
const DEBOUNCE_MS = 100;
/** Retry delay when watched directories are unavailable or watcher errors occur */
const WATCHER_RETRY_MS = 2000;
/** Poll interval for team metadata and inboxes when the teams watcher hits OS watcher limits */
const TEAMS_POLL_INTERVAL_MS = 1000;
/** Poll interval for task files, which can be much larger than team metadata/inboxes */
const TASKS_POLL_INTERVAL_MS = 3000;
/** Interval for periodic catch-up scan to detect missed fs.watch events */
const CATCH_UP_INTERVAL_MS = 30_000;
/** Only catch-up scan files modified within this window */
@ -68,11 +73,23 @@ interface ActiveSessionFile {
lastObservedAt: number;
}
type RecursiveWatcherType = 'projects' | 'todos' | 'teams' | 'tasks';
type PollingWatcherType = 'teams' | 'tasks';
type CloseableWatcher = { close: () => void | Promise<void> };
export class FileWatcher extends EventEmitter {
private projectsWatcher: fs.FSWatcher | null = null;
private todosWatcher: fs.FSWatcher | null = null;
private teamsWatcher: fs.FSWatcher | null = null;
private tasksWatcher: fs.FSWatcher | null = null;
private teamsWatcher: TeamTaskWatchRegistry | null = null;
private tasksWatcher: TeamTaskWatchRegistry | null = null;
private teamsPollingTimer: NodeJS.Timeout | null = null;
private tasksPollingTimer: NodeJS.Timeout | null = null;
private teamsPollingInProgress = false;
private tasksPollingInProgress = false;
private teamsPollingPrimed = false;
private tasksPollingPrimed = false;
private polledTeamFiles = new Map<string, string>();
private polledTaskFiles = new Map<string, string>();
private retryTimer: NodeJS.Timeout | null = null;
private projectsPath: string;
private todosPath: string;
@ -202,15 +219,31 @@ export class FileWatcher extends EventEmitter {
}
if (this.teamsWatcher) {
this.teamsWatcher.close();
void this.teamsWatcher.close();
this.teamsWatcher = null;
}
if (this.tasksWatcher) {
this.tasksWatcher.close();
void this.tasksWatcher.close();
this.tasksWatcher = null;
}
if (this.teamsPollingTimer) {
clearInterval(this.teamsPollingTimer);
this.teamsPollingTimer = null;
}
if (this.tasksPollingTimer) {
clearInterval(this.tasksPollingTimer);
this.tasksPollingTimer = null;
}
this.teamsPollingInProgress = false;
this.tasksPollingInProgress = false;
this.teamsPollingPrimed = false;
this.tasksPollingPrimed = false;
this.polledTeamFiles.clear();
this.polledTaskFiles.clear();
// Clear any pending debounce timers
for (const timer of this.debounceTimers.values()) {
clearTimeout(timer);
@ -284,6 +317,14 @@ export class FileWatcher extends EventEmitter {
clearInterval(this.pollingTimer);
this.pollingTimer = null;
}
if (this.teamsPollingTimer) {
clearInterval(this.teamsPollingTimer);
this.teamsPollingTimer = null;
}
if (this.tasksPollingTimer) {
clearInterval(this.tasksPollingTimer);
this.tasksPollingTimer = null;
}
// 6. Clear all tracking maps (stop() already handles most of these)
this.lastProcessedLineCount.clear();
@ -291,6 +332,8 @@ export class FileWatcher extends EventEmitter {
this.activeSessionFiles.clear();
this.catchUpStatFailures.clear();
this.polledFileSizes.clear();
this.polledTeamFiles.clear();
this.polledTaskFiles.clear();
this.processingInProgress.clear();
this.pendingReprocess.clear();
@ -377,7 +420,7 @@ export class FileWatcher extends EventEmitter {
* Starts the teams directory watcher.
*/
private async startTeamsWatcher(): Promise<void> {
if (this.teamsWatcher) {
if (this.teamsWatcher || this.teamsPollingTimer) {
return;
}
@ -390,15 +433,28 @@ export class FileWatcher extends EventEmitter {
// Guard: stop() may have been called while awaiting pathExists
if (!this.isWatching) return;
this.teamsWatcher = fs.watch(this.teamsPath, { recursive: true }, (eventType, filename) => {
if (filename) {
this.handleTeamsChange(eventType, filename);
}
const registry = new TeamTaskWatchRegistry({
kind: 'teams',
rootPath: this.teamsPath,
onChange: (eventType, filename) => this.handleTeamsChange(eventType, filename),
onError: (error) => this.handleTeamTaskWatcherError('teams', error),
});
this.attachWatcherRecovery(this.teamsWatcher, 'teams');
this.teamsWatcher = registry;
await registry.start();
if (!this.isWatching || this.teamsWatcher !== registry) {
void registry.close();
return;
}
logger.info(`FileWatcher: Started watching teams at ${this.teamsPath}`);
} catch (error) {
const registry = this.teamsWatcher;
if (this.startPollingFallbackForRecursiveWatchLimit('teams', error, registry ?? undefined)) {
return;
}
logger.error('Error starting teams watcher:', error);
if (this.teamsWatcher) {
void this.teamsWatcher.close();
}
this.teamsWatcher = null;
this.scheduleWatcherRetry();
}
@ -408,7 +464,7 @@ export class FileWatcher extends EventEmitter {
* Starts the tasks directory watcher.
*/
private async startTasksWatcher(): Promise<void> {
if (this.tasksWatcher) {
if (this.tasksWatcher || this.tasksPollingTimer) {
return;
}
@ -421,15 +477,28 @@ export class FileWatcher extends EventEmitter {
// Guard: stop() may have been called while awaiting pathExists
if (!this.isWatching) return;
this.tasksWatcher = fs.watch(this.tasksPath, { recursive: true }, (eventType, filename) => {
if (filename) {
this.handleTasksChange(eventType, filename);
}
const registry = new TeamTaskWatchRegistry({
kind: 'tasks',
rootPath: this.tasksPath,
onChange: (eventType, filename) => this.handleTasksChange(eventType, filename),
onError: (error) => this.handleTeamTaskWatcherError('tasks', error),
});
this.attachWatcherRecovery(this.tasksWatcher, 'tasks');
this.tasksWatcher = registry;
await registry.start();
if (!this.isWatching || this.tasksWatcher !== registry) {
void registry.close();
return;
}
logger.info(`FileWatcher: Started watching tasks at ${this.tasksPath}`);
} catch (error) {
const registry = this.tasksWatcher;
if (this.startPollingFallbackForRecursiveWatchLimit('tasks', error, registry ?? undefined)) {
return;
}
logger.error('Error starting tasks watcher:', error);
if (this.tasksWatcher) {
void this.tasksWatcher.close();
}
this.tasksWatcher = null;
this.scheduleWatcherRetry();
}
@ -472,7 +541,12 @@ export class FileWatcher extends EventEmitter {
]);
}
if (!this.projectsWatcher || !this.todosWatcher || !this.teamsWatcher || !this.tasksWatcher) {
if (
!this.projectsWatcher ||
!this.todosWatcher ||
!this.isWatcherOrPollingActive('teams') ||
!this.isWatcherOrPollingActive('tasks')
) {
this.scheduleWatcherRetry();
}
}
@ -488,18 +562,18 @@ export class FileWatcher extends EventEmitter {
}, WATCHER_RETRY_MS);
}
private attachWatcherRecovery(
watcher: fs.FSWatcher,
watcherType: 'projects' | 'todos' | 'teams' | 'tasks'
): void {
private attachWatcherRecovery(watcher: fs.FSWatcher, watcherType: RecursiveWatcherType): void {
watcher.on('error', (error: NodeJS.ErrnoException) => {
// Ephemeral .lock files cause harmless ENOENT when the recursive watcher
// tries to scandir a path that was already deleted. Log as debug and skip
// the teardown/retry the watcher is still healthy.
// the teardown/retry - the watcher is still healthy.
if (error.code === 'ENOENT' && error.path?.endsWith('.lock')) {
logger.debug(`FileWatcher: ${watcherType} ignoring transient ENOENT on lock file`);
return;
}
if (this.startPollingFallbackForRecursiveWatchLimit(watcherType, error, watcher)) {
return;
}
logger.error(`FileWatcher: ${watcherType} watcher error:`, error);
if (watcherType === 'projects') {
this.projectsWatcher = null;
@ -510,7 +584,9 @@ export class FileWatcher extends EventEmitter {
} else {
this.tasksWatcher = null;
}
this.scheduleWatcherRetry();
if (!this.isWatcherOrPollingActive(watcherType)) {
this.scheduleWatcherRetry();
}
});
watcher.on('close', () => {
@ -526,10 +602,295 @@ export class FileWatcher extends EventEmitter {
} else {
this.tasksWatcher = null;
}
this.scheduleWatcherRetry();
if (!this.isWatcherOrPollingActive(watcherType)) {
this.scheduleWatcherRetry();
}
});
}
private handleTeamTaskWatcherError(watcherType: TeamTaskWatchKind, error: unknown): void {
const watcher = watcherType === 'teams' ? this.teamsWatcher : this.tasksWatcher;
if (this.startPollingFallbackForRecursiveWatchLimit(watcherType, error, watcher ?? undefined)) {
return;
}
logger.error(`FileWatcher: ${watcherType} watcher error:`, error);
if (watcherType === 'teams') {
this.teamsWatcher = null;
} else {
this.tasksWatcher = null;
}
void watcher?.close();
if (!this.isWatcherOrPollingActive(watcherType)) {
this.scheduleWatcherRetry();
}
}
private isWatcherOrPollingActive(watcherType: RecursiveWatcherType): boolean {
if (watcherType === 'projects') {
return this.projectsWatcher !== null;
}
if (watcherType === 'todos') {
return this.todosWatcher !== null;
}
if (watcherType === 'teams') {
return this.teamsWatcher !== null || this.teamsPollingTimer !== null;
}
return this.tasksWatcher !== null || this.tasksPollingTimer !== null;
}
private startPollingFallbackForRecursiveWatchLimit(
watcherType: RecursiveWatcherType,
error: unknown,
watcher?: CloseableWatcher
): boolean {
if ((watcherType !== 'teams' && watcherType !== 'tasks') || !this.isWatchLimitError(error)) {
return false;
}
const err = error as NodeJS.ErrnoException;
logger.warn(
`FileWatcher: ${watcherType} watcher hit ${err.code ?? 'a platform limit'}; falling back to polling`
);
if (watcherType === 'teams') {
this.teamsWatcher = null;
} else {
this.tasksWatcher = null;
}
this.startTeamTaskPolling(watcherType);
try {
void watcher?.close();
} catch (closeError) {
logger.debug(`FileWatcher: ${watcherType} watcher close after fallback failed`, closeError);
}
return true;
}
private isWatchLimitError(error: unknown): boolean {
const code = (error as NodeJS.ErrnoException | undefined)?.code;
return (
code === 'EMFILE' ||
code === 'ENOSPC' ||
code === 'ERR_FS_WATCHER_LIMIT' ||
code === 'ERR_FEATURE_UNAVAILABLE_ON_PLATFORM'
);
}
private startTeamTaskPolling(watcherType: PollingWatcherType): void {
const existingTimer = watcherType === 'teams' ? this.teamsPollingTimer : this.tasksPollingTimer;
if (existingTimer) {
return;
}
const runPoll = (): void => {
if (!this.isWatching) {
return;
}
if (watcherType === 'teams') {
if (this.teamsPollingInProgress) {
return;
}
this.teamsPollingInProgress = true;
this.pollTeamsForChanges()
.catch((err) => {
logger.error('FileWatcher: Error during teams polling:', err);
})
.finally(() => {
this.teamsPollingInProgress = false;
});
return;
}
if (this.tasksPollingInProgress) {
return;
}
this.tasksPollingInProgress = true;
this.pollTasksForChanges()
.catch((err) => {
logger.error('FileWatcher: Error during tasks polling:', err);
})
.finally(() => {
this.tasksPollingInProgress = false;
});
};
runPoll();
const timer = setInterval(runPoll, this.getTeamTaskPollIntervalMs(watcherType));
timer.unref();
if (watcherType === 'teams') {
this.teamsPollingTimer = timer;
} else {
this.tasksPollingTimer = timer;
}
}
private getTeamTaskPollIntervalMs(watcherType: PollingWatcherType): number {
return watcherType === 'teams' ? TEAMS_POLL_INTERVAL_MS : TASKS_POLL_INTERVAL_MS;
}
private async pollTeamsForChanges(): Promise<void> {
const nextSnapshot = await this.collectTeamsPollSnapshot();
if (!this.isWatching) {
return;
}
this.emitPolledChanges(
'teams',
this.polledTeamFiles,
nextSnapshot,
this.teamsPollingPrimed,
(eventType, relativePath) => this.handleTeamsChange(eventType, relativePath)
);
this.polledTeamFiles = nextSnapshot;
this.teamsPollingPrimed = true;
}
private async pollTasksForChanges(): Promise<void> {
const nextSnapshot = await this.collectTasksPollSnapshot();
if (!this.isWatching) {
return;
}
this.emitPolledChanges(
'tasks',
this.polledTaskFiles,
nextSnapshot,
this.tasksPollingPrimed,
(eventType, relativePath) => this.handleTasksChange(eventType, relativePath)
);
this.polledTaskFiles = nextSnapshot;
this.tasksPollingPrimed = true;
}
private emitPolledChanges(
watcherType: PollingWatcherType,
previousSnapshot: Map<string, string>,
nextSnapshot: Map<string, string>,
isPrimed: boolean,
emitChange: (eventType: string, relativePath: string) => void
): void {
if (!isPrimed) {
logger.info(`FileWatcher: ${watcherType} polling baseline captured`);
return;
}
for (const [relativePath, fingerprint] of nextSnapshot) {
const previous = previousSnapshot.get(relativePath);
if (previous === undefined) {
emitChange('rename', relativePath);
} else if (previous !== fingerprint) {
emitChange('change', relativePath);
}
}
for (const relativePath of previousSnapshot.keys()) {
if (!nextSnapshot.has(relativePath)) {
emitChange('rename', relativePath);
}
}
}
private async collectTeamsPollSnapshot(): Promise<Map<string, string>> {
const snapshot = new Map<string, string>();
const teamEntries = await this.safeReadDir(this.teamsPath);
for (const teamEntry of teamEntries) {
if (!teamEntry.isDirectory()) {
continue;
}
const teamName = teamEntry.name;
const teamPath = path.join(this.teamsPath, teamName);
await this.collectPolledDirectoryFiles(snapshot, teamPath, teamName, (fileName) =>
fileName.endsWith('.json')
);
await this.collectPolledDirectoryFiles(
snapshot,
path.join(teamPath, 'inboxes'),
`${teamName}/inboxes`,
(fileName) => fileName.endsWith('.json')
);
}
return snapshot;
}
private async collectTasksPollSnapshot(): Promise<Map<string, string>> {
const snapshot = new Map<string, string>();
const teamEntries = await this.safeReadDir(this.tasksPath);
for (const teamEntry of teamEntries) {
if (!teamEntry.isDirectory()) {
continue;
}
const teamName = teamEntry.name;
await this.collectPolledDirectoryFiles(
snapshot,
path.join(this.tasksPath, teamName),
teamName,
(fileName) => !fileName.startsWith('.') && fileName.endsWith('.json')
);
}
return snapshot;
}
private async collectPolledDirectoryFiles(
snapshot: Map<string, string>,
dirPath: string,
relativeRoot: string,
shouldInclude: (fileName: string) => boolean
): Promise<void> {
const entries = await this.safeReadDir(dirPath);
for (const entry of entries) {
if (!entry.isFile() || !shouldInclude(entry.name)) {
continue;
}
await this.addPolledFile(
snapshot,
path.join(dirPath, entry.name),
`${relativeRoot}/${entry.name}`
);
}
}
private async addPolledFile(
snapshot: Map<string, string>,
absolutePath: string,
relativePath: string
): Promise<void> {
try {
const stats = await fsp.stat(absolutePath);
if (!stats.isFile()) {
return;
}
snapshot.set(
relativePath,
`${stats.dev}:${stats.ino}:${stats.mtimeMs}:${stats.ctimeMs}:${stats.size}`
);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
logger.debug(`FileWatcher: Unable to stat polled file ${absolutePath}`, error);
}
}
}
private async safeReadDir(dirPath: string): Promise<fs.Dirent[]> {
try {
return await fsp.readdir(dirPath, { withFileTypes: true });
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
logger.debug(`FileWatcher: Unable to read polled directory ${dirPath}`, error);
}
return [];
}
}
// ===========================================================================
// SSH Polling Mode
// ===========================================================================

View file

@ -0,0 +1,290 @@
import { OPENCODE_TASK_LOG_ATTRIBUTION_FILE } from '@shared/constants/opencodeTaskLogAttribution';
import { watch } from 'chokidar';
import * as fsp from 'fs/promises';
import * as path from 'path';
import type { FSWatcher } from 'chokidar';
import type { Dirent } from 'fs';
export type TeamTaskWatchKind = 'teams' | 'tasks';
export type TeamTaskWatchEventType = 'add' | 'change' | 'unlink' | 'addDir' | 'unlinkDir';
export interface TeamTaskWatchRegistryOptions {
kind: TeamTaskWatchKind;
rootPath: string;
onChange: (eventType: TeamTaskWatchEventType, relativePath: string) => void;
onError: (error: unknown) => void;
}
const RECONCILE_INTERVAL_MS = 30_000;
const TEAM_ROOT_FILES = new Set([
'config.json',
'processes.json',
'sentMessages.json',
'team.meta.json',
'members.meta.json',
OPENCODE_TASK_LOG_ATTRIBUTION_FILE,
]);
export class TeamTaskWatchRegistry {
private watcher: FSWatcher | null = null;
private reconcileTimer: NodeJS.Timeout | null = null;
private targets = new Set<string>();
private targetKey = '';
private initialTargetsCaptured = false;
private closed = false;
private generation = 0;
private reconcileInProgress = false;
private reconcileAgain = false;
constructor(private readonly options: TeamTaskWatchRegistryOptions) {}
async start(): Promise<void> {
if (this.closed) {
return;
}
await this.reconcileTargets();
if (this.closed || this.reconcileTimer) {
return;
}
this.reconcileTimer = setInterval(() => {
void this.reconcileTargets();
}, RECONCILE_INTERVAL_MS);
this.reconcileTimer.unref();
}
async close(): Promise<void> {
this.closed = true;
this.generation += 1;
if (this.reconcileTimer) {
clearInterval(this.reconcileTimer);
this.reconcileTimer = null;
}
const watcher = this.watcher;
this.watcher = null;
this.targets.clear();
this.targetKey = '';
if (watcher) {
await watcher.close().catch(() => undefined);
}
}
private async reconcileTargets(): Promise<void> {
if (this.closed) {
return;
}
if (this.reconcileInProgress) {
this.reconcileAgain = true;
return;
}
this.reconcileInProgress = true;
try {
const targets = await this.collectTargets();
const nextKey = targets.join('\n');
if (nextKey !== this.targetKey) {
const addedTargets = targets.filter((target) => !this.targets.has(target));
await this.rebuildWatcher(targets, nextKey, addedTargets);
}
} catch (error) {
if (!this.closed) {
this.options.onError(error);
}
} finally {
this.reconcileInProgress = false;
}
if (this.reconcileAgain && !this.closed) {
this.reconcileAgain = false;
await this.reconcileTargets();
}
}
private async rebuildWatcher(
targets: string[],
nextKey: string,
addedTargets: string[]
): Promise<void> {
const generation = this.generation + 1;
this.generation = generation;
const previousWatcher = this.watcher;
this.watcher = null;
if (previousWatcher) {
await previousWatcher.close().catch(() => undefined);
}
if (this.closed || generation !== this.generation) {
return;
}
const nextWatcher = watch(targets, {
ignoreInitial: true,
ignorePermissionErrors: true,
followSymlinks: false,
depth: 0,
});
this.watcher = nextWatcher;
this.targets = new Set(targets);
this.targetKey = nextKey;
const shouldEmitExistingFiles = this.initialTargetsCaptured;
this.initialTargetsCaptured = true;
const handleEvent = (eventType: TeamTaskWatchEventType, changedPath?: string): void => {
if (this.closed || generation !== this.generation || !changedPath) {
return;
}
const relativePath = this.toRelativePath(changedPath);
if (!relativePath) {
return;
}
if (this.shouldReconcile(eventType, relativePath)) {
void this.reconcileTargets();
}
if (!this.shouldEmit(eventType, relativePath)) {
return;
}
this.options.onChange(eventType, relativePath);
};
nextWatcher.on('add', (changedPath) => handleEvent('add', changedPath));
nextWatcher.on('change', (changedPath) => handleEvent('change', changedPath));
nextWatcher.on('unlink', (changedPath) => handleEvent('unlink', changedPath));
nextWatcher.on('addDir', (changedPath) => handleEvent('addDir', changedPath));
nextWatcher.on('unlinkDir', (changedPath) => handleEvent('unlinkDir', changedPath));
nextWatcher.on('error', (error) => {
if (!this.closed && generation === this.generation) {
this.options.onError(error);
}
});
if (shouldEmitExistingFiles) {
await this.emitExistingFilesForNewTargets(addedTargets, generation);
}
}
private async emitExistingFilesForNewTargets(
targets: string[],
generation: number
): Promise<void> {
const normalizedRoot = path.normalize(this.options.rootPath);
for (const targetPath of targets) {
if (this.closed || generation !== this.generation) {
return;
}
if (path.normalize(targetPath) === normalizedRoot) {
continue;
}
const entries = await this.readDirectory(targetPath);
for (const entry of entries) {
if (this.closed || generation !== this.generation) {
return;
}
if (!entry.isFile()) {
continue;
}
const relativePath = this.toRelativePath(path.join(targetPath, entry.name));
if (relativePath && this.shouldEmit('add', relativePath)) {
this.options.onChange('add', relativePath);
}
}
}
}
private async collectTargets(): Promise<string[]> {
const targets = new Set<string>([path.normalize(this.options.rootPath)]);
const rootEntries = await this.readDirectory(this.options.rootPath);
for (const entry of rootEntries) {
if (!entry.isDirectory()) {
continue;
}
const teamPath = path.join(this.options.rootPath, entry.name);
targets.add(path.normalize(teamPath));
if (this.options.kind === 'teams') {
const inboxPath = path.join(teamPath, 'inboxes');
if (await this.isDirectory(inboxPath)) {
targets.add(path.normalize(inboxPath));
}
}
}
return [...targets].sort((left, right) => left.localeCompare(right));
}
private async readDirectory(dirPath: string): Promise<Dirent[]> {
try {
return await fsp.readdir(dirPath, { withFileTypes: true });
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return [];
}
throw error;
}
}
private async isDirectory(dirPath: string): Promise<boolean> {
try {
return (await fsp.stat(dirPath)).isDirectory();
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return false;
}
throw error;
}
}
private toRelativePath(changedPath: string): string | null {
const absolutePath = path.isAbsolute(changedPath)
? changedPath
: path.join(this.options.rootPath, changedPath);
const relativePath = path.relative(this.options.rootPath, absolutePath);
if (!relativePath || relativePath.startsWith('..') || path.isAbsolute(relativePath)) {
return null;
}
return relativePath.replace(/\\/g, '/');
}
private shouldReconcile(eventType: TeamTaskWatchEventType, relativePath: string): boolean {
if (eventType !== 'addDir' && eventType !== 'unlinkDir') {
return false;
}
const parts = relativePath.split('/').filter(Boolean);
if (parts.length === 1) {
return true;
}
return this.options.kind === 'teams' && parts.length === 2 && parts[1] === 'inboxes';
}
private shouldEmit(eventType: TeamTaskWatchEventType, relativePath: string): boolean {
if (eventType === 'addDir' || eventType === 'unlinkDir') {
return false;
}
const parts = relativePath.split('/').filter(Boolean);
if (this.options.kind === 'tasks') {
return parts.length === 2 && !parts[1].startsWith('.') && parts[1].endsWith('.json');
}
if (parts.length === 2) {
return TEAM_ROOT_FILES.has(parts[1]);
}
return parts.length === 3 && parts[1] === 'inboxes' && parts[2].endsWith('.json');
}
}

View file

@ -1,10 +1,67 @@
import { EventEmitter } from 'events';
import type * as FsType from 'fs';
import * as os from 'os';
import * as path from 'path';
import { Readable } from 'stream';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import type * as FsType from 'fs';
type MockChokidarWatcher = {
targets: string[];
options: unknown;
handlers: Map<string, Array<(...args: unknown[]) => void>>;
on: (event: string, handler: (...args: unknown[]) => void) => MockChokidarWatcher;
close: ReturnType<typeof vi.fn>;
emit: (event: string, ...args: unknown[]) => void;
};
const chokidarMock = vi.hoisted(() => {
const instances: MockChokidarWatcher[] = [];
const createWatchImplementation = () => (targets: string | string[], options: unknown) => {
const watcher = {
targets: (Array.isArray(targets) ? targets : [targets]).map((target) => String(target)),
options,
handlers: new Map<string, Array<(...args: unknown[]) => void>>(),
close: vi.fn().mockResolvedValue(undefined),
emit(event: string, ...args: unknown[]) {
for (const handler of watcher.handlers.get(event) ?? []) {
handler(...args);
}
},
} as MockChokidarWatcher;
watcher.on = vi.fn((event: string, handler: (...args: unknown[]) => void) => {
const handlers = watcher.handlers.get(event) ?? [];
handlers.push(handler);
watcher.handlers.set(event, handlers);
return watcher;
});
instances.push(watcher);
return watcher;
};
const watch = vi.fn(createWatchImplementation());
return {
instances,
watch,
createWatcher(targets: string | string[], options: unknown): MockChokidarWatcher {
return createWatchImplementation()(targets, options);
},
reset() {
instances.length = 0;
watch.mockReset();
watch.mockImplementation(createWatchImplementation());
},
};
});
vi.mock('chokidar', () => ({
watch: chokidarMock.watch,
}));
vi.mock('@shared/utils/logger', () => ({
createLogger: () => ({
debug: vi.fn(),
@ -63,6 +120,7 @@ import * as fsp from 'fs/promises';
import { errorDetector } from '../../../../src/main/services/error/ErrorDetector';
import { DataCache } from '../../../../src/main/services/infrastructure/DataCache';
import { FileWatcher } from '../../../../src/main/services/infrastructure/FileWatcher';
import { setClaudeBasePathOverride } from '../../../../src/main/utils/pathDecoder';
import { OPENCODE_TASK_LOG_ATTRIBUTION_FILE } from '../../../../src/shared/constants/opencodeTaskLogAttribution';
function createFakeWatcher(): FsType.FSWatcher {
@ -73,12 +131,37 @@ function createFakeWatcher(): FsType.FSWatcher {
return emitter as unknown as FsType.FSWatcher;
}
function getChokidarWatcherForRoot(rootPath: string): MockChokidarWatcher {
const normalizedRoot = path.normalize(rootPath);
const watcher = [...chokidarMock.instances]
.reverse()
.find((instance) => instance.targets.includes(normalizedRoot));
if (!watcher) {
throw new Error(`Missing chokidar watcher for ${normalizedRoot}`);
}
return watcher;
}
function expectChokidarOptions(watcher: MockChokidarWatcher): void {
expect(watcher.options).toEqual({
ignoreInitial: true,
ignorePermissionErrors: true,
followSymlinks: false,
depth: 0,
});
}
/** Make existsSync delegate to the real implementation (needed for tests with real temp files) */
function useRealExistsSync() {
const realFn = (fs as unknown as { __realExistsSync: typeof fs.existsSync }).__realExistsSync;
vi.mocked(fs.existsSync).mockImplementation((p) => realFn(p));
}
function useRealAccess() {
const realFn = (fsp as unknown as { __realAccess: typeof fsp.access }).__realAccess;
vi.mocked(fsp.access).mockImplementation((p, mode) => realFn(p, mode));
}
function createMockNotificationManager() {
return {
addError: vi.fn().mockResolvedValue(null),
@ -103,9 +186,12 @@ function jsonlLine(uuid: string, text: string): string {
describe('FileWatcher', () => {
beforeEach(() => {
vi.useFakeTimers();
chokidarMock.reset();
});
afterEach(() => {
setClaudeBasePathOverride(null);
vi.unstubAllEnvs();
vi.useRealTimers();
vi.restoreAllMocks();
});
@ -170,6 +256,853 @@ describe('FileWatcher', () => {
watcher.stop();
});
it('falls back to teams polling when the chokidar registry hits the file descriptor limit', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-emfile-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.writeFileSync(path.join(teamsDir, 'base-1', 'inboxes', 'user.json'), '[]', 'utf8');
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchersByPath = new Map<string, FsType.FSWatcher>([
[projectsDir, createFakeWatcher()],
[todosDir, createFakeWatcher()],
]);
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation((targetPath) => {
const watcherForPath = watchersByPath.get(String(targetPath));
if (!watcherForPath) {
throw new Error(`Unexpected watch path: ${String(targetPath)}`);
}
return watcherForPath;
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2));
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.emit(
'error',
Object.assign(new Error('too many open files'), {
code: 'EMFILE',
path: path.join(teamsDir, 'base-1', 'inboxes'),
syscall: 'scandir',
})
);
await vi.advanceTimersByTimeAsync(0);
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
teamsPollingTimer: NodeJS.Timeout | null;
teamsPollingPrimed: boolean;
};
expect(watcherAny.teamsPollingTimer).not.toBeNull();
expect(watcherAny.retryTimer).toBeNull();
await vi.waitFor(() => expect(watcherAny.teamsPollingPrimed).toBe(true));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
await vi.advanceTimersByTimeAsync(2000);
expect(watchMock).toHaveBeenCalledTimes(2);
expect(teamsWatcher.close).toHaveBeenCalled();
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it.each(['ENOSPC', 'ERR_FS_WATCHER_LIMIT', 'ERR_FEATURE_UNAVAILABLE_ON_PLATFORM'])(
'falls back to tasks polling when chokidar reports %s',
async (code) => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-task-limit-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const tasksWatcher = getChokidarWatcherForRoot(tasksDir);
tasksWatcher.emit('error', Object.assign(new Error(code), { code }));
await vi.advanceTimersByTimeAsync(0);
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
tasksPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.tasksPollingTimer).not.toBeNull();
expect(watcherAny.retryTimer).toBeNull();
expect(tasksWatcher.close).toHaveBeenCalled();
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
}
);
it('falls back to polling when chokidar throws a known error during initial teams start', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-sync-limit-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
chokidarMock.watch.mockImplementation((targets, options) => {
const targetList = Array.isArray(targets) ? targets : [targets];
if (targetList.includes(path.normalize(teamsDir))) {
throw Object.assign(new Error('watch limit'), { code: 'EMFILE' });
}
return chokidarMock.createWatcher(targets, options);
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => {
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
teamsPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.teamsPollingTimer).not.toBeNull();
expect(watcherAny.retryTimer).toBeNull();
});
expect(chokidarMock.watch).toHaveBeenCalledTimes(2);
expect(getChokidarWatcherForRoot(tasksDir)).toBeTruthy();
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('retries chokidar registry after a non-limit error without enabling polling', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-nonlimit-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.emit('error', Object.assign(new Error('permission denied'), { code: 'EACCES' }));
await vi.advanceTimersByTimeAsync(0);
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
teamsPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.teamsPollingTimer).toBeNull();
expect(watcherAny.retryTimer).not.toBeNull();
expect(teamsWatcher.close).toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(2000);
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(3));
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('does not allow the legacy env var to force teams and tasks polling', async () => {
vi.stubEnv('AGENT_TEAMS_FILEWATCHER_TEAM_TASK_POLLING', '1');
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-chokidar-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2));
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const watcherAny = watcher as unknown as {
teamsPollingTimer: NodeJS.Timeout | null;
tasksPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.teamsPollingTimer).toBeNull();
expect(watcherAny.tasksPollingTimer).toBeNull();
expect(watchMock).not.toHaveBeenCalledWith(teamsDir, expect.anything(), expect.anything());
expect(watchMock).not.toHaveBeenCalledWith(tasksDir, expect.anything(), expect.anything());
expectChokidarOptions(getChokidarWatcherForRoot(teamsDir));
expectChokidarOptions(getChokidarWatcherForRoot(tasksDir));
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits team and task changes from the chokidar registry with stable relative paths', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-events-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'inboxes', 'user.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'sentMessages.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'processes.json'));
const tasksWatcher = getChokidarWatcherForRoot(tasksDir);
tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'task-1.json'));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([
{ type: 'config', teamName: 'base-1', detail: 'config.json' },
{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' },
{ type: 'inbox', teamName: 'base-1', detail: 'sentMessages.json' },
{ type: 'process', teamName: 'base-1', detail: 'processes.json' },
{ type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' },
]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits unlink events from the chokidar registry with the same relative path contract', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-unlink-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
getChokidarWatcherForRoot(teamsDir).emit(
'unlink',
path.join(teamsDir, 'base-1', 'config.json')
);
getChokidarWatcherForRoot(tasksDir).emit(
'unlink',
path.join(tasksDir, 'base-1', 'task-1.json')
);
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([
{ type: 'config', teamName: 'base-1', detail: 'config.json' },
{ type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' },
]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('normalizes relative chokidar paths and ignores paths outside the watched root', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-paths-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.emit('change', 'base-1/config.json');
teamsWatcher.emit('change', path.join(tempDir, 'outside', 'base-2', 'config.json'));
const tasksWatcher = getChokidarWatcherForRoot(tasksDir);
tasksWatcher.emit('change', 'base-1/task-1.json');
tasksWatcher.emit('change', path.join(tempDir, 'outside', 'base-2', 'task-2.json'));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([
{ type: 'config', teamName: 'base-1', detail: 'config.json' },
{ type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' },
]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('does not emit existing files from the initial chokidar registry baseline', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-baseline-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
fs.writeFileSync(path.join(teamsDir, 'base-1', 'config.json'), '{}', 'utf8');
fs.writeFileSync(path.join(teamsDir, 'base-1', 'inboxes', 'user.json'), '[]', 'utf8');
fs.writeFileSync(path.join(tasksDir, 'base-1', 'task-1.json'), '{}', 'utf8');
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits existing files once when reconciliation discovers new team and task directories', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-new-files-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
fs.mkdirSync(path.join(teamsDir, 'base-2', 'inboxes'), { recursive: true });
fs.writeFileSync(path.join(teamsDir, 'base-2', 'config.json'), '{}', 'utf8');
fs.writeFileSync(path.join(teamsDir, 'base-2', 'inboxes', 'user.json'), '[]', 'utf8');
fs.mkdirSync(path.join(tasksDir, 'base-2'), { recursive: true });
fs.writeFileSync(path.join(tasksDir, 'base-2', 'task-2.json'), '{}', 'utf8');
await vi.advanceTimersByTimeAsync(30_000);
await vi.waitFor(() => {
expect(events).toHaveLength(3);
expect(events).toEqual(
expect.arrayContaining([
{ type: 'config', teamName: 'base-2', detail: 'config.json' },
{ type: 'inbox', teamName: 'base-2', detail: 'inboxes/user.json' },
{ type: 'task', teamName: 'base-2', detail: 'task-2.json', taskId: 'task-2' },
])
);
});
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits existing inbox files once when reconciliation discovers a new inbox directory', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-new-inbox-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const inboxDir = path.join(teamsDir, 'base-1', 'inboxes');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
expect(getChokidarWatcherForRoot(teamsDir).targets).not.toContain(path.normalize(inboxDir));
fs.mkdirSync(inboxDir, { recursive: true });
fs.writeFileSync(path.join(inboxDir, 'user.json'), '[]', 'utf8');
await vi.advanceTimersByTimeAsync(30_000);
await vi.waitFor(() => {
expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(inboxDir));
expect(events).toEqual([
{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' },
]);
});
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('reconciles new team directories immediately after an addDir event', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-adddir-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const addedTeamDir = path.join(teamsDir, 'base-2');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
fs.mkdirSync(addedTeamDir, { recursive: true });
fs.writeFileSync(path.join(addedTeamDir, 'config.json'), '{}', 'utf8');
teamsWatcher.emit('addDir', addedTeamDir);
await vi.waitFor(() => {
expect(chokidarMock.watch).toHaveBeenCalledTimes(3);
expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(addedTeamDir));
expect(events).toEqual([{ type: 'config', teamName: 'base-2', detail: 'config.json' }]);
});
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('ignores events from an old chokidar generation after registry rebuild', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-generation-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const oldTeamsWatcher = getChokidarWatcherForRoot(teamsDir);
fs.mkdirSync(path.join(teamsDir, 'base-2'), { recursive: true });
await vi.advanceTimersByTimeAsync(30_000);
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(3));
const newTeamsWatcher = getChokidarWatcherForRoot(teamsDir);
expect(newTeamsWatcher).not.toBe(oldTeamsWatcher);
oldTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
newTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([{ type: 'config', teamName: 'base-1', detail: 'config.json' }]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('ignores stale chokidar events after stop closes the registry', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-stale-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
const tasksWatcher = getChokidarWatcherForRoot(tasksDir);
watcher.stop();
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'task-1.json'));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
expect(teamsWatcher.close).toHaveBeenCalled();
expect(tasksWatcher.close).toHaveBeenCalled();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('creates fresh chokidar registries after stop and start', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-restart-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const oldTeamsWatcher = getChokidarWatcherForRoot(teamsDir);
watcher.stop();
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(4));
const newTeamsWatcher = getChokidarWatcherForRoot(teamsDir);
expect(newTeamsWatcher).not.toBe(oldTeamsWatcher);
oldTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
newTeamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([{ type: 'config', teamName: 'base-1', detail: 'config.json' }]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('filters irrelevant team and task registry events before they reach FileWatcher handlers', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-filter-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'members', 'member.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', '.opencode-runtime', 'state.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'runtime', 'runtime.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'notes.json'));
teamsWatcher.emit('addDir', path.join(teamsDir, 'base-1', 'members'));
const tasksWatcher = getChokidarWatcherForRoot(tasksDir);
tasksWatcher.emit('change', path.join(tasksDir, 'base-1', '.lock'));
tasksWatcher.emit('change', path.join(tasksDir, 'base-1', '.highwatermark'));
tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'notes.txt'));
tasksWatcher.emit('change', path.join(tasksDir, 'base-1', 'nested', 'task.json'));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('keeps the teams registry shallow and excludes irrelevant runtime directories', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-scope-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const teamDir = path.join(teamsDir, 'base-1');
const inboxDir = path.join(teamDir, 'inboxes');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(inboxDir, { recursive: true });
fs.mkdirSync(path.join(teamDir, 'members'), { recursive: true });
fs.mkdirSync(path.join(teamDir, '.opencode-runtime'), { recursive: true });
fs.mkdirSync(path.join(teamDir, 'runtime'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const targets = getChokidarWatcherForRoot(teamsDir).targets;
expect(targets).toContain(path.normalize(teamsDir));
expect(targets).toContain(path.normalize(teamDir));
expect(targets).toContain(path.normalize(inboxDir));
expect(targets).not.toContain(path.normalize(path.join(teamDir, 'members')));
expect(targets).not.toContain(path.normalize(path.join(teamDir, '.opencode-runtime')));
expect(targets).not.toContain(path.normalize(path.join(teamDir, 'runtime')));
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('reconciles new and removed team inbox directories without content polling', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-reconcile-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const addedTeamDir = path.join(teamsDir, 'base-2');
const addedInboxDir = path.join(addedTeamDir, 'inboxes');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
expect(getChokidarWatcherForRoot(teamsDir).targets).not.toContain(path.normalize(addedTeamDir));
fs.mkdirSync(addedInboxDir, { recursive: true });
await vi.advanceTimersByTimeAsync(30_000);
await vi.waitFor(() =>
expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(addedInboxDir))
);
fs.rmSync(addedTeamDir, { recursive: true, force: true });
await vi.advanceTimersByTimeAsync(30_000);
await vi.waitFor(() =>
expect(getChokidarWatcherForRoot(teamsDir).targets).not.toContain(
path.normalize(addedTeamDir)
)
);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits team inbox changes from the teams polling fallback', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-poll-'));
setClaudeBasePathOverride(tempDir);
const teamsDir = path.join(tempDir, 'teams');
const inboxDir = path.join(teamsDir, 'base-1', 'inboxes');
fs.mkdirSync(inboxDir, { recursive: true });
const inboxPath = path.join(inboxDir, 'user.json');
fs.writeFileSync(inboxPath, '[]', 'utf8');
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(
dataCache,
path.join(tempDir, 'projects'),
path.join(tempDir, 'todos')
);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
const watcherAny = watcher as unknown as {
isWatching: boolean;
pollTeamsForChanges: () => Promise<void>;
};
watcherAny.isWatching = true;
await watcherAny.pollTeamsForChanges();
expect(events).toEqual([]);
fs.writeFileSync(inboxPath, '[{"messageId":"m1"}]', 'utf8');
await watcherAny.pollTeamsForChanges();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits task changes from the tasks polling fallback', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-task-poll-'));
setClaudeBasePathOverride(tempDir);
const taskDir = path.join(tempDir, 'tasks', 'base-1');
fs.mkdirSync(taskDir, { recursive: true });
const taskPath = path.join(taskDir, 'task-1.json');
fs.writeFileSync(taskPath, '{"status":"queued"}', 'utf8');
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(
dataCache,
path.join(tempDir, 'projects'),
path.join(tempDir, 'todos')
);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
const watcherAny = watcher as unknown as {
isWatching: boolean;
pollTasksForChanges: () => Promise<void>;
};
watcherAny.isWatching = true;
await watcherAny.pollTasksForChanges();
expect(events).toEqual([]);
fs.writeFileSync(taskPath, '{"status":"running"}', 'utf8');
await watcherAny.pollTasksForChanges();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([
{ type: 'task', teamName: 'base-1', detail: 'task-1.json', taskId: 'task-1' },
]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('emits log-source-change when OpenCode task-log attribution manifest changes', () => {
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, '/tmp/projects', '/tmp/todos');