perf(team): persist task projection cache
This commit is contained in:
parent
06ebb3d2b5
commit
f5f22e50fb
3 changed files with 396 additions and 13 deletions
|
|
@ -3,7 +3,7 @@ import * as path from 'node:path';
|
||||||
import { fileURLToPath } from 'node:url';
|
import { fileURLToPath } from 'node:url';
|
||||||
import { Worker } from 'node:worker_threads';
|
import { Worker } from 'node:worker_threads';
|
||||||
|
|
||||||
import { getTasksBasePath, getTeamsBasePath } from '@main/utils/pathDecoder';
|
import { getAppDataPath, getTasksBasePath, getTeamsBasePath } from '@main/utils/pathDecoder';
|
||||||
import { createLogger } from '@shared/utils/logger';
|
import { createLogger } from '@shared/utils/logger';
|
||||||
|
|
||||||
import type { TeamSummary, TeamTask } from '@shared/types';
|
import type { TeamSummary, TeamTask } from '@shared/types';
|
||||||
|
|
@ -30,6 +30,7 @@ interface ListTeamsPayload {
|
||||||
|
|
||||||
interface GetAllTasksPayload {
|
interface GetAllTasksPayload {
|
||||||
tasksBase: string;
|
tasksBase: string;
|
||||||
|
projectionCacheBase: string;
|
||||||
maxTaskBytes: number;
|
maxTaskBytes: number;
|
||||||
maxTaskReadMs: number;
|
maxTaskReadMs: number;
|
||||||
concurrency: number;
|
concurrency: number;
|
||||||
|
|
@ -61,6 +62,7 @@ function summarizeWorkerPayload(payload: WorkerRequest['payload']): Record<strin
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
tasksBase: payload.tasksBase,
|
tasksBase: payload.tasksBase,
|
||||||
|
projectionCacheBase: payload.projectionCacheBase,
|
||||||
concurrency: payload.concurrency,
|
concurrency: payload.concurrency,
|
||||||
maxTaskReadMs: payload.maxTaskReadMs,
|
maxTaskReadMs: payload.maxTaskReadMs,
|
||||||
maxTaskBytes: payload.maxTaskBytes,
|
maxTaskBytes: payload.maxTaskBytes,
|
||||||
|
|
@ -280,6 +282,7 @@ export class TeamFsWorkerClient {
|
||||||
}): Promise<{ tasks: (TeamTask & { teamName: string })[]; diag?: WorkerDiag }> {
|
}): Promise<{ tasks: (TeamTask & { teamName: string })[]; diag?: WorkerDiag }> {
|
||||||
const payload: GetAllTasksPayload = {
|
const payload: GetAllTasksPayload = {
|
||||||
tasksBase: getTasksBasePath(),
|
tasksBase: getTasksBasePath(),
|
||||||
|
projectionCacheBase: path.join(getAppDataPath(), 'cache', 'team-task-projections'),
|
||||||
maxTaskBytes: options.maxTaskBytes,
|
maxTaskBytes: options.maxTaskBytes,
|
||||||
maxTaskReadMs: options.maxTaskReadMs ?? DEFAULT_READ_TIMEOUT_MS,
|
maxTaskReadMs: options.maxTaskReadMs ?? DEFAULT_READ_TIMEOUT_MS,
|
||||||
concurrency: options.concurrency ?? DEFAULT_CONCURRENCY,
|
concurrency: options.concurrency ?? DEFAULT_CONCURRENCY,
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { createHash } from 'node:crypto';
|
||||||
import * as fs from 'node:fs';
|
import * as fs from 'node:fs';
|
||||||
import * as path from 'node:path';
|
import * as path from 'node:path';
|
||||||
import { parentPort } from 'node:worker_threads';
|
import { parentPort } from 'node:worker_threads';
|
||||||
|
|
@ -27,6 +28,7 @@ interface ListTeamsPayload {
|
||||||
|
|
||||||
interface GetAllTasksPayload {
|
interface GetAllTasksPayload {
|
||||||
tasksBase: string;
|
tasksBase: string;
|
||||||
|
projectionCacheBase?: string;
|
||||||
maxTaskBytes: number;
|
maxTaskBytes: number;
|
||||||
maxTaskReadMs: number;
|
maxTaskReadMs: number;
|
||||||
concurrency: number;
|
concurrency: number;
|
||||||
|
|
@ -96,6 +98,12 @@ interface GetAllTasksDiag {
|
||||||
cacheMisses: number;
|
cacheMisses: number;
|
||||||
cacheWriteSkips: number;
|
cacheWriteSkips: number;
|
||||||
cacheEvictions: number;
|
cacheEvictions: number;
|
||||||
|
persistentCacheHits: number;
|
||||||
|
persistentCacheMisses: number;
|
||||||
|
persistentCacheLoads: number;
|
||||||
|
persistentCacheWrites: number;
|
||||||
|
persistentCacheReadFailures: number;
|
||||||
|
persistentCacheWriteFailures: number;
|
||||||
totalMs: number;
|
totalMs: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -105,6 +113,12 @@ interface TaskReadDiag {
|
||||||
cacheHits: number;
|
cacheHits: number;
|
||||||
cacheMisses: number;
|
cacheMisses: number;
|
||||||
cacheWriteSkips: number;
|
cacheWriteSkips: number;
|
||||||
|
persistentCacheHits: number;
|
||||||
|
persistentCacheMisses: number;
|
||||||
|
persistentCacheLoads: number;
|
||||||
|
persistentCacheWrites: number;
|
||||||
|
persistentCacheReadFailures: number;
|
||||||
|
persistentCacheWriteFailures: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
const MAX_LAUNCH_STATE_BYTES = 32 * 1024;
|
const MAX_LAUNCH_STATE_BYTES = 32 * 1024;
|
||||||
|
|
@ -118,6 +132,10 @@ const REVIEW_LIFECYCLE_EVENTS = new Set([
|
||||||
const REVIEW_RESET_STATUSES = new Set(['in_progress', 'deleted']);
|
const REVIEW_RESET_STATUSES = new Set(['in_progress', 'deleted']);
|
||||||
const TEAM_SUMMARY_CACHE_MAX_ENTRIES = 1000;
|
const TEAM_SUMMARY_CACHE_MAX_ENTRIES = 1000;
|
||||||
const TASK_FILE_CACHE_MAX_ENTRIES = 10000;
|
const TASK_FILE_CACHE_MAX_ENTRIES = 10000;
|
||||||
|
const PERSISTENT_TASK_PROJECTION_CACHE_VERSION = 1;
|
||||||
|
const PERSISTENT_TASK_PROJECTION_CACHE_DIR = 'v1';
|
||||||
|
const MAX_PERSISTENT_TASK_PROJECTION_CACHE_BYTES = 16 * 1024 * 1024;
|
||||||
|
const CACHEABLE_TASK_SKIP_REASONS = new Set(['task_internal', 'task_deleted']);
|
||||||
const BOOTSTRAP_STATE_FILE = 'bootstrap-state.json';
|
const BOOTSTRAP_STATE_FILE = 'bootstrap-state.json';
|
||||||
const BOOTSTRAP_JOURNAL_FILE = 'bootstrap-journal.jsonl';
|
const BOOTSTRAP_JOURNAL_FILE = 'bootstrap-journal.jsonl';
|
||||||
|
|
||||||
|
|
@ -158,6 +176,11 @@ interface TaskFileCacheEntry {
|
||||||
lastUsedAt: number;
|
lastUsedAt: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface PersistentTaskProjectionCacheEntry {
|
||||||
|
fingerprint: string;
|
||||||
|
result: CachedTaskReadResult;
|
||||||
|
}
|
||||||
|
|
||||||
const teamSummaryCache = new Map<string, TeamSummaryCacheEntry>();
|
const teamSummaryCache = new Map<string, TeamSummaryCacheEntry>();
|
||||||
const taskFileCache = new Map<string, TaskFileCacheEntry>();
|
const taskFileCache = new Map<string, TaskFileCacheEntry>();
|
||||||
|
|
||||||
|
|
@ -617,15 +640,15 @@ async function cacheTaskReadResultIfStable(
|
||||||
fingerprintBeforeCacheSafe: boolean,
|
fingerprintBeforeCacheSafe: boolean,
|
||||||
result: CachedTaskReadResult,
|
result: CachedTaskReadResult,
|
||||||
taskDiag: TaskReadDiag
|
taskDiag: TaskReadDiag
|
||||||
): Promise<void> {
|
): Promise<boolean> {
|
||||||
if (!fingerprintBeforeCacheSafe) {
|
if (!fingerprintBeforeCacheSafe) {
|
||||||
taskDiag.cacheWriteSkips++;
|
taskDiag.cacheWriteSkips++;
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
const after = await statPathFingerprint(taskPath);
|
const after = await statPathFingerprint(taskPath);
|
||||||
if (!isCacheSafeFingerprint(after) || fingerprintToString(after) !== fingerprintBefore) {
|
if (!isCacheSafeFingerprint(after) || fingerprintToString(after) !== fingerprintBefore) {
|
||||||
taskDiag.cacheWriteSkips++;
|
taskDiag.cacheWriteSkips++;
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
taskFileCache.set(cacheKey, {
|
taskFileCache.set(cacheKey, {
|
||||||
fingerprint: fingerprintBefore,
|
fingerprint: fingerprintBefore,
|
||||||
|
|
@ -633,6 +656,7 @@ async function cacheTaskReadResultIfStable(
|
||||||
tasksBase,
|
tasksBase,
|
||||||
lastUsedAt: nowMs(),
|
lastUsedAt: nowMs(),
|
||||||
});
|
});
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
function applyCachedTaskReadResult(
|
function applyCachedTaskReadResult(
|
||||||
|
|
@ -667,6 +691,167 @@ function pruneTaskFileCache(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getPersistentTaskProjectionCachePath(
|
||||||
|
payload: GetAllTasksPayload,
|
||||||
|
teamName: string
|
||||||
|
): string | null {
|
||||||
|
const base = typeof payload.projectionCacheBase === 'string' ? payload.projectionCacheBase : '';
|
||||||
|
if (!base.trim()) return null;
|
||||||
|
const digest = createHash('sha256')
|
||||||
|
.update(payload.tasksBase)
|
||||||
|
.update('\0')
|
||||||
|
.update(teamName)
|
||||||
|
.digest('hex');
|
||||||
|
return path.join(base, PERSISTENT_TASK_PROJECTION_CACHE_DIR, `${digest}.json`);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||||
|
return Boolean(value) && typeof value === 'object' && !Array.isArray(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isSafeTaskProjectionFileName(file: string): boolean {
|
||||||
|
return (
|
||||||
|
file.endsWith('.json') &&
|
||||||
|
!file.startsWith('.') &&
|
||||||
|
!file.includes('/') &&
|
||||||
|
!file.includes('\\') &&
|
||||||
|
file !== '.lock' &&
|
||||||
|
file !== '.highwatermark'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizePersistentTaskReadResult(
|
||||||
|
value: unknown,
|
||||||
|
teamName: string
|
||||||
|
): CachedTaskReadResult | null {
|
||||||
|
if (!isRecord(value)) return null;
|
||||||
|
|
||||||
|
const skipReason = value.skipReason;
|
||||||
|
if (typeof skipReason === 'string') {
|
||||||
|
return CACHEABLE_TASK_SKIP_REASONS.has(skipReason) ? { skipReason } : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const task = value.task;
|
||||||
|
if (!isRecord(task)) return null;
|
||||||
|
if (task.teamName !== teamName) return null;
|
||||||
|
if (typeof task.id !== 'string') return null;
|
||||||
|
if (typeof task.subject !== 'string') return null;
|
||||||
|
if (
|
||||||
|
task.status !== 'pending' &&
|
||||||
|
task.status !== 'in_progress' &&
|
||||||
|
task.status !== 'completed' &&
|
||||||
|
task.status !== 'deleted'
|
||||||
|
) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { task };
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizePersistentTaskProjectionEntry(
|
||||||
|
value: unknown,
|
||||||
|
teamName: string
|
||||||
|
): PersistentTaskProjectionCacheEntry | null {
|
||||||
|
if (!isRecord(value) || typeof value.fingerprint !== 'string') return null;
|
||||||
|
const result = normalizePersistentTaskReadResult(value.result, teamName);
|
||||||
|
return result ? { fingerprint: value.fingerprint, result } : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function readPersistentTaskProjectionCache(
|
||||||
|
payload: GetAllTasksPayload,
|
||||||
|
teamName: string,
|
||||||
|
optionKey: string,
|
||||||
|
taskDiag: TaskReadDiag
|
||||||
|
): Promise<Map<string, PersistentTaskProjectionCacheEntry> | null> {
|
||||||
|
const cachePath = getPersistentTaskProjectionCachePath(payload, teamName);
|
||||||
|
if (!cachePath) return null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const stat = await fs.promises.stat(cachePath);
|
||||||
|
if (!stat.isFile() || stat.size > MAX_PERSISTENT_TASK_PROJECTION_CACHE_BYTES) {
|
||||||
|
taskDiag.persistentCacheReadFailures++;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const raw = await fs.promises.readFile(cachePath, 'utf8');
|
||||||
|
const parsed = JSON.parse(raw) as unknown;
|
||||||
|
if (!isRecord(parsed)) {
|
||||||
|
taskDiag.persistentCacheReadFailures++;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
parsed.version !== PERSISTENT_TASK_PROJECTION_CACHE_VERSION ||
|
||||||
|
parsed.tasksBase !== payload.tasksBase ||
|
||||||
|
parsed.teamName !== teamName ||
|
||||||
|
parsed.optionKey !== optionKey ||
|
||||||
|
!isRecord(parsed.entries)
|
||||||
|
) {
|
||||||
|
taskDiag.persistentCacheMisses++;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const entries = new Map<string, PersistentTaskProjectionCacheEntry>();
|
||||||
|
for (const [file, entry] of Object.entries(parsed.entries)) {
|
||||||
|
if (!isSafeTaskProjectionFileName(file)) continue;
|
||||||
|
const normalized = normalizePersistentTaskProjectionEntry(entry, teamName);
|
||||||
|
if (normalized) {
|
||||||
|
entries.set(file, normalized);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taskDiag.persistentCacheLoads++;
|
||||||
|
return entries;
|
||||||
|
} catch (error) {
|
||||||
|
if ((error as NodeJS.ErrnoException | undefined)?.code === 'ENOENT') {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
taskDiag.persistentCacheReadFailures++;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function shouldWritePersistentTaskProjectionCache(
|
||||||
|
previous: ReadonlyMap<string, PersistentTaskProjectionCacheEntry> | null,
|
||||||
|
next: ReadonlyMap<string, PersistentTaskProjectionCacheEntry>,
|
||||||
|
taskDiag: TaskReadDiag
|
||||||
|
): boolean {
|
||||||
|
if (next.size === 0) return false;
|
||||||
|
if (!previous) return true;
|
||||||
|
if (previous.size !== next.size) return true;
|
||||||
|
return taskDiag.persistentCacheMisses > 0 || taskDiag.cacheMisses > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function writePersistentTaskProjectionCache(
|
||||||
|
payload: GetAllTasksPayload,
|
||||||
|
teamName: string,
|
||||||
|
optionKey: string,
|
||||||
|
entries: ReadonlyMap<string, PersistentTaskProjectionCacheEntry>,
|
||||||
|
taskDiag: TaskReadDiag
|
||||||
|
): Promise<void> {
|
||||||
|
const cachePath = getPersistentTaskProjectionCachePath(payload, teamName);
|
||||||
|
if (!cachePath || entries.size === 0) return;
|
||||||
|
|
||||||
|
const body = {
|
||||||
|
version: PERSISTENT_TASK_PROJECTION_CACHE_VERSION,
|
||||||
|
tasksBase: payload.tasksBase,
|
||||||
|
teamName,
|
||||||
|
optionKey,
|
||||||
|
writtenAt: nowMs(),
|
||||||
|
entries: Object.fromEntries(entries),
|
||||||
|
};
|
||||||
|
const tmpPath = `${cachePath}.${process.pid}.${Date.now()}.${Math.random()
|
||||||
|
.toString(36)
|
||||||
|
.slice(2)}.tmp`;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await fs.promises.mkdir(path.dirname(cachePath), { recursive: true });
|
||||||
|
await fs.promises.writeFile(tmpPath, JSON.stringify(body), 'utf8');
|
||||||
|
await fs.promises.rename(tmpPath, cachePath);
|
||||||
|
taskDiag.persistentCacheWrites++;
|
||||||
|
} catch {
|
||||||
|
taskDiag.persistentCacheWriteFailures++;
|
||||||
|
await fs.promises.rm(tmpPath, { force: true }).catch(() => undefined);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// listTeams
|
// listTeams
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -1418,6 +1603,12 @@ async function readTasksDirForTeam(
|
||||||
cacheHits: 0,
|
cacheHits: 0,
|
||||||
cacheMisses: 0,
|
cacheMisses: 0,
|
||||||
cacheWriteSkips: 0,
|
cacheWriteSkips: 0,
|
||||||
|
persistentCacheHits: 0,
|
||||||
|
persistentCacheMisses: 0,
|
||||||
|
persistentCacheLoads: 0,
|
||||||
|
persistentCacheWrites: 0,
|
||||||
|
persistentCacheReadFailures: 0,
|
||||||
|
persistentCacheWriteFailures: 0,
|
||||||
};
|
};
|
||||||
let entries: string[];
|
let entries: string[];
|
||||||
try {
|
try {
|
||||||
|
|
@ -1432,6 +1623,13 @@ async function readTasksDirForTeam(
|
||||||
const tasks: unknown[] = [];
|
const tasks: unknown[] = [];
|
||||||
const liveCacheKeys = new Set<string>();
|
const liveCacheKeys = new Set<string>();
|
||||||
const optionKey = makeTaskOptionKey(payload);
|
const optionKey = makeTaskOptionKey(payload);
|
||||||
|
const persistentCache = await readPersistentTaskProjectionCache(
|
||||||
|
payload,
|
||||||
|
teamName,
|
||||||
|
optionKey,
|
||||||
|
taskDiag
|
||||||
|
);
|
||||||
|
const nextPersistentEntries = new Map<string, PersistentTaskProjectionCacheEntry>();
|
||||||
for (const file of entries) {
|
for (const file of entries) {
|
||||||
if (
|
if (
|
||||||
!file.endsWith('.json') ||
|
!file.endsWith('.json') ||
|
||||||
|
|
@ -1464,8 +1662,33 @@ async function readTasksDirForTeam(
|
||||||
cached.lastUsedAt = nowMs();
|
cached.lastUsedAt = nowMs();
|
||||||
taskDiag.cacheHits++;
|
taskDiag.cacheHits++;
|
||||||
applyCachedTaskReadResult(cached.result, tasks, taskDiag);
|
applyCachedTaskReadResult(cached.result, tasks, taskDiag);
|
||||||
|
nextPersistentEntries.set(file, {
|
||||||
|
fingerprint,
|
||||||
|
result: cloneCached(cached.result),
|
||||||
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const persistentEntry = persistentCache?.get(file);
|
||||||
|
if (fingerprintCacheSafe && persistentEntry?.fingerprint === fingerprint) {
|
||||||
|
const result = cloneCached(persistentEntry.result);
|
||||||
|
taskFileCache.set(cacheKey, {
|
||||||
|
fingerprint,
|
||||||
|
result: cloneCached(result),
|
||||||
|
tasksBase: payload.tasksBase,
|
||||||
|
lastUsedAt: nowMs(),
|
||||||
|
});
|
||||||
|
taskDiag.persistentCacheHits++;
|
||||||
|
applyCachedTaskReadResult(result, tasks, taskDiag);
|
||||||
|
nextPersistentEntries.set(file, {
|
||||||
|
fingerprint,
|
||||||
|
result: cloneCached(result),
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (persistentCache) {
|
||||||
|
taskDiag.persistentCacheMisses++;
|
||||||
|
}
|
||||||
taskDiag.cacheMisses++;
|
taskDiag.cacheMisses++;
|
||||||
|
|
||||||
const raw = await readFileUtf8WithTimeout(taskPath, payload.maxTaskReadMs);
|
const raw = await readFileUtf8WithTimeout(taskPath, payload.maxTaskReadMs);
|
||||||
|
|
@ -1474,29 +1697,43 @@ async function readTasksDirForTeam(
|
||||||
if (metadata?._internal === true) {
|
if (metadata?._internal === true) {
|
||||||
taskDiag.skipped++;
|
taskDiag.skipped++;
|
||||||
bumpSkipReason(taskDiag.skipReasons, 'task_internal');
|
bumpSkipReason(taskDiag.skipReasons, 'task_internal');
|
||||||
await cacheTaskReadResultIfStable(
|
const result: CachedTaskReadResult = { skipReason: 'task_internal' };
|
||||||
|
const cachedStable = await cacheTaskReadResultIfStable(
|
||||||
cacheKey,
|
cacheKey,
|
||||||
taskPath,
|
taskPath,
|
||||||
payload.tasksBase,
|
payload.tasksBase,
|
||||||
fingerprint,
|
fingerprint,
|
||||||
fingerprintCacheSafe,
|
fingerprintCacheSafe,
|
||||||
{ skipReason: 'task_internal' },
|
result,
|
||||||
taskDiag
|
taskDiag
|
||||||
);
|
);
|
||||||
|
if (cachedStable) {
|
||||||
|
nextPersistentEntries.set(file, {
|
||||||
|
fingerprint,
|
||||||
|
result: cloneCached(result),
|
||||||
|
});
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (parsed.status === 'deleted') {
|
if (parsed.status === 'deleted') {
|
||||||
taskDiag.skipped++;
|
taskDiag.skipped++;
|
||||||
bumpSkipReason(taskDiag.skipReasons, 'task_deleted');
|
bumpSkipReason(taskDiag.skipReasons, 'task_deleted');
|
||||||
await cacheTaskReadResultIfStable(
|
const result: CachedTaskReadResult = { skipReason: 'task_deleted' };
|
||||||
|
const cachedStable = await cacheTaskReadResultIfStable(
|
||||||
cacheKey,
|
cacheKey,
|
||||||
taskPath,
|
taskPath,
|
||||||
payload.tasksBase,
|
payload.tasksBase,
|
||||||
fingerprint,
|
fingerprint,
|
||||||
fingerprintCacheSafe,
|
fingerprintCacheSafe,
|
||||||
{ skipReason: 'task_deleted' },
|
result,
|
||||||
taskDiag
|
taskDiag
|
||||||
);
|
);
|
||||||
|
if (cachedStable) {
|
||||||
|
nextPersistentEntries.set(file, {
|
||||||
|
fingerprint,
|
||||||
|
result: cloneCached(result),
|
||||||
|
});
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1598,15 +1835,22 @@ async function readTasksDirForTeam(
|
||||||
teamName,
|
teamName,
|
||||||
};
|
};
|
||||||
tasks.push(task);
|
tasks.push(task);
|
||||||
await cacheTaskReadResultIfStable(
|
const result: CachedTaskReadResult = { task };
|
||||||
|
const cachedStable = await cacheTaskReadResultIfStable(
|
||||||
cacheKey,
|
cacheKey,
|
||||||
taskPath,
|
taskPath,
|
||||||
payload.tasksBase,
|
payload.tasksBase,
|
||||||
fingerprint,
|
fingerprint,
|
||||||
fingerprintCacheSafe,
|
fingerprintCacheSafe,
|
||||||
{ task },
|
result,
|
||||||
taskDiag
|
taskDiag
|
||||||
);
|
);
|
||||||
|
if (cachedStable) {
|
||||||
|
nextPersistentEntries.set(file, {
|
||||||
|
fingerprint,
|
||||||
|
result: cloneCached(result),
|
||||||
|
});
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
taskDiag.skipped++;
|
taskDiag.skipped++;
|
||||||
const code = (error as NodeJS.ErrnoException).code;
|
const code = (error as NodeJS.ErrnoException).code;
|
||||||
|
|
@ -1617,6 +1861,15 @@ async function readTasksDirForTeam(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (shouldWritePersistentTaskProjectionCache(persistentCache, nextPersistentEntries, taskDiag)) {
|
||||||
|
await writePersistentTaskProjectionCache(
|
||||||
|
payload,
|
||||||
|
teamName,
|
||||||
|
optionKey,
|
||||||
|
nextPersistentEntries,
|
||||||
|
taskDiag
|
||||||
|
);
|
||||||
|
}
|
||||||
return { tasks, taskDiag, liveCacheKeys };
|
return { tasks, taskDiag, liveCacheKeys };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1625,6 +1878,12 @@ function mergeTaskDiag(target: GetAllTasksDiag, source: TaskReadDiag): void {
|
||||||
target.cacheHits += source.cacheHits;
|
target.cacheHits += source.cacheHits;
|
||||||
target.cacheMisses += source.cacheMisses;
|
target.cacheMisses += source.cacheMisses;
|
||||||
target.cacheWriteSkips += source.cacheWriteSkips;
|
target.cacheWriteSkips += source.cacheWriteSkips;
|
||||||
|
target.persistentCacheHits += source.persistentCacheHits;
|
||||||
|
target.persistentCacheMisses += source.persistentCacheMisses;
|
||||||
|
target.persistentCacheLoads += source.persistentCacheLoads;
|
||||||
|
target.persistentCacheWrites += source.persistentCacheWrites;
|
||||||
|
target.persistentCacheReadFailures += source.persistentCacheReadFailures;
|
||||||
|
target.persistentCacheWriteFailures += source.persistentCacheWriteFailures;
|
||||||
for (const [reason, count] of Object.entries(source.skipReasons)) {
|
for (const [reason, count] of Object.entries(source.skipReasons)) {
|
||||||
target.skipReasons[reason] = (target.skipReasons[reason] || 0) + count;
|
target.skipReasons[reason] = (target.skipReasons[reason] || 0) + count;
|
||||||
}
|
}
|
||||||
|
|
@ -1647,6 +1906,12 @@ async function getAllTasks(
|
||||||
cacheMisses: 0,
|
cacheMisses: 0,
|
||||||
cacheWriteSkips: 0,
|
cacheWriteSkips: 0,
|
||||||
cacheEvictions: 0,
|
cacheEvictions: 0,
|
||||||
|
persistentCacheHits: 0,
|
||||||
|
persistentCacheMisses: 0,
|
||||||
|
persistentCacheLoads: 0,
|
||||||
|
persistentCacheWrites: 0,
|
||||||
|
persistentCacheReadFailures: 0,
|
||||||
|
persistentCacheWriteFailures: 0,
|
||||||
totalMs: 0,
|
totalMs: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,8 @@
|
||||||
import * as fs from 'fs/promises';
|
import * as fs from 'fs/promises';
|
||||||
import * as os from 'os';
|
import * as os from 'os';
|
||||||
import * as path from 'path';
|
import * as path from 'path';
|
||||||
import { Worker } from 'worker_threads';
|
|
||||||
|
|
||||||
import { afterAll, afterEach, describe, expect, it } from 'vitest';
|
import { afterAll, afterEach, describe, expect, it } from 'vitest';
|
||||||
|
import { Worker } from 'worker_threads';
|
||||||
|
|
||||||
import { createPersistedLaunchSummaryProjection } from '../../../../src/main/services/team/TeamLaunchSummaryProjection';
|
import { createPersistedLaunchSummaryProjection } from '../../../../src/main/services/team/TeamLaunchSummaryProjection';
|
||||||
|
|
||||||
|
|
@ -112,13 +111,15 @@ async function callListTeams(
|
||||||
|
|
||||||
async function callGetAllTasks(
|
async function callGetAllTasks(
|
||||||
worker: Worker,
|
worker: Worker,
|
||||||
tasksBase: string
|
tasksBase: string,
|
||||||
|
projectionCacheBase = path.join(path.dirname(tasksBase), 'projection-cache')
|
||||||
): Promise<{
|
): Promise<{
|
||||||
tasks: unknown[];
|
tasks: unknown[];
|
||||||
diag?: Record<string, unknown>;
|
diag?: Record<string, unknown>;
|
||||||
}> {
|
}> {
|
||||||
const { result, diag } = await callWorker(worker, 'getAllTasks', {
|
const { result, diag } = await callWorker(worker, 'getAllTasks', {
|
||||||
tasksBase,
|
tasksBase,
|
||||||
|
projectionCacheBase,
|
||||||
maxTaskBytes: 256 * 1024,
|
maxTaskBytes: 256 * 1024,
|
||||||
maxTaskReadMs: 5_000,
|
maxTaskReadMs: 5_000,
|
||||||
concurrency: 2,
|
concurrency: 2,
|
||||||
|
|
@ -576,4 +577,118 @@ describe('team-fs-worker integration', () => {
|
||||||
await worker.terminate();
|
await worker.terminate();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('reuses persisted task projections after a worker restart', async () => {
|
||||||
|
const workerPath = await getWorkerPath();
|
||||||
|
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-'));
|
||||||
|
const tasksBase = path.join(tempDir, 'tasks');
|
||||||
|
const projectionCacheBase = path.join(tempDir, 'projection-cache');
|
||||||
|
const teamName = 'persistent-task-cache-team';
|
||||||
|
const tasksDir = path.join(tasksBase, teamName);
|
||||||
|
await fs.mkdir(tasksDir, { recursive: true });
|
||||||
|
await fs.writeFile(
|
||||||
|
path.join(tasksDir, '1.json'),
|
||||||
|
JSON.stringify({
|
||||||
|
id: '1',
|
||||||
|
subject: 'Persisted subject',
|
||||||
|
status: 'pending',
|
||||||
|
createdAt: '2026-05-02T12:00:00.000Z',
|
||||||
|
}),
|
||||||
|
'utf8'
|
||||||
|
);
|
||||||
|
|
||||||
|
const firstWorker = createWorker(workerPath);
|
||||||
|
try {
|
||||||
|
const first = await callGetAllTasks(firstWorker, tasksBase, projectionCacheBase);
|
||||||
|
expect(first.tasks[0]).toMatchObject({ teamName, subject: 'Persisted subject' });
|
||||||
|
expect(first.diag?.cacheMisses).toBe(1);
|
||||||
|
expect(first.diag?.persistentCacheWrites).toBe(1);
|
||||||
|
} finally {
|
||||||
|
await firstWorker.terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
const secondWorker = createWorker(workerPath);
|
||||||
|
try {
|
||||||
|
const second = await callGetAllTasks(secondWorker, tasksBase, projectionCacheBase);
|
||||||
|
expect(second.tasks[0]).toMatchObject({ teamName, subject: 'Persisted subject' });
|
||||||
|
expect(second.diag?.cacheHits).toBe(0);
|
||||||
|
expect(second.diag?.cacheMisses).toBe(0);
|
||||||
|
expect(second.diag?.persistentCacheLoads).toBe(1);
|
||||||
|
expect(second.diag?.persistentCacheHits).toBe(1);
|
||||||
|
} finally {
|
||||||
|
await secondWorker.terminate();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to task JSON when persisted projections are stale or corrupt', async () => {
|
||||||
|
const workerPath = await getWorkerPath();
|
||||||
|
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'team-fs-worker-'));
|
||||||
|
const tasksBase = path.join(tempDir, 'tasks');
|
||||||
|
const projectionCacheBase = path.join(tempDir, 'projection-cache');
|
||||||
|
const teamName = 'stale-persistent-cache-team';
|
||||||
|
const tasksDir = path.join(tasksBase, teamName);
|
||||||
|
const taskPath = path.join(tasksDir, '1.json');
|
||||||
|
await fs.mkdir(tasksDir, { recursive: true });
|
||||||
|
await fs.writeFile(
|
||||||
|
taskPath,
|
||||||
|
JSON.stringify({
|
||||||
|
id: '1',
|
||||||
|
subject: 'Original subject',
|
||||||
|
status: 'pending',
|
||||||
|
createdAt: '2026-05-02T12:00:00.000Z',
|
||||||
|
}),
|
||||||
|
'utf8'
|
||||||
|
);
|
||||||
|
|
||||||
|
const firstWorker = createWorker(workerPath);
|
||||||
|
try {
|
||||||
|
const first = await callGetAllTasks(firstWorker, tasksBase, projectionCacheBase);
|
||||||
|
expect(first.tasks[0]).toMatchObject({ subject: 'Original subject' });
|
||||||
|
expect(first.diag?.persistentCacheWrites).toBe(1);
|
||||||
|
} finally {
|
||||||
|
await firstWorker.terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
await fs.writeFile(
|
||||||
|
taskPath,
|
||||||
|
JSON.stringify({
|
||||||
|
id: '1',
|
||||||
|
subject: 'Changed subject with a different size',
|
||||||
|
status: 'pending',
|
||||||
|
createdAt: '2026-05-02T12:00:00.000Z',
|
||||||
|
}),
|
||||||
|
'utf8'
|
||||||
|
);
|
||||||
|
|
||||||
|
const changedWorker = createWorker(workerPath);
|
||||||
|
try {
|
||||||
|
const changed = await callGetAllTasks(changedWorker, tasksBase, projectionCacheBase);
|
||||||
|
expect(changed.tasks[0]).toMatchObject({
|
||||||
|
teamName,
|
||||||
|
subject: 'Changed subject with a different size',
|
||||||
|
});
|
||||||
|
expect(changed.diag?.persistentCacheLoads).toBe(1);
|
||||||
|
expect(changed.diag?.persistentCacheHits).toBe(0);
|
||||||
|
expect(changed.diag?.persistentCacheMisses).toBe(1);
|
||||||
|
expect(changed.diag?.cacheMisses).toBe(1);
|
||||||
|
} finally {
|
||||||
|
await changedWorker.terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
const cacheFiles = await fs.readdir(path.join(projectionCacheBase, 'v1'));
|
||||||
|
await fs.writeFile(path.join(projectionCacheBase, 'v1', cacheFiles[0]), '{bad json', 'utf8');
|
||||||
|
|
||||||
|
const corruptWorker = createWorker(workerPath);
|
||||||
|
try {
|
||||||
|
const recovered = await callGetAllTasks(corruptWorker, tasksBase, projectionCacheBase);
|
||||||
|
expect(recovered.tasks[0]).toMatchObject({
|
||||||
|
teamName,
|
||||||
|
subject: 'Changed subject with a different size',
|
||||||
|
});
|
||||||
|
expect(recovered.diag?.persistentCacheReadFailures).toBe(1);
|
||||||
|
expect(recovered.diag?.cacheMisses).toBe(1);
|
||||||
|
} finally {
|
||||||
|
await corruptWorker.terminate();
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue