fix(team): watch task log freshness from team state

This commit is contained in:
777genius 2026-05-11 15:19:45 +03:00
parent eb83ac92b2
commit 064f5b977d
7 changed files with 178 additions and 76 deletions

View file

@ -1,4 +1,5 @@
import { createLogger } from '@shared/utils/logger';
import { getTeamsBasePath } from '@main/utils/pathDecoder';
import { watch } from 'chokidar';
import { createHash } from 'crypto';
import * as fs from 'fs/promises';
@ -13,6 +14,7 @@ import {
BOARD_TASK_CHANGES_DIRNAME,
BOARD_TASK_LOG_FRESHNESS_DIRNAME,
BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX,
TEAM_TASK_LOG_FRESHNESS_DIRNAME,
classifyLogSourceWatcherEvent,
getRelativeLogSourceParts,
isAgentTranscriptFileName,
@ -86,6 +88,23 @@ function pushUniqueNormalizedPath(paths: string[], candidate: string | undefined
}
}
function getTeamTaskLogFreshnessDir(teamName: string): string {
return path.join(getTeamsBasePath(), teamName, TEAM_TASK_LOG_FRESHNESS_DIRNAME);
}
function pathsOverlap(left: string, right: string): boolean {
const normalizedLeft = path.normalize(left);
const normalizedRight = path.normalize(right);
const leftToRight = path.relative(normalizedLeft, normalizedRight);
const rightToLeft = path.relative(normalizedRight, normalizedLeft);
return (
!leftToRight ||
(!leftToRight.startsWith('..') && !path.isAbsolute(leftToRight)) ||
!rightToLeft ||
(!rightToLeft.startsWith('..') && !path.isAbsolute(rightToLeft))
);
}
export function shouldIgnoreLogSourceWatcherPath(
projectDir: string,
watchedPath: string,
@ -382,19 +401,23 @@ export class TeamLogSourceTracker {
}
const taskFreshnessRootDirs = this.getTaskFreshnessRootDirs(context);
const taskFreshnessWatchRootDirs = await this.ensureLogSourceFreshnessDirs(
const taskFreshnessDirs = await this.ensureLogSourceFreshnessDirs(
teamName,
context.projectDir,
taskFreshnessRootDirs
).catch((error) => {
logger.debug(`Failed to ensure log-source freshness dirs for ${teamName}: ${String(error)}`);
return [path.normalize(context.projectDir)];
return {
legacyRootDirs: [path.normalize(context.projectDir)],
logSignalDirs: [getTeamTaskLogFreshnessDir(teamName)],
};
});
const { targets, scopedSessionIds } = await this.buildScopedWatchTargets(
context.projectDir,
context.watchSessionIds,
this.getPendingUnknownSessionIds(state),
taskFreshnessWatchRootDirs
taskFreshnessDirs
);
if (!this.isTrackingCurrent(teamName, expectedVersion)) {
return;
@ -406,11 +429,19 @@ export class TeamLogSourceTracker {
ignorePermissionErrors: true,
followSymlinks: false,
depth: 0,
ignored: (watchedPath) =>
shouldIgnoreLogSourceWatcherPath(context.projectDir, watchedPath, {
ignored: (watchedPath) => {
if (
taskFreshnessDirs.logSignalDirs.some((logSignalDir) =>
pathsOverlap(watchedPath, logSignalDir)
)
) {
return false;
}
return shouldIgnoreLogSourceWatcherPath(context.projectDir, watchedPath, {
scopedSessionIds,
pendingRootSessionIds: new Set(this.getPendingUnknownSessionIds(state)),
}),
});
},
awaitWriteFinish: {
stabilityThreshold: 250,
pollInterval: 50,
@ -431,13 +462,13 @@ export class TeamLogSourceTracker {
return;
}
const eventTaskFreshnessRootDirs = this.getTaskFreshnessRootDirs(current.activeContext);
pushUniqueNormalizedPath(eventTaskFreshnessRootDirs, current.projectDir);
const eventTaskFreshnessDirs = this.getTaskFreshnessDirsForContext(
teamName,
current.projectDir,
eventTaskFreshnessRootDirs
);
if (
this.handleTaskFreshnessSignalChangeForRoots(
teamName,
changedPath,
eventTaskFreshnessRootDirs
)
this.handleTaskFreshnessSignalChangeForDirs(teamName, changedPath, eventTaskFreshnessDirs)
) {
return;
}
@ -485,17 +516,19 @@ export class TeamLogSourceTracker {
}
private async ensureLogSourceFreshnessDirs(
teamName: string,
transcriptProjectDir: string,
projectDirs: readonly string[]
): Promise<string[]> {
const watchRootDirs: string[] = [];
): Promise<{ legacyRootDirs: string[]; logSignalDirs: string[] }> {
const legacyRootDirs: string[] = [];
const logSignalDirs: string[] = [];
const normalizedTranscriptProjectDir = path.normalize(transcriptProjectDir);
pushUniqueNormalizedPath(watchRootDirs, normalizedTranscriptProjectDir);
const teamLogFreshnessDir = getTeamTaskLogFreshnessDir(teamName);
pushUniqueNormalizedPath(legacyRootDirs, normalizedTranscriptProjectDir);
pushUniqueNormalizedPath(logSignalDirs, teamLogFreshnessDir);
await Promise.all([
fs.mkdir(path.join(normalizedTranscriptProjectDir, BOARD_TASK_LOG_FRESHNESS_DIRNAME), {
recursive: true,
}),
fs.mkdir(teamLogFreshnessDir, { recursive: true }),
fs.mkdir(path.join(normalizedTranscriptProjectDir, BOARD_TASK_CHANGE_FRESHNESS_DIRNAME), {
recursive: true,
}),
@ -511,34 +544,35 @@ export class TeamLogSourceTracker {
if (!(await this.isDirectory(normalizedProjectDir))) {
return;
}
await Promise.all([
fs.mkdir(path.join(normalizedProjectDir, BOARD_TASK_LOG_FRESHNESS_DIRNAME), {
recursive: true,
}),
fs.mkdir(path.join(normalizedProjectDir, BOARD_TASK_CHANGE_FRESHNESS_DIRNAME), {
recursive: true,
}),
]);
pushUniqueNormalizedPath(watchRootDirs, normalizedProjectDir);
await fs.mkdir(path.join(normalizedProjectDir, BOARD_TASK_CHANGE_FRESHNESS_DIRNAME), {
recursive: true,
});
pushUniqueNormalizedPath(legacyRootDirs, normalizedProjectDir);
} catch (error) {
logger.debug(`Failed to ensure task freshness dirs in ${projectDir}: ${String(error)}`);
}
})
);
return watchRootDirs;
return { legacyRootDirs, logSignalDirs };
}
private async buildScopedWatchTargets(
projectDir: string,
confirmedSessionIds: readonly string[],
pendingRootSessionIds: readonly string[],
taskFreshnessRootDirs: readonly string[] = [projectDir]
taskFreshnessDirs: {
legacyRootDirs: readonly string[];
logSignalDirs: readonly string[];
} = { legacyRootDirs: [projectDir], logSignalDirs: [] }
): Promise<{ targets: string[]; scopedSessionIds: Set<string> }> {
const targets = new Set<string>();
const scopedSessionIds = new Set<string>();
targets.add(projectDir);
for (const freshnessRootDir of taskFreshnessRootDirs) {
for (const logSignalDir of taskFreshnessDirs.logSignalDirs) {
targets.add(logSignalDir);
}
for (const freshnessRootDir of taskFreshnessDirs.legacyRootDirs) {
targets.add(path.join(freshnessRootDir, BOARD_TASK_LOG_FRESHNESS_DIRNAME));
targets.add(path.join(freshnessRootDir, BOARD_TASK_CHANGE_FRESHNESS_DIRNAME));
}
@ -843,6 +877,36 @@ export class TeamLogSourceTracker {
return false;
}
private getTaskFreshnessDirsForContext(
teamName: string,
projectDir: string,
taskFreshnessRootDirs: readonly string[]
): { legacyRootDirs: string[]; logSignalDirs: string[] } {
const legacyRootDirs = [...taskFreshnessRootDirs];
pushUniqueNormalizedPath(legacyRootDirs, projectDir);
return {
legacyRootDirs,
logSignalDirs: [getTeamTaskLogFreshnessDir(teamName)],
};
}
private handleTaskFreshnessSignalChangeForDirs(
teamName: string,
changedPath: string,
taskFreshnessDirs: { legacyRootDirs: readonly string[]; logSignalDirs: readonly string[] }
): boolean {
for (const logSignalDir of taskFreshnessDirs.logSignalDirs) {
if (this.handleTaskFreshnessSignalChange(teamName, changedPath, logSignalDir, 'log')) {
return true;
}
}
return this.handleTaskFreshnessSignalChangeForRoots(
teamName,
changedPath,
taskFreshnessDirs.legacyRootDirs
);
}
private async recompute(teamName: string): Promise<TeamLogSourceSnapshot> {
const state = this.getOrCreateState(teamName);
if (this.getActiveConsumerCount(state) === 0) {

View file

@ -1,3 +1,4 @@
import { getTeamsBasePath } from '@main/utils/pathDecoder';
import { createHash } from 'crypto';
import * as fs from 'fs/promises';
import * as path from 'path';
@ -7,6 +8,7 @@ import { BoardTaskActivityParseCache } from '../taskLogs/activity/BoardTaskActiv
import type { TaskLogFreshnessSignal } from './TeamTaskStallTypes';
const BOARD_TASK_LOG_FRESHNESS_DIRNAME = '.board-task-log-freshness';
const TEAM_TASK_LOG_FRESHNESS_DIRNAME = 'task-log-freshness';
const BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX = '.json';
const MAX_TASK_ID_ARTIFACT_SEGMENT_LENGTH = 120;
@ -43,12 +45,14 @@ function taskIdArtifactSegments(taskId: string): string[] {
return safe === legacy ? [safe] : [safe, legacy];
}
function taskSignalPathCandidates(projectDir: string, taskId: string): string[] {
return taskIdArtifactSegments(taskId).map((segment) =>
path.join(
projectDir,
BOARD_TASK_LOG_FRESHNESS_DIRNAME,
`${segment}${BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX}`
function taskSignalPathCandidates(projectDir: string, taskId: string, teamName?: string): string[] {
const dirs = [
...(teamName ? [path.join(getTeamsBasePath(), teamName, TEAM_TASK_LOG_FRESHNESS_DIRNAME)] : []),
path.join(projectDir, BOARD_TASK_LOG_FRESHNESS_DIRNAME),
];
return dirs.flatMap((dir) =>
taskIdArtifactSegments(taskId).map((segment) =>
path.join(dir, `${segment}${BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX}`)
)
);
}
@ -62,11 +66,12 @@ export class TeamTaskLogFreshnessReader {
async readSignals(
projectDir: string,
taskIds: string[]
taskIds: string[],
options?: { teamName?: string }
): Promise<Map<string, TaskLogFreshnessSignal>> {
const uniqueTaskIds = [...new Set(taskIds)].filter((taskId) => taskId.trim().length > 0).sort();
const signalFilePathCandidates = uniqueTaskIds.map((taskId) =>
taskSignalPathCandidates(projectDir, taskId)
taskSignalPathCandidates(projectDir, taskId, options?.teamName)
);
this.cache.retainOnly(new Set(signalFilePathCandidates.flat()));

View file

@ -159,7 +159,8 @@ export class TeamTaskStallSnapshotSource {
const [freshnessByTaskId, exactRowsByFilePath, openCodeEvidence] = await Promise.all([
this.freshnessReader.readSignals(
transcriptContext.projectDir,
relevantMonitorTasks.map((task) => task.id)
relevantMonitorTasks.map((task) => task.id),
{ teamName }
),
exactReadsEnabled
? this.exactRowReader.parseFiles(relevantExactFiles)

View file

@ -3,6 +3,7 @@ import * as path from 'path';
import type { PersistedTeamLaunchMemberState, PersistedTeamLaunchSnapshot } from '@shared/types';
export const BOARD_TASK_LOG_FRESHNESS_DIRNAME = '.board-task-log-freshness';
export const TEAM_TASK_LOG_FRESHNESS_DIRNAME = 'task-log-freshness';
export const BOARD_TASK_CHANGE_FRESHNESS_DIRNAME = '.board-task-change-freshness';
export const BOARD_TASK_CHANGES_DIRNAME = '.board-task-changes';
export const BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX = '.json';

View file

@ -4,6 +4,10 @@ import { tmpdir } from 'os';
import * as path from 'path';
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from 'vitest';
import {
getTeamsBasePath,
setClaudeBasePathOverride,
} from '../../../../src/main/utils/pathDecoder';
import {
shouldIgnoreLogSourceWatcherPath,
TeamLogSourceTracker,
@ -19,6 +23,10 @@ function safeTaskIdSegment(taskId: string): string {
return `task-id-${createHash('sha256').update(taskId).digest('hex').slice(0, 32)}`;
}
function teamLogFreshnessDir(teamName = 'demo'): string {
return path.join(getTeamsBasePath(), teamName, 'task-log-freshness');
}
describe('TeamLogSourceTracker', () => {
let tempDir: string | null = null;
@ -28,6 +36,7 @@ describe('TeamLogSourceTracker', () => {
});
afterEach(async () => {
setClaudeBasePathOverride(null);
if (tempDir) {
await rm(tempDir, { recursive: true, force: true });
tempDir = null;
@ -49,6 +58,7 @@ describe('TeamLogSourceTracker', () => {
it('emits task-log-change for matching runtime freshness signals without broad log-source-change', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-'));
setClaudeBasePathOverride(path.join(tempDir, '.claude'));
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
@ -64,10 +74,10 @@ describe('TeamLogSourceTracker', () => {
await tracker.enableTracking('demo', 'change_presence');
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
await new Promise((resolve) => setTimeout(resolve, 350));
const taskId = '123e4567-e89b-12d3-a456-426614174999';
const signalDir = path.join(tempDir, '.board-task-log-freshness');
const signalDir = teamLogFreshnessDir();
await mkdir(signalDir, { recursive: true });
await writeFile(path.join(signalDir, `${encodeURIComponent(taskId)}.json`), '{"ok":true}');
@ -87,6 +97,7 @@ describe('TeamLogSourceTracker', () => {
it('keeps task-log tracking alive until the last consumer unsubscribes', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-refcount-'));
setClaudeBasePathOverride(path.join(tempDir, '.claude'));
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
@ -103,12 +114,12 @@ describe('TeamLogSourceTracker', () => {
await tracker.enableTracking('demo', 'task_log_stream');
await tracker.enableTracking('demo', 'task_log_stream');
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
await new Promise((resolve) => setTimeout(resolve, 350));
await tracker.disableTracking('demo', 'task_log_stream');
const taskId = '223e4567-e89b-12d3-a456-426614174999';
const signalDir = path.join(tempDir, '.board-task-log-freshness');
const signalDir = teamLogFreshnessDir();
await mkdir(signalDir, { recursive: true });
await writeFile(path.join(signalDir, `${encodeURIComponent(taskId)}.json`), '{"ok":true}');
@ -129,8 +140,9 @@ describe('TeamLogSourceTracker', () => {
expect(emitter).not.toHaveBeenCalled();
});
it('creates transcript freshness dirs without creating missing live cwd roots', async () => {
it('creates team log freshness dir without creating missing live cwd roots', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-missing-root-'));
setClaudeBasePathOverride(path.join(tempDir, '.claude'));
const transcriptProjectDir = path.join(tempDir, 'transcript-project');
const missingWorkspaceDir = path.join(tempDir, 'missing-workspace');
@ -152,17 +164,12 @@ describe('TeamLogSourceTracker', () => {
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
expect((await stat(path.join(transcriptProjectDir, '.board-task-log-freshness'))).isDirectory())
.toBe(true);
expect((await stat(teamLogFreshnessDir())).isDirectory()).toBe(true);
await expect(stat(missingWorkspaceDir)).rejects.toThrow();
const taskId = 'transcript-root-task';
await writeFile(
path.join(
transcriptProjectDir,
'.board-task-log-freshness',
`${encodeURIComponent(taskId)}.json`
),
path.join(teamLogFreshnessDir(), `${encodeURIComponent(taskId)}.json`),
JSON.stringify({ taskId }),
'utf8'
);
@ -181,6 +188,7 @@ describe('TeamLogSourceTracker', () => {
it('emits log freshness kind from Windows-safe hashed task-log freshness files', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-safe-log-'));
setClaudeBasePathOverride(path.join(tempDir, '.claude'));
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
@ -199,7 +207,7 @@ describe('TeamLogSourceTracker', () => {
await new Promise((resolve) => setTimeout(resolve, 100));
const taskId = 'AUX';
const signalDir = path.join(tempDir, '.board-task-log-freshness');
const signalDir = teamLogFreshnessDir();
await mkdir(signalDir, { recursive: true });
await writeFile(
path.join(signalDir, `${safeTaskIdSegment(taskId)}.json`),
@ -219,8 +227,9 @@ describe('TeamLogSourceTracker', () => {
await tracker.disableTracking('demo', 'task_log_stream');
});
it('watches live cwd freshness roots used by Codex Native traces', async () => {
it('watches team-scoped log freshness and live cwd task-change freshness roots', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-codex-root-'));
setClaudeBasePathOverride(path.join(tempDir, '.claude'));
const transcriptProjectDir = path.join(tempDir, 'transcripts');
const workspaceProjectDir = path.join(tempDir, 'workspace');
const memberProjectDir = path.join(tempDir, 'member-workspace');
@ -244,30 +253,15 @@ describe('TeamLogSourceTracker', () => {
await tracker.enableTracking('demo', 'task_log_stream');
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
await new Promise((resolve) => setTimeout(resolve, 350));
const logTaskId = 'codex-task-1';
await writeFile(
path.join(
memberProjectDir,
'.board-task-log-freshness',
`${encodeURIComponent(logTaskId)}.json`
),
JSON.stringify({ taskId: logTaskId, source: 'codex-native-trace' }),
'utf8'
);
await expect(stat(path.join(memberProjectDir, '.board-task-log-freshness'))).rejects.toThrow();
await expect(stat(path.join(workspaceProjectDir, '.board-task-log-freshness'))).rejects.toThrow();
await vi.waitFor(() => {
expect(emitter).toHaveBeenCalledWith({
type: 'task-log-change',
teamName: 'demo',
taskId: logTaskId,
taskSignalKind: 'log',
});
});
emitter.mockClear();
const changeTaskId = 'codex-task-2';
await mkdir(path.join(workspaceProjectDir, '.board-task-change-freshness'), {
recursive: true,
});
await writeFile(
path.join(
workspaceProjectDir,
@ -432,6 +426,7 @@ describe('TeamLogSourceTracker', () => {
it('supports stall_monitor as an independent tracking consumer', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-stall-monitor-'));
setClaudeBasePathOverride(path.join(tempDir, '.claude'));
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
@ -450,7 +445,7 @@ describe('TeamLogSourceTracker', () => {
await new Promise((resolve) => setTimeout(resolve, 100));
const taskId = '323e4567-e89b-12d3-a456-426614174999';
const signalDir = path.join(tempDir, '.board-task-log-freshness');
const signalDir = teamLogFreshnessDir();
await mkdir(signalDir, { recursive: true });
await writeFile(path.join(signalDir, `${encodeURIComponent(taskId)}.json`), '{"ok":true}');

View file

@ -4,6 +4,10 @@ import * as os from 'os';
import * as path from 'path';
import { afterEach, describe, expect, it } from 'vitest';
import {
getTeamsBasePath,
setClaudeBasePathOverride,
} from '../../../../../src/main/utils/pathDecoder';
import { TeamTaskLogFreshnessReader } from '../../../../../src/main/services/team/stallMonitor/TeamTaskLogFreshnessReader';
const tempDirs: string[] = [];
@ -13,6 +17,7 @@ function safeTaskIdSegment(taskId: string): string {
}
afterEach(async () => {
setClaudeBasePathOverride(null);
await Promise.all(
tempDirs.splice(0).map(async (dirPath) => {
await fs.rm(dirPath, { recursive: true, force: true });
@ -20,6 +25,10 @@ afterEach(async () => {
);
});
function teamSignalDir(teamName: string): string {
return path.join(getTeamsBasePath(), teamName, 'task-log-freshness');
}
describe('TeamTaskLogFreshnessReader', () => {
it('reads valid freshness signals and normalizes transcript basename', async () => {
const projectDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-freshness-'));
@ -84,6 +93,31 @@ describe('TeamTaskLogFreshnessReader', () => {
expect(signals.get('CON')?.transcriptFileBasename).toBe('session-con.jsonl');
});
it('prefers team-scoped freshness signals', async () => {
const projectDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-freshness-'));
tempDirs.push(projectDir);
setClaudeBasePathOverride(path.join(projectDir, '.claude'));
const signalDir = teamSignalDir('demo');
await fs.mkdir(signalDir, { recursive: true });
await fs.writeFile(
path.join(signalDir, `${encodeURIComponent('task-a')}.json`),
JSON.stringify({
taskId: 'task-a',
updatedAt: '2026-04-19T12:00:00.000Z',
}),
'utf8'
);
const signals = await new TeamTaskLogFreshnessReader().readSignals(projectDir, ['task-a'], {
teamName: 'demo',
});
expect(signals.get('task-a')?.filePath).toBe(
path.join(signalDir, `${encodeURIComponent('task-a')}.json`)
);
});
it('reads hashed freshness files for very long task ids', async () => {
const projectDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-freshness-'));
tempDirs.push(projectDir);

View file

@ -155,7 +155,9 @@ describe('TeamTaskStallSnapshotSource', () => {
tasks: [...expectedWorkflowActiveTasks, ...deletedTasks],
messages: rawMessages,
});
expect(freshnessReader.readSignals).toHaveBeenCalledWith('/tmp/project', ['task-a', 'task-b']);
expect(freshnessReader.readSignals).toHaveBeenCalledWith('/tmp/project', ['task-a', 'task-b'], {
teamName: 'demo',
});
expect(exactRowReader.parseFiles).toHaveBeenCalledWith(['/tmp/project/session-a.jsonl', '/tmp/project/session-b.jsonl']);
expect(openCodeEvidenceSource.readEvidence).toHaveBeenCalledWith({
teamName: 'demo',