diff --git a/api/models.py b/api/models.py index 0c96543..80261b6 100644 --- a/api/models.py +++ b/api/models.py @@ -395,6 +395,16 @@ class SourceInsightResponse(BaseModel): updated: str +class InsightCreationResponse(BaseModel): + """Response for async insight creation.""" + + status: Literal["pending"] = "pending" + message: str = "Insight generation started" + source_id: str + transformation_id: str + command_id: Optional[str] = None + + class SaveAsNoteRequest(BaseModel): notebook_id: Optional[str] = Field(None, description="Notebook ID to add note to") diff --git a/api/routers/sources.py b/api/routers/sources.py index 22e4f74..e2cd62d 100644 --- a/api/routers/sources.py +++ b/api/routers/sources.py @@ -14,12 +14,13 @@ from fastapi import ( ) from fastapi.responses import FileResponse, Response from loguru import logger -from surreal_commands import execute_command_sync +from surreal_commands import execute_command_sync, submit_command from api.command_service import CommandService from api.models import ( AssetModel, CreateSourceInsightRequest, + InsightCreationResponse, SourceCreate, SourceInsightResponse, SourceListResponse, @@ -963,44 +964,56 @@ async def get_source_insights(source_id: str): ) -@router.post("/sources/{source_id}/insights", response_model=SourceInsightResponse) +@router.post( + "/sources/{source_id}/insights", + response_model=InsightCreationResponse, + status_code=202, +) async def create_source_insight(source_id: str, request: CreateSourceInsightRequest): - """Create a new insight for a source by running a transformation.""" + """ + Start insight generation for a source by running a transformation. + + This endpoint returns immediately with a 202 Accepted status. + The transformation runs asynchronously in the background via the job queue. + Poll GET /sources/{source_id}/insights to see when the insight is ready. + """ try: - # Get source + # Validate source exists source = await Source.get(source_id) if not source: raise HTTPException(status_code=404, detail="Source not found") - # Get transformation + # Validate transformation exists transformation = await Transformation.get(request.transformation_id) if not transformation: raise HTTPException(status_code=404, detail="Transformation not found") - # Run transformation graph - from open_notebook.graphs.transformation import graph as transform_graph - - await transform_graph.ainvoke( - input=dict(source=source, transformation=transformation) # type: ignore[arg-type] + # Submit transformation as background job (fire-and-forget) + command_id = submit_command( + "open_notebook", + "run_transformation", + { + "source_id": source_id, + "transformation_id": request.transformation_id, + }, + ) + logger.info( + f"Submitted run_transformation command {command_id} for source {source_id}" ) - # Get the newly created insight (last one) - insights = await source.get_insights() - if insights: - newest = insights[-1] - return SourceInsightResponse( - id=newest.id or "", - source_id=source_id, - insight_type=newest.insight_type, - content=newest.content, - created=str(newest.created), - updated=str(newest.updated), - ) - else: - raise HTTPException(status_code=500, detail="Failed to create insight") + # Return immediately with command_id for status tracking + return InsightCreationResponse( + status="pending", + message="Insight generation started", + source_id=source_id, + transformation_id=request.transformation_id, + command_id=str(command_id), + ) except HTTPException: raise except Exception as e: - logger.error(f"Error creating insight for source {source_id}: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error creating insight: {str(e)}") + logger.error(f"Error starting insight generation for source {source_id}: {e}") + raise HTTPException( + status_code=500, detail=f"Error starting insight generation: {str(e)}" + ) diff --git a/commands/CLAUDE.md b/commands/CLAUDE.md index eb9174e..5fe6f2f 100644 --- a/commands/CLAUDE.md +++ b/commands/CLAUDE.md @@ -9,11 +9,13 @@ - **`embed_note_command`**: Embeds a single note using unified embedding pipeline with content-type aware processing. Uses MARKDOWN content type detection. Retry: 5 attempts, exponential jitter 1-60s. - **`embed_insight_command`**: Embeds a single source insight. Uses MARKDOWN content type. Retry: 5 attempts, exponential jitter 1-60s. - **`embed_source_command`**: Embeds a source by chunking full_text with content-type aware splitters (HTML, Markdown, plain), then batch embedding all chunks. Uses single Esperanto API call. Retry: 5 attempts, exponential jitter 1-60s. +- **`create_insight_command`**: Creates a source insight with automatic retry on transaction conflicts. Creates the DB record, then submits `embed_insight` command (fire-and-forget). Retry: 5 attempts, exponential jitter 1-60s. Used by `Source.add_insight()`. - **`rebuild_embeddings_command`**: Submits individual embed_* commands for all sources/notes/insights. Returns immediately; actual embedding happens async. No retry (coordinator only). ### Other Commands -- **`process_source_command`**: Ingests content through `source_graph`, creates embeddings (optional), and generates insights. Retries on transaction conflicts (exp. jitter, max 5×). +- **`process_source_command`**: Ingests content through `source_graph`, creates embeddings (optional), and generates insights. Retries on transaction conflicts (exp. jitter, max 15×, 1-120s). +- **`run_transformation_command`**: Runs a transformation on an existing source to generate an insight. Executes the transformation graph (LLM call) then creates insight via `create_insight_command`. Used by `POST /sources/{id}/insights` API endpoint. Retry: 5 attempts, exponential jitter 1-60s. - **`generate_podcast_command`**: Creates podcasts via `podcast-creator` library using stored episode/speaker profiles. - **`process_text_command`** (example): Test fixture for text operations (uppercase, lowercase, reverse, word_count). - **`analyze_data_command`** (example): Test fixture for numeric aggregations. diff --git a/commands/embedding_commands.py b/commands/embedding_commands.py index f5e05ad..1087066 100644 --- a/commands/embedding_commands.py +++ b/commands/embedding_commands.py @@ -47,6 +47,23 @@ class RebuildEmbeddingsOutput(CommandOutput): # ============================================================================= +class CreateInsightInput(CommandInput): + """Input for creating a source insight with automatic retry on conflicts.""" + + source_id: str + insight_type: str + content: str + + +class CreateInsightOutput(CommandOutput): + """Output from insight creation command.""" + + success: bool + insight_id: Optional[str] = None + processing_time: float + error_message: Optional[str] = None + + class EmbedNoteInput(CommandInput): """Input for embedding a single note.""" @@ -412,6 +429,115 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu ) +@command( + "create_insight", + app="open_notebook", + retry={ + "max_attempts": 5, + "wait_strategy": "exponential_jitter", + "wait_min": 1, + "wait_max": 60, + "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "retry_log_level": "debug", + }, +) +async def create_insight_command( + input_data: CreateInsightInput, +) -> CreateInsightOutput: + """ + Create a source insight with automatic retry on transaction conflicts. + + This command wraps the CREATE source_insight operation with retry logic + to handle SurrealDB transaction conflicts that occur during batch imports + when multiple parallel transformations try to create insights concurrently. + + Flow: + 1. CREATE source_insight record in database + 2. Submit embed_insight command (fire-and-forget) for async embedding + 3. Return the insight_id + + Retry Strategy: + - Retries up to 5 times for transient failures (RuntimeError, ConnectionError, TimeoutError) + - Uses exponential-jitter backoff (1-60s) + - Does NOT retry permanent failures (ValueError, authentication errors) + """ + start_time = time.time() + + try: + logger.info( + f"Creating insight for source {input_data.source_id}: " + f"type={input_data.insight_type}" + ) + + # 1. Create insight record in database + result = await repo_query( + """ + CREATE source_insight CONTENT { + "source": $source_id, + "insight_type": $insight_type, + "content": $content + }; + """, + { + "source_id": ensure_record_id(input_data.source_id), + "insight_type": input_data.insight_type, + "content": input_data.content, + }, + ) + + if not result or len(result) == 0: + raise ValueError("Failed to create insight - no result returned") + + insight_id = str(result[0].get("id", "")) + if not insight_id: + raise ValueError("Failed to create insight - no ID in result") + + # 2. Submit embedding command (fire-and-forget) + submit_command( + "open_notebook", + "embed_insight", + {"insight_id": insight_id}, + ) + logger.debug(f"Submitted embed_insight command for {insight_id}") + + processing_time = time.time() - start_time + logger.info( + f"Successfully created insight {insight_id} for source " + f"{input_data.source_id} in {processing_time:.2f}s" + ) + + return CreateInsightOutput( + success=True, + insight_id=insight_id, + processing_time=processing_time, + ) + + except RuntimeError: + logger.debug( + f"Transaction conflict creating insight for source " + f"{input_data.source_id} - will retry" + ) + raise + except (ConnectionError, TimeoutError) as e: + logger.debug( + f"Network/timeout error creating insight for source " + f"{input_data.source_id} ({type(e).__name__}: {e}) - will retry" + ) + raise + except Exception as e: + processing_time = time.time() - start_time + logger.error( + f"Failed to create insight for source {input_data.source_id}: {e}" + ) + logger.exception(e) + + return CreateInsightOutput( + success=False, + processing_time=processing_time, + error_message=str(e), + ) + + async def collect_items_for_rebuild( mode: str, include_sources: bool, diff --git a/commands/source_commands.py b/commands/source_commands.py index 0b42c44..b5fde95 100644 --- a/commands/source_commands.py +++ b/commands/source_commands.py @@ -11,9 +11,10 @@ from open_notebook.domain.transformation import Transformation try: from open_notebook.graphs.source import source_graph + from open_notebook.graphs.transformation import graph as transform_graph except ImportError as e: - logger.error(f"Failed to import source_graph: {e}") - raise ValueError("source_graph not available") + logger.error(f"Failed to import graphs: {e}") + raise ValueError("graphs not available") def full_model_dump(model): @@ -151,3 +152,121 @@ async def process_source_command( processing_time=processing_time, error_message=str(e), ) + + +# ============================================================================= +# RUN TRANSFORMATION COMMAND +# ============================================================================= + + +class RunTransformationInput(CommandInput): + """Input for running a transformation on an existing source.""" + + source_id: str + transformation_id: str + + +class RunTransformationOutput(CommandOutput): + """Output from transformation command.""" + + success: bool + source_id: str + transformation_id: str + processing_time: float + error_message: Optional[str] = None + + +@command( + "run_transformation", + app="open_notebook", + retry={ + "max_attempts": 5, + "wait_strategy": "exponential_jitter", + "wait_min": 1, + "wait_max": 60, + "retry_on": [RuntimeError, ConnectionError, TimeoutError], + "retry_log_level": "debug", + }, +) +async def run_transformation_command( + input_data: RunTransformationInput, +) -> RunTransformationOutput: + """ + Run a transformation on an existing source to generate an insight. + + This command runs the transformation graph which: + 1. Loads the source and transformation + 2. Calls the LLM to generate insight content + 3. Creates the insight via create_insight command (fire-and-forget) + + Use this command for UI-triggered insight generation to avoid blocking + the HTTP request while the LLM processes. + + Retry Strategy: + - Retries up to 5 times for transient failures + - Uses exponential-jitter backoff (1-60s) + """ + start_time = time.time() + + try: + logger.info( + f"Running transformation {input_data.transformation_id} " + f"on source {input_data.source_id}" + ) + + # Load source + source = await Source.get(input_data.source_id) + if not source: + raise ValueError(f"Source '{input_data.source_id}' not found") + + # Load transformation + transformation = await Transformation.get(input_data.transformation_id) + if not transformation: + raise ValueError( + f"Transformation '{input_data.transformation_id}' not found" + ) + + # Run transformation graph (includes LLM call + insight creation) + await transform_graph.ainvoke( + input=dict(source=source, transformation=transformation) + ) + + processing_time = time.time() - start_time + logger.info( + f"Successfully ran transformation {input_data.transformation_id} " + f"on source {input_data.source_id} in {processing_time:.2f}s" + ) + + return RunTransformationOutput( + success=True, + source_id=input_data.source_id, + transformation_id=input_data.transformation_id, + processing_time=processing_time, + ) + + except RuntimeError: + logger.debug( + f"Transaction conflict running transformation - will retry" + ) + raise + except (ConnectionError, TimeoutError) as e: + logger.debug( + f"Network/timeout error running transformation " + f"({type(e).__name__}: {e}) - will retry" + ) + raise + except Exception as e: + processing_time = time.time() - start_time + logger.error( + f"Failed to run transformation {input_data.transformation_id} " + f"on source {input_data.source_id}: {e}" + ) + logger.exception(e) + + return RunTransformationOutput( + success=False, + source_id=input_data.source_id, + transformation_id=input_data.transformation_id, + processing_time=processing_time, + error_message=str(e), + ) diff --git a/frontend/src/app/(dashboard)/notebooks/[id]/page.tsx b/frontend/src/app/(dashboard)/notebooks/[id]/page.tsx index 33e1d21..c942bc5 100644 --- a/frontend/src/app/(dashboard)/notebooks/[id]/page.tsx +++ b/frontend/src/app/(dashboard)/notebooks/[id]/page.tsx @@ -58,16 +58,21 @@ export default function NotebookPage() { notes: {} }) - // Initialize default selections when sources/notes load + // Initialize and update selections when sources load or change useEffect(() => { if (sources && sources.length > 0) { setContextSelections(prev => { const newSourceSelections = { ...prev.sources } sources.forEach(source => { - // Only set default if not already set - if (!(source.id in newSourceSelections)) { - // Default to 'insights' if has insights, otherwise 'full' - newSourceSelections[source.id] = source.insights_count > 0 ? 'insights' : 'full' + const currentMode = newSourceSelections[source.id] + const hasInsights = source.insights_count > 0 + + if (currentMode === undefined) { + // Initial setup - default based on insights availability + newSourceSelections[source.id] = hasInsights ? 'insights' : 'full' + } else if (currentMode === 'full' && hasInsights) { + // Source gained insights while in 'full' mode - auto-switch to 'insights' + newSourceSelections[source.id] = 'insights' } }) return { ...prev, sources: newSourceSelections } diff --git a/frontend/src/components/source/SourceDetailContent.tsx b/frontend/src/components/source/SourceDetailContent.tsx index a17c33d..87ebb40 100644 --- a/frontend/src/components/source/SourceDetailContent.tsx +++ b/frontend/src/components/source/SourceDetailContent.tsx @@ -1,6 +1,7 @@ 'use client' import { useState, useEffect, useCallback, useMemo } from 'react' +import { useQueryClient } from '@tanstack/react-query' import { isAxiosError } from 'axios' import ReactMarkdown from 'react-markdown' import remarkGfm from 'remark-gfm' @@ -81,6 +82,7 @@ export function SourceDetailContent({ onClose }: SourceDetailContentProps) { const { t, language } = useTranslation() + const queryClient = useQueryClient() const [source, setSource] = useState(null) const [insights, setInsights] = useState([]) const [transformations, setTransformations] = useState([]) @@ -154,12 +156,36 @@ export function SourceDetailContent({ try { setCreatingInsight(true) - await insightsApi.create(sourceId, { + const response = await insightsApi.create(sourceId, { transformation_id: selectedTransformation }) - toast.success(t.common.success) - await fetchInsights() + // Show toast for async operation + toast.success(t.sources.insightGenerationStarted) setSelectedTransformation('') + + // Poll for command completion if we have a command_id + if (response.command_id) { + // Poll in background (don't block UI) + insightsApi.waitForCommand(response.command_id, { + maxAttempts: 120, // Up to 4 minutes (120 * 2s) + intervalMs: 2000 + }).then(success => { + if (success) { + void fetchInsights() + // Invalidate sources queries so notebook page refreshes with updated insights_count + queryClient.invalidateQueries({ queryKey: ['sources'] }) + } + }).catch(err => { + console.error('Error waiting for insight command:', err) + }) + } else { + // Fallback: refresh after delay if no command_id + setTimeout(() => { + void fetchInsights() + // Also invalidate sources queries + queryClient.invalidateQueries({ queryKey: ['sources'] }) + }, 5000) + } } catch (err) { console.error('Failed to create insight:', err) toast.error(t.common.error) diff --git a/frontend/src/lib/api/insights.ts b/frontend/src/lib/api/insights.ts index 1f6677c..ef5fd90 100644 --- a/frontend/src/lib/api/insights.ts +++ b/frontend/src/lib/api/insights.ts @@ -13,6 +13,21 @@ export interface CreateSourceInsightRequest { transformation_id: string } +export interface InsightCreationResponse { + status: 'pending' + message: string + source_id: string + transformation_id: string + command_id?: string +} + +export interface CommandJobStatusResponse { + job_id: string + status: string + result?: Record + error_message?: string +} + export const insightsApi = { listForSource: async (sourceId: string) => { const response = await apiClient.get(`/sources/${sourceId}/insights`) @@ -25,7 +40,7 @@ export const insightsApi = { }, create: async (sourceId: string, data: CreateSourceInsightRequest) => { - const response = await apiClient.post( + const response = await apiClient.post( `/sources/${sourceId}/insights`, data ) @@ -34,5 +49,46 @@ export const insightsApi = { delete: async (insightId: string) => { await apiClient.delete(`/insights/${insightId}`) + }, + + getCommandStatus: async (commandId: string) => { + const response = await apiClient.get( + `/commands/jobs/${commandId}` + ) + return response.data + }, + + /** + * Poll command status until completed or failed. + * Returns true if completed successfully, false if failed. + */ + waitForCommand: async ( + commandId: string, + options?: { maxAttempts?: number; intervalMs?: number } + ): Promise => { + const maxAttempts = options?.maxAttempts ?? 60 // Default 60 attempts + const intervalMs = options?.intervalMs ?? 2000 // Default 2 seconds + + for (let i = 0; i < maxAttempts; i++) { + try { + const status = await insightsApi.getCommandStatus(commandId) + if (status.status === 'completed') { + return true + } + if (status.status === 'failed' || status.status === 'canceled') { + console.error('Command failed:', status.error_message) + return false + } + // Still running, wait and retry + await new Promise(resolve => setTimeout(resolve, intervalMs)) + } catch (error) { + console.error('Error checking command status:', error) + // Continue polling on error + await new Promise(resolve => setTimeout(resolve, intervalMs)) + } + } + // Timeout + console.warn('Command polling timed out') + return false } } \ No newline at end of file diff --git a/frontend/src/lib/locales/en-US/index.ts b/frontend/src/lib/locales/en-US/index.ts index 477ea3a..ca0bf49 100644 --- a/frontend/src/lib/locales/en-US/index.ts +++ b/frontend/src/lib/locales/en-US/index.ts @@ -357,6 +357,7 @@ export const enUS = { viewInsight: "View Insight", deleteInsight: "Delete Insight", deleteInsightConfirm: "Are you sure you want to delete this insight? This action cannot be undone.", + insightGenerationStarted: "Insight generation started. It will appear shortly.", deleteNoteConfirm: 'Are you sure you want to delete this note? This action cannot be undone.', editNote: 'Edit note', createNote: 'Create note', diff --git a/frontend/src/lib/locales/ja-JP/index.ts b/frontend/src/lib/locales/ja-JP/index.ts index 375aae1..49e13ea 100644 --- a/frontend/src/lib/locales/ja-JP/index.ts +++ b/frontend/src/lib/locales/ja-JP/index.ts @@ -357,6 +357,7 @@ export const jaJP = { viewInsight: "インサイトを表示", deleteInsight: "インサイトを削除", deleteInsightConfirm: "このインサイトを削除しますか?この操作は元に戻せません。", + insightGenerationStarted: "インサイトの生成が開始されました。まもなく表示されます。", deleteNoteConfirm: 'このノートを削除しますか?この操作は元に戻せません。', editNote: 'ノートを編集', createNote: 'ノートを作成', diff --git a/frontend/src/lib/locales/pt-BR/index.ts b/frontend/src/lib/locales/pt-BR/index.ts index ccffae8..803581d 100644 --- a/frontend/src/lib/locales/pt-BR/index.ts +++ b/frontend/src/lib/locales/pt-BR/index.ts @@ -357,6 +357,7 @@ export const ptBR = { viewInsight: "Ver Insight", deleteInsight: "Excluir Insight", deleteInsightConfirm: "Tem certeza que deseja excluir este insight? Esta ação não pode ser desfeita.", + insightGenerationStarted: "Geração de insight iniciada. Aparecerá em breve.", deleteNoteConfirm: "Tem certeza que deseja excluir esta nota? Esta ação não pode ser desfeita.", editNote: "Editar nota", createNote: "Criar nota", diff --git a/frontend/src/lib/locales/zh-CN/index.ts b/frontend/src/lib/locales/zh-CN/index.ts index b8397c6..23979e9 100644 --- a/frontend/src/lib/locales/zh-CN/index.ts +++ b/frontend/src/lib/locales/zh-CN/index.ts @@ -363,6 +363,7 @@ export const zhCN = { noNotebooksAvailable: "暂无可用笔记本", deleteInsight: "删除见解", deleteInsightConfirm: "确定要删除此见解吗?此操作无法撤销。", + insightGenerationStarted: "见解生成已开始,稍后将显示。", notEmbeddedAlert: "内容未嵌入向量", notEmbeddedDesc: "此内容尚未为了向量搜索进行嵌入。嵌入可以启用高级搜索功能并更好地发现内容。", openOnYoutube: "在 YouTube 上打开", diff --git a/frontend/src/lib/locales/zh-TW/index.ts b/frontend/src/lib/locales/zh-TW/index.ts index fa6c5a9..9346ee4 100644 --- a/frontend/src/lib/locales/zh-TW/index.ts +++ b/frontend/src/lib/locales/zh-TW/index.ts @@ -363,6 +363,7 @@ export const zhTW = { noNotebooksAvailable: "暫無可用筆記本", deleteInsight: "刪除見解", deleteInsightConfirm: "確定要刪除此見解嗎?此操作無法撤銷。", + insightGenerationStarted: "見解生成已開始,稍後將顯示。", notEmbeddedAlert: "內容未嵌入向量", notEmbeddedDesc: "此內容尚未為了向量搜尋進行嵌入。嵌入可以啟用進階搜尋功能並更好地發現內容。", openOnYoutube: "在 YouTube 上開啟", diff --git a/open_notebook/domain/CLAUDE.md b/open_notebook/domain/CLAUDE.md index b1b11cc..3ec6289 100644 --- a/open_notebook/domain/CLAUDE.md +++ b/open_notebook/domain/CLAUDE.md @@ -32,7 +32,7 @@ Two base classes support different persistence patterns: **ObjectModel** (mutabl - `vectorize()`: Submit async embedding job (returns command_id, fire-and-forget) - `get_status()`, `get_processing_progress()`: Track job via surreal_commands - `get_context()`: Returns summary for LLM context - - `add_insight()`: Generate and store insights with embeddings + - `add_insight()`: Submit async insight creation via `create_insight_command` (fire-and-forget, returns command_id) - **Note**: Standalone or linked notes - `save()`: Submits `embed_note` command after save (fire-and-forget) @@ -80,7 +80,7 @@ Two base classes support different persistence patterns: **ObjectModel** (mutabl - **Auto-embedding behavior**: - `Note.save()` → auto-submits `embed_note` command - `Source.save()` → does NOT auto-submit (must call `vectorize()` explicitly) - - `Source.add_insight()` → auto-submits `embed_insight` command + - `Source.add_insight()` → submits `create_insight_command` which handles DB insert + `embed_insight` command (all fire-and-forget) - **Relationship strings**: Must match SurrealDB schema (reference, artifact, refers_to) ## How to Add New Model diff --git a/open_notebook/domain/notebook.py b/open_notebook/domain/notebook.py index f24802a..05dc8f2 100644 --- a/open_notebook/domain/notebook.py +++ b/open_notebook/domain/notebook.py @@ -454,53 +454,52 @@ class Source(ObjectModel): logger.exception(e) raise DatabaseOperationError(e) - async def add_insight(self, insight_type: str, content: str) -> Any: + async def add_insight(self, insight_type: str, content: str) -> Optional[str]: """ - Add an insight to this source. + Submit insight creation as an async command (fire-and-forget). - Creates the insight record without embedding, then submits an async - embed_insight command to generate the embedding in the background. + Submits a create_insight command that handles database operations with + automatic retry logic for transaction conflicts. The command also submits + an embed_insight command for async embedding. + + This method returns immediately after submitting the command - it does NOT + wait for the insight to be created. Use this for batch operations where + throughput is more important than immediate confirmation. Args: insight_type: Type/category of the insight content: The insight content text Returns: - The created insight record(s) + command_id for optional tracking, or None if submission failed + + Raises: + InvalidInputError: If insight_type or content is empty """ if not insight_type or not content: raise InvalidInputError("Insight type and content must be provided") + try: - # Create insight WITHOUT embedding (fire-and-forget embedding via command) - result = await repo_query( - """ - CREATE source_insight CONTENT { - "source": $source_id, - "insight_type": $insight_type, - "content": $content, - };""", + # Submit create_insight command (fire-and-forget) + # Command handles retries internally for transaction conflicts + command_id = submit_command( + "open_notebook", + "create_insight", { - "source_id": ensure_record_id(self.id), + "source_id": str(self.id), "insight_type": insight_type, "content": content, }, ) + logger.info( + f"Submitted create_insight command {command_id} for source {self.id} " + f"(type={insight_type})" + ) + return str(command_id) - # Submit embedding command (fire-and-forget) - if result and len(result) > 0: - insight_id = str(result[0].get("id", "")) - if insight_id: - submit_command( - "open_notebook", - "embed_insight", - {"insight_id": insight_id}, - ) - logger.debug(f"Submitted embed_insight command for {insight_id}") - - return result except Exception as e: - logger.error(f"Error adding insight to source {self.id}: {str(e)}") - raise + logger.error(f"Error submitting create_insight for source {self.id}: {e}") + return None def _prepare_save_data(self) -> dict: """Override to ensure command field is always RecordID format for database"""