perf: improve source listing speed by 20-30x (#436)
* fix(i18n): resolve podcast dialog translation infinite loop and profile issues - Remove incorrect translation keys for user-defined episode profiles - Cache translation strings in ContentSelectionPanel to avoid repeated Proxy accesses that triggered infinite loop detection - Stabilize useEffect dependencies with dataKey pattern to prevent re-initialization on every keystroke - Replace unstable sourcesQueries prop with stable fetchingNotebookIds set - Clean up unused getSourceModes function and TranslationKeys import * chore: bump lock * chore: bump version to 1.5.1 and update CHANGELOG * fix: guard .join() call in dataKey when query data is undefined * fix(api): use FETCH command instead of async status lookups for sources list Replace N async calls to surreal-commands with SurrealDB FETCH clause to resolve command status in a single query. This eliminates the command status cascade bottleneck. * perf(db): add indexes on source field for insights and embeddings Add migration #10 that creates indexes on the `source` field of `source_insight` and `source_embedding` tables. These indexes dramatically improve the performance of source listing queries that use subqueries to count insights and check embedding existence. Performance improvement: ~8.5s -> ~0.3s for 30 sources (28x faster) * perf(db): make index concurrent * fix: add IF NOT EXISTS to index definitions for idempotency * fix: address code review feedback - Add IF EXISTS to rollback migration for safer rollbacks - Add fallback for unresolved command references (status = "unknown")
This commit is contained in:
parent
c6ec1fcddf
commit
4dc1539838
4 changed files with 41 additions and 81 deletions
|
|
@ -181,7 +181,7 @@ async def get_sources(
|
|||
if not notebook:
|
||||
raise HTTPException(status_code=404, detail="Notebook not found")
|
||||
|
||||
# Query sources for specific notebook - include command field
|
||||
# Query sources for specific notebook - include command field with FETCH
|
||||
query = f"""
|
||||
SELECT id, asset, created, title, updated, topics, command,
|
||||
(SELECT VALUE count() FROM source_insight WHERE source = $parent.id GROUP ALL)[0].count OR 0 AS insights_count,
|
||||
|
|
@ -189,6 +189,7 @@ async def get_sources(
|
|||
FROM (select value in from reference where out=$notebook_id)
|
||||
{order_clause}
|
||||
LIMIT $limit START $offset
|
||||
FETCH command
|
||||
"""
|
||||
result = await repo_query(
|
||||
query,
|
||||
|
|
@ -199,7 +200,7 @@ async def get_sources(
|
|||
},
|
||||
)
|
||||
else:
|
||||
# Query all sources - include command field
|
||||
# Query all sources - include command field with FETCH
|
||||
query = f"""
|
||||
SELECT id, asset, created, title, updated, topics, command,
|
||||
(SELECT VALUE count() FROM source_insight WHERE source = $parent.id GROUP ALL)[0].count OR 0 AS insights_count,
|
||||
|
|
@ -207,93 +208,38 @@ async def get_sources(
|
|||
FROM source
|
||||
{order_clause}
|
||||
LIMIT $limit START $offset
|
||||
FETCH command
|
||||
"""
|
||||
result = await repo_query(query, {"limit": limit, "offset": offset})
|
||||
|
||||
# Extract command IDs for batch status fetching
|
||||
command_ids = []
|
||||
command_to_source = {}
|
||||
|
||||
for row in result:
|
||||
command = row.get("command")
|
||||
if command:
|
||||
command_str = str(command)
|
||||
command_ids.append(command_str)
|
||||
command_to_source[command_str] = row["id"]
|
||||
|
||||
# Batch fetch command statuses
|
||||
command_statuses = {}
|
||||
if command_ids:
|
||||
try:
|
||||
# Get status for all commands in batch (if the library supports it)
|
||||
# If not, we'll fall back to individual calls, but limit concurrent requests
|
||||
import asyncio
|
||||
|
||||
from surreal_commands import get_command_status
|
||||
|
||||
async def get_status_safe(command_id: str):
|
||||
try:
|
||||
status = await get_command_status(command_id)
|
||||
return (command_id, status)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to get status for command {command_id}: {e}"
|
||||
)
|
||||
return (command_id, None)
|
||||
|
||||
# Limit concurrent requests to avoid overwhelming the command service
|
||||
semaphore = asyncio.Semaphore(10)
|
||||
|
||||
async def get_status_with_limit(command_id: str):
|
||||
async with semaphore:
|
||||
return await get_status_safe(command_id)
|
||||
|
||||
# Fetch statuses concurrently but with limit
|
||||
status_tasks = [get_status_with_limit(cmd_id) for cmd_id in command_ids]
|
||||
status_results = await asyncio.gather(
|
||||
*status_tasks, return_exceptions=True
|
||||
)
|
||||
|
||||
# Process results
|
||||
for result_item in status_results:
|
||||
if isinstance(result_item, Exception):
|
||||
continue
|
||||
if isinstance(result_item, tuple) and len(result_item) == 2:
|
||||
cmd_id, status = result_item
|
||||
command_statuses[cmd_id] = status
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to batch fetch command statuses: {e}")
|
||||
|
||||
# Convert result to response model
|
||||
# Command data is already fetched via FETCH command clause
|
||||
response_list = []
|
||||
for row in result:
|
||||
command = row.get("command")
|
||||
command_id = str(command) if command else None
|
||||
command_id = None
|
||||
status = None
|
||||
processing_info = None
|
||||
|
||||
# Get status information if command exists
|
||||
if command_id and command_id in command_statuses:
|
||||
status_obj = command_statuses[command_id]
|
||||
if status_obj:
|
||||
status = status_obj.status
|
||||
# Extract execution metadata from nested result structure
|
||||
result_data: dict[str, Any] | None = getattr(
|
||||
status_obj, "result", None
|
||||
)
|
||||
execution_metadata: dict[str, Any] = (
|
||||
result_data.get("execution_metadata", {})
|
||||
if isinstance(result_data, dict)
|
||||
else {}
|
||||
)
|
||||
processing_info = {
|
||||
"started_at": execution_metadata.get("started_at"),
|
||||
"completed_at": execution_metadata.get("completed_at"),
|
||||
"error": getattr(status_obj, "error_message", None),
|
||||
}
|
||||
elif command_id:
|
||||
# Command exists but status couldn't be fetched
|
||||
# Extract status from fetched command object (already resolved by FETCH)
|
||||
if command and isinstance(command, dict):
|
||||
command_id = str(command.get("id")) if command.get("id") else None
|
||||
status = command.get("status")
|
||||
# Extract execution metadata from nested result structure
|
||||
result_data = command.get("result")
|
||||
execution_metadata = (
|
||||
result_data.get("execution_metadata", {})
|
||||
if isinstance(result_data, dict)
|
||||
else {}
|
||||
)
|
||||
processing_info = {
|
||||
"started_at": execution_metadata.get("started_at"),
|
||||
"completed_at": execution_metadata.get("completed_at"),
|
||||
"error": command.get("error_message"),
|
||||
}
|
||||
elif command:
|
||||
# Command exists but FETCH failed to resolve it (broken reference)
|
||||
command_id = str(command)
|
||||
status = "unknown"
|
||||
|
||||
response_list.append(
|
||||
|
|
@ -310,11 +256,11 @@ async def get_sources(
|
|||
if row.get("asset")
|
||||
else None,
|
||||
embedded=row.get("embedded", False),
|
||||
embedded_chunks=0, # Removed from query - not needed in list view
|
||||
embedded_chunks=0, # Not needed in list view
|
||||
insights_count=row.get("insights_count", 0),
|
||||
created=str(row["created"]),
|
||||
updated=str(row["updated"]),
|
||||
# Status fields
|
||||
# Status fields from fetched command
|
||||
command_id=command_id,
|
||||
status=status,
|
||||
processing_info=processing_info,
|
||||
|
|
|
|||
|
|
@ -105,6 +105,7 @@ class AsyncMigrationManager:
|
|||
AsyncMigration.from_file("open_notebook/database/migrations/7.surrealql"),
|
||||
AsyncMigration.from_file("open_notebook/database/migrations/8.surrealql"),
|
||||
AsyncMigration.from_file("open_notebook/database/migrations/9.surrealql"),
|
||||
AsyncMigration.from_file("open_notebook/database/migrations/10.surrealql"),
|
||||
]
|
||||
self.down_migrations = [
|
||||
AsyncMigration.from_file(
|
||||
|
|
@ -134,6 +135,9 @@ class AsyncMigrationManager:
|
|||
AsyncMigration.from_file(
|
||||
"open_notebook/database/migrations/9_down.surrealql"
|
||||
),
|
||||
AsyncMigration.from_file(
|
||||
"open_notebook/database/migrations/10_down.surrealql"
|
||||
),
|
||||
]
|
||||
self.runner = AsyncMigrationRunner(
|
||||
up_migrations=self.up_migrations,
|
||||
|
|
|
|||
6
open_notebook/database/migrations/10.surrealql
Normal file
6
open_notebook/database/migrations/10.surrealql
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
-- Migration 10: Add indexes for source_insight and source_embedding source field
|
||||
-- These indexes significantly improve performance of source listing queries
|
||||
-- that count insights and check embedding existence per source
|
||||
|
||||
DEFINE INDEX IF NOT EXISTS idx_source_insight_source ON source_insight FIELDS source CONCURRENTLY;
|
||||
DEFINE INDEX IF NOT EXISTS idx_source_embedding_source ON source_embedding FIELDS source CONCURRENTLY;
|
||||
4
open_notebook/database/migrations/10_down.surrealql
Normal file
4
open_notebook/database/migrations/10_down.surrealql
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
-- Rollback Migration 10: Remove source field indexes
|
||||
|
||||
REMOVE INDEX IF EXISTS idx_source_insight_source ON TABLE source_insight;
|
||||
REMOVE INDEX IF EXISTS idx_source_embedding_source ON TABLE source_embedding;
|
||||
Loading…
Reference in a new issue