diff --git a/open_notebook/graphs/content_processing/__init__.py b/open_notebook/graphs/content_processing/__init__.py index 270bb4f..5e22311 100644 --- a/open_notebook/graphs/content_processing/__init__.py +++ b/open_notebook/graphs/content_processing/__init__.py @@ -1,4 +1,5 @@ import os +from typing import Any, Dict import magic from langgraph.graph import END, START, StateGraph @@ -21,7 +22,7 @@ from open_notebook.graphs.content_processing.video import extract_best_audio_fro from open_notebook.graphs.content_processing.youtube import extract_youtube_transcript -def source_identification(state: ContentState): +async def source_identification(state: ContentState) -> Dict[str, str]: """ Identify the content source based on parameters """ @@ -37,7 +38,7 @@ def source_identification(state: ContentState): return {"source_type": doc_type} -def file_type(state: ContentState): +async def file_type(state: ContentState) -> Dict[str, Any]: """ Identify the file using python-magic """ @@ -49,7 +50,7 @@ def file_type(state: ContentState): return return_dict -def file_type_edge(data: ContentState): +async def file_type_edge(data: ContentState) -> str: assert data.get("identified_type"), "Type not identified" identified_type = data["identified_type"] @@ -69,7 +70,7 @@ def file_type_edge(data: ContentState): ) -def delete_file(data: ContentState): +async def delete_file(data: ContentState) -> Dict[str, Any]: if data.get("delete_source"): logger.debug(f"Deleting file: {data.get('file_path')}") file_path = data.get("file_path") @@ -81,9 +82,21 @@ def delete_file(data: ContentState): logger.warning(f"File not found while trying to delete: {file_path}") else: logger.debug("Not deleting file") + return {} +async def url_type_router(x: ContentState) -> str: + return x.get("identified_type", "") + + +async def source_type_router(x: ContentState) -> str: + return x.get("source_type", "") + + +# Create workflow workflow = StateGraph(ContentState) + +# Add nodes workflow.add_node("source", source_identification) workflow.add_node("url_provider", url_provider) workflow.add_node("file_type", file_type) @@ -95,10 +108,12 @@ workflow.add_node("extract_best_audio_from_video", extract_best_audio_from_video workflow.add_node("extract_audio", extract_audio) workflow.add_node("extract_youtube_transcript", extract_youtube_transcript) workflow.add_node("delete_file", delete_file) + +# Add edges workflow.add_edge(START, "source") workflow.add_conditional_edges( "source", - lambda x: x.get("source_type"), + source_type_router, { "url": "url_provider", "file": "file_type", @@ -111,7 +126,7 @@ workflow.add_conditional_edges( ) workflow.add_conditional_edges( "url_provider", - lambda x: x.get("identified_type"), + url_type_router, {"article": "extract_url", "youtube": "extract_youtube_transcript"}, ) workflow.add_edge("url_provider", END) @@ -125,4 +140,6 @@ workflow.add_edge("extract_office_content", "delete_file") workflow.add_edge("extract_best_audio_from_video", "extract_audio") workflow.add_edge("extract_audio", "delete_file") workflow.add_edge("delete_file", END) + +# Compile graph graph = workflow.compile() diff --git a/open_notebook/graphs/content_processing/audio.py b/open_notebook/graphs/content_processing/audio.py index b3d7617..6201788 100644 --- a/open_notebook/graphs/content_processing/audio.py +++ b/open_notebook/graphs/content_processing/audio.py @@ -1,4 +1,6 @@ +import asyncio import os +from functools import partial from math import ceil from loguru import logger @@ -11,90 +13,102 @@ from open_notebook.graphs.content_processing.state import ContentState # future: parallelize the transcription process -def split_audio(input_file, segment_length_minutes=15, output_prefix=None): +async def split_audio(input_file, segment_length_minutes=15, output_prefix=None): """ - Split an audio file into segments of specified length. - - Args: - input_file (str): Path to the input audio file - segment_length_minutes (int): Length of each segment in minutes - output_dir (str): Directory to save the segments (defaults to input file's directory) - output_prefix (str): Prefix for output files (defaults to input filename) - - Returns: - list: List of paths to the created segment files + Split an audio file into segments asynchronously. """ - # Convert input file to absolute path - input_file = os.path.abspath(input_file) - output_dir = os.path.dirname(input_file) - os.makedirs(output_dir, exist_ok=True) + def _split(input_file, segment_length_minutes, output_prefix): + # Convert input file to absolute path + input_file_abs = os.path.abspath(input_file) + output_dir = os.path.dirname(input_file_abs) + os.makedirs(output_dir, exist_ok=True) - # Set up output prefix - if output_prefix is None: - output_prefix = os.path.splitext(os.path.basename(input_file))[0] + # Set up output prefix + if output_prefix is None: + output_prefix = os.path.splitext(os.path.basename(input_file_abs))[0] - # Load the audio file - audio = AudioSegment.from_file(input_file) + # Load the audio file + audio = AudioSegment.from_file(input_file_abs) - # Calculate segment length in milliseconds - segment_length_ms = segment_length_minutes * 60 * 1000 + # Calculate segment length in milliseconds + segment_length_ms = segment_length_minutes * 60 * 1000 - # Calculate number of segments - total_segments = ceil(len(audio) / segment_length_ms) - logger.debug(f"Splitting file: {input_file} into {total_segments} segments") + # Calculate number of segments + total_segments = ceil(len(audio) / segment_length_ms) + logger.debug(f"Splitting file: {input_file_abs} into {total_segments} segments") - # List to store output file paths - output_files = [] + output_files = [] - # Split the audio into segments - for i in range(total_segments): - # Calculate start and end times for this segment - start_time = i * segment_length_ms - end_time = min((i + 1) * segment_length_ms, len(audio)) + # Split the audio into segments + for i in range(total_segments): + start_time = i * segment_length_ms + end_time = min((i + 1) * segment_length_ms, len(audio)) - # Extract segment - segment = audio[start_time:end_time] + # Extract segment + segment = audio[start_time:end_time] - # Generate output filename - # Format: prefix_001.mp3 (padding with zeros ensures correct ordering) - output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3" - output_path = os.path.join(output_dir, output_filename) + # Generate output filename + output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3" + output_path = os.path.join(output_dir, output_filename) - # Export segment - segment.export(output_path, format="mp3") + # Export segment + segment.export(output_path, format="mp3") + output_files.append(output_path) - output_files.append(output_path) + logger.debug(f"Exported segment {i+1}/{total_segments}: {output_filename}") - # Optional progress indication - logger.debug(f"Exported segment {i+1}/{total_segments}: {output_filename}") + return output_files - return output_files + # Run CPU-bound audio processing in thread pool + return await asyncio.get_event_loop().run_in_executor( + None, partial(_split, input_file, segment_length_minutes, output_prefix) + ) -def extract_audio(data: ContentState): +async def transcribe_audio_segment(audio_file, model): + """Transcribe a single audio segment asynchronously""" + + def _transcribe(audio_file, model): + return model.transcribe(audio_file) + + return await asyncio.get_event_loop().run_in_executor( + None, partial(_transcribe, audio_file, model) + ) + + +async def extract_audio(data: ContentState): SPEECH_TO_TEXT_MODEL = model_manager.speech_to_text - input_audio_path = data.get("file_path") audio_files = [] try: - audio_files = split_audio(input_audio_path) - transcriptions = [] + # Split audio into segments + audio_files = await split_audio(input_audio_path) - for audio_file in audio_files: - transcriptions.append(SPEECH_TO_TEXT_MODEL.transcribe(audio_file)) + # Transcribe all segments concurrently + transcribe_tasks = [ + transcribe_audio_segment(audio_file, SPEECH_TO_TEXT_MODEL) + for audio_file in audio_files + ] + transcriptions = await asyncio.gather(*transcribe_tasks) return {"content": " ".join(transcriptions)} except Exception as e: logger.error(f"Error transcribing audio: {str(e)}") logger.exception(e) - raise # Re-raise the exception after logging + raise finally: - for file in audio_files: - try: - os.remove(file) - except OSError as e: - logger.error(f"Error removing temporary file {file}: {str(e)}") + # Clean up temporary files + def _cleanup(files): + for file in files: + try: + os.remove(file) + except OSError as e: + logger.error(f"Error removing temporary file {file}: {str(e)}") + + await asyncio.get_event_loop().run_in_executor( + None, partial(_cleanup, audio_files) + ) diff --git a/open_notebook/graphs/content_processing/office.py b/open_notebook/graphs/content_processing/office.py index f7403a0..98f8ea5 100644 --- a/open_notebook/graphs/content_processing/office.py +++ b/open_notebook/graphs/content_processing/office.py @@ -1,3 +1,6 @@ +import asyncio +from functools import partial + from docx import Document from loguru import logger from openpyxl import load_workbook @@ -12,252 +15,284 @@ SUPPORTED_OFFICE_TYPES = [ ] -def extract_docx_content_detailed(file_path): - try: - doc = Document(file_path) - content = [] +async def extract_docx_content_detailed(file_path): + """Extract content from DOCX file""" - for paragraph in doc.paragraphs: - if not paragraph.text.strip(): - continue + def _extract(): + try: + doc = Document(file_path) + content = [] - style = paragraph.style.name if paragraph.style else "Normal" - text = paragraph.text.strip() + for paragraph in doc.paragraphs: + if not paragraph.text.strip(): + continue - # Get paragraph formatting - p_format = paragraph.paragraph_format - indent = p_format.left_indent or 0 + style = paragraph.style.name if paragraph.style else "Normal" + text = paragraph.text.strip() - # Convert indent to spaces (1 level = 4 spaces) - indent_level = 0 - if hasattr(indent, "pt"): - indent_level = int(indent.pt / 72) # 72 points = 1 inch - indent_spaces = " " * (indent_level * 4) + # Get paragraph formatting + p_format = paragraph.paragraph_format + indent = p_format.left_indent or 0 - # Handle different types of formatting - if "Heading" in style: - level = style[-1] if style[-1].isdigit() else "1" - heading_marks = "#" * int(level) - content.append(f"\n{heading_marks} {text}\n") + # Convert indent to spaces (1 level = 4 spaces) + indent_level = 0 + if hasattr(indent, "pt"): + indent_level = int(indent.pt / 72) # 72 points = 1 inch + indent_spaces = " " * (indent_level * 4) - # Handle bullet points - elif ( - paragraph.style - and hasattr(paragraph.style, "name") - and paragraph.style.name.startswith("List") - ): - # Numbered list - if ( - hasattr(paragraph._p, "pPr") - and paragraph._p.pPr is not None - and hasattr(paragraph._p.pPr, "numPr") - and paragraph._p.pPr.numPr is not None + # Handle different types of formatting + if "Heading" in style: + level = style[-1] if style[-1].isdigit() else "1" + heading_marks = "#" * int(level) + content.append(f"\n{heading_marks} {text}\n") + + # Handle bullet points + elif ( + paragraph.style + and hasattr(paragraph.style, "name") + and paragraph.style.name.startswith("List") ): - # Try to get the actual number - try: - if ( - hasattr(paragraph._p.pPr.numPr, "numId") - and paragraph._p.pPr.numPr.numId is not None - and hasattr(paragraph._p.pPr.numPr.numId, "val") - ): - number = paragraph._p.pPr.numPr.numId.val - content.append(f"{indent_spaces}{number}. {text}") - else: + # Numbered list + if ( + hasattr(paragraph._p, "pPr") + and paragraph._p.pPr is not None + and hasattr(paragraph._p.pPr, "numPr") + and paragraph._p.pPr.numPr is not None + ): + # Try to get the actual number + try: + if ( + hasattr(paragraph._p.pPr.numPr, "numId") + and paragraph._p.pPr.numPr.numId is not None + and hasattr(paragraph._p.pPr.numPr.numId, "val") + ): + number = paragraph._p.pPr.numPr.numId.val + content.append(f"{indent_spaces}{number}. {text}") + else: + content.append(f"{indent_spaces}1. {text}") + except Exception: content.append(f"{indent_spaces}1. {text}") - except Exception: - content.append(f"{indent_spaces}1. {text}") - # Bullet list - else: - content.append(f"{indent_spaces}* {text}") - - else: - # Handle text formatting - formatted_text = [] - for run in paragraph.runs: - if run.bold: - formatted_text.append(f"**{run.text}**") - elif run.italic: - formatted_text.append(f"*{run.text}*") + # Bullet list else: - formatted_text.append(run.text) + content.append(f"{indent_spaces}* {text}") - content.append(f"{indent_spaces}{''.join(formatted_text)}") + else: + # Handle text formatting + formatted_text = [] + for run in paragraph.runs: + if run.bold: + formatted_text.append(f"**{run.text}**") + elif run.italic: + formatted_text.append(f"*{run.text}*") + else: + formatted_text.append(run.text) - return "\n\n".join(content) + content.append(f"{indent_spaces}{''.join(formatted_text)}") - except Exception as e: - logger.error(f"Failed to extract DOCX content: {e}") - return None + return "\n\n".join(content) + + except Exception as e: + logger.error(f"Failed to extract DOCX content: {e}") + return None + + return await asyncio.get_event_loop().run_in_executor(None, _extract) -# Example of usage with metadata -def get_docx_info(file_path): - try: - doc = Document(file_path) +async def get_docx_info(file_path): + """Get DOCX metadata and content""" - # Extract core properties if available - core_props = { - "author": doc.core_properties.author, - "created": doc.core_properties.created, - "modified": doc.core_properties.modified, - "title": doc.core_properties.title, - "subject": doc.core_properties.subject, - "keywords": doc.core_properties.keywords, - "category": doc.core_properties.category, - "comments": doc.core_properties.comments, - } + def _get_info(): + try: + doc = Document(file_path) - # Get document content - content = extract_docx_content_detailed(file_path) + # Extract core properties if available + core_props = { + "author": doc.core_properties.author, + "created": doc.core_properties.created, + "modified": doc.core_properties.modified, + "title": doc.core_properties.title, + "subject": doc.core_properties.subject, + "keywords": doc.core_properties.keywords, + "category": doc.core_properties.category, + "comments": doc.core_properties.comments, + } - # Get document statistics - stats = { - "paragraph_count": len(doc.paragraphs), - "word_count": sum( - len(p.text.split()) for p in doc.paragraphs if p.text.strip() - ), - "character_count": sum( - len(p.text) for p in doc.paragraphs if p.text.strip() - ), - } + # Get document content + content = extract_docx_content_detailed(file_path) - return {"metadata": core_props, "content": content, "statistics": stats} + # Get document statistics + stats = { + "paragraph_count": len(doc.paragraphs), + "word_count": sum( + len(p.text.split()) for p in doc.paragraphs if p.text.strip() + ), + "character_count": sum( + len(p.text) for p in doc.paragraphs if p.text.strip() + ), + } - except Exception as e: - logger.error(f"Failed to get DOCX info: {e}") - return None + return {"metadata": core_props, "content": content, "statistics": stats} + + except Exception as e: + logger.error(f"Failed to get DOCX info: {e}") + return None + + return await asyncio.get_event_loop().run_in_executor(None, _get_info) -def extract_pptx_content(file_path): - try: - prs = Presentation(file_path) - content = [] +async def extract_pptx_content(file_path): + """Extract content from PPTX file""" - for slide_number, slide in enumerate(prs.slides, 1): - content.append(f"\n# Slide {slide_number}\n") + def _extract(): + try: + prs = Presentation(file_path) + content = [] - # Extract title - if slide.shapes.title: - content.append(f"## {slide.shapes.title.text}\n") + for slide_number, slide in enumerate(prs.slides, 1): + content.append(f"\n# Slide {slide_number}\n") - # Extract text from all shapes - for shape in slide.shapes: - if hasattr(shape, "text") and shape.text.strip(): - if shape != slide.shapes.title: # Skip title as it's already added - content.append(shape.text.strip()) + # Extract title + if slide.shapes.title: + content.append(f"## {slide.shapes.title.text}\n") - return "\n\n".join(content) + # Extract text from all shapes + for shape in slide.shapes: + if hasattr(shape, "text") and shape.text.strip(): + if ( + shape != slide.shapes.title + ): # Skip title as it's already added + content.append(shape.text.strip()) - except Exception as e: - logger.error(f"Failed to extract PPTX content: {e}") - return None + return "\n\n".join(content) + + except Exception as e: + logger.error(f"Failed to extract PPTX content: {e}") + return None + + return await asyncio.get_event_loop().run_in_executor(None, _extract) -def extract_xlsx_content(file_path, max_rows=1000, max_cols=100): - try: - wb = load_workbook(file_path, data_only=True) - content = [] +async def extract_xlsx_content(file_path, max_rows=10000, max_cols=100): + """Extract content from XLSX file""" - for sheet in wb.sheetnames: - ws = wb[sheet] - content.append(f"\n# Sheet: {sheet}\n") + def _extract(): + try: + wb = load_workbook(file_path, data_only=True) + content = [] - # Get the maximum row and column with data - max_row = min(ws.max_row, max_rows) - max_col = min(ws.max_column, max_cols) + for sheet in wb.sheetnames: + ws = wb[sheet] + content.append(f"\n# Sheet: {sheet}\n") - # Create markdown table header - headers = [] - for col in range(1, max_col + 1): - cell_value = ws.cell(row=1, column=col).value - headers.append(str(cell_value) if cell_value is not None else "") + # Get the maximum row and column with data + max_row = min(ws.max_row, max_rows) + max_col = min(ws.max_column, max_cols) - content.append("| " + " | ".join(headers) + " |") - content.append("| " + " | ".join(["---"] * len(headers)) + " |") - - # Add table content - for row in range(2, max_row + 1): - row_data = [] + # Create markdown table header + headers = [] for col in range(1, max_col + 1): - cell_value = ws.cell(row=row, column=col).value - row_data.append(str(cell_value) if cell_value is not None else "") - content.append("| " + " | ".join(row_data) + " |") + cell_value = ws.cell(row=1, column=col).value + headers.append(str(cell_value) if cell_value is not None else "") - return "\n".join(content) + content.append("| " + " | ".join(headers) + " |") + content.append("| " + " | ".join(["---"] * len(headers)) + " |") - except Exception as e: - logger.error(f"Failed to extract XLSX content: {e}") - return None + # Add table content + for row in range(2, max_row + 1): + row_data = [] + for col in range(1, max_col + 1): + cell_value = ws.cell(row=row, column=col).value + row_data.append( + str(cell_value) if cell_value is not None else "" + ) + content.append("| " + " | ".join(row_data) + " |") + + return "\n".join(content) + + except Exception as e: + logger.error(f"Failed to extract XLSX content: {e}") + return None + + return await asyncio.get_event_loop().run_in_executor(None, partial(_extract)) -def get_pptx_info(file_path): - try: - prs = Presentation(file_path) +async def get_pptx_info(file_path): + """Get PPTX metadata and content""" - # Extract basic properties - props = { - "slide_count": len(prs.slides), - "title": "", # PowerPoint doesn't have built-in metadata like Word - } + def _get_info(): + try: + prs = Presentation(file_path) - # Get document content - content = extract_pptx_content(file_path) + # Extract basic properties + props = { + "slide_count": len(prs.slides), + "title": "", # PowerPoint doesn't have built-in metadata like Word + } - # Get presentation statistics - stats = { - "slide_count": len(prs.slides), - "shape_count": sum(len(slide.shapes) for slide in prs.slides), - "text_frame_count": sum( - sum(1 for shape in slide.shapes if hasattr(shape, "text")) - for slide in prs.slides - ), - } + # Get document content + content = extract_pptx_content(file_path) - return {"metadata": props, "content": content, "statistics": stats} + # Get presentation statistics + stats = { + "slide_count": len(prs.slides), + "shape_count": sum(len(slide.shapes) for slide in prs.slides), + "text_frame_count": sum( + sum(1 for shape in slide.shapes if hasattr(shape, "text")) + for slide in prs.slides + ), + } - except Exception as e: - logger.error(f"Failed to get PPTX info: {e}") - return None + return {"metadata": props, "content": content, "statistics": stats} + + except Exception as e: + logger.error(f"Failed to get PPTX info: {e}") + return None + + return await asyncio.get_event_loop().run_in_executor(None, _get_info) -def get_xlsx_info(file_path): - try: - wb = load_workbook(file_path, data_only=True) +async def get_xlsx_info(file_path): + """Get XLSX metadata and content""" - # Extract basic properties - props = { - "sheet_count": len(wb.sheetnames), - "sheets": wb.sheetnames, - "title": wb.properties.title, - "creator": wb.properties.creator, - "created": wb.properties.created, - "modified": wb.properties.modified, - } + def _get_info(): + try: + wb = load_workbook(file_path, data_only=True) - # Get document content - content = extract_xlsx_content(file_path) + # Extract basic properties + props = { + "sheet_count": len(wb.sheetnames), + "sheets": wb.sheetnames, + "title": wb.properties.title, + "creator": wb.properties.creator, + "created": wb.properties.created, + "modified": wb.properties.modified, + } - # Get workbook statistics - stats = { - "sheet_count": len(wb.sheetnames), - "total_rows": sum(sheet.max_row for sheet in wb.worksheets), - "total_columns": sum(sheet.max_column for sheet in wb.worksheets), - } + # Get document content + content = extract_xlsx_content(file_path) - return {"metadata": props, "content": content, "statistics": stats} + # Get workbook statistics + stats = { + "sheet_count": len(wb.sheetnames), + "total_rows": sum(sheet.max_row for sheet in wb.worksheets), + "total_columns": sum(sheet.max_column for sheet in wb.worksheets), + } - except Exception as e: - logger.error(f"Failed to get XLSX info: {e}") - return None + return {"metadata": props, "content": content, "statistics": stats} + + except Exception as e: + logger.error(f"Failed to get XLSX info: {e}") + return None + + return await asyncio.get_event_loop().run_in_executor(None, _get_info) -def extract_office_content(state: ContentState): +async def extract_office_content(state: ContentState): """Universal function to extract content from Office files""" assert state.get("file_path"), "No file path provided" assert ( state.get("identified_type") in SUPPORTED_OFFICE_TYPES ), "Unsupported File Type" - file_path = state["file_path"] doc_type = state["identified_type"] @@ -266,24 +301,23 @@ def extract_office_content(state: ContentState): == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" ): logger.debug("Extracting content from DOCX file") - content = extract_docx_content_detailed(file_path) - info = get_docx_info(file_path) + content = await extract_docx_content_detailed(file_path) + info = await get_docx_info(file_path) elif ( doc_type == "application/vnd.openxmlformats-officedocument.presentationml.presentation" ): logger.debug("Extracting content from PPTX file") - content = extract_pptx_content(file_path) - info = get_pptx_info(file_path) + content = await extract_pptx_content(file_path) + info = await get_pptx_info(file_path) elif ( doc_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ): logger.debug("Extracting content from XLSX file") - content = extract_xlsx_content(file_path) - info = get_xlsx_info(file_path) + content = await extract_xlsx_content(file_path) + info = await get_xlsx_info(file_path) else: raise Exception(f"Unsupported file format: {doc_type}") del info["content"] - return {"content": content, "metadata": info} diff --git a/open_notebook/graphs/content_processing/pdf.py b/open_notebook/graphs/content_processing/pdf.py index 610ee58..a4d4209 100644 --- a/open_notebook/graphs/content_processing/pdf.py +++ b/open_notebook/graphs/content_processing/pdf.py @@ -1,3 +1,4 @@ +import asyncio import re import unicodedata @@ -114,7 +115,7 @@ def clean_pdf_text(text): return text.strip() -def _extract_text_from_pdf(pdf_path): +async def _extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) try: text = "" @@ -127,20 +128,39 @@ def _extract_text_from_pdf(pdf_path): doc.close() -def extract_pdf(state: ContentState): +async def _extract_text_from_pdf(pdf_path): + """Extract text from PDF asynchronously""" + + def _extract(): + doc = fitz.open(pdf_path) + try: + text = "" + logger.debug(f"Found {len(doc)} pages in PDF") + for page in doc: + text += page.get_text() + return clean_pdf_text(text) + finally: + doc.close() + + # Run CPU-bound PDF processing in a thread pool + return await asyncio.get_event_loop().run_in_executor(None, _extract) + + +async def extract_pdf(state: ContentState): """ - Parse the text file and print its content. + Parse the PDF file and extract its content asynchronously. """ return_dict = {} assert state.get("file_path"), "No file path provided" assert state.get("identified_type") in SUPPORTED_FITZ_TYPES, "Unsupported File Type" + if ( state.get("file_path") is not None and state.get("identified_type") in SUPPORTED_FITZ_TYPES ): file_path = state.get("file_path") try: - text = _extract_text_from_pdf(file_path) + text = await _extract_text_from_pdf(file_path) return_dict["content"] = text except FileNotFoundError: raise FileNotFoundError(f"File not found at {file_path}") diff --git a/open_notebook/graphs/content_processing/text.py b/open_notebook/graphs/content_processing/text.py index b81ca6c..85c11aa 100644 --- a/open_notebook/graphs/content_processing/text.py +++ b/open_notebook/graphs/content_processing/text.py @@ -1,11 +1,13 @@ +import asyncio + from loguru import logger from open_notebook.graphs.content_processing.state import ContentState -def extract_txt(state: ContentState): +async def extract_txt(state: ContentState): """ - Parse the text file and print its content. + Parse the text file and extract its content asynchronously. """ return_dict = {} if ( @@ -14,12 +16,22 @@ def extract_txt(state: ContentState): ): logger.debug(f"Extracting text from {state.get('file_path')}") file_path = state.get("file_path") + if file_path is not None: try: - with open(file_path, "r", encoding="utf-8") as file: - content = file.read() - logger.debug(f"Extracted: {content[:100]}") - return_dict["content"] = content + + def _read_file(): + with open(file_path, "r", encoding="utf-8") as file: + return file.read() + + # Run file I/O in thread pool + content = await asyncio.get_event_loop().run_in_executor( + None, _read_file + ) + + logger.debug(f"Extracted: {content[:100]}") + return_dict["content"] = content + except FileNotFoundError: raise FileNotFoundError(f"File not found at {file_path}") except Exception as e: diff --git a/open_notebook/graphs/content_processing/url.py b/open_notebook/graphs/content_processing/url.py index c06efbc..66bb62e 100644 --- a/open_notebook/graphs/content_processing/url.py +++ b/open_notebook/graphs/content_processing/url.py @@ -1,7 +1,7 @@ import re from urllib.parse import urlparse -import requests # type: ignore +import aiohttp from bs4 import BeautifulSoup, Comment from loguru import logger @@ -29,7 +29,7 @@ def url_provider(state: ContentState): return return_dict -def extract_url_bs4(url: str): +async def extract_url_bs4(url: str): """ Get the title and content of a URL using bs4 """ @@ -42,9 +42,10 @@ def extract_url_bs4(url: str): if url.startswith("") or url.startswith("") else None, } - except requests.exceptions.RequestException as e: + except aiohttp.ClientError as e: logger.error(f"Failed to fetch URL {url}: {e}") return None except Exception as e: @@ -151,38 +152,38 @@ def extract_url_bs4(url: str): return None -def extract_url_jina(url: str): +async def extract_url_jina(url: str): """ Get the content of a URL using Jina """ - response = requests.get(f"https://r.jina.ai/{url}") - text = response.text - if text.startswith("Title:") and "\n" in text: - title_end = text.index("\n") - title = text[6:title_end].strip() - content = text[title_end + 1 :].strip() - logger.debug( - f"Processed url: {url}, found title: {title}, content: {content[:100]}..." - ) - return {"title": title, "content": content} - else: - content = text - logger.debug( - f"Processed url: {url}, does not have Title prefix, returning full content: {content[:100]}..." - ) - return {"content": text} + async with aiohttp.ClientSession() as session: + async with session.get(f"https://r.jina.ai/{url}") as response: + text = await response.text() + if text.startswith("Title:") and "\n" in text: + title_end = text.index("\n") + title = text[6:title_end].strip() + content = text[title_end + 1 :].strip() + logger.debug( + f"Processed url: {url}, found title: {title}, content: {content[:100]}..." + ) + return {"title": title, "content": content} + else: + logger.debug( + f"Processed url: {url}, does not have Title prefix, returning full content: {text[:100]}..." + ) + return {"content": text} -def extract_url(state: ContentState): +async def extract_url(state: ContentState): assert state.get("url"), "No URL provided" url = state["url"] try: - result = extract_url_bs4(url) + result = await extract_url_bs4(url) if not result or not result.get("content"): logger.debug( f"BS4 extraction failed for url {url}, falling back to Jina extractor" ) - result = extract_url_jina(url) + result = await extract_url_jina(url) return result except Exception as e: logger.error(f"URL extraction failed for URL: {url}") diff --git a/open_notebook/graphs/content_processing/video.py b/open_notebook/graphs/content_processing/video.py index c48e540..9fc5018 100644 --- a/open_notebook/graphs/content_processing/video.py +++ b/open_notebook/graphs/content_processing/video.py @@ -1,114 +1,141 @@ +import asyncio import json import os import subprocess +from functools import partial from loguru import logger from open_notebook.graphs.content_processing.state import ContentState -def extract_audio_from_video(input_file, output_file, stream_index): +async def extract_audio_from_video(input_file, output_file, stream_index): """ - Extract the specified audio stream to MP3 format + Extract the specified audio stream to MP3 format asynchronously """ - try: - cmd = [ - "ffmpeg", - "-i", - input_file, - "-map", - f"0:a:{stream_index}", # Select specific audio stream - "-codec:a", - "libmp3lame", # Use MP3 codec - "-q:a", - "2", # High quality setting - "-y", # Overwrite output file if exists - output_file, - ] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode != 0: - raise Exception(f"FFmpeg failed: {result.stderr}") + def _extract(input_file, output_file, stream_index): + try: + cmd = [ + "ffmpeg", + "-i", + input_file, + "-map", + f"0:a:{stream_index}", # Select specific audio stream + "-codec:a", + "libmp3lame", # Use MP3 codec + "-q:a", + "2", # High quality setting + "-y", # Overwrite output file if exists + output_file, + ] - return True + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise Exception(f"FFmpeg failed: {result.stderr}") - except Exception as e: - print(f"Error extracting audio: {str(e)}") - return False + return True + + except Exception as e: + logger.error(f"Error extracting audio: {str(e)}") + return False + + return await asyncio.get_event_loop().run_in_executor( + None, partial(_extract, input_file, output_file, stream_index) + ) -def get_audio_streams(input_file): +async def get_audio_streams(input_file): """ - Analyze video file and return information about all audio streams + Analyze video file and return information about all audio streams asynchronously """ - logger.debug(f"Analyzing video file {input_file} for audio streams") - try: - # Get stream information in JSON format - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_streams", - "-select_streams", - "a", - input_file, - ] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode != 0: - raise Exception(f"FFprobe failed: {result.stderr}") + def _analyze(input_file): + logger.debug(f"Analyzing video file {input_file} for audio streams") + try: + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_streams", + "-select_streams", + "a", + input_file, + ] - data = json.loads(result.stdout) - return data.get("streams", []) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise Exception(f"FFprobe failed: {result.stderr}") - except Exception as e: - print(f"Error analyzing file: {str(e)}") - return [] + data = json.loads(result.stdout) + return data.get("streams", []) + + except Exception as e: + logger.error(f"Error analyzing file: {str(e)}") + return [] + + return await asyncio.get_event_loop().run_in_executor( + None, partial(_analyze, input_file) + ) -def select_best_audio_stream(streams): +async def select_best_audio_stream(streams): """ Select the best audio stream based on various quality metrics """ - if not streams: - logger.debug("No audio streams found") - return None - else: - logger.debug(f"Found {len(streams)} audio streams") - # Score each stream based on various factors - scored_streams = [] - for stream in streams: - score = 0 + def _select(streams): + if not streams: + logger.debug("No audio streams found") + return None + else: + logger.debug(f"Found {len(streams)} audio streams") - # Prefer higher bit rates - bit_rate = stream.get("bit_rate") - if bit_rate: - score += int(int(bit_rate) / 1000000) # Convert to Mbps and ensure int + # Score each stream based on various factors + scored_streams = [] + for stream in streams: + score = 0 - # Prefer more channels (stereo over mono) - channels = stream.get("channels", 0) - score += channels * 10 + # Prefer higher bit rates + bit_rate = stream.get("bit_rate") + if bit_rate: + score += int(int(bit_rate) / 1000000) # Convert to Mbps and ensure int - # Prefer higher sample rates - sample_rate = stream.get("sample_rate", "0") - score += int(int(sample_rate) / 48000) + # Prefer more channels (stereo over mono) + channels = stream.get("channels", 0) + score += channels * 10 - scored_streams.append((score, stream)) + # Prefer higher sample rates + sample_rate = stream.get("sample_rate", "0") + score += int(int(sample_rate) / 48000) - # Return the stream with highest score - return max(scored_streams, key=lambda x: x[0])[1] + scored_streams.append((score, stream)) + + # Return the stream with highest score + return max(scored_streams, key=lambda x: x[0])[1] + + return await asyncio.get_event_loop().run_in_executor( + None, partial(_select, streams) + ) -def extract_best_audio_from_video(data: ContentState): +async def extract_best_audio_from_video(data: ContentState): """ - Main function to extract the best audio stream from a video file + Main function to extract the best audio stream from a video file asynchronously """ input_file = data.get("file_path") assert input_file is not None, "Input file path must be provided" - if not os.path.exists(input_file): + + def _check_file(path): + return os.path.exists(path) + + file_exists = await asyncio.get_event_loop().run_in_executor( + None, partial(_check_file, input_file) + ) + + if not file_exists: logger.critical(f"Input file not found: {input_file}") return False @@ -116,20 +143,20 @@ def extract_best_audio_from_video(data: ContentState): output_file = f"{base_name}_audio.mp3" # Get all audio streams - streams = get_audio_streams(input_file) + streams = await get_audio_streams(input_file) if not streams: logger.debug("No audio streams found in the file") return False # Select best stream - best_stream = select_best_audio_stream(streams) + best_stream = await select_best_audio_stream(streams) if not best_stream: logger.error("Could not determine best audio stream") return False # Extract the selected stream stream_index = streams.index(best_stream) - success = extract_audio_from_video(input_file, output_file, stream_index) + success = await extract_audio_from_video(input_file, output_file, stream_index) if success: logger.debug(f"Successfully extracted audio to: {output_file}") diff --git a/open_notebook/graphs/content_processing/youtube.py b/open_notebook/graphs/content_processing/youtube.py index 1e85192..e39f4ba 100644 --- a/open_notebook/graphs/content_processing/youtube.py +++ b/open_notebook/graphs/content_processing/youtube.py @@ -1,7 +1,7 @@ import re import ssl -import requests +import aiohttp from bs4 import BeautifulSoup from loguru import logger from youtube_transcript_api import YouTubeTranscriptApi # type: ignore @@ -14,11 +14,15 @@ from open_notebook.graphs.content_processing.state import ContentState ssl._create_default_https_context = ssl._create_unverified_context -def get_video_title(video_id): +async def get_video_title(video_id): try: url = f"https://www.youtube.com/watch?v={video_id}" - response = requests.get(url) - soup = BeautifulSoup(response.text, "html.parser") + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + html = await response.text() + + # BeautifulSoup doesn't support async operations + soup = BeautifulSoup(html, "html.parser") # YouTube stores title in a meta tag title = soup.find("meta", property="og:title")["content"] @@ -63,7 +67,7 @@ def _extract_youtube_id(url): return match.group(1) if match else None -def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]): +async def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]): try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) @@ -129,7 +133,7 @@ def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]): return None -def extract_youtube_transcript(state: ContentState): +async def extract_youtube_transcript(state: ContentState): """ Parse the text file and print its content. """ @@ -139,12 +143,12 @@ def extract_youtube_transcript(state: ContentState): ) video_id = _extract_youtube_id(state.get("url")) - transcript = get_best_transcript(video_id, languages) + transcript = await get_best_transcript(video_id, languages) logger.debug(f"Found transcript: {transcript}") formatter = TextFormatter() try: - title = get_video_title(video_id) + title = await get_video_title(video_id) except Exception as e: logger.critical(f"Failed to get video title for video_id: {video_id}") logger.exception(e) diff --git a/open_notebook/graphs/source.py b/open_notebook/graphs/source.py index 39813ab..14129e6 100644 --- a/open_notebook/graphs/source.py +++ b/open_notebook/graphs/source.py @@ -16,8 +16,6 @@ from open_notebook.graphs.content_processing import graph as content_graph from open_notebook.graphs.multipattern import graph as transform_graph from open_notebook.utils import surreal_clean -# todo: we can make this more efficient - class SourceState(TypedDict): content_state: ContentState @@ -32,20 +30,24 @@ class TransformationState(TypedDict): transformation: dict -def content_process(state: SourceState): +async def content_process(state: SourceState) -> dict: content_state = state["content_state"] logger.debug("Content processing started for new content") - return {"content_state": content_graph.invoke(content_state)} + processed_state = await content_graph.ainvoke(content_state) + return {"content_state": processed_state} -def run_patterns(input_text, patterns): - output = transform_graph.invoke(dict(content_stack=[input_text], patterns=patterns)) +async def run_patterns(input_text: str, patterns: List[dict]) -> str: + output = await transform_graph.ainvoke( + dict(content_stack=[input_text], patterns=patterns) + ) return output["output"] -def save_source(state: SourceState): +def save_source(state: SourceState) -> dict: logger.debug("Saving source") content_state = state["content_state"] + source = Source( asset=Asset( url=content_state.get("url"), file_path=content_state.get("file_path") @@ -61,9 +63,10 @@ def save_source(state: SourceState): return {"source": source} -def trigger_transformations(state: SourceState, config: RunnableConfig): +def trigger_transformations(state: SourceState, config: RunnableConfig) -> List[Send]: if len(state["transformations"]) == 0: return [] + transformations = Transformation.get_all() to_apply = [ t @@ -71,6 +74,7 @@ def trigger_transformations(state: SourceState, config: RunnableConfig): if t["name"] in state["transformations"] ] logger.debug(f"Applying transformations {to_apply}") + return [ Send( "transform_content", @@ -83,24 +87,34 @@ def trigger_transformations(state: SourceState, config: RunnableConfig): ] -def transform_content(state: TransformationState): +async def transform_content(state: TransformationState) -> dict: source = state["source"] content = source.full_text transformation = state["transformation"] + logger.debug(f"Applying transformation {transformation['name']}") - result = run_patterns(content, patterns=transformation["patterns"]) + result = await run_patterns(content, patterns=transformation["patterns"]) + source.add_insight(transformation["name"], surreal_clean(result)) + return {"transformations": [{"name": transformation["name"], "content": result}]} +# Create and compile the workflow workflow = StateGraph(SourceState) + +# Add nodes workflow.add_node("content_process", content_process) workflow.add_node("save_source", save_source) workflow.add_node("transform_content", transform_content) + +# Define the graph edges workflow.add_edge(START, "content_process") workflow.add_edge("content_process", "save_source") workflow.add_conditional_edges( "save_source", trigger_transformations, ["transform_content"] ) workflow.add_edge("transform_content", END) + +# Compile the graph source_graph = workflow.compile() diff --git a/pages/stream_app/source.py b/pages/stream_app/source.py index 6114bd7..550553a 100644 --- a/pages/stream_app/source.py +++ b/pages/stream_app/source.py @@ -1,3 +1,4 @@ +import asyncio import os from pathlib import Path @@ -71,12 +72,14 @@ def add_source(notebook_id): f.write(source_file.getbuffer()) st.write("Processing content..") - source_graph.invoke( - { - "content_state": req, - "notebook_id": notebook_id, - "transformations": apply_transformations, - } + asyncio.run( + source_graph.ainvoke( + { + "content_state": req, + "notebook_id": notebook_id, + "transformations": apply_transformations, + } + ) ) except UnsupportedTypeException as e: st.warning(