diff --git a/api/routers/sources.py b/api/routers/sources.py index 3544cf5..22c8624 100644 --- a/api/routers/sources.py +++ b/api/routers/sources.py @@ -181,7 +181,7 @@ async def get_sources( if not notebook: raise HTTPException(status_code=404, detail="Notebook not found") - # Query sources for specific notebook - include command field + # Query sources for specific notebook - include command field with FETCH query = f""" SELECT id, asset, created, title, updated, topics, command, (SELECT VALUE count() FROM source_insight WHERE source = $parent.id GROUP ALL)[0].count OR 0 AS insights_count, @@ -189,6 +189,7 @@ async def get_sources( FROM (select value in from reference where out=$notebook_id) {order_clause} LIMIT $limit START $offset + FETCH command """ result = await repo_query( query, @@ -199,7 +200,7 @@ async def get_sources( }, ) else: - # Query all sources - include command field + # Query all sources - include command field with FETCH query = f""" SELECT id, asset, created, title, updated, topics, command, (SELECT VALUE count() FROM source_insight WHERE source = $parent.id GROUP ALL)[0].count OR 0 AS insights_count, @@ -207,93 +208,38 @@ async def get_sources( FROM source {order_clause} LIMIT $limit START $offset + FETCH command """ result = await repo_query(query, {"limit": limit, "offset": offset}) - # Extract command IDs for batch status fetching - command_ids = [] - command_to_source = {} - - for row in result: - command = row.get("command") - if command: - command_str = str(command) - command_ids.append(command_str) - command_to_source[command_str] = row["id"] - - # Batch fetch command statuses - command_statuses = {} - if command_ids: - try: - # Get status for all commands in batch (if the library supports it) - # If not, we'll fall back to individual calls, but limit concurrent requests - import asyncio - - from surreal_commands import get_command_status - - async def get_status_safe(command_id: str): - try: - status = await get_command_status(command_id) - return (command_id, status) - except Exception as e: - logger.warning( - f"Failed to get status for command {command_id}: {e}" - ) - return (command_id, None) - - # Limit concurrent requests to avoid overwhelming the command service - semaphore = asyncio.Semaphore(10) - - async def get_status_with_limit(command_id: str): - async with semaphore: - return await get_status_safe(command_id) - - # Fetch statuses concurrently but with limit - status_tasks = [get_status_with_limit(cmd_id) for cmd_id in command_ids] - status_results = await asyncio.gather( - *status_tasks, return_exceptions=True - ) - - # Process results - for result_item in status_results: - if isinstance(result_item, Exception): - continue - if isinstance(result_item, tuple) and len(result_item) == 2: - cmd_id, status = result_item - command_statuses[cmd_id] = status - - except Exception as e: - logger.warning(f"Failed to batch fetch command statuses: {e}") - # Convert result to response model + # Command data is already fetched via FETCH command clause response_list = [] for row in result: command = row.get("command") - command_id = str(command) if command else None + command_id = None status = None processing_info = None - # Get status information if command exists - if command_id and command_id in command_statuses: - status_obj = command_statuses[command_id] - if status_obj: - status = status_obj.status - # Extract execution metadata from nested result structure - result_data: dict[str, Any] | None = getattr( - status_obj, "result", None - ) - execution_metadata: dict[str, Any] = ( - result_data.get("execution_metadata", {}) - if isinstance(result_data, dict) - else {} - ) - processing_info = { - "started_at": execution_metadata.get("started_at"), - "completed_at": execution_metadata.get("completed_at"), - "error": getattr(status_obj, "error_message", None), - } - elif command_id: - # Command exists but status couldn't be fetched + # Extract status from fetched command object (already resolved by FETCH) + if command and isinstance(command, dict): + command_id = str(command.get("id")) if command.get("id") else None + status = command.get("status") + # Extract execution metadata from nested result structure + result_data = command.get("result") + execution_metadata = ( + result_data.get("execution_metadata", {}) + if isinstance(result_data, dict) + else {} + ) + processing_info = { + "started_at": execution_metadata.get("started_at"), + "completed_at": execution_metadata.get("completed_at"), + "error": command.get("error_message"), + } + elif command: + # Command exists but FETCH failed to resolve it (broken reference) + command_id = str(command) status = "unknown" response_list.append( @@ -310,11 +256,11 @@ async def get_sources( if row.get("asset") else None, embedded=row.get("embedded", False), - embedded_chunks=0, # Removed from query - not needed in list view + embedded_chunks=0, # Not needed in list view insights_count=row.get("insights_count", 0), created=str(row["created"]), updated=str(row["updated"]), - # Status fields + # Status fields from fetched command command_id=command_id, status=status, processing_info=processing_info, diff --git a/open_notebook/database/async_migrate.py b/open_notebook/database/async_migrate.py index 5128a2a..5121c53 100644 --- a/open_notebook/database/async_migrate.py +++ b/open_notebook/database/async_migrate.py @@ -105,6 +105,7 @@ class AsyncMigrationManager: AsyncMigration.from_file("open_notebook/database/migrations/7.surrealql"), AsyncMigration.from_file("open_notebook/database/migrations/8.surrealql"), AsyncMigration.from_file("open_notebook/database/migrations/9.surrealql"), + AsyncMigration.from_file("open_notebook/database/migrations/10.surrealql"), ] self.down_migrations = [ AsyncMigration.from_file( @@ -134,6 +135,9 @@ class AsyncMigrationManager: AsyncMigration.from_file( "open_notebook/database/migrations/9_down.surrealql" ), + AsyncMigration.from_file( + "open_notebook/database/migrations/10_down.surrealql" + ), ] self.runner = AsyncMigrationRunner( up_migrations=self.up_migrations, diff --git a/open_notebook/database/migrations/10.surrealql b/open_notebook/database/migrations/10.surrealql new file mode 100644 index 0000000..f1844c5 --- /dev/null +++ b/open_notebook/database/migrations/10.surrealql @@ -0,0 +1,6 @@ +-- Migration 10: Add indexes for source_insight and source_embedding source field +-- These indexes significantly improve performance of source listing queries +-- that count insights and check embedding existence per source + +DEFINE INDEX IF NOT EXISTS idx_source_insight_source ON source_insight FIELDS source CONCURRENTLY; +DEFINE INDEX IF NOT EXISTS idx_source_embedding_source ON source_embedding FIELDS source CONCURRENTLY; diff --git a/open_notebook/database/migrations/10_down.surrealql b/open_notebook/database/migrations/10_down.surrealql new file mode 100644 index 0000000..e23b979 --- /dev/null +++ b/open_notebook/database/migrations/10_down.surrealql @@ -0,0 +1,4 @@ +-- Rollback Migration 10: Remove source field indexes + +REMOVE INDEX IF EXISTS idx_source_insight_source ON TABLE source_insight; +REMOVE INDEX IF EXISTS idx_source_embedding_source ON TABLE source_embedding;