fix(team): scope log source tracking

This commit is contained in:
777genius 2026-05-03 10:25:38 +03:00
parent 2fde1ad7fc
commit 1339629da2
8 changed files with 990 additions and 85 deletions

View file

@ -8,16 +8,27 @@ import {
computeTaskChangePresenceProjectFingerprint,
normalizeTaskChangePresenceFilePath,
} from './taskChangePresenceUtils';
import {
BOARD_TASK_CHANGE_FRESHNESS_DIRNAME,
BOARD_TASK_CHANGES_DIRNAME,
BOARD_TASK_LOG_FRESHNESS_DIRNAME,
BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX,
MAX_PENDING_UNKNOWN_ROOT_REFRESH_ATTEMPTS,
MAX_PENDING_UNKNOWN_ROOT_SESSIONS,
PENDING_UNKNOWN_ROOT_SESSION_TTL_MS,
classifyLogSourceWatcherEvent,
getRelativeLogSourceParts,
isAgentTranscriptFileName,
normalizeLogSourceSessionId,
} from './teamLogSourceWatchScope';
import type { TeamMemberLogsFinder } from './TeamMemberLogsFinder';
import type { TeamLogSourceLiveContext, TeamMemberLogsFinder } from './TeamMemberLogsFinder';
import type { TeamChangeEvent } from '@shared/types';
import type { FSWatcher } from 'chokidar';
const logger = createLogger('Service:TeamLogSourceTracker');
const BOARD_TASK_LOG_FRESHNESS_DIRNAME = '.board-task-log-freshness';
const BOARD_TASK_CHANGE_FRESHNESS_DIRNAME = '.board-task-change-freshness';
const BOARD_TASK_CHANGES_DIRNAME = '.board-task-changes';
const BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX = '.json';
const CONTEXT_REFRESH_DEBOUNCE_MS = 300;
const PENDING_CONTEXT_REFRESH_RETRY_MS = 1_000;
interface TeamLogSourceSnapshot {
projectFingerprint: string | null;
@ -33,7 +44,11 @@ export type TeamLogSourceTrackingConsumer =
interface TrackingState {
watcher: FSWatcher | null;
projectDir: string | null;
activeContext: TeamLogSourceLiveContext | null;
scopedSessionIds: Set<string>;
pendingUnknownSessionIds: Map<string, PendingUnknownSessionCandidate>;
refreshTimer: ReturnType<typeof setTimeout> | null;
contextRefreshTimer: ReturnType<typeof setTimeout> | null;
initializePromise: Promise<TeamLogSourceSnapshot> | null;
initializeVersion: number | null;
recomputePromise: Promise<TeamLogSourceSnapshot> | null;
@ -43,6 +58,12 @@ interface TrackingState {
lifecycleVersion: number;
}
interface PendingUnknownSessionCandidate {
sessionId: string;
expiresAt: number;
refreshAttempts: number;
}
type DecodedFreshnessTaskId =
| { kind: 'task-id'; taskId: string }
| { kind: 'opaque-safe-segment' }
@ -52,14 +73,30 @@ function isOpaqueSafeTaskIdSegment(segment: string): boolean {
return /^task-id-[0-9a-f]{32}$/.test(segment);
}
export function shouldIgnoreLogSourceWatcherPath(projectDir: string, watchedPath: string): boolean {
const relativePath = path.relative(projectDir, watchedPath);
if (!relativePath || relativePath.startsWith('..') || path.isAbsolute(relativePath)) {
export function shouldIgnoreLogSourceWatcherPath(
projectDir: string,
watchedPath: string,
_scope?: { scopedSessionIds?: ReadonlySet<string> }
): boolean {
const parts = getRelativeLogSourceParts(projectDir, watchedPath);
if (!parts) {
return false;
}
const parts = relativePath.split(/[/\\]+/).filter(Boolean);
return parts[0] === BOARD_TASK_CHANGES_DIRNAME;
const first = parts[0];
if (first === BOARD_TASK_CHANGES_DIRNAME) return true;
if (parts.includes('tool-results')) return true;
if (parts.includes('memory')) return true;
if (first === BOARD_TASK_LOG_FRESHNESS_DIRNAME) return false;
if (first === BOARD_TASK_CHANGE_FRESHNESS_DIRNAME) return false;
if (parts.length >= 2 && parts[1] === 'subagents') {
if (parts.length === 2) return false;
if (parts.length === 3) return !isAgentTranscriptFileName(parts[2]);
return true;
}
return false;
}
export class TeamLogSourceTracker {
@ -149,7 +186,11 @@ export class TeamLogSourceTracker {
const created: TrackingState = {
watcher: null,
projectDir: null,
activeContext: null,
scopedSessionIds: new Set(),
pendingUnknownSessionIds: new Map(),
refreshTimer: null,
contextRefreshTimer: null,
initializePromise: null,
initializeVersion: null,
recomputePromise: null,
@ -205,6 +246,10 @@ export class TeamLogSourceTracker {
clearTimeout(state.refreshTimer);
state.refreshTimer = null;
}
if (state.contextRefreshTimer) {
clearTimeout(state.contextRefreshTimer);
state.contextRefreshTimer = null;
}
if (state.watcher) {
await state.watcher.close().catch(() => undefined);
@ -212,6 +257,9 @@ export class TeamLogSourceTracker {
}
state.projectDir = null;
state.activeContext = null;
state.scopedSessionIds.clear();
state.pendingUnknownSessionIds.clear();
state.snapshot = { projectFingerprint: null, logSourceGeneration: null };
return { ...state.snapshot };
}
@ -231,24 +279,27 @@ export class TeamLogSourceTracker {
): Promise<TeamLogSourceSnapshot> {
const state = this.getOrCreateState(teamName);
const previousGeneration = state.snapshot.logSourceGeneration;
const context = await this.logsFinder.getLogSourceWatchContext(teamName, {
const context = await this.logsFinder.getLiveLogSourceWatchContext(teamName, {
forceRefresh: true,
});
if (!this.isTrackingCurrent(teamName, expectedVersion)) {
return this.getOrCreateState(teamName).snapshot;
}
if (!context) {
state.activeContext = null;
state.scopedSessionIds.clear();
state.snapshot = { projectFingerprint: null, logSourceGeneration: null };
await this.rebuildWatcher(teamName, null, expectedVersion);
return state.snapshot;
}
state.activeContext = context;
const snapshot = await this.computeSnapshot(context);
if (!this.isTrackingCurrent(teamName, expectedVersion)) {
return this.getOrCreateState(teamName).snapshot;
}
state.snapshot = snapshot;
await this.rebuildWatcher(teamName, context.projectDir, expectedVersion);
await this.rebuildWatcher(teamName, context, expectedVersion);
if (
this.isTrackingCurrent(teamName, expectedVersion) &&
state.snapshot.logSourceGeneration &&
@ -261,7 +312,7 @@ export class TeamLogSourceTracker {
private async rebuildWatcher(
teamName: string,
projectDir: string | null,
context: TeamLogSourceLiveContext | null,
expectedVersion: number
): Promise<void> {
const state = this.stateByTeam.get(teamName);
@ -272,17 +323,15 @@ export class TeamLogSourceTracker {
) {
return;
}
if (state.projectDir === projectDir && state.watcher) {
return;
}
if (state.watcher) {
await state.watcher.close().catch(() => undefined);
state.watcher = null;
}
state.projectDir = projectDir;
if (!projectDir) {
state.projectDir = context?.projectDir ?? null;
state.scopedSessionIds.clear();
if (!context?.projectDir) {
return;
}
@ -291,26 +340,56 @@ export class TeamLogSourceTracker {
return;
}
state.watcher = watch(projectDir, {
await this.ensureLogSourceFreshnessDirs(context.projectDir).catch((error) => {
logger.debug(`Failed to ensure log-source freshness dirs for ${teamName}: ${String(error)}`);
});
const { targets, scopedSessionIds } = await this.buildScopedWatchTargets(
context.projectDir,
context.watchSessionIds,
this.getPendingUnknownSessionIds(state)
);
if (!this.isTrackingCurrent(teamName, expectedVersion)) {
return;
}
state.scopedSessionIds = scopedSessionIds;
state.watcher = watch(targets, {
ignoreInitial: true,
ignorePermissionErrors: true,
followSymlinks: false,
depth: 3,
ignored: (watchedPath) => shouldIgnoreLogSourceWatcherPath(projectDir, watchedPath),
depth: 0,
ignored: (watchedPath) =>
shouldIgnoreLogSourceWatcherPath(context.projectDir, watchedPath, { scopedSessionIds }),
awaitWriteFinish: {
stabilityThreshold: 250,
pollInterval: 50,
},
});
const scheduleRecompute = (changedPath?: string): void => {
const handleWatcherEvent = (
eventName: 'add' | 'change' | 'unlink' | 'addDir' | 'unlinkDir',
changedPath?: string
): void => {
const current = this.stateByTeam.get(teamName);
if (!current || this.getActiveConsumerCount(current) === 0 || !current.projectDir) {
if (
!changedPath ||
!current ||
this.getActiveConsumerCount(current) === 0 ||
!current.projectDir
) {
return;
}
if (
changedPath &&
(this.handleTaskFreshnessSignalChange(
const action = classifyLogSourceWatcherEvent({
projectDir: current.projectDir,
changedPath,
eventName,
scopedSessionIds: current.scopedSessionIds,
pendingUnknownSessionIds: new Set(current.pendingUnknownSessionIds.keys()),
});
if (action.kind === 'task-freshness') {
this.handleTaskFreshnessSignalChange(
teamName,
current.projectDir,
changedPath,
@ -321,29 +400,234 @@ export class TeamLogSourceTracker {
current.projectDir,
changedPath,
BOARD_TASK_CHANGE_FRESHNESS_DIRNAME
))
) {
);
return;
}
if (current.refreshTimer) {
clearTimeout(current.refreshTimer);
if (action.kind === 'context-refresh') {
this.scheduleContextRefresh(teamName, action.candidateSessionId);
return;
}
if (action.kind === 'scoped-recompute') {
this.scheduleScopedRecompute(teamName);
}
current.refreshTimer = setTimeout(() => {
current.refreshTimer = null;
void this.recompute(teamName);
}, 300);
};
state.watcher.on('add', scheduleRecompute);
state.watcher.on('change', scheduleRecompute);
state.watcher.on('unlink', scheduleRecompute);
state.watcher.on('addDir', scheduleRecompute);
state.watcher.on('unlinkDir', scheduleRecompute);
state.watcher.on('add', (changedPath) => handleWatcherEvent('add', changedPath));
state.watcher.on('change', (changedPath) => handleWatcherEvent('change', changedPath));
state.watcher.on('unlink', (changedPath) => handleWatcherEvent('unlink', changedPath));
state.watcher.on('addDir', (changedPath) => handleWatcherEvent('addDir', changedPath));
state.watcher.on('unlinkDir', (changedPath) => handleWatcherEvent('unlinkDir', changedPath));
state.watcher.on('error', (error) => {
logger.warn(`Log-source watcher error for ${teamName}: ${String(error)}`);
});
}
private async ensureLogSourceFreshnessDirs(projectDir: string): Promise<void> {
await Promise.all([
fs.mkdir(path.join(projectDir, BOARD_TASK_LOG_FRESHNESS_DIRNAME), { recursive: true }),
fs.mkdir(path.join(projectDir, BOARD_TASK_CHANGE_FRESHNESS_DIRNAME), { recursive: true }),
]);
}
private async buildScopedWatchTargets(
projectDir: string,
confirmedSessionIds: readonly string[],
pendingRootSessionIds: readonly string[]
): Promise<{ targets: string[]; scopedSessionIds: Set<string> }> {
const targets = new Set<string>();
const scopedSessionIds = new Set<string>();
targets.add(projectDir);
targets.add(path.join(projectDir, BOARD_TASK_LOG_FRESHNESS_DIRNAME));
targets.add(path.join(projectDir, BOARD_TASK_CHANGE_FRESHNESS_DIRNAME));
for (const rawSessionId of confirmedSessionIds) {
const sessionId = normalizeLogSourceSessionId(rawSessionId);
if (!sessionId) {
continue;
}
scopedSessionIds.add(sessionId);
const rootTranscript = path.join(projectDir, `${sessionId}.jsonl`);
const sessionDir = path.join(projectDir, sessionId);
const subagentsDir = path.join(sessionDir, 'subagents');
if (await this.isFile(rootTranscript)) targets.add(rootTranscript);
if (await this.isDirectory(sessionDir)) targets.add(sessionDir);
if (await this.isDirectory(subagentsDir)) targets.add(subagentsDir);
}
for (const rawSessionId of pendingRootSessionIds) {
const sessionId = normalizeLogSourceSessionId(rawSessionId);
if (!sessionId || scopedSessionIds.has(sessionId)) {
continue;
}
const rootTranscript = path.join(projectDir, `${sessionId}.jsonl`);
if (await this.isFile(rootTranscript)) targets.add(rootTranscript);
}
return { targets: [...targets], scopedSessionIds };
}
private async isFile(targetPath: string): Promise<boolean> {
try {
return (await fs.stat(targetPath)).isFile();
} catch {
return false;
}
}
private async isDirectory(targetPath: string): Promise<boolean> {
try {
return (await fs.stat(targetPath)).isDirectory();
} catch {
return false;
}
}
private getPendingUnknownSessionIds(state: TrackingState): string[] {
this.prunePendingUnknownSessions(state);
return [...state.pendingUnknownSessionIds.keys()];
}
private rememberPendingUnknownSession(
state: TrackingState,
rawSessionId: string | undefined
): void {
const sessionId = normalizeLogSourceSessionId(rawSessionId);
if (!sessionId || state.scopedSessionIds.has(sessionId)) {
return;
}
const now = Date.now();
state.pendingUnknownSessionIds.set(sessionId, {
sessionId,
expiresAt: now + PENDING_UNKNOWN_ROOT_SESSION_TTL_MS,
refreshAttempts: state.pendingUnknownSessionIds.get(sessionId)?.refreshAttempts ?? 0,
});
while (state.pendingUnknownSessionIds.size > MAX_PENDING_UNKNOWN_ROOT_SESSIONS) {
const oldest = [...state.pendingUnknownSessionIds.values()].sort(
(left, right) => left.expiresAt - right.expiresAt
)[0];
if (!oldest) break;
state.pendingUnknownSessionIds.delete(oldest.sessionId);
}
}
private prunePendingUnknownSessions(state: TrackingState): void {
const now = Date.now();
for (const [sessionId, candidate] of state.pendingUnknownSessionIds.entries()) {
if (
candidate.expiresAt <= now ||
candidate.refreshAttempts >= MAX_PENDING_UNKNOWN_ROOT_REFRESH_ATTEMPTS
) {
state.pendingUnknownSessionIds.delete(sessionId);
}
}
}
private markPendingRefreshAttempt(state: TrackingState): void {
for (const candidate of state.pendingUnknownSessionIds.values()) {
candidate.refreshAttempts += 1;
}
this.prunePendingUnknownSessions(state);
}
private removeConfirmedPendingSessions(
state: TrackingState,
confirmedSessionIds: readonly string[]
): void {
for (const rawSessionId of confirmedSessionIds) {
const sessionId = normalizeLogSourceSessionId(rawSessionId);
if (sessionId) {
state.pendingUnknownSessionIds.delete(sessionId);
}
}
}
private scheduleScopedRecompute(teamName: string): void {
const current = this.stateByTeam.get(teamName);
if (!current || this.getActiveConsumerCount(current) === 0) {
return;
}
if (current.refreshTimer) {
clearTimeout(current.refreshTimer);
}
current.refreshTimer = setTimeout(() => {
current.refreshTimer = null;
void this.recompute(teamName);
}, 300);
}
private scheduleContextRefresh(
teamName: string,
candidateSessionId?: string,
delayMs: number = CONTEXT_REFRESH_DEBOUNCE_MS
): void {
const state = this.stateByTeam.get(teamName);
if (!state || this.getActiveConsumerCount(state) === 0) {
return;
}
this.rememberPendingUnknownSession(state, candidateSessionId);
if (state.contextRefreshTimer) {
return;
}
state.contextRefreshTimer = setTimeout(() => {
const current = this.stateByTeam.get(teamName);
if (!current) return;
current.contextRefreshTimer = null;
if (this.getActiveConsumerCount(current) === 0) return;
void this.refreshContextAndWatcher(teamName, current.lifecycleVersion);
}, delayMs);
}
private async refreshContextAndWatcher(teamName: string, expectedVersion: number): Promise<void> {
const state = this.stateByTeam.get(teamName);
if (!state || !this.isTrackingCurrent(teamName, expectedVersion)) {
return;
}
this.markPendingRefreshAttempt(state);
const previousGeneration = state.snapshot.logSourceGeneration;
const context = await this.logsFinder.getLiveLogSourceWatchContext(teamName, {
forceRefresh: true,
});
if (!this.isTrackingCurrent(teamName, expectedVersion)) {
return;
}
state.activeContext = context;
if (!context) {
state.scopedSessionIds.clear();
state.snapshot = { projectFingerprint: null, logSourceGeneration: null };
await this.rebuildWatcher(teamName, null, expectedVersion);
return;
}
this.removeConfirmedPendingSessions(state, context.watchSessionIds);
state.snapshot = await this.computeSnapshot(context);
if (!this.isTrackingCurrent(teamName, expectedVersion)) {
return;
}
await this.rebuildWatcher(teamName, context, expectedVersion);
if (
state.snapshot.logSourceGeneration &&
previousGeneration !== state.snapshot.logSourceGeneration
) {
this.emitLogSourceChange(teamName);
}
if (
this.isTrackingCurrent(teamName, expectedVersion) &&
state.pendingUnknownSessionIds.size > 0
) {
this.scheduleContextRefresh(teamName, undefined, PENDING_CONTEXT_REFRESH_RETRY_MS);
}
}
private handleTaskFreshnessSignalChange(
teamName: string,
projectDir: string,
@ -440,22 +724,15 @@ export class TeamLogSourceTracker {
const recomputeVersion = state.lifecycleVersion;
const recomputePromise = (async () => {
const previousGeneration = state.snapshot.logSourceGeneration;
const context = await this.logsFinder.getLogSourceWatchContext(teamName, {
forceRefresh: true,
});
if (!this.isTrackingCurrent(teamName, recomputeVersion)) {
return this.getOrCreateState(teamName).snapshot;
}
const context = state.activeContext;
if (!context) {
state.snapshot = { projectFingerprint: null, logSourceGeneration: null };
await this.rebuildWatcher(teamName, null, recomputeVersion);
} else {
state.snapshot = await this.computeSnapshot(context);
if (!this.isTrackingCurrent(teamName, recomputeVersion)) {
return this.getOrCreateState(teamName).snapshot;
}
await this.rebuildWatcher(teamName, context.projectDir, recomputeVersion);
}
if (
@ -495,23 +772,21 @@ export class TeamLogSourceTracker {
});
}
private async computeSnapshot(context: {
projectDir: string;
projectPath?: string;
leadSessionId?: string;
sessionIds: string[];
}): Promise<TeamLogSourceSnapshot> {
private async computeSnapshot(context: TeamLogSourceLiveContext): Promise<TeamLogSourceSnapshot> {
const projectFingerprint = computeTaskChangePresenceProjectFingerprint(context.projectPath);
const parts: string[] = [];
const sessionIds =
context.watchSessionIds.length > 0 ? context.watchSessionIds : context.sessionIds;
if (context.leadSessionId) {
const leadLogPath = path.join(context.projectDir, `${context.leadSessionId}.jsonl`);
parts.push(await this.describePath('lead', leadLogPath));
}
for (const sessionId of [...context.sessionIds].sort((a, b) => a.localeCompare(b))) {
for (const rawSessionId of [...sessionIds].sort((a, b) => a.localeCompare(b))) {
const sessionId = normalizeLogSourceSessionId(rawSessionId);
if (!sessionId) {
continue;
}
const rootLogPath = path.join(context.projectDir, `${sessionId}.jsonl`);
const sessionDir = path.join(context.projectDir, sessionId);
const subagentsDir = path.join(sessionDir, 'subagents');
parts.push(await this.describePath('root', rootLogPath));
parts.push(await this.describePath('session', sessionDir));
parts.push(await this.describePath('subagents', subagentsDir));
@ -523,25 +798,19 @@ export class TeamLogSourceTracker {
}
for (const fileName of entries
.filter(
(entry) =>
entry.startsWith('agent-') &&
entry.endsWith('.jsonl') &&
!entry.startsWith('agent-acompact')
)
.filter((entry) => isAgentTranscriptFileName(entry))
.sort((a, b) => a.localeCompare(b))) {
parts.push(await this.describePath('subagent-log', path.join(subagentsDir, fileName)));
}
}
const sourceMaterial =
parts.length > 0
? parts.join('|')
: `empty:${normalizeTaskChangePresenceFilePath(context.projectDir)}`;
if (parts.length === 0) {
return { projectFingerprint, logSourceGeneration: null };
}
return {
projectFingerprint,
logSourceGeneration: createHash('sha256').update(sourceMaterial).digest('hex'),
logSourceGeneration: createHash('sha256').update(parts.join('|')).digest('hex'),
};
}

View file

@ -10,10 +10,16 @@ import {
canonicalizeAgentTeamsToolName,
lineHasAgentTeamsTaskBoundaryToolName,
} from './agentTeamsToolNames';
import {
readBootstrapLaunchSnapshot,
choosePreferredLaunchSnapshot,
} from './TeamBootstrapStateReader';
import { TeamConfigReader } from './TeamConfigReader';
import { TeamInboxReader } from './TeamInboxReader';
import { TeamLaunchStateStore } from './TeamLaunchStateStore';
import { TeamMembersMetaStore } from './TeamMembersMetaStore';
import { TeamTranscriptProjectResolver } from './TeamTranscriptProjectResolver';
import { buildTeamLogWatchSessionIds, extractRuntimeSessionIds } from './teamLogSourceWatchScope';
import type {
MemberLogSummary,
@ -107,6 +113,14 @@ export interface MemberLogFileRef {
mtimeMs: number;
}
export interface TeamLogSourceLiveContext {
projectDir: string;
projectPath?: string;
leadSessionId?: string;
sessionIds: string[];
watchSessionIds: string[];
}
async function mapLimit<T, R>(
items: readonly T[],
limit: number,
@ -151,7 +165,8 @@ export class TeamMemberLogsFinder {
{
getConfig: (teamName) => configReader.getConfigSnapshot(teamName),
}
)
),
private readonly launchStateStore: TeamLaunchStateStore = new TeamLaunchStateStore()
) {}
async findMemberLogs(
@ -262,6 +277,43 @@ export class TeamMemberLogsFinder {
};
}
async getLiveLogSourceWatchContext(
teamName: string,
options?: { forceRefresh?: boolean }
): Promise<TeamLogSourceLiveContext | null> {
const [launchSnapshot, bootstrapSnapshot] = await Promise.all([
this.launchStateStore.read(teamName).catch(() => null),
readBootstrapLaunchSnapshot(teamName).catch(() => null),
]);
const preferredSnapshot = choosePreferredLaunchSnapshot(bootstrapSnapshot, launchSnapshot);
const extraProjectPathCandidates = Object.values(preferredSnapshot?.members ?? {}).map(
(member) => member.cwd
);
const base = await this.projectResolver.getLiveBaseContext(teamName, {
forceRefresh: options?.forceRefresh,
extraProjectPathCandidates,
});
if (!base) {
return null;
}
const watchSessionIds = buildTeamLogWatchSessionIds({
configLeadSessionId: base.config.leadSessionId,
launchLeadSessionId: preferredSnapshot?.leadSessionId,
sessionHistory: base.config.sessionHistory,
launchRuntimeSessionIds: extractRuntimeSessionIds(preferredSnapshot),
});
return {
projectDir: base.projectDir,
projectPath: base.config.projectPath,
leadSessionId: base.config.leadSessionId ?? preferredSnapshot?.leadSessionId,
sessionIds: watchSessionIds,
watchSessionIds,
};
}
/**
* Returns session logs that reference the given task (TaskCreate, TaskUpdate, comments, etc.).
* When the task is in_progress and has an owner, also includes that owner's session logs so

View file

@ -185,6 +185,12 @@ export interface TeamTranscriptProjectContext {
sessionIds: string[];
}
export interface TeamTranscriptProjectLiveBaseContext {
projectDir: string;
projectId: string;
config: TeamConfig;
}
export class TeamTranscriptProjectResolver {
private readonly contextCache = new Map<
string,
@ -201,6 +207,41 @@ export class TeamTranscriptProjectResolver {
: this.configReader.getConfig(teamName);
}
async getLiveBaseContext(
teamName: string,
options?: { forceRefresh?: boolean; extraProjectPathCandidates?: readonly unknown[] }
): Promise<TeamTranscriptProjectLiveBaseContext | null> {
if (options?.forceRefresh) {
this.contextCache.delete(teamName);
}
const config = await this.readConfigForObservation(teamName);
if (!config) {
return null;
}
const projectPathCandidates = this.collectLiveProjectPathCandidates(
config,
options?.extraProjectPathCandidates ?? []
);
const resolution = await this.resolveLiveProjectDirectoryFromCandidates(projectPathCandidates);
if (!resolution) {
return null;
}
const resolvedConfig =
trimTrailingSlashes(config.projectPath ?? '') !==
trimTrailingSlashes(resolution.effectiveProjectPath)
? { ...config, projectPath: resolution.effectiveProjectPath }
: config;
return {
projectDir: resolution.projectDir,
projectId: resolution.projectId,
config: resolvedConfig,
};
}
async getContext(
teamName: string,
options?: { forceRefresh?: boolean }
@ -438,6 +479,46 @@ export class TeamTranscriptProjectResolver {
return candidates;
}
private collectLiveProjectPathCandidates(
config: TeamConfig,
extraCandidates: readonly unknown[]
): string[] {
const candidates: string[] = [];
const seen = new Set<string>();
const push = (value: unknown): void => {
const normalized = normalizeProjectPathCandidate(value);
if (!normalized || seen.has(normalized)) {
return;
}
seen.add(normalized);
candidates.push(normalized);
};
push(config.projectPath);
const history = Array.isArray(config.projectPathHistory) ? config.projectPathHistory : [];
for (let index = history.length - 1; index >= Math.max(0, history.length - 5); index -= 1) {
push(history[index]);
}
push((config.members ?? []).find((member) => isLeadMember(member))?.cwd);
const distinctMemberCwds = new Set(
(config.members ?? [])
.map((member) => normalizeProjectPathCandidate(member.cwd))
.filter((cwd): cwd is string => Boolean(cwd))
);
if (distinctMemberCwds.size === 1) {
push([...distinctMemberCwds][0]);
}
for (const candidate of extraCandidates.slice(0, 64)) {
push(candidate);
}
return candidates;
}
private buildProjectDirCandidates(projectPath: string): ProjectDirCandidate[] {
const normalizedProjectPath = trimTrailingSlashes(projectPath);
const projectId = extractBaseDir(encodePath(normalizedProjectPath));
@ -470,6 +551,32 @@ export class TeamTranscriptProjectResolver {
}));
}
private async resolveLiveProjectDirectoryFromCandidates(
candidates: readonly string[]
): Promise<{ projectDir: string; projectId: string; effectiveProjectPath: string } | null> {
let firstResolution: {
projectDir: string;
projectId: string;
effectiveProjectPath: string;
} | null = null;
for (const projectPath of candidates) {
for (const dirCandidate of this.buildProjectDirCandidates(projectPath)) {
const resolution = {
projectDir: dirCandidate.projectDir,
projectId: dirCandidate.projectId,
effectiveProjectPath: projectPath,
};
firstResolution ??= resolution;
if (await this.projectDirExists(dirCandidate.projectDir)) {
return resolution;
}
}
}
return null;
}
private async findMatchInProjectPathCandidate(
candidate: ProjectPathCandidate,
sessionIds: string[]

View file

@ -0,0 +1,178 @@
import * as path from 'path';
import type { PersistedTeamLaunchMemberState, PersistedTeamLaunchSnapshot } from '@shared/types';
export const BOARD_TASK_LOG_FRESHNESS_DIRNAME = '.board-task-log-freshness';
export const BOARD_TASK_CHANGE_FRESHNESS_DIRNAME = '.board-task-change-freshness';
export const BOARD_TASK_CHANGES_DIRNAME = '.board-task-changes';
export const BOARD_TASK_LOG_FRESHNESS_FILE_SUFFIX = '.json';
export const MAX_LOG_SOURCE_WATCH_SESSION_IDS = 24;
export const MAX_PENDING_UNKNOWN_ROOT_SESSIONS = 16;
export const PENDING_UNKNOWN_ROOT_SESSION_TTL_MS = 15_000;
export const MAX_PENDING_UNKNOWN_ROOT_REFRESH_ATTEMPTS = 4;
const SAFE_SESSION_ID_RE = /^[A-Za-z0-9_-]{1,200}$/;
const AGENT_TRANSCRIPT_RE = /^agent-(?!acompact).*\.jsonl$/;
export type WatcherEventName = 'add' | 'change' | 'unlink' | 'addDir' | 'unlinkDir';
export type LogSourceWatcherAction =
| { kind: 'ignore' }
| { kind: 'task-freshness' }
| { kind: 'scoped-recompute' }
| { kind: 'context-refresh'; candidateSessionId?: string };
export interface TeamLogWatchSessionInput {
configLeadSessionId?: unknown;
launchLeadSessionId?: unknown;
sessionHistory?: unknown[];
launchRuntimeSessionIds?: unknown[];
bootstrapRuntimeSessionIds?: unknown[];
}
export interface ClassifyLogSourceWatcherEventInput {
projectDir: string;
changedPath: string;
eventName: WatcherEventName;
scopedSessionIds: ReadonlySet<string>;
pendingUnknownSessionIds: ReadonlySet<string>;
}
export function normalizeLogSourceSessionId(value: unknown): string | null {
if (typeof value !== 'string') {
return null;
}
const trimmed = value.trim();
if (!trimmed || !SAFE_SESSION_ID_RE.test(trimmed)) {
return null;
}
return trimmed;
}
function pushSessionId(ids: string[], seen: Set<string>, value: unknown, limit: number): void {
if (ids.length >= limit) {
return;
}
const normalized = normalizeLogSourceSessionId(value);
if (!normalized || seen.has(normalized)) {
return;
}
seen.add(normalized);
ids.push(normalized);
}
function shouldIncludeRuntimeMember(member: PersistedTeamLaunchMemberState): boolean {
return Boolean(
member.runtimeSessionId &&
!member.hardFailure &&
member.launchState !== 'skipped_for_launch' &&
member.launchState !== 'failed_to_start'
);
}
export function extractRuntimeSessionIds(
snapshot: PersistedTeamLaunchSnapshot | null | undefined
): string[] {
if (!snapshot?.members) {
return [];
}
const ids: string[] = [];
const seen = new Set<string>();
for (const member of Object.values(snapshot.members)) {
if (!shouldIncludeRuntimeMember(member)) {
continue;
}
pushSessionId(ids, seen, member.runtimeSessionId, MAX_LOG_SOURCE_WATCH_SESSION_IDS);
}
return ids;
}
export function buildTeamLogWatchSessionIds(
input: TeamLogWatchSessionInput,
limit = MAX_LOG_SOURCE_WATCH_SESSION_IDS
): string[] {
const ids: string[] = [];
const seen = new Set<string>();
pushSessionId(ids, seen, input.configLeadSessionId, limit);
pushSessionId(ids, seen, input.launchLeadSessionId, limit);
for (const sessionId of input.launchRuntimeSessionIds ?? []) {
pushSessionId(ids, seen, sessionId, limit);
}
for (const sessionId of input.bootstrapRuntimeSessionIds ?? []) {
pushSessionId(ids, seen, sessionId, limit);
}
const history = Array.isArray(input.sessionHistory) ? input.sessionHistory : [];
for (let index = history.length - 1; index >= 0; index -= 1) {
pushSessionId(ids, seen, history[index], limit);
}
return ids;
}
export function isAgentTranscriptFileName(fileName: string): boolean {
return AGENT_TRANSCRIPT_RE.test(fileName);
}
export function getRelativeLogSourceParts(projectDir: string, targetPath: string): string[] | null {
const relativePath = path.relative(projectDir, targetPath);
if (!relativePath || relativePath.startsWith('..') || path.isAbsolute(relativePath)) {
return null;
}
return relativePath.split(/[/\\]+/).filter(Boolean);
}
export function classifyLogSourceWatcherEvent(
input: ClassifyLogSourceWatcherEventInput
): LogSourceWatcherAction {
const parts = getRelativeLogSourceParts(input.projectDir, input.changedPath);
if (!parts) {
return { kind: 'ignore' };
}
const first = parts[0];
if (first === BOARD_TASK_LOG_FRESHNESS_DIRNAME || first === BOARD_TASK_CHANGE_FRESHNESS_DIRNAME) {
return { kind: 'task-freshness' };
}
if (
first === BOARD_TASK_CHANGES_DIRNAME ||
parts.includes('tool-results') ||
parts.includes('memory')
) {
return { kind: 'ignore' };
}
if (parts.length === 1 && first.endsWith('.jsonl')) {
const sessionId = normalizeLogSourceSessionId(first.slice(0, -'.jsonl'.length));
if (!sessionId) {
return { kind: 'ignore' };
}
if (input.scopedSessionIds.has(sessionId)) {
return { kind: 'scoped-recompute' };
}
if (input.eventName === 'add' || input.pendingUnknownSessionIds.has(sessionId)) {
return { kind: 'context-refresh', candidateSessionId: sessionId };
}
return { kind: 'ignore' };
}
if (input.scopedSessionIds.has(first)) {
if (parts.length === 1) {
return { kind: 'scoped-recompute' };
}
if (parts[1] === 'subagents') {
if (parts.length === 2) {
return { kind: 'context-refresh' };
}
if (parts.length === 3 && isAgentTranscriptFileName(parts[2])) {
return { kind: 'scoped-recompute' };
}
}
}
return { kind: 'ignore' };
}

View file

@ -30,10 +30,11 @@ describe('TeamLogSourceTracker', () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-'));
const logsFinder = {
getLogSourceWatchContext: vi.fn(async () => ({
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: [],
})),
watchSessionIds: [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
@ -66,10 +67,11 @@ describe('TeamLogSourceTracker', () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-refcount-'));
const logsFinder = {
getLogSourceWatchContext: vi.fn(async () => ({
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: [],
})),
watchSessionIds: [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
@ -104,14 +106,109 @@ describe('TeamLogSourceTracker', () => {
expect(emitter).not.toHaveBeenCalled();
});
it('emits log-source-change for scoped root transcripts', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-scoped-root-'));
await writeFile(path.join(tempDir, 'lead-session.jsonl'), '{"seq":1}\n');
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: ['lead-session'],
watchSessionIds: ['lead-session'],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
const emitter = vi.fn<(event: TeamChangeEvent) => void>();
tracker.setEmitter(emitter);
await tracker.enableTracking('demo', 'change_presence');
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
await writeFile(path.join(tempDir, 'lead-session.jsonl'), '{"seq":2}\n');
await vi.waitFor(() => {
expect(emitter).toHaveBeenCalledWith({
type: 'log-source-change',
teamName: 'demo',
});
});
await tracker.disableTracking('demo', 'change_presence');
});
it('ignores old unscoped root transcript changes', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-unscoped-root-'));
await writeFile(path.join(tempDir, 'lead-session.jsonl'), '{"seq":1}\n');
await writeFile(path.join(tempDir, 'old-session.jsonl'), '{"seq":1}\n');
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: ['lead-session'],
watchSessionIds: ['lead-session'],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
const emitter = vi.fn<(event: TeamChangeEvent) => void>();
tracker.setEmitter(emitter);
await tracker.enableTracking('demo', 'change_presence');
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
await writeFile(path.join(tempDir, 'old-session.jsonl'), '{"seq":2}\n');
await new Promise((resolve) => setTimeout(resolve, 450));
expect(emitter.mock.calls.map(([event]) => event.type)).not.toContain('log-source-change');
await tracker.disableTracking('demo', 'change_presence');
});
it('emits log-source-change when a pending root transcript becomes confirmed', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-pending-root-'));
let confirmed = false;
const logsFinder = {
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: confirmed ? ['new-runtime'] : [],
watchSessionIds: confirmed ? ['new-runtime'] : [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
const emitter = vi.fn<(event: TeamChangeEvent) => void>();
tracker.setEmitter(emitter);
await tracker.enableTracking('demo', 'change_presence');
emitter.mockClear();
await new Promise((resolve) => setTimeout(resolve, 100));
confirmed = true;
await writeFile(path.join(tempDir, 'new-runtime.jsonl'), '{"seq":1}\n');
await vi.waitFor(() => {
expect(emitter).toHaveBeenCalledWith({
type: 'log-source-change',
teamName: 'demo',
});
});
await tracker.disableTracking('demo', 'change_presence');
});
it('does not reinitialize when another consumer joins an already tracked team', async () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-init-'));
const logsFinder = {
getLogSourceWatchContext: vi.fn(async () => ({
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: [],
})),
watchSessionIds: [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
@ -119,7 +216,7 @@ describe('TeamLogSourceTracker', () => {
await tracker.enableTracking('demo', 'tool_activity');
await tracker.enableTracking('demo', 'task_log_stream');
expect(logsFinder.getLogSourceWatchContext).toHaveBeenCalledTimes(1);
expect(logsFinder.getLiveLogSourceWatchContext).toHaveBeenCalledTimes(1);
await tracker.disableTracking('demo', 'task_log_stream');
await tracker.disableTracking('demo', 'tool_activity');
@ -127,10 +224,11 @@ describe('TeamLogSourceTracker', () => {
it('notifies log-source listeners before forwarding the external team change event', () => {
const logsFinder = {
getLogSourceWatchContext: vi.fn(async () => ({
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: '/tmp/demo',
sessionIds: [],
})),
watchSessionIds: [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
const events: string[] = [];
@ -154,10 +252,11 @@ describe('TeamLogSourceTracker', () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-stall-monitor-'));
const logsFinder = {
getLogSourceWatchContext: vi.fn(async () => ({
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: [],
})),
watchSessionIds: [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);
@ -188,10 +287,11 @@ describe('TeamLogSourceTracker', () => {
tempDir = await mkdtemp(path.join(tmpdir(), 'team-log-source-tracker-safe-task-'));
const logsFinder = {
getLogSourceWatchContext: vi.fn(async () => ({
getLiveLogSourceWatchContext: vi.fn(async () => ({
projectDir: tempDir!,
sessionIds: [],
})),
watchSessionIds: [],
})),
} as unknown as TeamMemberLogsFinder;
const tracker = new TeamLogSourceTracker(logsFinder);

View file

@ -1,6 +1,6 @@
import * as os from 'os';
import * as path from 'path';
import { afterEach, describe, expect, it } from 'vitest';
import { afterEach, describe, expect, it, vi } from 'vitest';
import * as fs from 'fs/promises';
@ -18,6 +18,82 @@ describe('TeamMemberLogsFinder', () => {
}
});
it('builds live log source context without broad transcript discovery', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-logs-live-context-'));
setClaudeBasePathOverride(tmpDir);
const teamName = 'live-context-team';
const projectPath = '/Users/test/live-context';
const projectRoot = path.join(tmpDir, 'projects', '-Users-test-live-context');
const config = {
name: teamName,
projectPath,
leadSessionId: 'lead-session',
sessionHistory: ['old-session', 'recent-session'],
members: [],
};
await fs.mkdir(projectRoot, { recursive: true });
const projectResolver = {
getLiveBaseContext: vi.fn(async () => ({
projectDir: projectRoot,
projectId: '-Users-test-live-context',
config,
})),
getContext: vi.fn(async () => {
throw new Error('broad context must not be used for live tracking');
}),
};
const launchStateStore = {
read: vi.fn(async () => ({
version: 2,
teamName,
updatedAt: '2026-05-03T12:00:00.000Z',
leadSessionId: 'lead-session',
launchPhase: 'active',
expectedMembers: ['bob'],
members: {
bob: {
name: 'bob',
launchState: 'runtime_pending_bootstrap',
agentToolAccepted: true,
runtimeAlive: false,
bootstrapConfirmed: false,
hardFailure: false,
runtimeSessionId: 'runtime-bob',
updatedAt: '2026-05-03T12:00:00.000Z',
},
},
summary: {},
teamLaunchState: 'partial_pending',
})),
};
const finder = new TeamMemberLogsFinder(
undefined,
undefined,
undefined,
projectResolver as never,
launchStateStore as never
);
const context = await finder.getLiveLogSourceWatchContext(teamName, { forceRefresh: true });
expect(projectResolver.getLiveBaseContext).toHaveBeenCalledWith(
teamName,
expect.objectContaining({ forceRefresh: true })
);
expect(projectResolver.getContext).not.toHaveBeenCalled();
expect(context?.projectDir).toBe(projectRoot);
expect(context?.watchSessionIds).toEqual([
'lead-session',
'runtime-bob',
'recent-session',
'old-session',
]);
expect(context?.sessionIds).toEqual(context?.watchSessionIds);
});
it('returns subagent logs for a member and lead session for team-lead', async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'claude-team-logs-'));
setClaudeBasePathOverride(tmpDir);

View file

@ -164,6 +164,35 @@ describe('TeamTranscriptProjectResolver', () => {
expect(getConfig).not.toHaveBeenCalled();
});
it('resolves live base context from cheap projectPath evidence without session discovery', async () => {
await setupClaudeRoot();
const teamName = 'live-base-team';
const projectPath = '/Users/test/live-base';
const projectDir = path.join(tmpDir!, 'projects', encodePath(projectPath));
await fs.mkdir(projectDir, { recursive: true });
const getConfig = vi.fn(async () => {
throw new Error('verified config read should not be used for live base context');
});
const getConfigSnapshot = vi.fn(async () => ({
name: teamName,
projectPath,
leadSessionId: 'lead-session',
members: [{ name: 'team-lead', agentType: 'team-lead' }],
}));
const resolver = new TeamTranscriptProjectResolver({
getConfig,
getConfigSnapshot,
});
const context = await resolver.getLiveBaseContext(teamName, { forceRefresh: true });
expect(context?.projectDir).toBe(projectDir);
expect(context?.config.projectPath).toBe(projectPath);
expect(getConfigSnapshot).toHaveBeenCalledWith(teamName);
expect(getConfig).not.toHaveBeenCalled();
});
it('repairs stale projectPath when exact leadSessionId exists only in the renamed project', async () => {
await setupClaudeRoot();

View file

@ -0,0 +1,94 @@
import * as path from 'path';
import { describe, expect, it } from 'vitest';
import {
buildTeamLogWatchSessionIds,
classifyLogSourceWatcherEvent,
extractRuntimeSessionIds,
normalizeLogSourceSessionId,
} from '../../../../src/main/services/team/teamLogSourceWatchScope';
describe('teamLogSourceWatchScope', () => {
it('builds a bounded confirmed session scope with lead, runtime ids, and newest history first', () => {
const ids = buildTeamLogWatchSessionIds({
configLeadSessionId: 'lead-session',
launchLeadSessionId: 'lead-session',
launchRuntimeSessionIds: ['runtime-bob'],
sessionHistory: ['old-session', 'recent-session', 'lead-session'],
});
expect(ids).toEqual(['lead-session', 'runtime-bob', 'recent-session', 'old-session']);
});
it('normalizes session ids defensively', () => {
expect(normalizeLogSourceSessionId(' valid_id-1 ')).toBe('valid_id-1');
expect(normalizeLogSourceSessionId('')).toBeNull();
expect(normalizeLogSourceSessionId('../escape')).toBeNull();
expect(normalizeLogSourceSessionId('with/slash')).toBeNull();
});
it('extracts runtime session ids from launch members that can still write early logs', () => {
const ids = extractRuntimeSessionIds({
members: {
pending: {
launchState: 'runtime_pending_bootstrap',
runtimeSessionId: 'runtime-pending',
hardFailure: false,
},
failed: {
launchState: 'failed_to_start',
runtimeSessionId: 'runtime-failed',
hardFailure: true,
},
},
} as never);
expect(ids).toEqual(['runtime-pending']);
});
it('classifies unknown, pending, scoped, and ignored watcher events', () => {
const projectDir = '/tmp/project';
const scopedSessionIds = new Set(['lead-session']);
const pendingUnknownSessionIds = new Set(['new-runtime']);
expect(
classifyLogSourceWatcherEvent({
projectDir,
changedPath: path.join(projectDir, 'old-session.jsonl'),
eventName: 'change',
scopedSessionIds,
pendingUnknownSessionIds,
})
).toEqual({ kind: 'ignore' });
expect(
classifyLogSourceWatcherEvent({
projectDir,
changedPath: path.join(projectDir, 'new-runtime.jsonl'),
eventName: 'change',
scopedSessionIds,
pendingUnknownSessionIds,
})
).toEqual({ kind: 'context-refresh', candidateSessionId: 'new-runtime' });
expect(
classifyLogSourceWatcherEvent({
projectDir,
changedPath: path.join(projectDir, 'lead-session', 'subagents', 'agent-worker.jsonl'),
eventName: 'change',
scopedSessionIds,
pendingUnknownSessionIds,
})
).toEqual({ kind: 'scoped-recompute' });
expect(
classifyLogSourceWatcherEvent({
projectDir,
changedPath: path.join(projectDir, 'lead-session', 'subagents', 'agent-acompact-x.jsonl'),
eventName: 'change',
scopedSessionIds,
pendingUnknownSessionIds,
})
).toEqual({ kind: 'ignore' });
});
});