aplit audio to prevent OpenAI limit

This commit is contained in:
LUIS NOVO 2024-10-26 18:56:21 -03:00
parent 8dac6dd2ac
commit 1ecc7af0be

View file

@ -1,11 +1,16 @@
import json
import os
import re
import subprocess
import unicodedata
from math import ceil
import fitz # type: ignore
import magic
import requests # type: ignore
from langgraph.graph import END, START, StateGraph
from loguru import logger
from pydub import AudioSegment
from typing_extensions import TypedDict
from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
from youtube_transcript_api.formatters import TextFormatter # type: ignore
@ -291,6 +296,226 @@ def should_continue(data: SourceState):
return "end"
def split_audio(input_file, segment_length_minutes=15, output_prefix=None):
"""
Split an audio file into segments of specified length.
Args:
input_file (str): Path to the input audio file
segment_length_minutes (int): Length of each segment in minutes
output_dir (str): Directory to save the segments (defaults to input file's directory)
output_prefix (str): Prefix for output files (defaults to input filename)
Returns:
list: List of paths to the created segment files
"""
# Convert input file to absolute path
input_file = os.path.abspath(input_file)
output_dir = os.path.dirname(input_file)
os.makedirs(output_dir, exist_ok=True)
# Set up output prefix
if output_prefix is None:
output_prefix = os.path.splitext(os.path.basename(input_file))[0]
# Load the audio file
audio = AudioSegment.from_file(input_file)
# Calculate segment length in milliseconds
segment_length_ms = segment_length_minutes * 60 * 1000
# Calculate number of segments
total_segments = ceil(len(audio) / segment_length_ms)
# List to store output file paths
output_files = []
# Split the audio into segments
for i in range(total_segments):
# Calculate start and end times for this segment
start_time = i * segment_length_ms
end_time = min((i + 1) * segment_length_ms, len(audio))
# Extract segment
segment = audio[start_time:end_time]
# Generate output filename
# Format: prefix_001.mp3 (padding with zeros ensures correct ordering)
output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3"
output_path = os.path.join(output_dir, output_filename)
# Export segment
segment.export(output_path, format="mp3")
output_files.append(output_path)
# Optional progress indication
print(f"Exported segment {i+1}/{total_segments}: {output_filename}")
return output_files
# todo: add a speechtotext model to the config
def extract_audio(data: SourceState):
input_audio_path = data.get("file_path")
from openai import OpenAI
client = OpenAI()
audio_files = split_audio(input_audio_path)
transcriptions = []
for audio_file in audio_files:
audio_file = open(audio_file, "rb")
transcription = client.audio.transcriptions.create(
model="whisper-1", file=audio_file
)
transcriptions.append(transcription.text)
return {"content": " ".join(transcriptions)}
def get_audio_streams(input_file):
"""
Analyze video file and return information about all audio streams
"""
try:
# Get stream information in JSON format
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"a",
input_file,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFprobe failed: {result.stderr}")
data = json.loads(result.stdout)
return data.get("streams", [])
except Exception as e:
print(f"Error analyzing file: {str(e)}")
return []
def select_best_audio_stream(streams):
"""
Select the best audio stream based on various quality metrics
"""
if not streams:
return None
# Score each stream based on various factors
scored_streams = []
for stream in streams:
score = 0
# Prefer higher bit rates
bit_rate = stream.get("bit_rate")
if bit_rate:
score += int(bit_rate) / 1000000 # Convert to Mbps
# Prefer more channels (stereo over mono)
channels = stream.get("channels", 0)
score += channels * 10
# Prefer higher sample rates
sample_rate = stream.get("sample_rate", "0")
score += int(sample_rate) / 48000
scored_streams.append((score, stream))
# Return the stream with highest score
return max(scored_streams, key=lambda x: x[0])[1]
def extract_audio_from_video(input_file, output_file, stream_index):
"""
Extract the specified audio stream to MP3 format
"""
try:
cmd = [
"ffmpeg",
"-i",
input_file,
"-map",
f"0:a:{stream_index}", # Select specific audio stream
"-codec:a",
"libmp3lame", # Use MP3 codec
"-q:a",
"2", # High quality setting
"-y", # Overwrite output file if exists
output_file,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFmpeg failed: {result.stderr}")
return True
except Exception as e:
print(f"Error extracting audio: {str(e)}")
return False
def extract_best_audio_from_video(data: SourceState):
"""
Main function to extract the best audio stream from a video file
"""
input_file = data.get("file_path")
if not os.path.exists(input_file):
print(f"Input file not found: {input_file}")
return False
base_name = os.path.splitext(input_file)[0]
output_file = f"{base_name}_audio.mp3"
# Get all audio streams
streams = get_audio_streams(input_file)
if not streams:
print("No audio streams found in the file")
return False
# Select best stream
best_stream = select_best_audio_stream(streams)
if not best_stream:
print("Could not determine best audio stream")
return False
# Extract the selected stream
stream_index = streams.index(best_stream)
success = extract_audio_from_video(input_file, output_file, stream_index)
if success:
print(f"Successfully extracted audio to: {output_file}")
print("Selected stream details:")
print(f"- Channels: {best_stream.get('channels', 'unknown')}")
print(f"- Sample rate: {best_stream.get('sample_rate', 'unknown')} Hz")
print(f"- Bit rate: {best_stream.get('bit_rate', 'unknown')} bits/s")
return {"file_path": output_file, "identified_type": "audio/mp3"}
def file_type_edge(data: SourceState):
if data.get("identified_type") == "text/plain":
return "extract_txt"
elif data.get("identified_type") == "application/pdf":
return "extract_pdf"
elif data.get("identified_type").startswith("video"):
return "extract_best_audio_from_video"
elif data.get("identified_type").startswith("audio"):
return "extract_audio"
else:
return "end"
workflow = StateGraph(SourceState)
workflow.add_node("source", source_identification)
workflow.add_node("url_provider", url_provider)
@ -298,6 +523,8 @@ workflow.add_node("file_type", file_type)
workflow.add_node("extract_txt", extract_txt)
workflow.add_node("extract_pdf", extract_pdf)
workflow.add_node("extract_url", extract_url)
workflow.add_node("extract_best_audio_from_video", extract_best_audio_from_video)
workflow.add_node("extract_audio", extract_audio)
workflow.add_node("extract_youtube_transcript", extract_youtube_transcript)
workflow.add_edge(START, "source")
@ -312,11 +539,7 @@ workflow.add_conditional_edges(
)
workflow.add_conditional_edges(
"file_type",
lambda x: x.get("identified_type"),
{
"text/plain": "extract_txt",
"application/pdf": "extract_pdf",
},
file_type_edge,
)
workflow.add_conditional_edges(
"url_provider",
@ -328,5 +551,7 @@ workflow.add_edge("file_type", END)
workflow.add_edge("extract_txt", END)
workflow.add_edge("extract_pdf", END)
workflow.add_edge("extract_url", END)
workflow.add_edge("extract_best_audio_from_video", "extract_audio")
workflow.add_edge("extract_audio", END)
workflow.add_edge("extract_youtube_transcript", END)
graph = workflow.compile()