From 5d84ab07688dafcd36a1f2ad39d961ff9992f0a2 Mon Sep 17 00:00:00 2001 From: Luis Novo Date: Wed, 18 Feb 2026 11:39:47 -0300 Subject: [PATCH] fix: embedding batch sizing and 413 error classification (1.7.4) - Add batching to generate_embeddings() (50 texts per batch with per-batch retry) to prevent 413 Payload Too Large errors on large documents - Add 413 error classification rule for user-friendly error messages - Fix misleading "Created 0 embedded chunks" log in process_source_command by removing premature get_embedded_chunks() call (embedding is fire-and-forget) Closes #594 --- CHANGELOG.md | 8 ++ CLAUDE.md | 2 +- commands/CLAUDE.md | 4 +- commands/embedding_commands.py | 4 +- commands/source_commands.py | 11 +-- open_notebook/domain/notebook.py | 2 +- open_notebook/utils/CLAUDE.md | 2 +- open_notebook/utils/embedding.py | 71 ++++++++++----- open_notebook/utils/error_classifier.py | 6 ++ pyproject.toml | 2 +- tests/test_embedding.py | 113 ++++++++++++++++++++++++ uv.lock | 2 +- 12 files changed, 190 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2efc4b6..cbe2ced 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.7.4] - 2026-02-18 + +### Fixed +- Embedding large documents (3MB+) fails with 413 Payload Too Large (#594) +- `generate_embeddings()` now batches texts in groups of 50 with per-batch retry, preventing provider payload limits from being exceeded +- 413 errors now classified with user-friendly message in error classifier +- Misleading "Created 0 embedded chunks" log in `process_source_command` — embedding is fire-and-forget, so the count was always 0; now logs "embedding submitted" instead + ## [1.7.3] - 2026-02-17 ### Added diff --git a/CLAUDE.md b/CLAUDE.md index 8a053da..a8a1f18 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -218,4 +218,4 @@ See dedicated CLAUDE.md files for detailed guidance: --- -**Last Updated**: February 2026 | **Project Version**: 1.7.2 +**Last Updated**: February 2026 | **Project Version**: 1.7.4 diff --git a/commands/CLAUDE.md b/commands/CLAUDE.md index 676a70a..de5ec45 100644 --- a/commands/CLAUDE.md +++ b/commands/CLAUDE.md @@ -8,7 +8,7 @@ - **`embed_note_command`**: Embeds a single note using unified embedding pipeline with content-type aware processing. Uses MARKDOWN content type detection. Retry: 5 attempts, exponential jitter 1-60s. - **`embed_insight_command`**: Embeds a single source insight. Uses MARKDOWN content type. Retry: 5 attempts, exponential jitter 1-60s. -- **`embed_source_command`**: Embeds a source by chunking full_text with content-type aware splitters (HTML, Markdown, plain), then batch embedding all chunks. Uses single Esperanto API call. Retry: 5 attempts, exponential jitter 1-60s. +- **`embed_source_command`**: Embeds a source by chunking full_text with content-type aware splitters (HTML, Markdown, plain), then batch embedding all chunks (batches of 50 with per-batch retry). Retry: 5 attempts, exponential jitter 1-60s. - **`create_insight_command`**: Creates a source insight with automatic retry on transaction conflicts. Creates the DB record, then submits `embed_insight` command (fire-and-forget). Retry: 5 attempts, exponential jitter 1-60s. Used by `Source.add_insight()`. - **`rebuild_embeddings_command`**: Submits individual embed_* commands for all sources/notes/insights. Returns immediately; actual embedding happens async. No retry (coordinator only). @@ -27,7 +27,7 @@ - **Retry configuration**: Uses `stop_on: [ValueError]` (blocklist approach) - retries all exceptions EXCEPT ValueError. This is more resilient than allowlist as new exception types auto-retry. - **Fire-and-forget embedding**: Domain models submit embed_* commands via `submit_command()` without waiting. Commands process asynchronously. - **Content-type aware chunking**: `embed_source_command` uses `chunk_text()` with automatic content type detection (HTML, Markdown, plain text) for optimal text splitting. Default: 1500 char chunks with 225 char overlap. -- **Batch embedding**: `embed_source_command` uses `generate_embeddings()` for single API call efficiency instead of per-chunk calls. +- **Batch embedding**: `embed_source_command` uses `generate_embeddings()` which automatically batches texts (default 50) with per-batch retry to avoid exceeding provider payload limits. - **Mean pooling for large content**: `embed_note_command` and `embed_insight_command` use `generate_embedding()` which handles content larger than chunk size via mean pooling. - **Model dumping**: Recursive `full_model_dump()` utility converts Pydantic models → dicts for DB/API responses. - **Logging**: Uses `loguru.logger` throughout; logs execution start/end and key metrics (processing time, counts). diff --git a/commands/embedding_commands.py b/commands/embedding_commands.py index 89c03c6..f6ec70d 100644 --- a/commands/embedding_commands.py +++ b/commands/embedding_commands.py @@ -328,7 +328,7 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu 2. DELETE existing source_embedding records for this source 3. Detect content type from file path or content 4. Chunk text using appropriate splitter - 5. Generate embeddings for all chunks in a single API call + 5. Generate embeddings for all chunks in batches 6. Bulk INSERT source_embedding records Retry Strategy: @@ -377,7 +377,7 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu if total_chunks == 0: raise ValueError("No chunks created after splitting text") - # 5. Generate embeddings for all chunks in single API call + # 5. Generate embeddings for all chunks in batches cmd_id = get_command_id(input_data) logger.debug(f"Generating embeddings for {total_chunks} chunks") embeddings = await generate_embeddings(chunks, command_id=cmd_id) diff --git a/commands/source_commands.py b/commands/source_commands.py index 1c6ef19..0edac24 100644 --- a/commands/source_commands.py +++ b/commands/source_commands.py @@ -115,24 +115,25 @@ async def process_source_command( processed_source = result["source"] # 4. Gather processing results (notebook associations handled by source_graph) - embedded_chunks = ( - await processed_source.get_embedded_chunks() if input_data.embed else 0 - ) + # Note: embedding is fire-and-forget (async job), so we can't query the + # count here — it hasn't completed yet. The embed_source_command logs + # the actual count when it finishes. insights_list = await processed_source.get_insights() insights_created = len(insights_list) processing_time = time.time() - start_time + embed_status = "submitted" if input_data.embed else "skipped" logger.info( f"Successfully processed source: {processed_source.id} in {processing_time:.2f}s" ) logger.info( - f"Created {insights_created} insights and {embedded_chunks} embedded chunks" + f"Created {insights_created} insights, embedding {embed_status}" ) return SourceProcessingOutput( success=True, source_id=str(processed_source.id), - embedded_chunks=embedded_chunks, + embedded_chunks=0, insights_created=insights_created, processing_time=processing_time, ) diff --git a/open_notebook/domain/notebook.py b/open_notebook/domain/notebook.py index 656255f..3c4ea22 100644 --- a/open_notebook/domain/notebook.py +++ b/open_notebook/domain/notebook.py @@ -416,7 +416,7 @@ class Source(ObjectModel): pool exhaustion when processing large documents. The embed_source command: 1. Detects content type from file path 2. Chunks text using content-type aware splitter - 3. Generates all embeddings in a single API call + 3. Generates all embeddings in batches 4. Bulk inserts source_embedding records Returns: diff --git a/open_notebook/utils/CLAUDE.md b/open_notebook/utils/CLAUDE.md index f22f648..ae71ff7 100644 --- a/open_notebook/utils/CLAUDE.md +++ b/open_notebook/utils/CLAUDE.md @@ -78,7 +78,7 @@ Note: Changes require restart of the application. ### embedding.py - **mean_pool_embeddings(embeddings)**: Combine multiple embeddings via normalized mean pooling -- **generate_embeddings(texts)**: Batch embedding via single Esperanto API call +- **generate_embeddings(texts)**: Batch embedding with automatic batching (default 50 texts per batch) and per-batch retry - **generate_embedding(text, content_type, file_path)**: Unified embedding with automatic chunking + mean pooling **Key behavior**: diff --git a/open_notebook/utils/embedding.py b/open_notebook/utils/embedding.py index 7d8a93c..7c746f1 100644 --- a/open_notebook/utils/embedding.py +++ b/open_notebook/utils/embedding.py @@ -3,13 +3,14 @@ Unified embedding utilities for Open Notebook. Provides centralized embedding generation with support for: - Single text embedding (with automatic chunking and mean pooling for large texts) -- Batch text embedding (multiple texts in a single API call) +- Batch text embedding (multiple texts with automatic batching) - Mean pooling for combining multiple embeddings into one All embedding operations in the application should use these functions to ensure consistent behavior and proper handling of large content. """ +import asyncio from typing import TYPE_CHECKING, List, Optional import numpy as np @@ -17,6 +18,10 @@ from loguru import logger from .chunking import CHUNK_SIZE, ContentType, chunk_text +EMBEDDING_BATCH_SIZE = 50 +EMBEDDING_MAX_RETRIES = 3 +EMBEDDING_RETRY_DELAY = 2 # seconds + # Lazy import to avoid circular dependency: # utils -> embedding -> models -> key_provider -> provider_config -> utils if TYPE_CHECKING: @@ -83,10 +88,11 @@ async def generate_embeddings( texts: List[str], command_id: Optional[str] = None ) -> List[List[float]]: """ - Generate embeddings for multiple texts in a single API call. + Generate embeddings for multiple texts with automatic batching and retry. - This is more efficient than calling generate_embedding() multiple times - when you have multiple texts to embed (e.g., source chunks). + Texts are split into batches of EMBEDDING_BATCH_SIZE to avoid exceeding + provider payload limits. Each batch is retried up to EMBEDDING_MAX_RETRIES + times on transient failures. Args: texts: List of text strings to embed @@ -121,23 +127,42 @@ async def generate_embeddings( f"total={sum(text_sizes)} chars)" ) - try: - # Single API call for all texts - embeddings = await embedding_model.aembed(texts) - logger.debug(f"Generated {len(embeddings)} embeddings") - return embeddings - except Exception as e: - # Log at debug level - the calling command will log at appropriate level - # based on whether retries are exhausted - cmd_context = f" (command: {command_id})" if command_id else "" - logger.debug( - f"Embedding API error using model '{model_name}' " - f"for {len(texts)} texts (sizes: {min(text_sizes)}-{max(text_sizes)} chars)" - f"{cmd_context}: {e}" - ) - raise RuntimeError( - f"Failed to generate embeddings using model '{model_name}': {e}" - ) from e + all_embeddings: List[List[float]] = [] + total_batches = (len(texts) + EMBEDDING_BATCH_SIZE - 1) // EMBEDDING_BATCH_SIZE + + for batch_idx in range(total_batches): + start = batch_idx * EMBEDDING_BATCH_SIZE + end = start + EMBEDDING_BATCH_SIZE + batch = texts[start:end] + + for attempt in range(1, EMBEDDING_MAX_RETRIES + 1): + try: + batch_embeddings = await embedding_model.aembed(batch) + all_embeddings.extend(batch_embeddings) + break + except Exception as e: + cmd_context = f" (command: {command_id})" if command_id else "" + if attempt < EMBEDDING_MAX_RETRIES: + logger.debug( + f"Embedding batch {batch_idx + 1}/{total_batches} " + f"attempt {attempt}/{EMBEDDING_MAX_RETRIES} failed " + f"using model '{model_name}'{cmd_context}: {e}. Retrying..." + ) + await asyncio.sleep(EMBEDDING_RETRY_DELAY) + else: + logger.debug( + f"Embedding batch {batch_idx + 1}/{total_batches} " + f"failed after {EMBEDDING_MAX_RETRIES} attempts " + f"using model '{model_name}'{cmd_context}: {e}" + ) + raise RuntimeError( + f"Failed to generate embeddings using model '{model_name}' " + f"(batch {batch_idx + 1}/{total_batches}, " + f"{len(batch)} texts): {e}" + ) from e + + logger.debug(f"Generated {len(all_embeddings)} embeddings in {total_batches} batch(es)") + return all_embeddings async def generate_embedding( @@ -154,7 +179,7 @@ async def generate_embedding( For long text (> CHUNK_SIZE): - Chunks the text using appropriate splitter for content type - - Embeds all chunks in a single API call + - Embeds all chunks in batches - Combines embeddings via mean pooling Args: @@ -197,7 +222,7 @@ async def generate_embedding( logger.debug(f"Embedding {len(chunks)} chunks and mean pooling") - # Embed all chunks in single API call + # Embed all chunks in batches embeddings = await generate_embeddings(chunks, command_id=command_id) # Mean pool to get single embedding diff --git a/open_notebook/utils/error_classifier.py b/open_notebook/utils/error_classifier.py index 8d1e319..b7ac4f7 100644 --- a/open_notebook/utils/error_classifier.py +++ b/open_notebook/utils/error_classifier.py @@ -54,6 +54,12 @@ _CLASSIFICATION_RULES: list[tuple[list[str], type[OpenNotebookError], str | None ExternalServiceError, "Content too large for the selected model. Try using a smaller selection or a model with a larger context window.", ), + # Payload too large errors + ( + ["413", "payload too large", "request entity too large"], + ExternalServiceError, + "The request payload is too large for the AI provider. Try reducing the content size or using a different model.", + ), # Provider availability errors ( ["500", "502", "503", "service unavailable", "overloaded", "internal server error"], diff --git a/pyproject.toml b/pyproject.toml index f0ec289..c781f9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "open-notebook" -version = "1.7.3" +version = "1.7.4" description = "An open source implementation of a research assistant, inspired by Google Notebook LM" authors = [ {name = "Luis Novo", email = "lfnovo@gmail.com"} diff --git a/tests/test_embedding.py b/tests/test_embedding.py index 61cb6e0..61da171 100644 --- a/tests/test_embedding.py +++ b/tests/test_embedding.py @@ -229,5 +229,118 @@ class TestGenerateEmbedding: assert len(result) == 3 + @pytest.mark.asyncio + async def test_batching(self): + """Test that large input is split into batches of EMBEDDING_BATCH_SIZE.""" + from unittest.mock import AsyncMock, MagicMock, call, patch + + from open_notebook.utils.embedding import EMBEDDING_BATCH_SIZE + + num_texts = 120 + texts = [f"text_{i}" for i in range(num_texts)] + + mock_model = MagicMock() + mock_model.model_name = "test-model" + + def make_embeddings(batch): + return [[float(i)] * 3 for i in range(len(batch))] + + mock_model.aembed = AsyncMock(side_effect=lambda batch: make_embeddings(batch)) + + with patch( + "open_notebook.ai.models.model_manager.get_embedding_model", + new_callable=AsyncMock, + return_value=mock_model, + ): + result = await generate_embeddings(texts) + + assert len(result) == num_texts + # 120 texts / 50 batch size = 3 batches (50, 50, 20) + assert mock_model.aembed.call_count == 3 + assert len(mock_model.aembed.call_args_list[0][0][0]) == EMBEDDING_BATCH_SIZE + assert len(mock_model.aembed.call_args_list[1][0][0]) == EMBEDDING_BATCH_SIZE + assert len(mock_model.aembed.call_args_list[2][0][0]) == 20 + + @pytest.mark.asyncio + async def test_batch_retry_on_transient_failure(self): + """Test that a transient failure is retried and succeeds.""" + from unittest.mock import AsyncMock, MagicMock, patch + + texts = ["text_a", "text_b"] + mock_model = MagicMock() + mock_model.model_name = "test-model" + + # Fail once, then succeed + mock_model.aembed = AsyncMock( + side_effect=[ + RuntimeError("transient error"), + [[0.1, 0.2], [0.3, 0.4]], + ] + ) + + with ( + patch( + "open_notebook.ai.models.model_manager.get_embedding_model", + new_callable=AsyncMock, + return_value=mock_model, + ), + patch("open_notebook.utils.embedding.EMBEDDING_RETRY_DELAY", 0), + ): + result = await generate_embeddings(texts) + assert result == [[0.1, 0.2], [0.3, 0.4]] + assert mock_model.aembed.call_count == 2 + + @pytest.mark.asyncio + async def test_batch_retry_exhaustion(self): + """Test that RuntimeError is raised after all retries are exhausted.""" + from unittest.mock import AsyncMock, MagicMock, patch + + from open_notebook.utils.embedding import EMBEDDING_MAX_RETRIES + + texts = ["text_a"] + mock_model = MagicMock() + mock_model.model_name = "test-model" + mock_model.aembed = AsyncMock(side_effect=RuntimeError("persistent error")) + + with ( + patch( + "open_notebook.ai.models.model_manager.get_embedding_model", + new_callable=AsyncMock, + return_value=mock_model, + ), + patch("open_notebook.utils.embedding.EMBEDDING_RETRY_DELAY", 0), + ): + with pytest.raises(RuntimeError, match="Failed to generate embeddings"): + await generate_embeddings(texts) + assert mock_model.aembed.call_count == EMBEDDING_MAX_RETRIES + + +# ============================================================================ +# TEST SUITE 4: Error Classification for 413 +# ============================================================================ + + +class TestErrorClassifier413: + """Test that 413 payload-too-large errors are classified correctly.""" + + def test_413_status_code(self): + from open_notebook.exceptions import ExternalServiceError + from open_notebook.utils.error_classifier import classify_error + + exc = Exception("HTTP 413: Payload Too Large") + exc_class, message = classify_error(exc) + assert exc_class is ExternalServiceError + assert "payload is too large" in message + + def test_request_entity_too_large(self): + from open_notebook.exceptions import ExternalServiceError + from open_notebook.utils.error_classifier import classify_error + + exc = Exception("Request Entity Too Large") + exc_class, message = classify_error(exc) + assert exc_class is ExternalServiceError + assert "payload is too large" in message + + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/uv.lock b/uv.lock index 78b844f..8035304 100644 --- a/uv.lock +++ b/uv.lock @@ -2095,7 +2095,7 @@ wheels = [ [[package]] name = "open-notebook" -version = "1.7.3" +version = "1.7.4" source = { editable = "." } dependencies = [ { name = "ai-prompter" },