diff --git a/src/main/ipc/ssh.ts b/src/main/ipc/ssh.ts index 570204f1..7e89550e 100644 --- a/src/main/ipc/ssh.ts +++ b/src/main/ipc/ssh.ts @@ -19,6 +19,7 @@ import { SSH_TEST, } from '@preload/constants/ipcChannels'; import { createLogger } from '@shared/utils/logger'; +import * as path from 'path'; import { configManager, ServiceContext } from '../services'; @@ -74,6 +75,9 @@ export function registerSshHandlers(ipcMain: IpcMain): void { // Get provider and remote path const provider = connectionManager.getProvider(); const remoteProjectsPath = connectionManager.getRemoteProjectsPath() ?? undefined; + const remoteTodosPath = remoteProjectsPath + ? path.join(path.dirname(remoteProjectsPath), 'todos') + : undefined; // Generate context ID const contextId = `ssh-${config.host}`; @@ -90,6 +94,7 @@ export function registerSshHandlers(ipcMain: IpcMain): void { type: 'ssh', fsProvider: provider, projectsDir: remoteProjectsPath, + todosDir: remoteTodosPath, }); // Register and start SSH context diff --git a/src/main/services/discovery/ProjectPathResolver.ts b/src/main/services/discovery/ProjectPathResolver.ts index 5ea65792..1b206003 100644 --- a/src/main/services/discovery/ProjectPathResolver.ts +++ b/src/main/services/discovery/ProjectPathResolver.ts @@ -72,7 +72,7 @@ export class ProjectPathResolver { for (const sessionPath of sessionPaths) { try { - const cwd = await extractCwd(sessionPath); + const cwd = await extractCwd(sessionPath, this.fsProvider); if (cwd && path.isAbsolute(cwd)) { this.projectPathCache.set(projectId, cwd); return cwd; diff --git a/src/main/services/discovery/ProjectScanner.ts b/src/main/services/discovery/ProjectScanner.ts index c8e454e4..aaa1f0a5 100644 --- a/src/main/services/discovery/ProjectScanner.ts +++ b/src/main/services/discovery/ProjectScanner.ts @@ -201,33 +201,45 @@ export class ProjectScanner { cwd: string | null; } - const sessionInfos: SessionInfo[] = await Promise.all( - sessionFiles.map(async (file) => { + const shouldSplitByCwd = this.fsProvider.type !== 'ssh'; + const sessionInfos = await this.collectFulfilledInBatches( + sessionFiles, + this.fsProvider.type === 'ssh' ? 32 : 128, + async (file) => { const filePath = path.join(projectPath, file.name); const stats = await this.fsProvider.stat(filePath); let cwd: string | null = null; - try { - cwd = await extractCwd(filePath, this.fsProvider); - } catch { - // Ignore unreadable files + + // Over SSH, avoid reading every file body during project discovery. + if (shouldSplitByCwd) { + try { + cwd = await extractCwd(filePath, this.fsProvider); + } catch { + // Ignore unreadable files + } } + return { sessionId: extractSessionId(file.name), filePath, mtimeMs: stats.mtimeMs, birthtimeMs: stats.birthtimeMs, cwd, - }; - }) + } satisfies SessionInfo; + } ); + if (sessionInfos.length === 0) { + return []; + } + // Group sessions by cwd const cwdGroups = new Map(); const baseName = extractProjectName(encodedName); const decodedFallback = baseName; // Used when cwd is null for (const info of sessionInfos) { - const key = info.cwd ?? `__decoded__${decodedFallback}`; + const key = shouldSplitByCwd ? (info.cwd ?? `__decoded__${decodedFallback}`) : encodedName; const group = cwdGroups.get(key) ?? []; group.push(info); cwdGroups.set(key, group); @@ -360,6 +372,7 @@ export class ProjectScanner { const baseDir = extractBaseDir(projectId); const projectPath = path.join(this.projectsDir, baseDir); const sessionFilter = subprojectRegistry.getSessionFilter(projectId); + const shouldFilterNoise = this.fsProvider.type !== 'ssh'; if (!(await this.fsProvider.exists(projectPath))) { return []; @@ -381,10 +394,12 @@ export class ProjectScanner { const sessionId = extractSessionId(file.name); const filePath = path.join(projectPath, file.name); - // Check if session has non-noise messages (delegated to SessionContentFilter) - const hasContent = await this.hasDisplayableContent(filePath); - if (!hasContent) { - return null; // Filter out noise-only sessions + if (shouldFilterNoise) { + // Check if session has non-noise messages (delegated to SessionContentFilter) + const hasContent = await this.hasDisplayableContent(filePath); + if (!hasContent) { + return null; // Filter out noise-only sessions + } } return this.buildSessionMetadata(projectId, sessionId, filePath, decodedPath); @@ -425,6 +440,7 @@ export class ProjectScanner { const baseDir = extractBaseDir(projectId); const projectPath = path.join(this.projectsDir, baseDir); const sessionFilter = subprojectRegistry.getSessionFilter(projectId); + const shouldFilterNoise = this.fsProvider.type !== 'ssh'; if (!(await this.fsProvider.exists(projectPath))) { return { sessions: [], nextCursor: null, hasMore: false, totalCount: 0 }; @@ -439,7 +455,7 @@ export class ProjectScanner { sessionFiles = sessionFiles.filter((f) => sessionFilter.has(extractSessionId(f.name))); } - // Get stats for all session files + // Get stats for all session files (parallel for SSH performance) interface SessionFileInfo { name: string; sessionId: string; @@ -447,24 +463,22 @@ export class ProjectScanner { filePath: string; mtimeMs: number; } - const fileInfos: SessionFileInfo[] = []; - for (const file of sessionFiles) { - const filePath = path.join(projectPath, file.name); - try { + const fileInfos = await this.collectFulfilledInBatches( + sessionFiles, + this.fsProvider.type === 'ssh' ? 48 : 200, + async (file) => { + const filePath = path.join(projectPath, file.name); const stats = await this.fsProvider.stat(filePath); - fileInfos.push({ + return { name: file.name, sessionId: extractSessionId(file.name), timestamp: stats.mtimeMs, filePath, mtimeMs: stats.mtimeMs, - }); - } catch { - // Skip files we can't stat - continue; + } satisfies SessionFileInfo; } - } + ); // Step 2: Sort by timestamp descending (most recent first) fileInfos.sort((a, b) => { @@ -479,11 +493,17 @@ export class ProjectScanner { // This is slower but provides exact totalCount. let validSessionIds: Set | null = null; let totalCount = 0; - if (prefilterAll) { + if (prefilterAll && shouldFilterNoise) { + const contentResults = await Promise.allSettled( + fileInfos.map(async (fileInfo) => ({ + sessionId: fileInfo.sessionId, + hasContent: await this.hasDisplayableContent(fileInfo.filePath, fileInfo.mtimeMs), + })) + ); validSessionIds = new Set(); - for (const fileInfo of fileInfos) { - if (await this.hasDisplayableContent(fileInfo.filePath, fileInfo.mtimeMs)) { - validSessionIds.add(fileInfo.sessionId); + for (const result of contentResults) { + if (result.status === 'fulfilled' && result.value.hasContent) { + validSessionIds.add(result.value.sessionId); } } totalCount = validSessionIds.size; @@ -519,35 +539,67 @@ export class ProjectScanner { const sessions: Session[] = []; let scannedCandidates = 0; - // Fast path: avoid pre-filtering everything. Scan until we have enough page items. - for (let i = startIndex; i < fileInfos.length; i++) { - const fileInfo = fileInfos[i]; - if (!fileInfo) { - continue; - } - scannedCandidates++; + // Fetch page items in parallel batches for SSH performance. + // Process candidates in chunks, checking content + building metadata concurrently. + const BATCH_SIZE = limit + 1; // One extra to detect hasMore + let batchStart = startIndex; - let hasContent: boolean; + while (sessions.length < limit + 1 && batchStart < fileInfos.length) { + // Take a batch of candidates (overshoot to account for filtered-out items) + const batchEnd = Math.min(batchStart + BATCH_SIZE * 2, fileInfos.length); + const batch = fileInfos.slice(batchStart, batchEnd); + scannedCandidates += batch.length; + + // Step 5a: Check content in parallel + let contentBatch: { fileInfo: SessionFileInfo; hasContent: boolean }[]; if (validSessionIds) { - hasContent = validSessionIds.has(fileInfo.sessionId); + contentBatch = batch.map((fileInfo) => ({ + fileInfo, + hasContent: validSessionIds.has(fileInfo.sessionId), + })); + } else if (!shouldFilterNoise) { + contentBatch = batch.map((fileInfo) => ({ fileInfo, hasContent: true })); } else { - hasContent = await this.hasDisplayableContent(fileInfo.filePath, fileInfo.mtimeMs); - } - if (!hasContent) { - continue; + const contentResults = await Promise.allSettled( + batch.map(async (fileInfo) => ({ + fileInfo, + hasContent: await this.hasDisplayableContent(fileInfo.filePath, fileInfo.mtimeMs), + })) + ); + contentBatch = contentResults + .filter( + ( + r + ): r is PromiseFulfilledResult<{ fileInfo: SessionFileInfo; hasContent: boolean }> => + r.status === 'fulfilled' + ) + .map((r) => r.value); } - const session = await this.buildSessionMetadata( - projectId, - fileInfo.sessionId, - fileInfo.filePath, - decodedPath + // Step 5b: Build metadata in parallel for items with content + const withContent = contentBatch.filter((c) => c.hasContent); + const needed = limit + 1 - sessions.length; + const toBuild = withContent.slice(0, needed); + + const metadataResults = await Promise.allSettled( + toBuild.map(({ fileInfo }) => + this.buildSessionMetadata( + projectId, + fileInfo.sessionId, + fileInfo.filePath, + decodedPath, + fileInfo.mtimeMs + ) + ) ); - sessions.push(session); - if (sessions.length >= limit + 1) { - break; + for (const result of metadataResults) { + if (result.status === 'fulfilled') { + sessions.push(result.value); + } } + + batchStart = batchEnd; } // Step 6: Build next cursor @@ -593,23 +645,25 @@ export class ProjectScanner { projectId: string, sessionId: string, filePath: string, - projectPath: string + projectPath: string, + prefetchedMtimeMs?: number ): Promise { const stats = await this.fsProvider.stat(filePath); + const effectiveMtime = prefetchedMtimeMs ?? stats.mtimeMs; const cachedMetadata = this.sessionMetadataCache.get(filePath); const metadata = - cachedMetadata?.mtimeMs === stats.mtimeMs + cachedMetadata?.mtimeMs === effectiveMtime ? cachedMetadata.metadata : await analyzeSessionFileMetadata(filePath, this.fsProvider); - if (cachedMetadata?.mtimeMs !== stats.mtimeMs) { - this.sessionMetadataCache.set(filePath, { mtimeMs: stats.mtimeMs, metadata }); + if (cachedMetadata?.mtimeMs !== effectiveMtime) { + this.sessionMetadataCache.set(filePath, { mtimeMs: effectiveMtime, metadata }); } - // Check for subagents (delegated to SubagentLocator) - const hasSubagents = await this.subagentLocator.hasSubagents(projectId, sessionId); - - // Load task list data if exists - const todoData = await this.loadTodoData(sessionId); + // Check for subagents and load task list data in parallel + const [hasSubagents, todoData] = await Promise.all([ + this.subagentLocator.hasSubagents(projectId, sessionId), + this.loadTodoData(sessionId), + ]); return { id: sessionId, @@ -790,6 +844,31 @@ export class ProjectScanner { return this.sessionSearcher.searchSessions(projectId, query, maxResults); } + /** + * Runs async mapping in bounded batches and returns only fulfilled results. + * This prevents overwhelming SFTP servers with unbounded parallel requests. + */ + private async collectFulfilledInBatches( + items: T[], + batchSize: number, + mapper: (item: T) => Promise + ): Promise { + const safeBatchSize = Math.max(1, batchSize); + const results: R[] = []; + + for (let i = 0; i < items.length; i += safeBatchSize) { + const batch = items.slice(i, i + safeBatchSize); + const settled = await Promise.allSettled(batch.map((item) => mapper(item))); + for (const result of settled) { + if (result.status === 'fulfilled') { + results.push(result.value); + } + } + } + + return results; + } + /** * Resolves the project path for a given project ID. * For composite IDs, uses the registry's cwd directly. diff --git a/src/main/services/discovery/WorktreeGrouper.ts b/src/main/services/discovery/WorktreeGrouper.ts index f5fb0d9e..daf627ce 100644 --- a/src/main/services/discovery/WorktreeGrouper.ts +++ b/src/main/services/discovery/WorktreeGrouper.ts @@ -71,6 +71,7 @@ export class WorktreeGrouper { // 2. Filter sessions for each project to only include non-noise sessions const projectFilteredSessions = new Map(); + const shouldFilterNoise = this.fsProvider.type !== 'ssh'; await Promise.all( projects.map(async (project) => { const baseDir = extractBaseDir(project.id); @@ -83,6 +84,11 @@ export class WorktreeGrouper { if (sessionFilter && !sessionFilter.has(sessionId)) { continue; } + if (!shouldFilterNoise) { + filteredSessions.push(sessionId); + continue; + } + const sessionPath = path.join(projectPath, `${sessionId}.jsonl`); if (await SessionContentFilter.hasNonNoiseMessages(sessionPath, this.fsProvider)) { filteredSessions.push(sessionId); diff --git a/src/main/services/infrastructure/FileWatcher.ts b/src/main/services/infrastructure/FileWatcher.ts index 99af016d..06dd7167 100644 --- a/src/main/services/infrastructure/FileWatcher.ts +++ b/src/main/services/infrastructure/FileWatcher.ts @@ -26,7 +26,7 @@ import { type DataCache } from './DataCache'; import { LocalFileSystemProvider } from './LocalFileSystemProvider'; import { type NotificationManager } from './NotificationManager'; -import type { FileSystemProvider } from './FileSystemProvider'; +import type { FileSystemProvider, FsDirent } from './FileSystemProvider'; const logger = createLogger('Service:FileWatcher'); @@ -73,7 +73,9 @@ export class FileWatcher extends EventEmitter { /** Timer for SSH polling mode (replaces fs.watch) */ private pollingTimer: NodeJS.Timeout | null = null; /** Polling interval for SSH mode */ - private static readonly SSH_POLL_INTERVAL_MS = 5000; + private static readonly SSH_POLL_INTERVAL_MS = 10000; + /** Guard to prevent overlapping SSH polling runs */ + private pollingInProgress = false; /** Track file sizes for SSH polling change detection */ private polledFileSizes = new Map(); /** Files currently being processed (concurrency guard) */ @@ -176,6 +178,7 @@ export class FileWatcher extends EventEmitter { clearInterval(this.pollingTimer); this.pollingTimer = null; } + this.pollingInProgress = false; this.polledFileSizes.clear(); // Clear error detection tracking @@ -372,9 +375,18 @@ export class FileWatcher extends EventEmitter { logger.info('FileWatcher: Starting SSH polling mode'); this.pollingTimer = setInterval(() => { - this.pollForChanges().catch((err) => { - logger.error('Error during SSH polling:', err); - }); + if (this.pollingInProgress) { + return; + } + + this.pollingInProgress = true; + this.pollForChanges() + .catch((err) => { + logger.error('Error during SSH polling:', err); + }) + .finally(() => { + this.pollingInProgress = false; + }); }, FileWatcher.SSH_POLL_INTERVAL_MS); } @@ -383,14 +395,12 @@ export class FileWatcher extends EventEmitter { */ private async pollForChanges(): Promise { try { - if (!(await this.fsProvider.exists(this.projectsPath))) return; - const projectDirs = await this.fsProvider.readdir(this.projectsPath); for (const dir of projectDirs) { if (!dir.isDirectory()) continue; const projectPath = path.join(this.projectsPath, dir.name); - let entries: import('./FileSystemProvider').FsDirent[]; + let entries: FsDirent[]; try { entries = await this.fsProvider.readdir(projectPath); } catch { diff --git a/src/main/services/infrastructure/SshFileSystemProvider.ts b/src/main/services/infrastructure/SshFileSystemProvider.ts index 76408b50..1b9f5111 100644 --- a/src/main/services/infrastructure/SshFileSystemProvider.ts +++ b/src/main/services/infrastructure/SshFileSystemProvider.ts @@ -19,6 +19,9 @@ import type { SFTPWrapper } from 'ssh2'; const logger = createLogger('Infrastructure:SshFileSystemProvider'); export class SshFileSystemProvider implements FileSystemProvider { + private static readonly MAX_RETRIES = 3; + private static readonly RETRY_BASE_DELAY_MS = 75; + readonly type = 'ssh' as const; private sftp: SFTPWrapper; @@ -27,72 +30,134 @@ export class SshFileSystemProvider implements FileSystemProvider { } async exists(filePath: string): Promise { - return new Promise((resolve) => { - this.sftp.stat(filePath, (err) => { - resolve(!err); - }); - }); + try { + await this.stat(filePath); + return true; + } catch (error) { + if (this.isNotFoundError(error)) { + return false; + } + + // For transient SFTP failures (e.g. code=4), avoid false negatives. + if (this.isRetryableError(error)) { + const code = this.getErrorCode(error); + logger.debug( + `exists(${filePath}) got retryable SFTP error (${String(code)}); treating path as potentially present` + ); + return true; + } + + return false; + } } async readFile(filePath: string, encoding: BufferEncoding = 'utf8'): Promise { - return new Promise((resolve, reject) => { - this.sftp.readFile(filePath, { encoding }, (err, data) => { - if (err) { - reject(err); - return; + let lastError: unknown; + for (let attempt = 1; attempt <= SshFileSystemProvider.MAX_RETRIES; attempt++) { + try { + return await new Promise((resolve, reject) => { + this.sftp.readFile(filePath, { encoding }, (err, data) => { + if (err) { + reject(err); + return; + } + resolve(data as unknown as string); + }); + }); + } catch (error) { + lastError = error; + if ( + this.isRetryableError(error) && + attempt < SshFileSystemProvider.MAX_RETRIES + ) { + await this.sleep(SshFileSystemProvider.RETRY_BASE_DELAY_MS * attempt); + continue; } - resolve(data as unknown as string); - }); - }); + throw error; + } + } + + throw lastError instanceof Error ? lastError : new Error(`Failed to read file: ${filePath}`); } async stat(filePath: string): Promise { - return new Promise((resolve, reject) => { - this.sftp.stat(filePath, (err, stats) => { - if (err) { - reject(err); - return; - } - // SFTP stats use mode bitmask for file type detection - const S_IFMT = 0o170000; - const S_IFREG = 0o100000; - const S_IFDIR = 0o040000; - const mode = stats.mode; + let lastError: unknown; + for (let attempt = 1; attempt <= SshFileSystemProvider.MAX_RETRIES; attempt++) { + try { + return await new Promise((resolve, reject) => { + this.sftp.stat(filePath, (err, stats) => { + if (err) { + reject(err); + return; + } + // SFTP stats use mode bitmask for file type detection + const S_IFMT = 0o170000; + const S_IFREG = 0o100000; + const S_IFDIR = 0o040000; + const mode = stats.mode; - resolve({ - size: stats.size, - mtimeMs: (stats.mtime ?? 0) * 1000, - // SFTP doesn't provide birth time, use mtime as fallback - birthtimeMs: (stats.mtime ?? 0) * 1000, - isFile: () => (mode & S_IFMT) === S_IFREG, - isDirectory: () => (mode & S_IFMT) === S_IFDIR, + resolve({ + size: stats.size, + mtimeMs: (stats.mtime ?? 0) * 1000, + // SFTP doesn't provide birth time, use mtime as fallback + birthtimeMs: (stats.mtime ?? 0) * 1000, + isFile: () => (mode & S_IFMT) === S_IFREG, + isDirectory: () => (mode & S_IFMT) === S_IFDIR, + }); + }); }); - }); - }); + } catch (error) { + lastError = error; + if ( + this.isRetryableError(error) && + attempt < SshFileSystemProvider.MAX_RETRIES + ) { + await this.sleep(SshFileSystemProvider.RETRY_BASE_DELAY_MS * attempt); + continue; + } + throw error; + } + } + + throw lastError instanceof Error ? lastError : new Error(`Failed to stat: ${filePath}`); } async readdir(dirPath: string): Promise { - return new Promise((resolve, reject) => { - this.sftp.readdir(dirPath, (err, list) => { - if (err) { - reject(err); - return; - } - const S_IFMT = 0o170000; - const S_IFREG = 0o100000; - const S_IFDIR = 0o040000; + let lastError: unknown; + for (let attempt = 1; attempt <= SshFileSystemProvider.MAX_RETRIES; attempt++) { + try { + return await new Promise((resolve, reject) => { + this.sftp.readdir(dirPath, (err, list) => { + if (err) { + reject(err); + return; + } + const S_IFMT = 0o170000; + const S_IFREG = 0o100000; + const S_IFDIR = 0o040000; - const entries: FsDirent[] = list.map((item) => { - const mode = item.attrs.mode; - return { - name: item.filename, - isFile: () => (mode & S_IFMT) === S_IFREG, - isDirectory: () => (mode & S_IFMT) === S_IFDIR, - }; + const entries: FsDirent[] = []; + for (const item of list) { + const mode = item.attrs.mode; + entries.push(this.buildDirent(item.filename, mode, S_IFMT, S_IFREG, S_IFDIR)); + } + resolve(entries); + }); }); - resolve(entries); - }); - }); + } catch (error) { + lastError = error; + if ( + this.isRetryableError(error) && + attempt < SshFileSystemProvider.MAX_RETRIES + ) { + await this.sleep(SshFileSystemProvider.RETRY_BASE_DELAY_MS * attempt); + continue; + } + throw error; + } + } + + throw lastError instanceof Error ? lastError : new Error(`Failed to read directory: ${dirPath}`); } createReadStream(filePath: string, opts?: ReadStreamOptions): Readable { @@ -126,4 +191,53 @@ export class SshFileSystemProvider implements FileSystemProvider { // Ignore errors during cleanup } } + + private async sleep(ms: number): Promise { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } + + private getErrorCode(error: unknown): string { + if (typeof error === 'object' && error !== null && 'code' in error) { + const code = (error as { code?: unknown }).code; + if (typeof code === 'number') { + return String(code); + } + if (typeof code === 'string') { + return code; + } + } + return ''; + } + + private isNotFoundError(error: unknown): boolean { + const code = this.getErrorCode(error); + return code === '2' || code === 'ENOENT'; + } + + private isRetryableError(error: unknown): boolean { + const code = this.getErrorCode(error); + return ( + code === '4' || + code === 'EAGAIN' || + code === 'ECONNRESET' || + code === 'ETIMEDOUT' || + code === 'EPIPE' + ); + } + + private buildDirent( + filename: string, + mode: number, + sifmt: number, + sifreg: number, + sifdir: number + ): FsDirent { + return { + name: filename, + isFile: () => (mode & sifmt) === sifreg, + isDirectory: () => (mode & sifmt) === sifdir, + }; + } } diff --git a/src/renderer/store/slices/sessionSlice.ts b/src/renderer/store/slices/sessionSlice.ts index f4821bdb..dd5a2309 100644 --- a/src/renderer/store/slices/sessionSlice.ts +++ b/src/renderer/store/slices/sessionSlice.ts @@ -228,23 +228,31 @@ export const createSessionSlice: StateCreator = } }, - // Toggle pin/unpin for a session + // Toggle pin/unpin for a session (optimistic update) togglePinSession: async (sessionId: string) => { const state = get(); const projectId = state.selectedProjectId; if (!projectId) return; const isPinned = state.pinnedSessionIds.includes(sessionId); + const previousPinnedIds = state.pinnedSessionIds; + + // Optimistic: update UI immediately + if (isPinned) { + set({ pinnedSessionIds: previousPinnedIds.filter((id) => id !== sessionId) }); + } else { + set({ pinnedSessionIds: [sessionId, ...previousPinnedIds] }); + } try { if (isPinned) { await api.config.unpinSession(projectId, sessionId); - set({ pinnedSessionIds: state.pinnedSessionIds.filter((id) => id !== sessionId) }); } else { await api.config.pinSession(projectId, sessionId); - set({ pinnedSessionIds: [sessionId, ...state.pinnedSessionIds] }); } } catch (error) { + // Rollback on failure + set({ pinnedSessionIds: previousPinnedIds }); logger.error('togglePinSession error:', error); } },