open-notebook/commands/embedding_commands.py
Luis Novo b7e656a319
Version 1 (#160)
New front-end
Launch Chat API
Manage Sources
Enable re-embedding of all contents
Sources can be added without a notebook now
Improved settings
Enable model selector on all chats
Background processing for better experience
Dark mode
Improved Notes

Improved Docs: 
- Remove all Streamlit references from documentation
- Update deployment guides with React frontend setup
- Fix Docker environment variables format (SURREAL_URL, SURREAL_PASSWORD)
- Update docker image tag from :latest to :v1-latest
- Change navigation references (Settings → Models to just Models)
- Update development setup to include frontend npm commands
- Add MIGRATION.md guide for users upgrading from Streamlit
- Update quick-start guide with correct environment variables
- Add port 5055 documentation for API access
- Update project structure to reflect frontend/ directory
- Remove outdated source-chat documentation files
2025-10-18 12:46:22 -03:00

392 lines
14 KiB
Python

import time
from typing import Dict, List, Literal, Optional
from loguru import logger
from pydantic import BaseModel
from surreal_commands import CommandInput, CommandOutput, command
from open_notebook.database.repository import ensure_record_id, repo_query
from open_notebook.domain.models import model_manager
from open_notebook.domain.notebook import Note, Source, SourceInsight
def full_model_dump(model):
if isinstance(model, BaseModel):
return model.model_dump()
elif isinstance(model, dict):
return {k: full_model_dump(v) for k, v in model.items()}
elif isinstance(model, list):
return [full_model_dump(item) for item in model]
else:
return model
class EmbedSingleItemInput(CommandInput):
item_id: str
item_type: Literal["source", "note", "insight"]
class EmbedSingleItemOutput(CommandOutput):
success: bool
item_id: str
item_type: str
chunks_created: int = 0 # For sources
processing_time: float
error_message: Optional[str] = None
class RebuildEmbeddingsInput(CommandInput):
mode: Literal["existing", "all"]
include_sources: bool = True
include_notes: bool = True
include_insights: bool = True
class RebuildEmbeddingsOutput(CommandOutput):
success: bool
total_items: int
processed_items: int
failed_items: int
sources_processed: int = 0
notes_processed: int = 0
insights_processed: int = 0
processing_time: float
error_message: Optional[str] = None
@command("embed_single_item", app="open_notebook")
async def embed_single_item_command(
input_data: EmbedSingleItemInput,
) -> EmbedSingleItemOutput:
"""
Embed a single item (source, note, or insight)
"""
start_time = time.time()
try:
logger.info(
f"Starting embedding for {input_data.item_type}: {input_data.item_id}"
)
# Check if embedding model is available
EMBEDDING_MODEL = await model_manager.get_embedding_model()
if not EMBEDDING_MODEL:
raise ValueError(
"No embedding model configured. Please configure one in the Models section."
)
chunks_created = 0
if input_data.item_type == "source":
# Get source and vectorize
source = await Source.get(input_data.item_id)
if not source:
raise ValueError(f"Source '{input_data.item_id}' not found")
await source.vectorize()
# Count chunks created
chunks_result = await repo_query(
"SELECT VALUE count() FROM source_embedding WHERE source = $source_id GROUP ALL",
{"source_id": ensure_record_id(input_data.item_id)},
)
if chunks_result and isinstance(chunks_result[0], dict):
chunks_created = chunks_result[0].get("count", 0)
elif chunks_result and isinstance(chunks_result[0], int):
chunks_created = chunks_result[0]
else:
chunks_created = 0
logger.info(f"Source vectorized: {chunks_created} chunks created")
elif input_data.item_type == "note":
# Get note and save (auto-embeds via ObjectModel.save())
note = await Note.get(input_data.item_id)
if not note:
raise ValueError(f"Note '{input_data.item_id}' not found")
await note.save()
logger.info(f"Note embedded: {input_data.item_id}")
elif input_data.item_type == "insight":
# Get insight and re-generate embedding
insight = await SourceInsight.get(input_data.item_id)
if not insight:
raise ValueError(f"Insight '{input_data.item_id}' not found")
# Generate new embedding
embedding = (await EMBEDDING_MODEL.aembed([insight.content]))[0]
# Update insight with new embedding
await repo_query(
"UPDATE $insight_id SET embedding = $embedding",
{
"insight_id": ensure_record_id(input_data.item_id),
"embedding": embedding,
},
)
logger.info(f"Insight embedded: {input_data.item_id}")
else:
raise ValueError(
f"Invalid item_type: {input_data.item_type}. Must be 'source', 'note', or 'insight'"
)
processing_time = time.time() - start_time
logger.info(
f"Successfully embedded {input_data.item_type} {input_data.item_id} in {processing_time:.2f}s"
)
return EmbedSingleItemOutput(
success=True,
item_id=input_data.item_id,
item_type=input_data.item_type,
chunks_created=chunks_created,
processing_time=processing_time,
)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Embedding failed for {input_data.item_type} {input_data.item_id}: {e}")
logger.exception(e)
return EmbedSingleItemOutput(
success=False,
item_id=input_data.item_id,
item_type=input_data.item_type,
processing_time=processing_time,
error_message=str(e),
)
async def collect_items_for_rebuild(
mode: str,
include_sources: bool,
include_notes: bool,
include_insights: bool,
) -> Dict[str, List[str]]:
"""
Collect items to rebuild based on mode and include flags.
Returns:
Dict with keys: 'sources', 'notes', 'insights' containing lists of item IDs
"""
items: Dict[str, List[str]] = {"sources": [], "notes": [], "insights": []}
if include_sources:
if mode == "existing":
# Query sources with embeddings (via source_embedding table)
result = await repo_query(
"""
RETURN array::distinct(
SELECT VALUE source.id
FROM source_embedding
WHERE embedding != none AND array::len(embedding) > 0
)
"""
)
# RETURN returns the array directly as the result (not nested)
if result:
items["sources"] = [str(item) for item in result]
else:
items["sources"] = []
else: # mode == "all"
# Query all sources with content
result = await repo_query("SELECT id FROM source WHERE full_text != none")
items["sources"] = [str(item["id"]) for item in result] if result else []
logger.info(f"Collected {len(items['sources'])} sources for rebuild")
if include_notes:
if mode == "existing":
# Query notes with embeddings
result = await repo_query(
"SELECT id FROM note WHERE embedding != none AND array::len(embedding) > 0"
)
else: # mode == "all"
# Query all notes (with content)
result = await repo_query("SELECT id FROM note WHERE content != none")
items["notes"] = [str(item["id"]) for item in result] if result else []
logger.info(f"Collected {len(items['notes'])} notes for rebuild")
if include_insights:
if mode == "existing":
# Query insights with embeddings
result = await repo_query(
"SELECT id FROM source_insight WHERE embedding != none AND array::len(embedding) > 0"
)
else: # mode == "all"
# Query all insights
result = await repo_query("SELECT id FROM source_insight")
items["insights"] = [str(item["id"]) for item in result] if result else []
logger.info(f"Collected {len(items['insights'])} insights for rebuild")
return items
@command("rebuild_embeddings", app="open_notebook")
async def rebuild_embeddings_command(
input_data: RebuildEmbeddingsInput,
) -> RebuildEmbeddingsOutput:
"""
Rebuild embeddings for sources, notes, and/or insights
"""
start_time = time.time()
try:
logger.info("=" * 60)
logger.info(f"Starting embedding rebuild with mode={input_data.mode}")
logger.info(f"Include: sources={input_data.include_sources}, notes={input_data.include_notes}, insights={input_data.include_insights}")
logger.info("=" * 60)
# Check embedding model availability
EMBEDDING_MODEL = await model_manager.get_embedding_model()
if not EMBEDDING_MODEL:
raise ValueError(
"No embedding model configured. Please configure one in the Models section."
)
logger.info(f"Using embedding model: {EMBEDDING_MODEL}")
# Collect items to process
items = await collect_items_for_rebuild(
input_data.mode,
input_data.include_sources,
input_data.include_notes,
input_data.include_insights,
)
total_items = (
len(items["sources"]) + len(items["notes"]) + len(items["insights"])
)
logger.info(f"Total items to process: {total_items}")
if total_items == 0:
logger.warning("No items found to rebuild")
return RebuildEmbeddingsOutput(
success=True,
total_items=0,
processed_items=0,
failed_items=0,
processing_time=time.time() - start_time,
)
# Initialize counters
sources_processed = 0
notes_processed = 0
insights_processed = 0
failed_items = 0
# Process sources
logger.info(f"\nProcessing {len(items['sources'])} sources...")
for idx, source_id in enumerate(items["sources"], 1):
try:
source = await Source.get(source_id)
if not source:
logger.warning(f"Source {source_id} not found, skipping")
failed_items += 1
continue
await source.vectorize()
sources_processed += 1
if idx % 10 == 0 or idx == len(items["sources"]):
logger.info(
f" Progress: {idx}/{len(items['sources'])} sources processed"
)
except Exception as e:
logger.error(f"Failed to re-embed source {source_id}: {e}")
failed_items += 1
# Process notes
logger.info(f"\nProcessing {len(items['notes'])} notes...")
for idx, note_id in enumerate(items["notes"], 1):
try:
note = await Note.get(note_id)
if not note:
logger.warning(f"Note {note_id} not found, skipping")
failed_items += 1
continue
await note.save() # Auto-embeds via ObjectModel.save()
notes_processed += 1
if idx % 10 == 0 or idx == len(items["notes"]):
logger.info(f" Progress: {idx}/{len(items['notes'])} notes processed")
except Exception as e:
logger.error(f"Failed to re-embed note {note_id}: {e}")
failed_items += 1
# Process insights
logger.info(f"\nProcessing {len(items['insights'])} insights...")
for idx, insight_id in enumerate(items["insights"], 1):
try:
insight = await SourceInsight.get(insight_id)
if not insight:
logger.warning(f"Insight {insight_id} not found, skipping")
failed_items += 1
continue
# Re-generate embedding
embedding = (await EMBEDDING_MODEL.aembed([insight.content]))[0]
# Update insight with new embedding
await repo_query(
"UPDATE $insight_id SET embedding = $embedding",
{
"insight_id": ensure_record_id(insight_id),
"embedding": embedding,
},
)
insights_processed += 1
if idx % 10 == 0 or idx == len(items["insights"]):
logger.info(
f" Progress: {idx}/{len(items['insights'])} insights processed"
)
except Exception as e:
logger.error(f"Failed to re-embed insight {insight_id}: {e}")
failed_items += 1
processing_time = time.time() - start_time
processed_items = sources_processed + notes_processed + insights_processed
logger.info("=" * 60)
logger.info("REBUILD COMPLETE")
logger.info(f" Total processed: {processed_items}/{total_items}")
logger.info(f" Sources: {sources_processed}")
logger.info(f" Notes: {notes_processed}")
logger.info(f" Insights: {insights_processed}")
logger.info(f" Failed: {failed_items}")
logger.info(f" Time: {processing_time:.2f}s")
logger.info("=" * 60)
return RebuildEmbeddingsOutput(
success=True,
total_items=total_items,
processed_items=processed_items,
failed_items=failed_items,
sources_processed=sources_processed,
notes_processed=notes_processed,
insights_processed=insights_processed,
processing_time=processing_time,
)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Rebuild embeddings failed: {e}")
logger.exception(e)
return RebuildEmbeddingsOutput(
success=False,
total_items=0,
processed_items=0,
failed_items=0,
processing_time=processing_time,
error_message=str(e),
)