feat: add resilient cross-platform file watching

Share watcher fallback behavior across project, todo, team, and task file monitoring. Add polling fallback coverage for watcher-limit and startup failure cases so Linux EMFILE conditions degrade instead of amplifying renderer crashes.
This commit is contained in:
777genius 2026-05-24 00:22:48 +03:00
parent b9cbdde502
commit 91b153459a
5 changed files with 2047 additions and 580 deletions

View file

@ -0,0 +1,320 @@
import { createLogger } from '@shared/utils/logger';
const logger = createLogger('Service:CrossPlatformFileChangeSource');
export type PollingChangeEventType = 'rename' | 'change';
export interface CloseableWatcher {
close: () => void | Promise<void>;
}
export interface WatcherLifecycle {
onError: (error: unknown) => void;
onClose: () => void;
isCurrent: () => boolean;
}
export interface CrossPlatformFileChangeSourceOptions {
name: string;
pollIntervalMs: number;
createWatcher?: (lifecycle: WatcherLifecycle) => Promise<CloseableWatcher> | CloseableWatcher;
collectPollSnapshot: () => Promise<Map<string, string>>;
emitPolledChange: (eventType: PollingChangeEventType, relativePath: string) => void;
isOwnerActive: () => boolean;
isWatchLimitError: (error: unknown) => boolean;
requestRetry: () => void;
onWatcherStartError?: (error: unknown) => void;
onWatcherError?: (error: unknown) => void;
onPollingError?: (error: unknown) => void;
}
export class CrossPlatformFileChangeSource {
private watcher: CloseableWatcher | null = null;
private pollingTimer: NodeJS.Timeout | null = null;
private pollingGenerationInProgress: number | null = null;
private pollingPrimed = false;
private pollSnapshot = new Map<string, string>();
private closedGeneration: number | null = null;
private rejectedGeneration: number | null = null;
private generation = 0;
private startPromise: Promise<void> | null = null;
constructor(private readonly options: CrossPlatformFileChangeSourceOptions) {}
get isActive(): boolean {
return this.watcher !== null || this.pollingTimer !== null;
}
get currentPollingTimer(): NodeJS.Timeout | null {
return this.pollingTimer;
}
get isPollingPrimed(): boolean {
return this.pollingPrimed;
}
async start(): Promise<void> {
if (this.isActive) {
return;
}
if (this.startPromise) {
await this.startPromise;
return;
}
const createWatcher = this.options.createWatcher;
if (!createWatcher) {
this.startPolling();
return;
}
const generation = this.nextGeneration();
this.closedGeneration = null;
this.rejectedGeneration = null;
const startPromise = this.startWatcher(generation, createWatcher);
this.startPromise = startPromise;
try {
await startPromise;
} finally {
if (this.startPromise === startPromise) {
this.startPromise = null;
}
}
}
private async startWatcher(
generation: number,
createWatcher: NonNullable<CrossPlatformFileChangeSourceOptions['createWatcher']>
): Promise<void> {
try {
const watcher = await createWatcher({
onError: (error) => this.handleWatcherError(error, generation),
onClose: () => this.handleWatcherClose(generation),
isCurrent: () => this.isCurrentGeneration(generation),
});
if (!this.isCurrentGeneration(generation)) {
await this.closeWatcher(watcher);
return;
}
this.watcher = watcher;
} catch (error) {
if (generation !== this.generation || !this.options.isOwnerActive()) {
return;
}
if (this.pollingTimer || this.rejectedGeneration === generation) {
return;
}
if (this.startPollingFallback(error, generation)) {
return;
}
if (this.closedGeneration === generation) {
return;
}
this.rejectedGeneration = generation;
this.options.onWatcherStartError?.(error);
this.options.requestRetry();
}
}
startPolling(): void {
if (this.pollingTimer || !this.options.isOwnerActive()) {
return;
}
const generation = this.nextGeneration();
this.startPollingForGeneration(generation);
}
private startPollingForGeneration(generation: number): void {
if (this.pollingTimer || generation !== this.generation || !this.options.isOwnerActive()) {
return;
}
const watcher = this.watcher;
this.watcher = null;
const runPoll = (): void => {
void this.pollOnce(generation);
};
this.pollingTimer = setInterval(runPoll, this.options.pollIntervalMs);
this.pollingTimer.unref();
runPoll();
if (watcher) {
void this.closeWatcher(watcher);
}
}
async pollOnce(expectedGeneration = this.generation): Promise<void> {
if (
expectedGeneration !== this.generation ||
!this.options.isOwnerActive() ||
this.pollingGenerationInProgress !== null
) {
return;
}
this.pollingGenerationInProgress = expectedGeneration;
try {
await this.pollForChanges(expectedGeneration);
} catch (error) {
if (expectedGeneration === this.generation && this.options.isOwnerActive()) {
this.options.onPollingError?.(error);
}
} finally {
if (this.pollingGenerationInProgress === expectedGeneration) {
this.pollingGenerationInProgress = null;
}
}
}
stop(): void {
this.generation += 1;
this.startPromise = null;
this.closedGeneration = null;
this.rejectedGeneration = null;
this.pollingGenerationInProgress = null;
this.pollingPrimed = false;
this.pollSnapshot.clear();
const timer = this.pollingTimer;
this.pollingTimer = null;
if (timer) {
clearInterval(timer);
}
const watcher = this.watcher;
this.watcher = null;
if (watcher) {
void this.closeWatcher(watcher);
}
}
private handleWatcherError(error: unknown, generation: number): void {
if (
generation !== this.generation ||
!this.options.isOwnerActive() ||
this.rejectedGeneration === generation
) {
return;
}
if (this.startPollingFallback(error, generation)) {
return;
}
if (this.closedGeneration === generation) {
return;
}
this.rejectedGeneration = generation;
this.options.onWatcherError?.(error);
const watcher = this.watcher;
this.watcher = null;
if (watcher) {
void this.closeWatcher(watcher);
}
if (!this.isActive) {
this.options.requestRetry();
}
}
private handleWatcherClose(generation: number): void {
if (
generation !== this.generation ||
!this.options.isOwnerActive() ||
this.closedGeneration === generation ||
this.rejectedGeneration === generation
) {
return;
}
this.closedGeneration = generation;
this.watcher = null;
if (!this.isActive) {
this.options.requestRetry();
}
}
private startPollingFallback(error: unknown, generation: number): boolean {
if (
generation !== this.generation ||
!this.options.isOwnerActive() ||
!this.options.isWatchLimitError(error)
) {
return false;
}
this.rejectedGeneration = generation;
const err = error as NodeJS.ErrnoException;
logger.warn(
`${this.options.name} watcher hit ${err.code ?? 'a platform limit'}; falling back to polling`
);
const watcher = this.watcher;
this.watcher = null;
this.startPollingForGeneration(generation);
if (watcher) {
void this.closeWatcher(watcher);
}
return true;
}
private async pollForChanges(expectedGeneration: number): Promise<void> {
const nextSnapshot = await this.options.collectPollSnapshot();
if (expectedGeneration !== this.generation || !this.options.isOwnerActive()) {
return;
}
if (!this.pollingPrimed) {
logger.info(`${this.options.name} polling baseline captured`);
this.pollSnapshot = nextSnapshot;
this.pollingPrimed = true;
return;
}
for (const [relativePath, fingerprint] of nextSnapshot) {
const previous = this.pollSnapshot.get(relativePath);
if (previous === undefined) {
this.options.emitPolledChange('rename', relativePath);
} else if (previous !== fingerprint) {
this.options.emitPolledChange('change', relativePath);
}
}
for (const relativePath of this.pollSnapshot.keys()) {
if (!nextSnapshot.has(relativePath)) {
this.options.emitPolledChange('rename', relativePath);
}
}
this.pollSnapshot = nextSnapshot;
}
private async closeWatcher(watcher: CloseableWatcher): Promise<void> {
try {
await watcher.close();
} catch (error) {
logger.debug(`${this.options.name} watcher close failed`, error);
}
}
private nextGeneration(): number {
this.generation += 1;
return this.generation;
}
private isCurrentGeneration(generation: number): boolean {
return (
generation === this.generation &&
this.options.isOwnerActive() &&
!this.pollingTimer &&
this.closedGeneration !== generation &&
this.rejectedGeneration !== generation
);
}
}

File diff suppressed because it is too large Load diff

View file

@ -22,6 +22,7 @@ const RECONCILE_INTERVAL_MS = 30_000;
// If a new team artifact should produce TeamChangeEvent, add it here too.
const TEAM_ROOT_FILES = new Set([
'config.json',
'kanban-state.json',
'processes.json',
'sentMessages.json',
'team.meta.json',
@ -62,7 +63,7 @@ export class TeamTaskWatchRegistry {
if (this.closed) {
return;
}
await this.reconcileTargets();
await this.reconcileTargets({ rethrowErrors: true });
if (this.closed || this.reconcileTimer) {
return;
}
@ -89,11 +90,11 @@ export class TeamTaskWatchRegistry {
this.targets.clear();
this.targetKey = '';
if (watcher) {
await watcher.close().catch(() => undefined);
await this.closeWatcher(watcher);
}
}
private async reconcileTargets(): Promise<void> {
private async reconcileTargets(options: { rethrowErrors?: boolean } = {}): Promise<void> {
if (this.closed) {
return;
}
@ -111,6 +112,9 @@ export class TeamTaskWatchRegistry {
await this.rebuildWatcher(targets, nextKey, addedTargets);
}
} catch (error) {
if (options.rethrowErrors) {
throw error;
}
if (!this.closed) {
this.options.onError(error);
}
@ -135,7 +139,7 @@ export class TeamTaskWatchRegistry {
const previousWatcher = this.watcher;
this.watcher = null;
if (previousWatcher) {
await previousWatcher.close().catch(() => undefined);
await this.closeWatcher(previousWatcher);
}
if (this.closed || generation !== this.generation) {
@ -264,6 +268,15 @@ export class TeamTaskWatchRegistry {
}
}
private async closeWatcher(watcher: FSWatcher): Promise<void> {
try {
await watcher.close();
} catch {
// Best-effort cleanup only. Chokidar close can fail if the underlying
// watcher is already torn down during startup or limit-error recovery.
}
}
private async isDirectory(dirPath: string): Promise<boolean> {
try {
return (await fsp.stat(dirPath)).isDirectory();

View file

@ -0,0 +1,598 @@
import { describe, expect, it, vi } from 'vitest';
import {
CrossPlatformFileChangeSource,
type WatcherLifecycle,
} from '../../../../src/main/services/infrastructure/CrossPlatformFileChangeSource';
vi.mock('@shared/utils/logger', () => ({
createLogger: () => ({
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
}),
}));
function createSource(options: {
active: () => boolean;
createWatcher?: (
lifecycle: WatcherLifecycle
) => Promise<{ close: () => void }> | { close: () => void };
collectPollSnapshot?: () => Promise<Map<string, string>>;
}) {
return new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: options.createWatcher,
collectPollSnapshot: options.collectPollSnapshot ?? vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: options.active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry: vi.fn(),
});
}
describe('CrossPlatformFileChangeSource', () => {
it('coalesces concurrent watcher starts into one watcher', async () => {
let active = true;
let resolveWatcher: ((watcher: { close: () => void }) => void) | undefined;
const close = vi.fn();
const createWatcher = vi.fn(
() =>
new Promise<{ close: () => void }>((resolve) => {
resolveWatcher = resolve;
})
);
const source = createSource({ active: () => active, createWatcher });
const firstStart = source.start();
const secondStart = source.start();
expect(createWatcher).toHaveBeenCalledTimes(1);
resolveWatcher?.({ close });
await Promise.all([firstStart, secondStart]);
expect(source.isActive).toBe(true);
source.stop();
active = false;
});
it('ignores stale watcher close events after restart', async () => {
let active = true;
const lifecycles: WatcherLifecycle[] = [];
const createWatcher = vi.fn((lifecycle: WatcherLifecycle) => {
lifecycles.push(lifecycle);
return { close: vi.fn() };
});
const source = createSource({ active: () => active, createWatcher });
await source.start();
source.stop();
await source.start();
lifecycles[0].onClose();
expect(source.isActive).toBe(true);
source.stop();
active = false;
});
it('marks old watcher lifecycles stale after restart', async () => {
let active = true;
const lifecycles: WatcherLifecycle[] = [];
const createWatcher = vi.fn((lifecycle: WatcherLifecycle) => {
lifecycles.push(lifecycle);
return { close: vi.fn() };
});
const source = createSource({ active: () => active, createWatcher });
await source.start();
expect(lifecycles[0].isCurrent()).toBe(true);
source.stop();
expect(lifecycles[0].isCurrent()).toBe(false);
await source.start();
expect(lifecycles[0].isCurrent()).toBe(false);
expect(lifecycles[1].isCurrent()).toBe(true);
source.stop();
active = false;
});
it('does not keep a watcher that closes during startup', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const close = vi.fn();
const requestRetry = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
lifecycle.onClose();
return { close };
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry,
});
await source.start();
expect(source.isActive).toBe(false);
expect(close).toHaveBeenCalled();
expect(requestRetry).toHaveBeenCalledTimes(1);
active = false;
});
it('falls back to polling when a watcher reports a limit error during startup', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const close = vi.fn();
const requestRetry = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
lifecycle.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' }));
return { close };
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
});
await source.start();
expect(source.currentPollingTimer).not.toBeNull();
expect(lifecycle?.isCurrent()).toBe(false);
expect(close).toHaveBeenCalled();
expect(requestRetry).not.toHaveBeenCalled();
source.stop();
active = false;
});
it('falls back to polling when startup closes before throwing a limit error', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const requestRetry = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
lifecycle.onClose();
throw Object.assign(new Error('too many open files'), { code: 'EMFILE' });
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
});
await source.start();
expect(source.currentPollingTimer).not.toBeNull();
expect(lifecycle?.isCurrent()).toBe(false);
expect(requestRetry).toHaveBeenCalledTimes(1);
source.stop();
active = false;
});
it('does not retry when startup throws after reporting a limit error', async () => {
let active = true;
const requestRetry = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((lifecycle: WatcherLifecycle) => {
lifecycle.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' }));
throw Object.assign(new Error('startup failed after limit error'), { code: 'EMFILE' });
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
});
await source.start();
expect(source.currentPollingTimer).not.toBeNull();
expect(requestRetry).not.toHaveBeenCalled();
source.stop();
active = false;
});
it('retries without keeping a watcher that reports a non-limit error during startup', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const close = vi.fn();
const requestRetry = vi.fn();
const onWatcherError = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
lifecycle.onError(Object.assign(new Error('permission denied'), { code: 'EACCES' }));
return { close };
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
onWatcherError,
});
await source.start();
expect(source.isActive).toBe(false);
expect(lifecycle?.isCurrent()).toBe(false);
expect(close).toHaveBeenCalled();
expect(onWatcherError).toHaveBeenCalledTimes(1);
expect(requestRetry).toHaveBeenCalledTimes(1);
active = false;
});
it('does not retry twice when startup throws after reporting a non-limit error', async () => {
let active = true;
const requestRetry = vi.fn();
const onWatcherError = vi.fn();
const onWatcherStartError = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((lifecycle: WatcherLifecycle) => {
lifecycle.onError(Object.assign(new Error('permission denied'), { code: 'EACCES' }));
throw Object.assign(new Error('startup failed after permission error'), { code: 'EACCES' });
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
onWatcherError,
onWatcherStartError,
});
await source.start();
expect(source.isActive).toBe(false);
expect(onWatcherError).toHaveBeenCalledTimes(1);
expect(onWatcherStartError).not.toHaveBeenCalled();
expect(requestRetry).toHaveBeenCalledTimes(1);
active = false;
});
it('invalidates startup lifecycles after a direct non-limit start failure', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const requestRetry = vi.fn();
const onWatcherError = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
throw Object.assign(new Error('permission denied'), { code: 'EACCES' });
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
onWatcherError,
});
await source.start();
expect(source.isActive).toBe(false);
expect(lifecycle?.isCurrent()).toBe(false);
expect(requestRetry).toHaveBeenCalledTimes(1);
lifecycle?.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' }));
lifecycle?.onClose();
expect(source.currentPollingTimer).toBeNull();
expect(onWatcherError).not.toHaveBeenCalled();
expect(requestRetry).toHaveBeenCalledTimes(1);
active = false;
});
it('does not request retry twice when a watcher closes after an error', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const close = vi.fn();
const requestRetry = vi.fn();
const onWatcherError = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
return { close };
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry,
onWatcherError,
});
await source.start();
lifecycle?.onError(Object.assign(new Error('permission denied'), { code: 'EACCES' }));
lifecycle?.onClose();
expect(source.isActive).toBe(false);
expect(close).toHaveBeenCalledTimes(1);
expect(onWatcherError).toHaveBeenCalledTimes(1);
expect(requestRetry).toHaveBeenCalledTimes(1);
active = false;
});
it('falls back to polling when a close is followed by a limit error', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const requestRetry = vi.fn();
const onWatcherError = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
return { close: vi.fn() };
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: (error) => (error as NodeJS.ErrnoException | undefined)?.code === 'EMFILE',
requestRetry,
onWatcherError,
});
await source.start();
lifecycle?.onClose();
lifecycle?.onError(Object.assign(new Error('too many open files'), { code: 'EMFILE' }));
expect(source.currentPollingTimer).not.toBeNull();
expect(onWatcherError).not.toHaveBeenCalled();
expect(requestRetry).toHaveBeenCalledTimes(1);
source.stop();
active = false;
});
it('does not request retry when switching an active watcher to polling', async () => {
let active = true;
let lifecycle: WatcherLifecycle | undefined;
const requestRetry = vi.fn();
const close = vi.fn(() => lifecycle?.onClose());
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn((nextLifecycle: WatcherLifecycle) => {
lifecycle = nextLifecycle;
return { close };
}),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry,
});
await source.start();
source.startPolling();
expect(source.currentPollingTimer).not.toBeNull();
expect(lifecycle?.isCurrent()).toBe(false);
expect(close).toHaveBeenCalled();
expect(requestRetry).not.toHaveBeenCalled();
source.stop();
active = false;
});
it('closes a late watcher when polling starts during watcher startup', async () => {
let active = true;
let resolveWatcher: ((watcher: { close: () => void }) => void) | undefined;
const close = vi.fn();
const lifecycles: WatcherLifecycle[] = [];
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn(
(lifecycle: WatcherLifecycle) =>
new Promise<{ close: () => void }>((resolve) => {
lifecycles.push(lifecycle);
resolveWatcher = resolve;
})
),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry: vi.fn(),
});
const start = source.start();
expect(lifecycles).toHaveLength(1);
expect(lifecycles[0].isCurrent()).toBe(true);
source.startPolling();
expect(source.currentPollingTimer).not.toBeNull();
expect(lifecycles[0].isCurrent()).toBe(false);
resolveWatcher?.({ close });
await start;
expect(close).toHaveBeenCalled();
source.stop();
active = false;
});
it('closes a stale pending watcher after stop and restart', async () => {
let active = true;
const resolvers: Array<(watcher: { close: () => void }) => void> = [];
const closeOld = vi.fn();
const closeCurrent = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn(
() =>
new Promise<{ close: () => void }>((resolve) => {
resolvers.push(resolve);
})
),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry: vi.fn(),
});
const firstStart = source.start();
source.stop();
const secondStart = source.start();
resolvers[1]?.({ close: closeCurrent });
await secondStart;
expect(source.isActive).toBe(true);
resolvers[0]?.({ close: closeOld });
await firstStart;
expect(closeOld).toHaveBeenCalledTimes(1);
expect(closeCurrent).not.toHaveBeenCalled();
expect(source.isActive).toBe(true);
source.stop();
active = false;
});
it('swallows synchronous watcher close failures during stale startup cleanup', async () => {
let active = true;
let resolveWatcher: ((watcher: { close: () => void }) => void) | undefined;
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
createWatcher: vi.fn(
() =>
new Promise<{ close: () => void }>((resolve) => {
resolveWatcher = resolve;
})
),
collectPollSnapshot: vi.fn().mockResolvedValue(new Map()),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry: vi.fn(),
});
const start = source.start();
source.stop();
resolveWatcher?.({
close: () => {
throw new Error('close failed');
},
});
await expect(start).resolves.toBeUndefined();
expect(source.isActive).toBe(false);
active = false;
});
it('ignores stale in-flight polling snapshots after stop and restart', async () => {
let active = true;
const snapshots: Array<() => void> = [];
const emitted: string[] = [];
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
collectPollSnapshot: () =>
new Promise<Map<string, string>>((resolve) => {
snapshots.push(() => resolve(new Map([['old.json', '1']])));
}),
emitPolledChange: (_eventType, relativePath) => emitted.push(relativePath),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry: vi.fn(),
});
source.startPolling();
expect(snapshots).toHaveLength(1);
source.stop();
source.startPolling();
expect(snapshots).toHaveLength(2);
snapshots[0]();
await Promise.resolve();
snapshots[1]();
await Promise.resolve();
expect(emitted).toEqual([]);
source.stop();
active = false;
});
it('ignores stale in-flight polling errors after stop', async () => {
let active = true;
const onPollingError = vi.fn();
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
collectPollSnapshot: vi.fn().mockRejectedValue(new Error('old polling failure')),
emitPolledChange: vi.fn(),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry: vi.fn(),
onPollingError,
});
const poll = source.pollOnce();
source.stop();
active = false;
await poll;
expect(onPollingError).not.toHaveBeenCalled();
});
it('keeps the previous polling snapshot when a poll fails', async () => {
let active = true;
const emitted: Array<[string, string]> = [];
const onPollingError = vi.fn();
const collectPollSnapshot = vi
.fn()
.mockResolvedValueOnce(new Map([['session.jsonl', '1']]))
.mockRejectedValueOnce(new Error('transient polling failure'))
.mockResolvedValueOnce(new Map([['session.jsonl', '2']]));
const source = new CrossPlatformFileChangeSource({
name: 'test-source',
pollIntervalMs: 1000,
collectPollSnapshot,
emitPolledChange: (eventType, relativePath) => emitted.push([eventType, relativePath]),
isOwnerActive: () => active,
isWatchLimitError: () => false,
requestRetry: vi.fn(),
onPollingError,
});
await source.pollOnce();
await source.pollOnce();
await source.pollOnce();
expect(onPollingError).toHaveBeenCalledTimes(1);
expect(emitted).toEqual([['change', 'session.jsonl']]);
source.stop();
active = false;
});
});

View file

@ -168,6 +168,61 @@ function createMockNotificationManager() {
} as unknown as Parameters<FileWatcher['setNotificationManager']>[0];
}
function createFsDirent(
name: string,
type: 'file' | 'directory',
metadata: { size?: number; mtimeMs?: number } = {}
) {
return {
name,
...metadata,
isFile: () => type === 'file',
isDirectory: () => type === 'directory',
};
}
type TestChangeSourceName = 'projects' | 'todos' | 'teams' | 'tasks';
interface TestChangeSourceState {
currentPollingTimer: NodeJS.Timeout | null;
isPollingPrimed: boolean;
pollOnce: () => Promise<void>;
}
function getChangeSource(watcher: FileWatcher, name: TestChangeSourceName): TestChangeSourceState {
return (
watcher as unknown as {
changeSources: Record<TestChangeSourceName, TestChangeSourceState>;
}
).changeSources[name];
}
function getRetryTimer(watcher: FileWatcher): NodeJS.Timeout | null {
return (watcher as unknown as { retryTimer: NodeJS.Timeout | null }).retryTimer;
}
function setWatcherActive(watcher: FileWatcher): void {
(watcher as unknown as { isWatching: boolean }).isWatching = true;
}
type NativeWatchCallback = (eventType: string, filename: string) => void;
function getNativeWatchCallback(
optionsOrListener: unknown,
maybeListener: unknown
): NativeWatchCallback | undefined {
return (
typeof optionsOrListener === 'function' ? optionsOrListener : maybeListener
) as NativeWatchCallback | undefined;
}
function mockFsWatchImplementation(implementation: (...args: unknown[]) => FsType.FSWatcher): void {
(
vi.mocked(fs.watch) as unknown as {
mockImplementation: (nextImplementation: (...args: unknown[]) => FsType.FSWatcher) => void;
}
).mockImplementation(implementation);
}
/** Helper to write a valid JSONL line */
function jsonlLine(uuid: string, text: string): string {
return (
@ -256,6 +311,659 @@ describe('FileWatcher', () => {
watcher.stop();
});
it('ignores stale native watcher callbacks after stop and restart', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-native-stale-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const projectDir = path.join(projectsDir, 'encoded-project');
fs.mkdirSync(projectDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(tempDir, 'teams', 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tempDir, 'tasks', 'base-1'), { recursive: true });
fs.writeFileSync(path.join(projectDir, 'old-session.jsonl'), jsonlLine('old', 'old'), 'utf8');
fs.writeFileSync(path.join(projectDir, 'new-session.jsonl'), jsonlLine('new', 'new'), 'utf8');
fs.writeFileSync(path.join(todosDir, 'old-todo.json'), '{"items":[]}', 'utf8');
fs.writeFileSync(path.join(todosDir, 'new-todo.json'), '{"items":[]}', 'utf8');
useRealAccess();
const projectCallbacks: Array<(eventType: string, filename: string) => void> = [];
const todoCallbacks: Array<(eventType: string, filename: string) => void> = [];
mockFsWatchImplementation((targetPath, optionsOrListener, maybeListener) => {
const listener = getNativeWatchCallback(optionsOrListener, maybeListener);
if (String(targetPath) === projectsDir && listener) {
projectCallbacks.push(listener);
} else if (String(targetPath) === todosDir && listener) {
todoCallbacks.push(listener);
}
return createFakeWatcher();
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const fileEvents: unknown[] = [];
const todoEvents: unknown[] = [];
watcher.on('file-change', (event) => fileEvents.push(event));
watcher.on('todo-change', (event) => todoEvents.push(event));
watcher.start();
await vi.waitFor(() => {
expect(projectCallbacks).toHaveLength(1);
expect(todoCallbacks).toHaveLength(1);
});
watcher.stop();
watcher.start();
await vi.waitFor(() => {
expect(projectCallbacks).toHaveLength(2);
expect(todoCallbacks).toHaveLength(2);
});
projectCallbacks[0]('rename', 'encoded-project/old-session.jsonl');
todoCallbacks[0]('rename', 'old-todo.json');
projectCallbacks[1]('rename', 'encoded-project/new-session.jsonl');
todoCallbacks[1]('rename', 'new-todo.json');
await vi.advanceTimersByTimeAsync(100);
await vi.waitFor(() => {
expect(fileEvents).toEqual([
{
type: 'add',
path: path.join(projectDir, 'new-session.jsonl'),
projectId: 'encoded-project',
sessionId: 'new-session',
isSubagent: false,
},
]);
expect(todoEvents).toEqual([
{
type: 'add',
path: path.join(todosDir, 'new-todo.json'),
sessionId: 'new-todo',
isSubagent: false,
},
]);
});
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('falls back to projects polling on EMFILE and still emits session and subagent changes', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-project-emfile-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const projectDir = path.join(projectsDir, 'encoded-project');
const sessionPath = path.join(projectDir, 'session-1.jsonl');
fs.mkdirSync(projectDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
fs.writeFileSync(sessionPath, jsonlLine('a1', 'baseline'), 'utf8');
useRealAccess();
const projectWatcher = createFakeWatcher();
const todoWatcher = createFakeWatcher();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation((targetPath) => {
if (String(targetPath) === projectsDir) return projectWatcher;
if (String(targetPath) === todosDir) return todoWatcher;
throw new Error(`Unexpected watch path: ${String(targetPath)}`);
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2));
(projectWatcher as unknown as EventEmitter).emit(
'error',
Object.assign(new Error('too many open files'), { code: 'EMFILE' })
);
await vi.advanceTimersByTimeAsync(0);
const projectsSource = getChangeSource(watcher, 'projects');
expect(projectsSource.currentPollingTimer).not.toBeNull();
expect(getRetryTimer(watcher)).toBeNull();
expect(projectWatcher.close).toHaveBeenCalled();
await vi.waitFor(() => expect(projectsSource.isPollingPrimed).toBe(true));
fs.appendFileSync(sessionPath, jsonlLine('a2', 'changed'), 'utf8');
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
await vi.waitFor(() =>
expect(events).toContainEqual({
type: 'change',
path: sessionPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
})
);
const subagentsDir = path.join(projectDir, 'session-1', 'subagents');
const subagentPath = path.join(subagentsDir, 'agent-worker.jsonl');
fs.mkdirSync(subagentsDir, { recursive: true });
fs.writeFileSync(subagentPath, jsonlLine('s1', 'subagent'), 'utf8');
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
await vi.waitFor(() =>
expect(events).toContainEqual({
type: 'add',
path: subagentPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: true,
})
);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('ignores stale native project callbacks after EMFILE fallback switches to polling', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-project-stale-fallback-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const projectDir = path.join(projectsDir, 'encoded-project');
fs.mkdirSync(projectDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
fs.writeFileSync(path.join(projectDir, 'stale-session.jsonl'), jsonlLine('stale', 'stale'), 'utf8');
useRealAccess();
const projectWatcher = createFakeWatcher();
const todoWatcher = createFakeWatcher();
const projectCallbacks: Array<(eventType: string, filename: string) => void> = [];
mockFsWatchImplementation((targetPath, optionsOrListener, maybeListener) => {
const listener = getNativeWatchCallback(optionsOrListener, maybeListener);
if (String(targetPath) === projectsDir) {
if (listener) {
projectCallbacks.push(listener);
}
return projectWatcher;
}
if (String(targetPath) === todosDir) {
return todoWatcher;
}
throw new Error(`Unexpected watch path: ${String(targetPath)}`);
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(projectCallbacks).toHaveLength(1));
(projectWatcher as unknown as EventEmitter).emit(
'error',
Object.assign(new Error('too many open files'), { code: 'EMFILE' })
);
await vi.advanceTimersByTimeAsync(0);
const projectsSource = getChangeSource(watcher, 'projects');
expect(projectsSource.currentPollingTimer).not.toBeNull();
await vi.waitFor(() => expect(projectsSource.isPollingPrimed).toBe(true));
projectCallbacks[0]('rename', 'encoded-project/stale-session.jsonl');
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('falls back to todos polling on EMFILE and still emits todo changes', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-todo-emfile-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const todoPath = path.join(todosDir, 'session-1.json');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
fs.writeFileSync(todoPath, '{"items":[]}', 'utf8');
useRealAccess();
const projectWatcher = createFakeWatcher();
const todoWatcher = createFakeWatcher();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation((targetPath) => {
if (String(targetPath) === projectsDir) return projectWatcher;
if (String(targetPath) === todosDir) return todoWatcher;
throw new Error(`Unexpected watch path: ${String(targetPath)}`);
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('todo-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2));
(todoWatcher as unknown as EventEmitter).emit(
'error',
Object.assign(new Error('too many open files'), { code: 'EMFILE' })
);
await vi.advanceTimersByTimeAsync(0);
const todosSource = getChangeSource(watcher, 'todos');
expect(todosSource.currentPollingTimer).not.toBeNull();
expect(getRetryTimer(watcher)).toBeNull();
expect(todoWatcher.close).toHaveBeenCalled();
await vi.waitFor(() => expect(todosSource.isPollingPrimed).toBe(true));
fs.writeFileSync(todoPath, '{"items":[{"text":"done"}]}', 'utf8');
await todosSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
await vi.waitFor(() =>
expect(events).toContainEqual({
type: 'change',
path: todoPath,
sessionId: 'session-1',
isSubagent: false,
})
);
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('uses a custom local provider for project polling snapshots', async () => {
const projectsDir = '/virtual/projects';
const todosDir = '/virtual/todos';
const sessionEntry = createFsDirent('session-1.jsonl', 'file', { size: 10, mtimeMs: 1000 });
const fsProvider = {
type: 'local' as const,
exists: vi.fn().mockResolvedValue(true),
readFile: vi.fn().mockResolvedValue(''),
stat: vi.fn().mockResolvedValue({
size: 10,
mtimeMs: 1000,
birthtimeMs: 1000,
isFile: () => true,
isDirectory: () => false,
}),
readdir: vi.fn(async (dirPath: string) => {
if (dirPath === projectsDir) {
return [createFsDirent('encoded-project', 'directory')];
}
if (dirPath === path.join(projectsDir, 'encoded-project')) {
return [sessionEntry];
}
return [];
}),
createReadStream: vi.fn(() => Readable.from([])),
dispose: vi.fn(),
};
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
setWatcherActive(watcher);
const projectsSource = getChangeSource(watcher, 'projects');
await projectsSource.pollOnce();
expect(events).toEqual([]);
sessionEntry.size = 12;
sessionEntry.mtimeMs = 2000;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(fsProvider.readdir).toHaveBeenCalledWith(projectsDir);
expect(events).toContainEqual({
type: 'change',
path: path.join(projectsDir, 'encoded-project', 'session-1.jsonl'),
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
});
watcher.stop();
});
it('treats SSH not-found subagent directories as empty during project polling', async () => {
const projectsDir = '/remote/projects';
const todosDir = '/remote/todos';
const projectDir = path.join(projectsDir, 'encoded-project');
const sessionPath = path.join(projectDir, 'session-1.jsonl');
let size = 10;
let mtimeMs = 1000;
const fsProvider = {
type: 'ssh' as const,
exists: vi.fn(async (filePath: string) => filePath === sessionPath),
readFile: vi.fn().mockResolvedValue(''),
stat: vi.fn(async (filePath: string) => {
if (filePath !== sessionPath) {
throw Object.assign(new Error('not found'), { code: '2' });
}
return {
size,
mtimeMs,
birthtimeMs: 1000,
isFile: () => true,
isDirectory: () => false,
};
}),
readdir: vi.fn(async (dirPath: string) => {
if (dirPath === projectsDir) {
return [createFsDirent('encoded-project', 'directory')];
}
if (dirPath === projectDir) {
return [
createFsDirent('session-1.jsonl', 'file'),
createFsDirent('session-1', 'directory'),
];
}
if (dirPath === path.join(projectDir, 'session-1', 'subagents')) {
throw Object.assign(new Error('not found'), { code: '2' });
}
return [];
}),
createReadStream: vi.fn(() => Readable.from([])),
dispose: vi.fn(),
};
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
setWatcherActive(watcher);
const projectsSource = getChangeSource(watcher, 'projects');
await projectsSource.pollOnce();
expect(events).toEqual([]);
size = 12;
mtimeMs = 2000;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toContainEqual({
type: 'change',
path: sessionPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
});
watcher.stop();
});
it.each(['EMFILE', 'ENOENT'])(
'does not emit false project deletes when a polling stat fails with %s',
async (failureCode) => {
const projectsDir = '/virtual/projects';
const todosDir = '/virtual/todos';
const sessionPath = path.join(projectsDir, 'encoded-project', 'session-1.jsonl');
let statShouldFail = false;
let size = 10;
let mtimeMs = 1000;
const fsProvider = {
type: 'local' as const,
exists: vi.fn().mockResolvedValue(true),
readFile: vi.fn().mockResolvedValue(''),
stat: vi.fn(async () => {
if (statShouldFail) {
throw Object.assign(new Error(failureCode), { code: failureCode });
}
return {
size,
mtimeMs,
birthtimeMs: 1000,
isFile: () => true,
isDirectory: () => false,
};
}),
readdir: vi.fn(async (dirPath: string) => {
if (dirPath === projectsDir) {
return [createFsDirent('encoded-project', 'directory')];
}
if (dirPath === path.join(projectsDir, 'encoded-project')) {
return [createFsDirent('session-1.jsonl', 'file')];
}
return [];
}),
createReadStream: vi.fn(() => Readable.from([])),
dispose: vi.fn(),
};
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
setWatcherActive(watcher);
const projectsSource = getChangeSource(watcher, 'projects');
await projectsSource.pollOnce();
expect(events).toEqual([]);
statShouldFail = true;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
statShouldFail = false;
size = 12;
mtimeMs = 2000;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toContainEqual({
type: 'change',
path: sessionPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
});
watcher.stop();
}
);
it('does not emit false project deletes when the polling root is temporarily missing', async () => {
const projectsDir = '/virtual/projects';
const todosDir = '/virtual/todos';
const sessionPath = path.join(projectsDir, 'encoded-project', 'session-1.jsonl');
let rootMissing = false;
let size = 10;
let mtimeMs = 1000;
const fsProvider = {
type: 'local' as const,
exists: vi.fn().mockResolvedValue(true),
readFile: vi.fn().mockResolvedValue(''),
stat: vi.fn().mockImplementation(async () => ({
size,
mtimeMs,
birthtimeMs: 1000,
isFile: () => true,
isDirectory: () => false,
})),
readdir: vi.fn(async (dirPath: string) => {
if (dirPath === projectsDir) {
if (rootMissing) {
throw Object.assign(new Error('missing root'), { code: 'ENOENT' });
}
return [createFsDirent('encoded-project', 'directory')];
}
if (dirPath === path.join(projectsDir, 'encoded-project')) {
return [createFsDirent('session-1.jsonl', 'file')];
}
return [];
}),
createReadStream: vi.fn(() => Readable.from([])),
dispose: vi.fn(),
};
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
setWatcherActive(watcher);
const projectsSource = getChangeSource(watcher, 'projects');
await projectsSource.pollOnce();
expect(events).toEqual([]);
rootMissing = true;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
rootMissing = false;
size = 12;
mtimeMs = 2000;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toContainEqual({
type: 'change',
path: sessionPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
});
watcher.stop();
});
it('still emits project deletes when polling no longer lists a file', async () => {
const projectsDir = '/virtual/projects';
const todosDir = '/virtual/todos';
const sessionPath = path.join(projectsDir, 'encoded-project', 'session-1.jsonl');
let filePresent = true;
const fsProvider = {
type: 'local' as const,
exists: vi.fn(async (filePath: string) => filePath !== sessionPath || filePresent),
readFile: vi.fn().mockResolvedValue(''),
stat: vi.fn().mockResolvedValue({
size: 10,
mtimeMs: 1000,
birthtimeMs: 1000,
isFile: () => true,
isDirectory: () => false,
}),
readdir: vi.fn(async (dirPath: string) => {
if (dirPath === projectsDir) {
return [createFsDirent('encoded-project', 'directory')];
}
if (dirPath === path.join(projectsDir, 'encoded-project')) {
return filePresent ? [createFsDirent('session-1.jsonl', 'file')] : [];
}
return [];
}),
createReadStream: vi.fn(() => Readable.from([])),
dispose: vi.fn(),
};
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
setWatcherActive(watcher);
const projectsSource = getChangeSource(watcher, 'projects');
await projectsSource.pollOnce();
expect(events).toEqual([]);
filePresent = false;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toContainEqual({
type: 'unlink',
path: sessionPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
});
watcher.stop();
});
it('does not emit false project deletes when a listed project dir vanishes during polling', async () => {
const projectsDir = '/virtual/projects';
const todosDir = '/virtual/todos';
const projectDir = path.join(projectsDir, 'encoded-project');
const sessionPath = path.join(projectDir, 'session-1.jsonl');
let rootListsProject = true;
let projectDirMissing = false;
const fsProvider = {
type: 'local' as const,
exists: vi.fn(async (filePath: string) => filePath !== sessionPath || rootListsProject),
readFile: vi.fn().mockResolvedValue(''),
stat: vi.fn().mockResolvedValue({
size: 10,
mtimeMs: 1000,
birthtimeMs: 1000,
isFile: () => true,
isDirectory: () => false,
}),
readdir: vi.fn(async (dirPath: string) => {
if (dirPath === projectsDir) {
return rootListsProject ? [createFsDirent('encoded-project', 'directory')] : [];
}
if (dirPath === projectDir) {
if (projectDirMissing) {
throw Object.assign(new Error('missing project dir'), { code: 'ENOENT' });
}
return [createFsDirent('session-1.jsonl', 'file')];
}
return [];
}),
createReadStream: vi.fn(() => Readable.from([])),
dispose: vi.fn(),
};
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir, fsProvider);
const events: unknown[] = [];
watcher.on('file-change', (event) => events.push(event));
setWatcherActive(watcher);
const projectsSource = getChangeSource(watcher, 'projects');
await projectsSource.pollOnce();
expect(events).toEqual([]);
projectDirMissing = true;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
rootListsProject = false;
await projectsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toContainEqual({
type: 'unlink',
path: sessionPath,
projectId: 'encoded-project',
sessionId: 'session-1',
isSubagent: false,
});
watcher.stop();
});
it('falls back to teams polling when the chokidar registry hits the file descriptor limit', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-emfile-'));
setClaudeBasePathOverride(tempDir);
@ -303,14 +1011,10 @@ describe('FileWatcher', () => {
);
await vi.advanceTimersByTimeAsync(0);
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
teamsPollingTimer: NodeJS.Timeout | null;
teamsPollingPrimed: boolean;
};
expect(watcherAny.teamsPollingTimer).not.toBeNull();
expect(watcherAny.retryTimer).toBeNull();
await vi.waitFor(() => expect(watcherAny.teamsPollingPrimed).toBe(true));
const teamsSource = getChangeSource(watcher, 'teams');
expect(teamsSource.currentPollingTimer).not.toBeNull();
expect(getRetryTimer(watcher)).toBeNull();
await vi.waitFor(() => expect(teamsSource.isPollingPrimed).toBe(true));
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([]);
@ -350,12 +1054,8 @@ describe('FileWatcher', () => {
tasksWatcher.emit('error', Object.assign(new Error(code), { code }));
await vi.advanceTimersByTimeAsync(0);
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
tasksPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.tasksPollingTimer).not.toBeNull();
expect(watcherAny.retryTimer).toBeNull();
expect(getChangeSource(watcher, 'tasks').currentPollingTimer).not.toBeNull();
expect(getRetryTimer(watcher)).toBeNull();
expect(tasksWatcher.close).toHaveBeenCalled();
watcher.stop();
@ -391,12 +1091,8 @@ describe('FileWatcher', () => {
watcher.start();
await vi.waitFor(() => {
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
teamsPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.teamsPollingTimer).not.toBeNull();
expect(watcherAny.retryTimer).toBeNull();
expect(getChangeSource(watcher, 'teams').currentPollingTimer).not.toBeNull();
expect(getRetryTimer(watcher)).toBeNull();
});
expect(chokidarMock.watch).toHaveBeenCalledTimes(2);
@ -406,6 +1102,49 @@ describe('FileWatcher', () => {
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('closes a partially-created teams registry when initial start fails', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-partial-limit-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const partialWatcher = chokidarMock.createWatcher([path.normalize(teamsDir)], {});
partialWatcher.on = vi.fn(() => {
throw Object.assign(new Error('watch limit during listener registration'), {
code: 'EMFILE',
});
});
chokidarMock.instances.length = 0;
chokidarMock.watch.mockImplementation((targets, options) => {
const targetList = Array.isArray(targets) ? targets : [targets];
if (targetList.includes(path.normalize(teamsDir))) {
return partialWatcher;
}
return chokidarMock.createWatcher(targets, options);
});
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
watcher.start();
await vi.waitFor(() => {
expect(getChangeSource(watcher, 'teams').currentPollingTimer).not.toBeNull();
expect(partialWatcher.close).toHaveBeenCalled();
});
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('retries chokidar registry after a non-limit error without enabling polling', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-team-nonlimit-'));
setClaudeBasePathOverride(tempDir);
@ -432,12 +1171,8 @@ describe('FileWatcher', () => {
teamsWatcher.emit('error', Object.assign(new Error('permission denied'), { code: 'EACCES' }));
await vi.advanceTimersByTimeAsync(0);
const watcherAny = watcher as unknown as {
retryTimer: NodeJS.Timeout | null;
teamsPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.teamsPollingTimer).toBeNull();
expect(watcherAny.retryTimer).not.toBeNull();
expect(getChangeSource(watcher, 'teams').currentPollingTimer).toBeNull();
expect(getRetryTimer(watcher)).not.toBeNull();
expect(teamsWatcher.close).toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(2000);
@ -472,12 +1207,8 @@ describe('FileWatcher', () => {
await vi.waitFor(() => expect(watchMock).toHaveBeenCalledTimes(2));
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const watcherAny = watcher as unknown as {
teamsPollingTimer: NodeJS.Timeout | null;
tasksPollingTimer: NodeJS.Timeout | null;
};
expect(watcherAny.teamsPollingTimer).toBeNull();
expect(watcherAny.tasksPollingTimer).toBeNull();
expect(getChangeSource(watcher, 'teams').currentPollingTimer).toBeNull();
expect(getChangeSource(watcher, 'tasks').currentPollingTimer).toBeNull();
expect(watchMock).not.toHaveBeenCalledWith(teamsDir, expect.anything(), expect.anything());
expect(watchMock).not.toHaveBeenCalledWith(tasksDir, expect.anything(), expect.anything());
expectChokidarOptions(getChokidarWatcherForRoot(teamsDir));
@ -513,6 +1244,7 @@ describe('FileWatcher', () => {
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'config.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'kanban-state.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'inboxes', 'user.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'sentMessages.json'));
teamsWatcher.emit('change', path.join(teamsDir, 'base-1', 'processes.json'));
@ -524,6 +1256,7 @@ describe('FileWatcher', () => {
expect(events).toEqual([
{ type: 'config', teamName: 'base-1', detail: 'config.json' },
{ type: 'config', teamName: 'base-1', detail: 'kanban-state.json' },
{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' },
{ type: 'inbox', teamName: 'base-1', detail: 'sentMessages.json' },
{ type: 'process', teamName: 'base-1', detail: 'processes.json' },
@ -782,6 +1515,49 @@ describe('FileWatcher', () => {
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('keeps rebuilding the registry when the previous chokidar close throws synchronously', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-close-throw-'));
setClaudeBasePathOverride(tempDir);
const projectsDir = path.join(tempDir, 'projects');
const todosDir = path.join(tempDir, 'todos');
const teamsDir = path.join(tempDir, 'teams');
const tasksDir = path.join(tempDir, 'tasks');
const addedTeamDir = path.join(teamsDir, 'base-2');
fs.mkdirSync(projectsDir, { recursive: true });
fs.mkdirSync(todosDir, { recursive: true });
fs.mkdirSync(path.join(teamsDir, 'base-1', 'inboxes'), { recursive: true });
fs.mkdirSync(path.join(tasksDir, 'base-1'), { recursive: true });
useRealAccess();
const watchMock = vi.mocked(fs.watch);
watchMock.mockImplementation(() => createFakeWatcher());
const dataCache = new DataCache(50, 10, false);
const watcher = new FileWatcher(dataCache, projectsDir, todosDir);
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
watcher.start();
await vi.waitFor(() => expect(chokidarMock.watch).toHaveBeenCalledTimes(2));
const teamsWatcher = getChokidarWatcherForRoot(teamsDir);
teamsWatcher.close.mockImplementationOnce(() => {
throw new Error('close failed');
});
fs.mkdirSync(addedTeamDir, { recursive: true });
fs.writeFileSync(path.join(addedTeamDir, 'config.json'), '{}', 'utf8');
teamsWatcher.emit('addDir', addedTeamDir);
await vi.waitFor(() => {
expect(chokidarMock.watch).toHaveBeenCalledTimes(3);
expect(getChokidarWatcherForRoot(teamsDir).targets).toContain(path.normalize(addedTeamDir));
expect(events).toEqual([{ type: 'config', teamName: 'base-2', detail: 'config.json' }]);
});
watcher.stop();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it('ignores events from an old chokidar generation after registry rebuild', async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'filewatcher-chokidar-generation-'));
setClaudeBasePathOverride(tempDir);
@ -1048,16 +1824,13 @@ describe('FileWatcher', () => {
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
const watcherAny = watcher as unknown as {
isWatching: boolean;
pollTeamsForChanges: () => Promise<void>;
};
watcherAny.isWatching = true;
await watcherAny.pollTeamsForChanges();
setWatcherActive(watcher);
const teamsSource = getChangeSource(watcher, 'teams');
await teamsSource.pollOnce();
expect(events).toEqual([]);
fs.writeFileSync(inboxPath, '[{"messageId":"m1"}]', 'utf8');
await watcherAny.pollTeamsForChanges();
await teamsSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([{ type: 'inbox', teamName: 'base-1', detail: 'inboxes/user.json' }]);
@ -1083,16 +1856,13 @@ describe('FileWatcher', () => {
const events: unknown[] = [];
watcher.on('team-change', (event) => events.push(event));
const watcherAny = watcher as unknown as {
isWatching: boolean;
pollTasksForChanges: () => Promise<void>;
};
watcherAny.isWatching = true;
await watcherAny.pollTasksForChanges();
setWatcherActive(watcher);
const tasksSource = getChangeSource(watcher, 'tasks');
await tasksSource.pollOnce();
expect(events).toEqual([]);
fs.writeFileSync(taskPath, '{"status":"running"}', 'utf8');
await watcherAny.pollTasksForChanges();
await tasksSource.pollOnce();
await vi.advanceTimersByTimeAsync(100);
expect(events).toEqual([