open-notebook/commands/example_commands.py
Luis Novo d7b0fff954
Api podcast migration (#93)
Creates the API layer for Open Notebook
Creates a services API gateway for the Streamlit front-end
Migrates the SurrealDB SDK to the official one
Change all database calls to async
New podcast framework supporting multiple speaker configurations
Implement the surreal-commands library for async processing
Improve docker image and docker-compose configurations
2025-07-17 08:36:11 -03:00

149 lines
No EOL
5 KiB
Python

from surreal_commands import command
from pydantic import BaseModel
from typing import Optional, List
from loguru import logger
import asyncio
import time
# Add debugging to see if this module is being imported
logger.info("=== IMPORTING example_commands.py ===")
logger.info("Registering commands...")
class TextProcessingInput(BaseModel):
text: str
operation: str = "uppercase" # uppercase, lowercase, word_count, reverse
delay_seconds: Optional[int] = None # For testing async behavior
class TextProcessingOutput(BaseModel):
success: bool
original_text: str
processed_text: Optional[str] = None
word_count: Optional[int] = None
processing_time: float
error_message: Optional[str] = None
class DataAnalysisInput(BaseModel):
numbers: List[float]
analysis_type: str = "basic" # basic, detailed
delay_seconds: Optional[int] = None
class DataAnalysisOutput(BaseModel):
success: bool
analysis_type: str
count: int
sum: Optional[float] = None
average: Optional[float] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
processing_time: float
error_message: Optional[str] = None
@command("process_text", app="open_notebook")
async def process_text_command(input_data: TextProcessingInput) -> TextProcessingOutput:
"""
Example command for text processing. Tests basic command functionality
and demonstrates different processing types.
"""
start_time = time.time()
try:
logger.info(f"Processing text with operation: {input_data.operation}")
# Simulate processing delay if specified
if input_data.delay_seconds:
await asyncio.sleep(input_data.delay_seconds)
processed_text = None
word_count = None
if input_data.operation == "uppercase":
processed_text = input_data.text.upper()
elif input_data.operation == "lowercase":
processed_text = input_data.text.lower()
elif input_data.operation == "reverse":
processed_text = input_data.text[::-1]
elif input_data.operation == "word_count":
word_count = len(input_data.text.split())
processed_text = f"Word count: {word_count}"
else:
raise ValueError(f"Unknown operation: {input_data.operation}")
processing_time = time.time() - start_time
return TextProcessingOutput(
success=True,
original_text=input_data.text,
processed_text=processed_text,
word_count=word_count,
processing_time=processing_time
)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Text processing failed: {e}")
return TextProcessingOutput(
success=False,
original_text=input_data.text,
processing_time=processing_time,
error_message=str(e)
)
@command("analyze_data", app="open_notebook")
async def analyze_data_command(input_data: DataAnalysisInput) -> DataAnalysisOutput:
"""
Example command for data analysis. Tests command with complex input/output
and demonstrates error handling.
"""
start_time = time.time()
try:
logger.info(f"Analyzing {len(input_data.numbers)} numbers with {input_data.analysis_type} analysis")
# Simulate processing delay if specified
if input_data.delay_seconds:
await asyncio.sleep(input_data.delay_seconds)
if not input_data.numbers:
raise ValueError("No numbers provided for analysis")
count = len(input_data.numbers)
sum_value = sum(input_data.numbers)
average = sum_value / count
min_value = min(input_data.numbers)
max_value = max(input_data.numbers)
processing_time = time.time() - start_time
return DataAnalysisOutput(
success=True,
analysis_type=input_data.analysis_type,
count=count,
sum=sum_value,
average=average,
min_value=min_value,
max_value=max_value,
processing_time=processing_time
)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Data analysis failed: {e}")
return DataAnalysisOutput(
success=False,
analysis_type=input_data.analysis_type,
count=0,
processing_time=processing_time,
error_message=str(e)
)
# Add debugging to confirm commands are registered
logger.info("✅ Commands registered: process_text and analyze_data")
logger.info("=== FINISHED IMPORTING example_commands.py ===")
# Let's also verify what the registry contains
try:
from surreal_commands import registry
commands = registry.list_commands()
logger.info(f"Registry after import: {commands}")
except Exception as e:
logger.error(f"Error checking registry: {e}")