diff --git a/src/main/services/team/TeamFsWorkerClient.ts b/src/main/services/team/TeamFsWorkerClient.ts index fd98fb89..4bdd70d8 100644 --- a/src/main/services/team/TeamFsWorkerClient.ts +++ b/src/main/services/team/TeamFsWorkerClient.ts @@ -3,7 +3,7 @@ import * as path from 'node:path'; import { fileURLToPath } from 'node:url'; import { Worker } from 'node:worker_threads'; -import { getTasksBasePath, getTeamsBasePath } from '@main/utils/pathDecoder'; +import { getAppDataPath, getTasksBasePath, getTeamsBasePath } from '@main/utils/pathDecoder'; import { createLogger } from '@shared/utils/logger'; import type { TeamSummary, TeamTask } from '@shared/types'; @@ -30,6 +30,7 @@ interface ListTeamsPayload { interface GetAllTasksPayload { tasksBase: string; + projectionCacheBase: string; maxTaskBytes: number; maxTaskReadMs: number; concurrency: number; @@ -61,6 +62,7 @@ function summarizeWorkerPayload(payload: WorkerRequest['payload']): Record { const payload: GetAllTasksPayload = { tasksBase: getTasksBasePath(), + projectionCacheBase: path.join(getAppDataPath(), 'cache', 'team-task-projections'), maxTaskBytes: options.maxTaskBytes, maxTaskReadMs: options.maxTaskReadMs ?? DEFAULT_READ_TIMEOUT_MS, concurrency: options.concurrency ?? DEFAULT_CONCURRENCY, diff --git a/src/main/workers/team-fs-worker.ts b/src/main/workers/team-fs-worker.ts index 638a3965..3ac26cb2 100644 --- a/src/main/workers/team-fs-worker.ts +++ b/src/main/workers/team-fs-worker.ts @@ -1,3 +1,4 @@ +import { createHash } from 'node:crypto'; import * as fs from 'node:fs'; import * as path from 'node:path'; import { parentPort } from 'node:worker_threads'; @@ -27,6 +28,7 @@ interface ListTeamsPayload { interface GetAllTasksPayload { tasksBase: string; + projectionCacheBase?: string; maxTaskBytes: number; maxTaskReadMs: number; concurrency: number; @@ -96,6 +98,12 @@ interface GetAllTasksDiag { cacheMisses: number; cacheWriteSkips: number; cacheEvictions: number; + persistentCacheHits: number; + persistentCacheMisses: number; + persistentCacheLoads: number; + persistentCacheWrites: number; + persistentCacheReadFailures: number; + persistentCacheWriteFailures: number; totalMs: number; } @@ -105,6 +113,12 @@ interface TaskReadDiag { cacheHits: number; cacheMisses: number; cacheWriteSkips: number; + persistentCacheHits: number; + persistentCacheMisses: number; + persistentCacheLoads: number; + persistentCacheWrites: number; + persistentCacheReadFailures: number; + persistentCacheWriteFailures: number; } const MAX_LAUNCH_STATE_BYTES = 32 * 1024; @@ -118,6 +132,10 @@ const REVIEW_LIFECYCLE_EVENTS = new Set([ const REVIEW_RESET_STATUSES = new Set(['in_progress', 'deleted']); const TEAM_SUMMARY_CACHE_MAX_ENTRIES = 1000; const TASK_FILE_CACHE_MAX_ENTRIES = 10000; +const PERSISTENT_TASK_PROJECTION_CACHE_VERSION = 1; +const PERSISTENT_TASK_PROJECTION_CACHE_DIR = 'v1'; +const MAX_PERSISTENT_TASK_PROJECTION_CACHE_BYTES = 16 * 1024 * 1024; +const CACHEABLE_TASK_SKIP_REASONS = new Set(['task_internal', 'task_deleted']); const BOOTSTRAP_STATE_FILE = 'bootstrap-state.json'; const BOOTSTRAP_JOURNAL_FILE = 'bootstrap-journal.jsonl'; @@ -158,6 +176,11 @@ interface TaskFileCacheEntry { lastUsedAt: number; } +interface PersistentTaskProjectionCacheEntry { + fingerprint: string; + result: CachedTaskReadResult; +} + const teamSummaryCache = new Map(); const taskFileCache = new Map(); @@ -617,15 +640,15 @@ async function cacheTaskReadResultIfStable( fingerprintBeforeCacheSafe: boolean, result: CachedTaskReadResult, taskDiag: TaskReadDiag -): Promise { +): Promise { if (!fingerprintBeforeCacheSafe) { taskDiag.cacheWriteSkips++; - return; + return false; } const after = await statPathFingerprint(taskPath); if (!isCacheSafeFingerprint(after) || fingerprintToString(after) !== fingerprintBefore) { taskDiag.cacheWriteSkips++; - return; + return false; } taskFileCache.set(cacheKey, { fingerprint: fingerprintBefore, @@ -633,6 +656,7 @@ async function cacheTaskReadResultIfStable( tasksBase, lastUsedAt: nowMs(), }); + return true; } function applyCachedTaskReadResult( @@ -667,6 +691,167 @@ function pruneTaskFileCache( } } +function getPersistentTaskProjectionCachePath( + payload: GetAllTasksPayload, + teamName: string +): string | null { + const base = typeof payload.projectionCacheBase === 'string' ? payload.projectionCacheBase : ''; + if (!base.trim()) return null; + const digest = createHash('sha256') + .update(payload.tasksBase) + .update('\0') + .update(teamName) + .digest('hex'); + return path.join(base, PERSISTENT_TASK_PROJECTION_CACHE_DIR, `${digest}.json`); +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === 'object' && !Array.isArray(value); +} + +function isSafeTaskProjectionFileName(file: string): boolean { + return ( + file.endsWith('.json') && + !file.startsWith('.') && + !file.includes('/') && + !file.includes('\\') && + file !== '.lock' && + file !== '.highwatermark' + ); +} + +function normalizePersistentTaskReadResult( + value: unknown, + teamName: string +): CachedTaskReadResult | null { + if (!isRecord(value)) return null; + + const skipReason = value.skipReason; + if (typeof skipReason === 'string') { + return CACHEABLE_TASK_SKIP_REASONS.has(skipReason) ? { skipReason } : null; + } + + const task = value.task; + if (!isRecord(task)) return null; + if (task.teamName !== teamName) return null; + if (typeof task.id !== 'string') return null; + if (typeof task.subject !== 'string') return null; + if ( + task.status !== 'pending' && + task.status !== 'in_progress' && + task.status !== 'completed' && + task.status !== 'deleted' + ) { + return null; + } + + return { task }; +} + +function normalizePersistentTaskProjectionEntry( + value: unknown, + teamName: string +): PersistentTaskProjectionCacheEntry | null { + if (!isRecord(value) || typeof value.fingerprint !== 'string') return null; + const result = normalizePersistentTaskReadResult(value.result, teamName); + return result ? { fingerprint: value.fingerprint, result } : null; +} + +async function readPersistentTaskProjectionCache( + payload: GetAllTasksPayload, + teamName: string, + optionKey: string, + taskDiag: TaskReadDiag +): Promise | null> { + const cachePath = getPersistentTaskProjectionCachePath(payload, teamName); + if (!cachePath) return null; + + try { + const stat = await fs.promises.stat(cachePath); + if (!stat.isFile() || stat.size > MAX_PERSISTENT_TASK_PROJECTION_CACHE_BYTES) { + taskDiag.persistentCacheReadFailures++; + return null; + } + const raw = await fs.promises.readFile(cachePath, 'utf8'); + const parsed = JSON.parse(raw) as unknown; + if (!isRecord(parsed)) { + taskDiag.persistentCacheReadFailures++; + return null; + } + if ( + parsed.version !== PERSISTENT_TASK_PROJECTION_CACHE_VERSION || + parsed.tasksBase !== payload.tasksBase || + parsed.teamName !== teamName || + parsed.optionKey !== optionKey || + !isRecord(parsed.entries) + ) { + taskDiag.persistentCacheMisses++; + return null; + } + + const entries = new Map(); + for (const [file, entry] of Object.entries(parsed.entries)) { + if (!isSafeTaskProjectionFileName(file)) continue; + const normalized = normalizePersistentTaskProjectionEntry(entry, teamName); + if (normalized) { + entries.set(file, normalized); + } + } + taskDiag.persistentCacheLoads++; + return entries; + } catch (error) { + if ((error as NodeJS.ErrnoException | undefined)?.code === 'ENOENT') { + return null; + } + taskDiag.persistentCacheReadFailures++; + return null; + } +} + +function shouldWritePersistentTaskProjectionCache( + previous: ReadonlyMap | null, + next: ReadonlyMap, + taskDiag: TaskReadDiag +): boolean { + if (next.size === 0) return false; + if (!previous) return true; + if (previous.size !== next.size) return true; + return taskDiag.persistentCacheMisses > 0 || taskDiag.cacheMisses > 0; +} + +async function writePersistentTaskProjectionCache( + payload: GetAllTasksPayload, + teamName: string, + optionKey: string, + entries: ReadonlyMap, + taskDiag: TaskReadDiag +): Promise { + const cachePath = getPersistentTaskProjectionCachePath(payload, teamName); + if (!cachePath || entries.size === 0) return; + + const body = { + version: PERSISTENT_TASK_PROJECTION_CACHE_VERSION, + tasksBase: payload.tasksBase, + teamName, + optionKey, + writtenAt: nowMs(), + entries: Object.fromEntries(entries), + }; + const tmpPath = `${cachePath}.${process.pid}.${Date.now()}.${Math.random() + .toString(36) + .slice(2)}.tmp`; + + try { + await fs.promises.mkdir(path.dirname(cachePath), { recursive: true }); + await fs.promises.writeFile(tmpPath, JSON.stringify(body), 'utf8'); + await fs.promises.rename(tmpPath, cachePath); + taskDiag.persistentCacheWrites++; + } catch { + taskDiag.persistentCacheWriteFailures++; + await fs.promises.rm(tmpPath, { force: true }).catch(() => undefined); + } +} + // --------------------------------------------------------------------------- // listTeams // --------------------------------------------------------------------------- @@ -1418,6 +1603,12 @@ async function readTasksDirForTeam( cacheHits: 0, cacheMisses: 0, cacheWriteSkips: 0, + persistentCacheHits: 0, + persistentCacheMisses: 0, + persistentCacheLoads: 0, + persistentCacheWrites: 0, + persistentCacheReadFailures: 0, + persistentCacheWriteFailures: 0, }; let entries: string[]; try { @@ -1432,6 +1623,13 @@ async function readTasksDirForTeam( const tasks: unknown[] = []; const liveCacheKeys = new Set(); const optionKey = makeTaskOptionKey(payload); + const persistentCache = await readPersistentTaskProjectionCache( + payload, + teamName, + optionKey, + taskDiag + ); + const nextPersistentEntries = new Map(); for (const file of entries) { if ( !file.endsWith('.json') || @@ -1464,8 +1662,33 @@ async function readTasksDirForTeam( cached.lastUsedAt = nowMs(); taskDiag.cacheHits++; applyCachedTaskReadResult(cached.result, tasks, taskDiag); + nextPersistentEntries.set(file, { + fingerprint, + result: cloneCached(cached.result), + }); continue; } + + const persistentEntry = persistentCache?.get(file); + if (fingerprintCacheSafe && persistentEntry?.fingerprint === fingerprint) { + const result = cloneCached(persistentEntry.result); + taskFileCache.set(cacheKey, { + fingerprint, + result: cloneCached(result), + tasksBase: payload.tasksBase, + lastUsedAt: nowMs(), + }); + taskDiag.persistentCacheHits++; + applyCachedTaskReadResult(result, tasks, taskDiag); + nextPersistentEntries.set(file, { + fingerprint, + result: cloneCached(result), + }); + continue; + } + if (persistentCache) { + taskDiag.persistentCacheMisses++; + } taskDiag.cacheMisses++; const raw = await readFileUtf8WithTimeout(taskPath, payload.maxTaskReadMs); @@ -1474,29 +1697,43 @@ async function readTasksDirForTeam( if (metadata?._internal === true) { taskDiag.skipped++; bumpSkipReason(taskDiag.skipReasons, 'task_internal'); - await cacheTaskReadResultIfStable( + const result: CachedTaskReadResult = { skipReason: 'task_internal' }; + const cachedStable = await cacheTaskReadResultIfStable( cacheKey, taskPath, payload.tasksBase, fingerprint, fingerprintCacheSafe, - { skipReason: 'task_internal' }, + result, taskDiag ); + if (cachedStable) { + nextPersistentEntries.set(file, { + fingerprint, + result: cloneCached(result), + }); + } continue; } if (parsed.status === 'deleted') { taskDiag.skipped++; bumpSkipReason(taskDiag.skipReasons, 'task_deleted'); - await cacheTaskReadResultIfStable( + const result: CachedTaskReadResult = { skipReason: 'task_deleted' }; + const cachedStable = await cacheTaskReadResultIfStable( cacheKey, taskPath, payload.tasksBase, fingerprint, fingerprintCacheSafe, - { skipReason: 'task_deleted' }, + result, taskDiag ); + if (cachedStable) { + nextPersistentEntries.set(file, { + fingerprint, + result: cloneCached(result), + }); + } continue; } @@ -1598,15 +1835,22 @@ async function readTasksDirForTeam( teamName, }; tasks.push(task); - await cacheTaskReadResultIfStable( + const result: CachedTaskReadResult = { task }; + const cachedStable = await cacheTaskReadResultIfStable( cacheKey, taskPath, payload.tasksBase, fingerprint, fingerprintCacheSafe, - { task }, + result, taskDiag ); + if (cachedStable) { + nextPersistentEntries.set(file, { + fingerprint, + result: cloneCached(result), + }); + } } catch (error) { taskDiag.skipped++; const code = (error as NodeJS.ErrnoException).code; @@ -1617,6 +1861,15 @@ async function readTasksDirForTeam( } } } + if (shouldWritePersistentTaskProjectionCache(persistentCache, nextPersistentEntries, taskDiag)) { + await writePersistentTaskProjectionCache( + payload, + teamName, + optionKey, + nextPersistentEntries, + taskDiag + ); + } return { tasks, taskDiag, liveCacheKeys }; } @@ -1625,6 +1878,12 @@ function mergeTaskDiag(target: GetAllTasksDiag, source: TaskReadDiag): void { target.cacheHits += source.cacheHits; target.cacheMisses += source.cacheMisses; target.cacheWriteSkips += source.cacheWriteSkips; + target.persistentCacheHits += source.persistentCacheHits; + target.persistentCacheMisses += source.persistentCacheMisses; + target.persistentCacheLoads += source.persistentCacheLoads; + target.persistentCacheWrites += source.persistentCacheWrites; + target.persistentCacheReadFailures += source.persistentCacheReadFailures; + target.persistentCacheWriteFailures += source.persistentCacheWriteFailures; for (const [reason, count] of Object.entries(source.skipReasons)) { target.skipReasons[reason] = (target.skipReasons[reason] || 0) + count; } @@ -1647,6 +1906,12 @@ async function getAllTasks( cacheMisses: 0, cacheWriteSkips: 0, cacheEvictions: 0, + persistentCacheHits: 0, + persistentCacheMisses: 0, + persistentCacheLoads: 0, + persistentCacheWrites: 0, + persistentCacheReadFailures: 0, + persistentCacheWriteFailures: 0, totalMs: 0, }; diff --git a/test/main/services/team/TeamFsWorker.integration.test.ts b/test/main/services/team/TeamFsWorker.integration.test.ts index 03424d51..177e3728 100644 --- a/test/main/services/team/TeamFsWorker.integration.test.ts +++ b/test/main/services/team/TeamFsWorker.integration.test.ts @@ -1,9 +1,8 @@ import * as fs from 'fs/promises'; import * as os from 'os'; import * as path from 'path'; -import { Worker } from 'worker_threads'; - import { afterAll, afterEach, describe, expect, it } from 'vitest'; +import { Worker } from 'worker_threads'; import { createPersistedLaunchSummaryProjection } from '../../../../src/main/services/team/TeamLaunchSummaryProjection'; @@ -112,13 +111,15 @@ async function callListTeams( async function callGetAllTasks( worker: Worker, - tasksBase: string + tasksBase: string, + projectionCacheBase = path.join(path.dirname(tasksBase), 'projection-cache') ): Promise<{ tasks: unknown[]; diag?: Record; }> { const { result, diag } = await callWorker(worker, 'getAllTasks', { tasksBase, + projectionCacheBase, maxTaskBytes: 256 * 1024, maxTaskReadMs: 5_000, concurrency: 2, @@ -576,4 +577,118 @@ describe('team-fs-worker integration', () => { await worker.terminate(); } }); + + it('reuses persisted task projections after a worker restart', async () => { + const workerPath = await getWorkerPath(); + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-')); + const tasksBase = path.join(tempDir, 'tasks'); + const projectionCacheBase = path.join(tempDir, 'projection-cache'); + const teamName = 'persistent-task-cache-team'; + const tasksDir = path.join(tasksBase, teamName); + await fs.mkdir(tasksDir, { recursive: true }); + await fs.writeFile( + path.join(tasksDir, '1.json'), + JSON.stringify({ + id: '1', + subject: 'Persisted subject', + status: 'pending', + createdAt: '2026-05-02T12:00:00.000Z', + }), + 'utf8' + ); + + const firstWorker = createWorker(workerPath); + try { + const first = await callGetAllTasks(firstWorker, tasksBase, projectionCacheBase); + expect(first.tasks[0]).toMatchObject({ teamName, subject: 'Persisted subject' }); + expect(first.diag?.cacheMisses).toBe(1); + expect(first.diag?.persistentCacheWrites).toBe(1); + } finally { + await firstWorker.terminate(); + } + + const secondWorker = createWorker(workerPath); + try { + const second = await callGetAllTasks(secondWorker, tasksBase, projectionCacheBase); + expect(second.tasks[0]).toMatchObject({ teamName, subject: 'Persisted subject' }); + expect(second.diag?.cacheHits).toBe(0); + expect(second.diag?.cacheMisses).toBe(0); + expect(second.diag?.persistentCacheLoads).toBe(1); + expect(second.diag?.persistentCacheHits).toBe(1); + } finally { + await secondWorker.terminate(); + } + }); + + it('falls back to task JSON when persisted projections are stale or corrupt', async () => { + const workerPath = await getWorkerPath(); + tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-')); + const tasksBase = path.join(tempDir, 'tasks'); + const projectionCacheBase = path.join(tempDir, 'projection-cache'); + const teamName = 'stale-persistent-cache-team'; + const tasksDir = path.join(tasksBase, teamName); + const taskPath = path.join(tasksDir, '1.json'); + await fs.mkdir(tasksDir, { recursive: true }); + await fs.writeFile( + taskPath, + JSON.stringify({ + id: '1', + subject: 'Original subject', + status: 'pending', + createdAt: '2026-05-02T12:00:00.000Z', + }), + 'utf8' + ); + + const firstWorker = createWorker(workerPath); + try { + const first = await callGetAllTasks(firstWorker, tasksBase, projectionCacheBase); + expect(first.tasks[0]).toMatchObject({ subject: 'Original subject' }); + expect(first.diag?.persistentCacheWrites).toBe(1); + } finally { + await firstWorker.terminate(); + } + + await fs.writeFile( + taskPath, + JSON.stringify({ + id: '1', + subject: 'Changed subject with a different size', + status: 'pending', + createdAt: '2026-05-02T12:00:00.000Z', + }), + 'utf8' + ); + + const changedWorker = createWorker(workerPath); + try { + const changed = await callGetAllTasks(changedWorker, tasksBase, projectionCacheBase); + expect(changed.tasks[0]).toMatchObject({ + teamName, + subject: 'Changed subject with a different size', + }); + expect(changed.diag?.persistentCacheLoads).toBe(1); + expect(changed.diag?.persistentCacheHits).toBe(0); + expect(changed.diag?.persistentCacheMisses).toBe(1); + expect(changed.diag?.cacheMisses).toBe(1); + } finally { + await changedWorker.terminate(); + } + + const cacheFiles = await fs.readdir(path.join(projectionCacheBase, 'v1')); + await fs.writeFile(path.join(projectionCacheBase, 'v1', cacheFiles[0]), '{bad json', 'utf8'); + + const corruptWorker = createWorker(workerPath); + try { + const recovered = await callGetAllTasks(corruptWorker, tasksBase, projectionCacheBase); + expect(recovered.tasks[0]).toMatchObject({ + teamName, + subject: 'Changed subject with a different size', + }); + expect(recovered.diag?.persistentCacheReadFailures).toBe(1); + expect(recovered.diag?.cacheMisses).toBe(1); + } finally { + await corruptWorker.terminate(); + } + }); });