fix: async insight creation to prevent transaction conflicts (#512)

Migrate insight creation to the command system with automatic retry logic
to prevent SurrealDB transaction conflicts during batch imports.

Changes:
- Add create_insight_command with retry logic for transaction conflicts
- Add run_transformation_command for async transformation execution
- Make Source.add_insight() fire-and-forget (returns command_id)
- Update POST /sources/{id}/insights to return 202 Accepted immediately
- Frontend polls command status until complete, then refreshes
- Auto-update notebook page icon when source gains insights
- Add i18n keys for insight generation feedback

Related to #489
This commit is contained in:
Luis Novo 2026-01-31 15:51:27 -03:00 committed by GitHub
parent 303ffacfa5
commit 301dd4e20a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 429 additions and 68 deletions

View file

@ -395,6 +395,16 @@ class SourceInsightResponse(BaseModel):
updated: str
class InsightCreationResponse(BaseModel):
"""Response for async insight creation."""
status: Literal["pending"] = "pending"
message: str = "Insight generation started"
source_id: str
transformation_id: str
command_id: Optional[str] = None
class SaveAsNoteRequest(BaseModel):
notebook_id: Optional[str] = Field(None, description="Notebook ID to add note to")

View file

@ -14,12 +14,13 @@ from fastapi import (
)
from fastapi.responses import FileResponse, Response
from loguru import logger
from surreal_commands import execute_command_sync
from surreal_commands import execute_command_sync, submit_command
from api.command_service import CommandService
from api.models import (
AssetModel,
CreateSourceInsightRequest,
InsightCreationResponse,
SourceCreate,
SourceInsightResponse,
SourceListResponse,
@ -963,44 +964,56 @@ async def get_source_insights(source_id: str):
)
@router.post("/sources/{source_id}/insights", response_model=SourceInsightResponse)
@router.post(
"/sources/{source_id}/insights",
response_model=InsightCreationResponse,
status_code=202,
)
async def create_source_insight(source_id: str, request: CreateSourceInsightRequest):
"""Create a new insight for a source by running a transformation."""
"""
Start insight generation for a source by running a transformation.
This endpoint returns immediately with a 202 Accepted status.
The transformation runs asynchronously in the background via the job queue.
Poll GET /sources/{source_id}/insights to see when the insight is ready.
"""
try:
# Get source
# Validate source exists
source = await Source.get(source_id)
if not source:
raise HTTPException(status_code=404, detail="Source not found")
# Get transformation
# Validate transformation exists
transformation = await Transformation.get(request.transformation_id)
if not transformation:
raise HTTPException(status_code=404, detail="Transformation not found")
# Run transformation graph
from open_notebook.graphs.transformation import graph as transform_graph
await transform_graph.ainvoke(
input=dict(source=source, transformation=transformation) # type: ignore[arg-type]
# Submit transformation as background job (fire-and-forget)
command_id = submit_command(
"open_notebook",
"run_transformation",
{
"source_id": source_id,
"transformation_id": request.transformation_id,
},
)
logger.info(
f"Submitted run_transformation command {command_id} for source {source_id}"
)
# Get the newly created insight (last one)
insights = await source.get_insights()
if insights:
newest = insights[-1]
return SourceInsightResponse(
id=newest.id or "",
source_id=source_id,
insight_type=newest.insight_type,
content=newest.content,
created=str(newest.created),
updated=str(newest.updated),
)
else:
raise HTTPException(status_code=500, detail="Failed to create insight")
# Return immediately with command_id for status tracking
return InsightCreationResponse(
status="pending",
message="Insight generation started",
source_id=source_id,
transformation_id=request.transformation_id,
command_id=str(command_id),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating insight for source {source_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error creating insight: {str(e)}")
logger.error(f"Error starting insight generation for source {source_id}: {e}")
raise HTTPException(
status_code=500, detail=f"Error starting insight generation: {str(e)}"
)

View file

@ -9,11 +9,13 @@
- **`embed_note_command`**: Embeds a single note using unified embedding pipeline with content-type aware processing. Uses MARKDOWN content type detection. Retry: 5 attempts, exponential jitter 1-60s.
- **`embed_insight_command`**: Embeds a single source insight. Uses MARKDOWN content type. Retry: 5 attempts, exponential jitter 1-60s.
- **`embed_source_command`**: Embeds a source by chunking full_text with content-type aware splitters (HTML, Markdown, plain), then batch embedding all chunks. Uses single Esperanto API call. Retry: 5 attempts, exponential jitter 1-60s.
- **`create_insight_command`**: Creates a source insight with automatic retry on transaction conflicts. Creates the DB record, then submits `embed_insight` command (fire-and-forget). Retry: 5 attempts, exponential jitter 1-60s. Used by `Source.add_insight()`.
- **`rebuild_embeddings_command`**: Submits individual embed_* commands for all sources/notes/insights. Returns immediately; actual embedding happens async. No retry (coordinator only).
### Other Commands
- **`process_source_command`**: Ingests content through `source_graph`, creates embeddings (optional), and generates insights. Retries on transaction conflicts (exp. jitter, max 5×).
- **`process_source_command`**: Ingests content through `source_graph`, creates embeddings (optional), and generates insights. Retries on transaction conflicts (exp. jitter, max 15×, 1-120s).
- **`run_transformation_command`**: Runs a transformation on an existing source to generate an insight. Executes the transformation graph (LLM call) then creates insight via `create_insight_command`. Used by `POST /sources/{id}/insights` API endpoint. Retry: 5 attempts, exponential jitter 1-60s.
- **`generate_podcast_command`**: Creates podcasts via `podcast-creator` library using stored episode/speaker profiles.
- **`process_text_command`** (example): Test fixture for text operations (uppercase, lowercase, reverse, word_count).
- **`analyze_data_command`** (example): Test fixture for numeric aggregations.

View file

@ -47,6 +47,23 @@ class RebuildEmbeddingsOutput(CommandOutput):
# =============================================================================
class CreateInsightInput(CommandInput):
"""Input for creating a source insight with automatic retry on conflicts."""
source_id: str
insight_type: str
content: str
class CreateInsightOutput(CommandOutput):
"""Output from insight creation command."""
success: bool
insight_id: Optional[str] = None
processing_time: float
error_message: Optional[str] = None
class EmbedNoteInput(CommandInput):
"""Input for embedding a single note."""
@ -412,6 +429,115 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu
)
@command(
"create_insight",
app="open_notebook",
retry={
"max_attempts": 5,
"wait_strategy": "exponential_jitter",
"wait_min": 1,
"wait_max": 60,
"retry_on": [RuntimeError, ConnectionError, TimeoutError],
"retry_log_level": "debug",
},
)
async def create_insight_command(
input_data: CreateInsightInput,
) -> CreateInsightOutput:
"""
Create a source insight with automatic retry on transaction conflicts.
This command wraps the CREATE source_insight operation with retry logic
to handle SurrealDB transaction conflicts that occur during batch imports
when multiple parallel transformations try to create insights concurrently.
Flow:
1. CREATE source_insight record in database
2. Submit embed_insight command (fire-and-forget) for async embedding
3. Return the insight_id
Retry Strategy:
- Retries up to 5 times for transient failures (RuntimeError, ConnectionError, TimeoutError)
- Uses exponential-jitter backoff (1-60s)
- Does NOT retry permanent failures (ValueError, authentication errors)
"""
start_time = time.time()
try:
logger.info(
f"Creating insight for source {input_data.source_id}: "
f"type={input_data.insight_type}"
)
# 1. Create insight record in database
result = await repo_query(
"""
CREATE source_insight CONTENT {
"source": $source_id,
"insight_type": $insight_type,
"content": $content
};
""",
{
"source_id": ensure_record_id(input_data.source_id),
"insight_type": input_data.insight_type,
"content": input_data.content,
},
)
if not result or len(result) == 0:
raise ValueError("Failed to create insight - no result returned")
insight_id = str(result[0].get("id", ""))
if not insight_id:
raise ValueError("Failed to create insight - no ID in result")
# 2. Submit embedding command (fire-and-forget)
submit_command(
"open_notebook",
"embed_insight",
{"insight_id": insight_id},
)
logger.debug(f"Submitted embed_insight command for {insight_id}")
processing_time = time.time() - start_time
logger.info(
f"Successfully created insight {insight_id} for source "
f"{input_data.source_id} in {processing_time:.2f}s"
)
return CreateInsightOutput(
success=True,
insight_id=insight_id,
processing_time=processing_time,
)
except RuntimeError:
logger.debug(
f"Transaction conflict creating insight for source "
f"{input_data.source_id} - will retry"
)
raise
except (ConnectionError, TimeoutError) as e:
logger.debug(
f"Network/timeout error creating insight for source "
f"{input_data.source_id} ({type(e).__name__}: {e}) - will retry"
)
raise
except Exception as e:
processing_time = time.time() - start_time
logger.error(
f"Failed to create insight for source {input_data.source_id}: {e}"
)
logger.exception(e)
return CreateInsightOutput(
success=False,
processing_time=processing_time,
error_message=str(e),
)
async def collect_items_for_rebuild(
mode: str,
include_sources: bool,

View file

@ -11,9 +11,10 @@ from open_notebook.domain.transformation import Transformation
try:
from open_notebook.graphs.source import source_graph
from open_notebook.graphs.transformation import graph as transform_graph
except ImportError as e:
logger.error(f"Failed to import source_graph: {e}")
raise ValueError("source_graph not available")
logger.error(f"Failed to import graphs: {e}")
raise ValueError("graphs not available")
def full_model_dump(model):
@ -151,3 +152,121 @@ async def process_source_command(
processing_time=processing_time,
error_message=str(e),
)
# =============================================================================
# RUN TRANSFORMATION COMMAND
# =============================================================================
class RunTransformationInput(CommandInput):
"""Input for running a transformation on an existing source."""
source_id: str
transformation_id: str
class RunTransformationOutput(CommandOutput):
"""Output from transformation command."""
success: bool
source_id: str
transformation_id: str
processing_time: float
error_message: Optional[str] = None
@command(
"run_transformation",
app="open_notebook",
retry={
"max_attempts": 5,
"wait_strategy": "exponential_jitter",
"wait_min": 1,
"wait_max": 60,
"retry_on": [RuntimeError, ConnectionError, TimeoutError],
"retry_log_level": "debug",
},
)
async def run_transformation_command(
input_data: RunTransformationInput,
) -> RunTransformationOutput:
"""
Run a transformation on an existing source to generate an insight.
This command runs the transformation graph which:
1. Loads the source and transformation
2. Calls the LLM to generate insight content
3. Creates the insight via create_insight command (fire-and-forget)
Use this command for UI-triggered insight generation to avoid blocking
the HTTP request while the LLM processes.
Retry Strategy:
- Retries up to 5 times for transient failures
- Uses exponential-jitter backoff (1-60s)
"""
start_time = time.time()
try:
logger.info(
f"Running transformation {input_data.transformation_id} "
f"on source {input_data.source_id}"
)
# Load source
source = await Source.get(input_data.source_id)
if not source:
raise ValueError(f"Source '{input_data.source_id}' not found")
# Load transformation
transformation = await Transformation.get(input_data.transformation_id)
if not transformation:
raise ValueError(
f"Transformation '{input_data.transformation_id}' not found"
)
# Run transformation graph (includes LLM call + insight creation)
await transform_graph.ainvoke(
input=dict(source=source, transformation=transformation)
)
processing_time = time.time() - start_time
logger.info(
f"Successfully ran transformation {input_data.transformation_id} "
f"on source {input_data.source_id} in {processing_time:.2f}s"
)
return RunTransformationOutput(
success=True,
source_id=input_data.source_id,
transformation_id=input_data.transformation_id,
processing_time=processing_time,
)
except RuntimeError:
logger.debug(
f"Transaction conflict running transformation - will retry"
)
raise
except (ConnectionError, TimeoutError) as e:
logger.debug(
f"Network/timeout error running transformation "
f"({type(e).__name__}: {e}) - will retry"
)
raise
except Exception as e:
processing_time = time.time() - start_time
logger.error(
f"Failed to run transformation {input_data.transformation_id} "
f"on source {input_data.source_id}: {e}"
)
logger.exception(e)
return RunTransformationOutput(
success=False,
source_id=input_data.source_id,
transformation_id=input_data.transformation_id,
processing_time=processing_time,
error_message=str(e),
)

View file

@ -58,16 +58,21 @@ export default function NotebookPage() {
notes: {}
})
// Initialize default selections when sources/notes load
// Initialize and update selections when sources load or change
useEffect(() => {
if (sources && sources.length > 0) {
setContextSelections(prev => {
const newSourceSelections = { ...prev.sources }
sources.forEach(source => {
// Only set default if not already set
if (!(source.id in newSourceSelections)) {
// Default to 'insights' if has insights, otherwise 'full'
newSourceSelections[source.id] = source.insights_count > 0 ? 'insights' : 'full'
const currentMode = newSourceSelections[source.id]
const hasInsights = source.insights_count > 0
if (currentMode === undefined) {
// Initial setup - default based on insights availability
newSourceSelections[source.id] = hasInsights ? 'insights' : 'full'
} else if (currentMode === 'full' && hasInsights) {
// Source gained insights while in 'full' mode - auto-switch to 'insights'
newSourceSelections[source.id] = 'insights'
}
})
return { ...prev, sources: newSourceSelections }

View file

@ -1,6 +1,7 @@
'use client'
import { useState, useEffect, useCallback, useMemo } from 'react'
import { useQueryClient } from '@tanstack/react-query'
import { isAxiosError } from 'axios'
import ReactMarkdown from 'react-markdown'
import remarkGfm from 'remark-gfm'
@ -81,6 +82,7 @@ export function SourceDetailContent({
onClose
}: SourceDetailContentProps) {
const { t, language } = useTranslation()
const queryClient = useQueryClient()
const [source, setSource] = useState<SourceDetailResponse | null>(null)
const [insights, setInsights] = useState<SourceInsightResponse[]>([])
const [transformations, setTransformations] = useState<Transformation[]>([])
@ -154,12 +156,36 @@ export function SourceDetailContent({
try {
setCreatingInsight(true)
await insightsApi.create(sourceId, {
const response = await insightsApi.create(sourceId, {
transformation_id: selectedTransformation
})
toast.success(t.common.success)
await fetchInsights()
// Show toast for async operation
toast.success(t.sources.insightGenerationStarted)
setSelectedTransformation('')
// Poll for command completion if we have a command_id
if (response.command_id) {
// Poll in background (don't block UI)
insightsApi.waitForCommand(response.command_id, {
maxAttempts: 120, // Up to 4 minutes (120 * 2s)
intervalMs: 2000
}).then(success => {
if (success) {
void fetchInsights()
// Invalidate sources queries so notebook page refreshes with updated insights_count
queryClient.invalidateQueries({ queryKey: ['sources'] })
}
}).catch(err => {
console.error('Error waiting for insight command:', err)
})
} else {
// Fallback: refresh after delay if no command_id
setTimeout(() => {
void fetchInsights()
// Also invalidate sources queries
queryClient.invalidateQueries({ queryKey: ['sources'] })
}, 5000)
}
} catch (err) {
console.error('Failed to create insight:', err)
toast.error(t.common.error)

View file

@ -13,6 +13,21 @@ export interface CreateSourceInsightRequest {
transformation_id: string
}
export interface InsightCreationResponse {
status: 'pending'
message: string
source_id: string
transformation_id: string
command_id?: string
}
export interface CommandJobStatusResponse {
job_id: string
status: string
result?: Record<string, unknown>
error_message?: string
}
export const insightsApi = {
listForSource: async (sourceId: string) => {
const response = await apiClient.get<SourceInsightResponse[]>(`/sources/${sourceId}/insights`)
@ -25,7 +40,7 @@ export const insightsApi = {
},
create: async (sourceId: string, data: CreateSourceInsightRequest) => {
const response = await apiClient.post<SourceInsightResponse>(
const response = await apiClient.post<InsightCreationResponse>(
`/sources/${sourceId}/insights`,
data
)
@ -34,5 +49,46 @@ export const insightsApi = {
delete: async (insightId: string) => {
await apiClient.delete(`/insights/${insightId}`)
},
getCommandStatus: async (commandId: string) => {
const response = await apiClient.get<CommandJobStatusResponse>(
`/commands/jobs/${commandId}`
)
return response.data
},
/**
* Poll command status until completed or failed.
* Returns true if completed successfully, false if failed.
*/
waitForCommand: async (
commandId: string,
options?: { maxAttempts?: number; intervalMs?: number }
): Promise<boolean> => {
const maxAttempts = options?.maxAttempts ?? 60 // Default 60 attempts
const intervalMs = options?.intervalMs ?? 2000 // Default 2 seconds
for (let i = 0; i < maxAttempts; i++) {
try {
const status = await insightsApi.getCommandStatus(commandId)
if (status.status === 'completed') {
return true
}
if (status.status === 'failed' || status.status === 'canceled') {
console.error('Command failed:', status.error_message)
return false
}
// Still running, wait and retry
await new Promise(resolve => setTimeout(resolve, intervalMs))
} catch (error) {
console.error('Error checking command status:', error)
// Continue polling on error
await new Promise(resolve => setTimeout(resolve, intervalMs))
}
}
// Timeout
console.warn('Command polling timed out')
return false
}
}

View file

@ -357,6 +357,7 @@ export const enUS = {
viewInsight: "View Insight",
deleteInsight: "Delete Insight",
deleteInsightConfirm: "Are you sure you want to delete this insight? This action cannot be undone.",
insightGenerationStarted: "Insight generation started. It will appear shortly.",
deleteNoteConfirm: 'Are you sure you want to delete this note? This action cannot be undone.',
editNote: 'Edit note',
createNote: 'Create note',

View file

@ -357,6 +357,7 @@ export const jaJP = {
viewInsight: "インサイトを表示",
deleteInsight: "インサイトを削除",
deleteInsightConfirm: "このインサイトを削除しますか?この操作は元に戻せません。",
insightGenerationStarted: "インサイトの生成が開始されました。まもなく表示されます。",
deleteNoteConfirm: 'このノートを削除しますか?この操作は元に戻せません。',
editNote: 'ノートを編集',
createNote: 'ノートを作成',

View file

@ -357,6 +357,7 @@ export const ptBR = {
viewInsight: "Ver Insight",
deleteInsight: "Excluir Insight",
deleteInsightConfirm: "Tem certeza que deseja excluir este insight? Esta ação não pode ser desfeita.",
insightGenerationStarted: "Geração de insight iniciada. Aparecerá em breve.",
deleteNoteConfirm: "Tem certeza que deseja excluir esta nota? Esta ação não pode ser desfeita.",
editNote: "Editar nota",
createNote: "Criar nota",

View file

@ -363,6 +363,7 @@ export const zhCN = {
noNotebooksAvailable: "暂无可用笔记本",
deleteInsight: "删除见解",
deleteInsightConfirm: "确定要删除此见解吗?此操作无法撤销。",
insightGenerationStarted: "见解生成已开始,稍后将显示。",
notEmbeddedAlert: "内容未嵌入向量",
notEmbeddedDesc: "此内容尚未为了向量搜索进行嵌入。嵌入可以启用高级搜索功能并更好地发现内容。",
openOnYoutube: "在 YouTube 上打开",

View file

@ -363,6 +363,7 @@ export const zhTW = {
noNotebooksAvailable: "暫無可用筆記本",
deleteInsight: "刪除見解",
deleteInsightConfirm: "確定要刪除此見解嗎?此操作無法撤銷。",
insightGenerationStarted: "見解生成已開始,稍後將顯示。",
notEmbeddedAlert: "內容未嵌入向量",
notEmbeddedDesc: "此內容尚未為了向量搜尋進行嵌入。嵌入可以啟用進階搜尋功能並更好地發現內容。",
openOnYoutube: "在 YouTube 上開啟",

View file

@ -32,7 +32,7 @@ Two base classes support different persistence patterns: **ObjectModel** (mutabl
- `vectorize()`: Submit async embedding job (returns command_id, fire-and-forget)
- `get_status()`, `get_processing_progress()`: Track job via surreal_commands
- `get_context()`: Returns summary for LLM context
- `add_insight()`: Generate and store insights with embeddings
- `add_insight()`: Submit async insight creation via `create_insight_command` (fire-and-forget, returns command_id)
- **Note**: Standalone or linked notes
- `save()`: Submits `embed_note` command after save (fire-and-forget)
@ -80,7 +80,7 @@ Two base classes support different persistence patterns: **ObjectModel** (mutabl
- **Auto-embedding behavior**:
- `Note.save()` → auto-submits `embed_note` command
- `Source.save()` → does NOT auto-submit (must call `vectorize()` explicitly)
- `Source.add_insight()`auto-submits `embed_insight` command
- `Source.add_insight()` → submits `create_insight_command` which handles DB insert + `embed_insight` command (all fire-and-forget)
- **Relationship strings**: Must match SurrealDB schema (reference, artifact, refers_to)
## How to Add New Model

View file

@ -454,53 +454,52 @@ class Source(ObjectModel):
logger.exception(e)
raise DatabaseOperationError(e)
async def add_insight(self, insight_type: str, content: str) -> Any:
async def add_insight(self, insight_type: str, content: str) -> Optional[str]:
"""
Add an insight to this source.
Submit insight creation as an async command (fire-and-forget).
Creates the insight record without embedding, then submits an async
embed_insight command to generate the embedding in the background.
Submits a create_insight command that handles database operations with
automatic retry logic for transaction conflicts. The command also submits
an embed_insight command for async embedding.
This method returns immediately after submitting the command - it does NOT
wait for the insight to be created. Use this for batch operations where
throughput is more important than immediate confirmation.
Args:
insight_type: Type/category of the insight
content: The insight content text
Returns:
The created insight record(s)
command_id for optional tracking, or None if submission failed
Raises:
InvalidInputError: If insight_type or content is empty
"""
if not insight_type or not content:
raise InvalidInputError("Insight type and content must be provided")
try:
# Create insight WITHOUT embedding (fire-and-forget embedding via command)
result = await repo_query(
"""
CREATE source_insight CONTENT {
"source": $source_id,
"insight_type": $insight_type,
"content": $content,
};""",
# Submit create_insight command (fire-and-forget)
# Command handles retries internally for transaction conflicts
command_id = submit_command(
"open_notebook",
"create_insight",
{
"source_id": ensure_record_id(self.id),
"source_id": str(self.id),
"insight_type": insight_type,
"content": content,
},
)
logger.info(
f"Submitted create_insight command {command_id} for source {self.id} "
f"(type={insight_type})"
)
return str(command_id)
# Submit embedding command (fire-and-forget)
if result and len(result) > 0:
insight_id = str(result[0].get("id", ""))
if insight_id:
submit_command(
"open_notebook",
"embed_insight",
{"insight_id": insight_id},
)
logger.debug(f"Submitted embed_insight command for {insight_id}")
return result
except Exception as e:
logger.error(f"Error adding insight to source {self.id}: {str(e)}")
raise
logger.error(f"Error submitting create_insight for source {self.id}: {e}")
return None
def _prepare_save_data(self) -> dict:
"""Override to ensure command field is always RecordID format for database"""