open-notebook/api/insights_service.py
Luis Novo d7b0fff954
Api podcast migration (#93)
Creates the API layer for Open Notebook
Creates a services API gateway for the Streamlit front-end
Migrates the SurrealDB SDK to the official one
Change all database calls to async
New podcast framework supporting multiple speaker configurations
Implement the surreal-commands library for async processing
Improve docker image and docker-compose configurations
2025-07-17 08:36:11 -03:00

82 lines
No EOL
3 KiB
Python

"""
Insights service layer using API.
"""
from typing import List, Optional
from loguru import logger
from api.client import api_client
from open_notebook.domain.notebook import Note, SourceInsight
class InsightsService:
"""Service layer for insights operations using API."""
def __init__(self):
logger.info("Using API for insights operations")
def get_source_insights(self, source_id: str) -> List[SourceInsight]:
"""Get all insights for a specific source."""
insights_data = api_client.get_source_insights(source_id)
# Convert API response to SourceInsight objects
insights = []
for insight_data in insights_data:
insight = SourceInsight(
insight_type=insight_data["insight_type"],
content=insight_data["content"],
)
insight.id = insight_data["id"]
insight.created = insight_data["created"]
insight.updated = insight_data["updated"]
insights.append(insight)
return insights
def get_insight(self, insight_id: str) -> SourceInsight:
"""Get a specific insight."""
insight_data = api_client.get_insight(insight_id)
insight = SourceInsight(
insight_type=insight_data["insight_type"],
content=insight_data["content"],
)
insight.id = insight_data["id"]
insight.created = insight_data["created"]
insight.updated = insight_data["updated"]
# Store source_id as an attribute for easy access
insight._source_id = insight_data["source_id"]
return insight
def delete_insight(self, insight_id: str) -> bool:
"""Delete a specific insight."""
api_client.delete_insight(insight_id)
return True
def save_insight_as_note(self, insight_id: str, notebook_id: Optional[str] = None) -> Note:
"""Convert an insight to a note."""
note_data = api_client.save_insight_as_note(insight_id, notebook_id)
note = Note(
title=note_data["title"],
content=note_data["content"],
note_type=note_data["note_type"],
)
note.id = note_data["id"]
note.created = note_data["created"]
note.updated = note_data["updated"]
return note
def create_source_insight(self, source_id: str, transformation_id: str, model_id: Optional[str] = None) -> SourceInsight:
"""Create a new insight for a source by running a transformation."""
insight_data = api_client.create_source_insight(source_id, transformation_id, model_id)
insight = SourceInsight(
insight_type=insight_data["insight_type"],
content=insight_data["content"],
)
insight.id = insight_data["id"]
insight.created = insight_data["created"]
insight.updated = insight_data["updated"]
insight._source_id = insight_data["source_id"]
return insight
# Global service instance
insights_service = InsightsService()