fix: embedding batch sizing and 413 error classification (1.7.4)
- Add batching to generate_embeddings() (50 texts per batch with per-batch retry) to prevent 413 Payload Too Large errors on large documents - Add 413 error classification rule for user-friendly error messages - Fix misleading "Created 0 embedded chunks" log in process_source_command by removing premature get_embedded_chunks() call (embedding is fire-and-forget) Closes #594
This commit is contained in:
parent
924cd88494
commit
5d84ab0768
12 changed files with 190 additions and 37 deletions
|
|
@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
## [1.7.4] - 2026-02-18
|
||||
|
||||
### Fixed
|
||||
- Embedding large documents (3MB+) fails with 413 Payload Too Large (#594)
|
||||
- `generate_embeddings()` now batches texts in groups of 50 with per-batch retry, preventing provider payload limits from being exceeded
|
||||
- 413 errors now classified with user-friendly message in error classifier
|
||||
- Misleading "Created 0 embedded chunks" log in `process_source_command` — embedding is fire-and-forget, so the count was always 0; now logs "embedding submitted" instead
|
||||
|
||||
## [1.7.3] - 2026-02-17
|
||||
|
||||
### Added
|
||||
|
|
|
|||
|
|
@ -218,4 +218,4 @@ See dedicated CLAUDE.md files for detailed guidance:
|
|||
|
||||
---
|
||||
|
||||
**Last Updated**: February 2026 | **Project Version**: 1.7.2
|
||||
**Last Updated**: February 2026 | **Project Version**: 1.7.4
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
- **`embed_note_command`**: Embeds a single note using unified embedding pipeline with content-type aware processing. Uses MARKDOWN content type detection. Retry: 5 attempts, exponential jitter 1-60s.
|
||||
- **`embed_insight_command`**: Embeds a single source insight. Uses MARKDOWN content type. Retry: 5 attempts, exponential jitter 1-60s.
|
||||
- **`embed_source_command`**: Embeds a source by chunking full_text with content-type aware splitters (HTML, Markdown, plain), then batch embedding all chunks. Uses single Esperanto API call. Retry: 5 attempts, exponential jitter 1-60s.
|
||||
- **`embed_source_command`**: Embeds a source by chunking full_text with content-type aware splitters (HTML, Markdown, plain), then batch embedding all chunks (batches of 50 with per-batch retry). Retry: 5 attempts, exponential jitter 1-60s.
|
||||
- **`create_insight_command`**: Creates a source insight with automatic retry on transaction conflicts. Creates the DB record, then submits `embed_insight` command (fire-and-forget). Retry: 5 attempts, exponential jitter 1-60s. Used by `Source.add_insight()`.
|
||||
- **`rebuild_embeddings_command`**: Submits individual embed_* commands for all sources/notes/insights. Returns immediately; actual embedding happens async. No retry (coordinator only).
|
||||
|
||||
|
|
@ -27,7 +27,7 @@
|
|||
- **Retry configuration**: Uses `stop_on: [ValueError]` (blocklist approach) - retries all exceptions EXCEPT ValueError. This is more resilient than allowlist as new exception types auto-retry.
|
||||
- **Fire-and-forget embedding**: Domain models submit embed_* commands via `submit_command()` without waiting. Commands process asynchronously.
|
||||
- **Content-type aware chunking**: `embed_source_command` uses `chunk_text()` with automatic content type detection (HTML, Markdown, plain text) for optimal text splitting. Default: 1500 char chunks with 225 char overlap.
|
||||
- **Batch embedding**: `embed_source_command` uses `generate_embeddings()` for single API call efficiency instead of per-chunk calls.
|
||||
- **Batch embedding**: `embed_source_command` uses `generate_embeddings()` which automatically batches texts (default 50) with per-batch retry to avoid exceeding provider payload limits.
|
||||
- **Mean pooling for large content**: `embed_note_command` and `embed_insight_command` use `generate_embedding()` which handles content larger than chunk size via mean pooling.
|
||||
- **Model dumping**: Recursive `full_model_dump()` utility converts Pydantic models → dicts for DB/API responses.
|
||||
- **Logging**: Uses `loguru.logger` throughout; logs execution start/end and key metrics (processing time, counts).
|
||||
|
|
|
|||
|
|
@ -328,7 +328,7 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu
|
|||
2. DELETE existing source_embedding records for this source
|
||||
3. Detect content type from file path or content
|
||||
4. Chunk text using appropriate splitter
|
||||
5. Generate embeddings for all chunks in a single API call
|
||||
5. Generate embeddings for all chunks in batches
|
||||
6. Bulk INSERT source_embedding records
|
||||
|
||||
Retry Strategy:
|
||||
|
|
@ -377,7 +377,7 @@ async def embed_source_command(input_data: EmbedSourceInput) -> EmbedSourceOutpu
|
|||
if total_chunks == 0:
|
||||
raise ValueError("No chunks created after splitting text")
|
||||
|
||||
# 5. Generate embeddings for all chunks in single API call
|
||||
# 5. Generate embeddings for all chunks in batches
|
||||
cmd_id = get_command_id(input_data)
|
||||
logger.debug(f"Generating embeddings for {total_chunks} chunks")
|
||||
embeddings = await generate_embeddings(chunks, command_id=cmd_id)
|
||||
|
|
|
|||
|
|
@ -115,24 +115,25 @@ async def process_source_command(
|
|||
processed_source = result["source"]
|
||||
|
||||
# 4. Gather processing results (notebook associations handled by source_graph)
|
||||
embedded_chunks = (
|
||||
await processed_source.get_embedded_chunks() if input_data.embed else 0
|
||||
)
|
||||
# Note: embedding is fire-and-forget (async job), so we can't query the
|
||||
# count here — it hasn't completed yet. The embed_source_command logs
|
||||
# the actual count when it finishes.
|
||||
insights_list = await processed_source.get_insights()
|
||||
insights_created = len(insights_list)
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
embed_status = "submitted" if input_data.embed else "skipped"
|
||||
logger.info(
|
||||
f"Successfully processed source: {processed_source.id} in {processing_time:.2f}s"
|
||||
)
|
||||
logger.info(
|
||||
f"Created {insights_created} insights and {embedded_chunks} embedded chunks"
|
||||
f"Created {insights_created} insights, embedding {embed_status}"
|
||||
)
|
||||
|
||||
return SourceProcessingOutput(
|
||||
success=True,
|
||||
source_id=str(processed_source.id),
|
||||
embedded_chunks=embedded_chunks,
|
||||
embedded_chunks=0,
|
||||
insights_created=insights_created,
|
||||
processing_time=processing_time,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -416,7 +416,7 @@ class Source(ObjectModel):
|
|||
pool exhaustion when processing large documents. The embed_source command:
|
||||
1. Detects content type from file path
|
||||
2. Chunks text using content-type aware splitter
|
||||
3. Generates all embeddings in a single API call
|
||||
3. Generates all embeddings in batches
|
||||
4. Bulk inserts source_embedding records
|
||||
|
||||
Returns:
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ Note: Changes require restart of the application.
|
|||
|
||||
### embedding.py
|
||||
- **mean_pool_embeddings(embeddings)**: Combine multiple embeddings via normalized mean pooling
|
||||
- **generate_embeddings(texts)**: Batch embedding via single Esperanto API call
|
||||
- **generate_embeddings(texts)**: Batch embedding with automatic batching (default 50 texts per batch) and per-batch retry
|
||||
- **generate_embedding(text, content_type, file_path)**: Unified embedding with automatic chunking + mean pooling
|
||||
|
||||
**Key behavior**:
|
||||
|
|
|
|||
|
|
@ -3,13 +3,14 @@ Unified embedding utilities for Open Notebook.
|
|||
|
||||
Provides centralized embedding generation with support for:
|
||||
- Single text embedding (with automatic chunking and mean pooling for large texts)
|
||||
- Batch text embedding (multiple texts in a single API call)
|
||||
- Batch text embedding (multiple texts with automatic batching)
|
||||
- Mean pooling for combining multiple embeddings into one
|
||||
|
||||
All embedding operations in the application should use these functions
|
||||
to ensure consistent behavior and proper handling of large content.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
|
@ -17,6 +18,10 @@ from loguru import logger
|
|||
|
||||
from .chunking import CHUNK_SIZE, ContentType, chunk_text
|
||||
|
||||
EMBEDDING_BATCH_SIZE = 50
|
||||
EMBEDDING_MAX_RETRIES = 3
|
||||
EMBEDDING_RETRY_DELAY = 2 # seconds
|
||||
|
||||
# Lazy import to avoid circular dependency:
|
||||
# utils -> embedding -> models -> key_provider -> provider_config -> utils
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -83,10 +88,11 @@ async def generate_embeddings(
|
|||
texts: List[str], command_id: Optional[str] = None
|
||||
) -> List[List[float]]:
|
||||
"""
|
||||
Generate embeddings for multiple texts in a single API call.
|
||||
Generate embeddings for multiple texts with automatic batching and retry.
|
||||
|
||||
This is more efficient than calling generate_embedding() multiple times
|
||||
when you have multiple texts to embed (e.g., source chunks).
|
||||
Texts are split into batches of EMBEDDING_BATCH_SIZE to avoid exceeding
|
||||
provider payload limits. Each batch is retried up to EMBEDDING_MAX_RETRIES
|
||||
times on transient failures.
|
||||
|
||||
Args:
|
||||
texts: List of text strings to embed
|
||||
|
|
@ -121,23 +127,42 @@ async def generate_embeddings(
|
|||
f"total={sum(text_sizes)} chars)"
|
||||
)
|
||||
|
||||
try:
|
||||
# Single API call for all texts
|
||||
embeddings = await embedding_model.aembed(texts)
|
||||
logger.debug(f"Generated {len(embeddings)} embeddings")
|
||||
return embeddings
|
||||
except Exception as e:
|
||||
# Log at debug level - the calling command will log at appropriate level
|
||||
# based on whether retries are exhausted
|
||||
cmd_context = f" (command: {command_id})" if command_id else ""
|
||||
logger.debug(
|
||||
f"Embedding API error using model '{model_name}' "
|
||||
f"for {len(texts)} texts (sizes: {min(text_sizes)}-{max(text_sizes)} chars)"
|
||||
f"{cmd_context}: {e}"
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Failed to generate embeddings using model '{model_name}': {e}"
|
||||
) from e
|
||||
all_embeddings: List[List[float]] = []
|
||||
total_batches = (len(texts) + EMBEDDING_BATCH_SIZE - 1) // EMBEDDING_BATCH_SIZE
|
||||
|
||||
for batch_idx in range(total_batches):
|
||||
start = batch_idx * EMBEDDING_BATCH_SIZE
|
||||
end = start + EMBEDDING_BATCH_SIZE
|
||||
batch = texts[start:end]
|
||||
|
||||
for attempt in range(1, EMBEDDING_MAX_RETRIES + 1):
|
||||
try:
|
||||
batch_embeddings = await embedding_model.aembed(batch)
|
||||
all_embeddings.extend(batch_embeddings)
|
||||
break
|
||||
except Exception as e:
|
||||
cmd_context = f" (command: {command_id})" if command_id else ""
|
||||
if attempt < EMBEDDING_MAX_RETRIES:
|
||||
logger.debug(
|
||||
f"Embedding batch {batch_idx + 1}/{total_batches} "
|
||||
f"attempt {attempt}/{EMBEDDING_MAX_RETRIES} failed "
|
||||
f"using model '{model_name}'{cmd_context}: {e}. Retrying..."
|
||||
)
|
||||
await asyncio.sleep(EMBEDDING_RETRY_DELAY)
|
||||
else:
|
||||
logger.debug(
|
||||
f"Embedding batch {batch_idx + 1}/{total_batches} "
|
||||
f"failed after {EMBEDDING_MAX_RETRIES} attempts "
|
||||
f"using model '{model_name}'{cmd_context}: {e}"
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Failed to generate embeddings using model '{model_name}' "
|
||||
f"(batch {batch_idx + 1}/{total_batches}, "
|
||||
f"{len(batch)} texts): {e}"
|
||||
) from e
|
||||
|
||||
logger.debug(f"Generated {len(all_embeddings)} embeddings in {total_batches} batch(es)")
|
||||
return all_embeddings
|
||||
|
||||
|
||||
async def generate_embedding(
|
||||
|
|
@ -154,7 +179,7 @@ async def generate_embedding(
|
|||
|
||||
For long text (> CHUNK_SIZE):
|
||||
- Chunks the text using appropriate splitter for content type
|
||||
- Embeds all chunks in a single API call
|
||||
- Embeds all chunks in batches
|
||||
- Combines embeddings via mean pooling
|
||||
|
||||
Args:
|
||||
|
|
@ -197,7 +222,7 @@ async def generate_embedding(
|
|||
|
||||
logger.debug(f"Embedding {len(chunks)} chunks and mean pooling")
|
||||
|
||||
# Embed all chunks in single API call
|
||||
# Embed all chunks in batches
|
||||
embeddings = await generate_embeddings(chunks, command_id=command_id)
|
||||
|
||||
# Mean pool to get single embedding
|
||||
|
|
|
|||
|
|
@ -54,6 +54,12 @@ _CLASSIFICATION_RULES: list[tuple[list[str], type[OpenNotebookError], str | None
|
|||
ExternalServiceError,
|
||||
"Content too large for the selected model. Try using a smaller selection or a model with a larger context window.",
|
||||
),
|
||||
# Payload too large errors
|
||||
(
|
||||
["413", "payload too large", "request entity too large"],
|
||||
ExternalServiceError,
|
||||
"The request payload is too large for the AI provider. Try reducing the content size or using a different model.",
|
||||
),
|
||||
# Provider availability errors
|
||||
(
|
||||
["500", "502", "503", "service unavailable", "overloaded", "internal server error"],
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[project]
|
||||
name = "open-notebook"
|
||||
version = "1.7.3"
|
||||
version = "1.7.4"
|
||||
description = "An open source implementation of a research assistant, inspired by Google Notebook LM"
|
||||
authors = [
|
||||
{name = "Luis Novo", email = "lfnovo@gmail.com"}
|
||||
|
|
|
|||
|
|
@ -229,5 +229,118 @@ class TestGenerateEmbedding:
|
|||
assert len(result) == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_batching(self):
|
||||
"""Test that large input is split into batches of EMBEDDING_BATCH_SIZE."""
|
||||
from unittest.mock import AsyncMock, MagicMock, call, patch
|
||||
|
||||
from open_notebook.utils.embedding import EMBEDDING_BATCH_SIZE
|
||||
|
||||
num_texts = 120
|
||||
texts = [f"text_{i}" for i in range(num_texts)]
|
||||
|
||||
mock_model = MagicMock()
|
||||
mock_model.model_name = "test-model"
|
||||
|
||||
def make_embeddings(batch):
|
||||
return [[float(i)] * 3 for i in range(len(batch))]
|
||||
|
||||
mock_model.aembed = AsyncMock(side_effect=lambda batch: make_embeddings(batch))
|
||||
|
||||
with patch(
|
||||
"open_notebook.ai.models.model_manager.get_embedding_model",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_model,
|
||||
):
|
||||
result = await generate_embeddings(texts)
|
||||
|
||||
assert len(result) == num_texts
|
||||
# 120 texts / 50 batch size = 3 batches (50, 50, 20)
|
||||
assert mock_model.aembed.call_count == 3
|
||||
assert len(mock_model.aembed.call_args_list[0][0][0]) == EMBEDDING_BATCH_SIZE
|
||||
assert len(mock_model.aembed.call_args_list[1][0][0]) == EMBEDDING_BATCH_SIZE
|
||||
assert len(mock_model.aembed.call_args_list[2][0][0]) == 20
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_batch_retry_on_transient_failure(self):
|
||||
"""Test that a transient failure is retried and succeeds."""
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
texts = ["text_a", "text_b"]
|
||||
mock_model = MagicMock()
|
||||
mock_model.model_name = "test-model"
|
||||
|
||||
# Fail once, then succeed
|
||||
mock_model.aembed = AsyncMock(
|
||||
side_effect=[
|
||||
RuntimeError("transient error"),
|
||||
[[0.1, 0.2], [0.3, 0.4]],
|
||||
]
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"open_notebook.ai.models.model_manager.get_embedding_model",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_model,
|
||||
),
|
||||
patch("open_notebook.utils.embedding.EMBEDDING_RETRY_DELAY", 0),
|
||||
):
|
||||
result = await generate_embeddings(texts)
|
||||
assert result == [[0.1, 0.2], [0.3, 0.4]]
|
||||
assert mock_model.aembed.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_batch_retry_exhaustion(self):
|
||||
"""Test that RuntimeError is raised after all retries are exhausted."""
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from open_notebook.utils.embedding import EMBEDDING_MAX_RETRIES
|
||||
|
||||
texts = ["text_a"]
|
||||
mock_model = MagicMock()
|
||||
mock_model.model_name = "test-model"
|
||||
mock_model.aembed = AsyncMock(side_effect=RuntimeError("persistent error"))
|
||||
|
||||
with (
|
||||
patch(
|
||||
"open_notebook.ai.models.model_manager.get_embedding_model",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_model,
|
||||
),
|
||||
patch("open_notebook.utils.embedding.EMBEDDING_RETRY_DELAY", 0),
|
||||
):
|
||||
with pytest.raises(RuntimeError, match="Failed to generate embeddings"):
|
||||
await generate_embeddings(texts)
|
||||
assert mock_model.aembed.call_count == EMBEDDING_MAX_RETRIES
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TEST SUITE 4: Error Classification for 413
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class TestErrorClassifier413:
|
||||
"""Test that 413 payload-too-large errors are classified correctly."""
|
||||
|
||||
def test_413_status_code(self):
|
||||
from open_notebook.exceptions import ExternalServiceError
|
||||
from open_notebook.utils.error_classifier import classify_error
|
||||
|
||||
exc = Exception("HTTP 413: Payload Too Large")
|
||||
exc_class, message = classify_error(exc)
|
||||
assert exc_class is ExternalServiceError
|
||||
assert "payload is too large" in message
|
||||
|
||||
def test_request_entity_too_large(self):
|
||||
from open_notebook.exceptions import ExternalServiceError
|
||||
from open_notebook.utils.error_classifier import classify_error
|
||||
|
||||
exc = Exception("Request Entity Too Large")
|
||||
exc_class, message = classify_error(exc)
|
||||
assert exc_class is ExternalServiceError
|
||||
assert "payload is too large" in message
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
|
|
|
|||
2
uv.lock
2
uv.lock
|
|
@ -2095,7 +2095,7 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "open-notebook"
|
||||
version = "1.7.3"
|
||||
version = "1.7.4"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "ai-prompter" },
|
||||
|
|
|
|||
Loading…
Reference in a new issue