From 2a27d2030a21f1683219706890607746a8cdf3b1 Mon Sep 17 00:00:00 2001 From: GeorgeCaoJ <851383386@qq.com> Date: Mon, 15 Sep 2025 21:29:55 +0800 Subject: [PATCH] feat: support web audio 16kHz PCM input and remove ffmpeg dependency --- whisperlivekit/audio_processor.py | 138 ++-------------- whisperlivekit/ffmpeg_manager.py | 193 ----------------------- whisperlivekit/web/live_transcription.js | 39 ++++- whisperlivekit/web/recorder_worker.js | 58 +++++++ 4 files changed, 104 insertions(+), 324 deletions(-) delete mode 100644 whisperlivekit/ffmpeg_manager.py create mode 100644 whisperlivekit/web/recorder_worker.js diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 525c4fc..df02a52 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -6,7 +6,6 @@ import logging import traceback from whisperlivekit.timed_objects import ASRToken, Silence, Line, FrontData, State from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory -from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState from whisperlivekit.silero_vad_iterator import FixedVADIterator from whisperlivekit.results_formater import format_output # Set up logging once @@ -49,10 +48,7 @@ class AudioProcessor: self.bytes_per_sample = 2 self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz - self.last_ffmpeg_activity = time() - self.ffmpeg_health_check_interval = 5 - self.ffmpeg_max_idle_time = 10 - self.is_pcm_input = self.args.pcm_input + self.is_pcm_input = True self.debug = False # State management @@ -79,18 +75,6 @@ class AudioProcessor: else: self.vac = None - self.ffmpeg_manager = FFmpegManager( - sample_rate=self.sample_rate, - channels=self.channels - ) - - async def handle_ffmpeg_error(error_type: str): - logger.error(f"FFmpeg error: {error_type}") - self._ffmpeg_error = error_type - - self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error - self._ffmpeg_error = None - self.transcription_queue = asyncio.Queue() if self.args.transcription else None self.diarization_queue = asyncio.Queue() if self.args.diarization else None self.translation_queue = asyncio.Queue() if self.args.target_language else None @@ -98,7 +82,6 @@ class AudioProcessor: self.transcription_task = None self.diarization_task = None - self.ffmpeg_reader_task = None self.watchdog_task = None self.all_tasks_for_cleanup = [] @@ -172,67 +155,6 @@ class AudioProcessor: self.end_buffer = self.end_attributed_speaker = 0 self.beg_loop = time() - async def ffmpeg_stdout_reader(self): - """Read audio data from FFmpeg stdout and process it.""" - beg = time() - - while True: - try: - # Check if FFmpeg is running - state = await self.ffmpeg_manager.get_state() - if state == FFmpegState.FAILED: - logger.error("FFmpeg is in FAILED state, cannot read data") - break - elif state == FFmpegState.STOPPED: - logger.info("FFmpeg is stopped") - break - elif state != FFmpegState.RUNNING: - logger.warning(f"FFmpeg is in {state} state, waiting...") - await asyncio.sleep(0.5) - continue - - current_time = time() - elapsed_time = math.floor((current_time - beg) * 10) / 10 - buffer_size = max(int(32000 * elapsed_time), 4096) - beg = current_time - - chunk = await self.ffmpeg_manager.read_data(buffer_size) - - if not chunk: - if self.is_stopping: - logger.info("FFmpeg stdout closed, stopping.") - break - else: - # No data available, but not stopping - FFmpeg might be restarting - await asyncio.sleep(0.1) - continue - - self.pcm_buffer.extend(chunk) - await self.handle_pcm_data() - - - - except Exception as e: - logger.warning(f"Exception in ffmpeg_stdout_reader: {e}") - logger.warning(f"Traceback: {traceback.format_exc()}") - # Try to recover by waiting a bit - await asyncio.sleep(1) - - # Check if we should exit - if self.is_stopping: - break - - logger.info("FFmpeg stdout processing finished. Signaling downstream processors.") - if self.args.transcription and self.transcription_queue: - await self.transcription_queue.put(SENTINEL) - logger.debug("Sentinel put into transcription_queue.") - if self.args.diarization and self.diarization_queue: - await self.diarization_queue.put(SENTINEL) - logger.debug("Sentinel put into diarization_queue.") - if self.args.target_language and self.translation_queue: - await self.translation_queue.put(SENTINEL) - - async def transcription_processor(self): """Process audio chunks for transcription.""" cumulative_pcm_duration_stream_time = 0.0 @@ -312,6 +234,14 @@ class AudioProcessor: logger.warning(f"Traceback: {traceback.format_exc()}") if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue self.transcription_queue.task_done() + + if self.is_stopping: + logger.info("Transcription processor finishing due to stopping flag.") + if self.diarization_queue: + await self.diarization_queue.put(SENTINEL) + if self.translation_queue: + await self.translation_queue.put(SENTINEL) + logger.info("Transcription processor task finished.") @@ -407,16 +337,7 @@ class AudioProcessor: """Format processing results for output.""" while True: try: - ffmpeg_state = await self.ffmpeg_manager.get_state() - if ffmpeg_state == FFmpegState.FAILED and self._ffmpeg_error: - yield FrontData( - status="error", - error=f"FFmpeg error: {self._ffmpeg_error}" - ) - self._ffmpeg_error = None - await asyncio.sleep(1) - continue - + # Get current state state = await self.get_current_state() # Add dummy tokens if needed @@ -491,16 +412,6 @@ class AudioProcessor: self.all_tasks_for_cleanup = [] processing_tasks_for_watchdog = [] - success = await self.ffmpeg_manager.start() - if not success: - logger.error("Failed to start FFmpeg manager") - async def error_generator(): - yield FrontData( - status="error", - error="FFmpeg failed to start. Please check that FFmpeg is installed." - ) - return error_generator() - if self.args.transcription and self.online: self.transcription_task = asyncio.create_task(self.transcription_processor()) self.all_tasks_for_cleanup.append(self.transcription_task) @@ -516,10 +427,6 @@ class AudioProcessor: self.all_tasks_for_cleanup.append(self.translation_task) processing_tasks_for_watchdog.append(self.translation_task) - self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader()) - self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task) - processing_tasks_for_watchdog.append(self.ffmpeg_reader_task) - # Monitor overall system health self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog)) self.all_tasks_for_cleanup.append(self.watchdog_task) @@ -540,15 +447,6 @@ class AudioProcessor: logger.error(f"{task_name} unexpectedly completed with exception: {exc}") else: logger.info(f"{task_name} completed normally.") - - # Check FFmpeg status through the manager - ffmpeg_state = await self.ffmpeg_manager.get_state() - if ffmpeg_state == FFmpegState.FAILED: - logger.error("FFmpeg is in FAILED state, notifying results formatter") - # FFmpeg manager will handle its own recovery - elif ffmpeg_state == FFmpegState.STOPPED and not self.is_stopping: - logger.warning("FFmpeg unexpectedly stopped, attempting restart") - await self.ffmpeg_manager.restart() except asyncio.CancelledError: logger.info("Watchdog task cancelled.") @@ -568,8 +466,6 @@ class AudioProcessor: if created_tasks: await asyncio.gather(*created_tasks, return_exceptions=True) logger.info("All processing tasks cancelled or finished.") - await self.ffmpeg_manager.stop() - logger.info("FFmpeg manager stopped.") if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'): self.diarization.close() logger.info("AudioProcessor cleanup complete.") @@ -584,8 +480,10 @@ class AudioProcessor: if not message: logger.info("Empty audio message received, initiating stop sequence.") self.is_stopping = True - # Signal FFmpeg manager to stop accepting data - await self.ffmpeg_manager.stop() + + if self.transcription_queue: + await self.transcription_queue.put(SENTINEL) + return if self.is_stopping: @@ -595,14 +493,6 @@ class AudioProcessor: if self.is_pcm_input: self.pcm_buffer.extend(message) await self.handle_pcm_data() - else: - success = await self.ffmpeg_manager.write_data(message) - if not success: - ffmpeg_state = await self.ffmpeg_manager.get_state() - if ffmpeg_state == FFmpegState.FAILED: - logger.error("FFmpeg is in FAILED state, cannot process audio") - else: - logger.warning("Failed to write audio data to FFmpeg") async def handle_pcm_data(self): # Process when enough data diff --git a/whisperlivekit/ffmpeg_manager.py b/whisperlivekit/ffmpeg_manager.py deleted file mode 100644 index bf1f565..0000000 --- a/whisperlivekit/ffmpeg_manager.py +++ /dev/null @@ -1,193 +0,0 @@ -import asyncio -import logging -from enum import Enum -from typing import Optional, Callable -import contextlib - -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) - -ERROR_INSTALL_INSTRUCTIONS = """ -FFmpeg is not installed or not found in your system's PATH. -Please install FFmpeg to enable audio processing. - -Installation instructions: - -# Ubuntu/Debian: -sudo apt update && sudo apt install ffmpeg - -# macOS (using Homebrew): -brew install ffmpeg - -# Windows: -# 1. Download the latest static build from https://ffmpeg.org/download.html -# 2. Extract the archive (e.g., to C:\\FFmpeg). -# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable. - -After installation, please restart the application. -""" - -class FFmpegState(Enum): - STOPPED = "stopped" - STARTING = "starting" - RUNNING = "running" - RESTARTING = "restarting" - FAILED = "failed" - -class FFmpegManager: - def __init__(self, sample_rate: int = 16000, channels: int = 1): - self.sample_rate = sample_rate - self.channels = channels - - self.process: Optional[asyncio.subprocess.Process] = None - self._stderr_task: Optional[asyncio.Task] = None - - self.on_error_callback: Optional[Callable[[str], None]] = None - - self.state = FFmpegState.STOPPED - self._state_lock = asyncio.Lock() - - async def start(self) -> bool: - async with self._state_lock: - if self.state != FFmpegState.STOPPED: - logger.warning(f"FFmpeg already running in state: {self.state}") - return False - self.state = FFmpegState.STARTING - - try: - cmd = [ - "ffmpeg", - "-hide_banner", - "-loglevel", "error", - "-i", "pipe:0", - "-f", "s16le", - "-acodec", "pcm_s16le", - "-ac", str(self.channels), - "-ar", str(self.sample_rate), - "pipe:1" - ] - - self.process = await asyncio.create_subprocess_exec( - *cmd, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - - self._stderr_task = asyncio.create_task(self._drain_stderr()) - - async with self._state_lock: - self.state = FFmpegState.RUNNING - - logger.info("FFmpeg started.") - return True - - except FileNotFoundError: - logger.error(ERROR_INSTALL_INSTRUCTIONS) - async with self._state_lock: - self.state = FFmpegState.FAILED - if self.on_error_callback: - await self.on_error_callback("ffmpeg_not_found") - return False - - except Exception as e: - logger.error(f"Error starting FFmpeg: {e}") - async with self._state_lock: - self.state = FFmpegState.FAILED - if self.on_error_callback: - await self.on_error_callback("start_failed") - return False - - async def stop(self): - async with self._state_lock: - if self.state == FFmpegState.STOPPED: - return - self.state = FFmpegState.STOPPED - - if self.process: - if self.process.stdin and not self.process.stdin.is_closing(): - self.process.stdin.close() - await self.process.stdin.wait_closed() - await self.process.wait() - self.process = None - - if self._stderr_task: - self._stderr_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await self._stderr_task - - logger.info("FFmpeg stopped.") - - async def write_data(self, data: bytes) -> bool: - async with self._state_lock: - if self.state != FFmpegState.RUNNING: - logger.warning(f"Cannot write, FFmpeg state: {self.state}") - return False - - try: - self.process.stdin.write(data) - await self.process.stdin.drain() - return True - except Exception as e: - logger.error(f"Error writing to FFmpeg: {e}") - if self.on_error_callback: - await self.on_error_callback("write_error") - return False - - async def read_data(self, size: int) -> Optional[bytes]: - async with self._state_lock: - if self.state != FFmpegState.RUNNING: - logger.warning(f"Cannot read, FFmpeg state: {self.state}") - return None - - try: - data = await asyncio.wait_for( - self.process.stdout.read(size), - timeout=20.0 - ) - return data - except asyncio.TimeoutError: - logger.warning("FFmpeg read timeout.") - return None - except Exception as e: - logger.error(f"Error reading from FFmpeg: {e}") - if self.on_error_callback: - await self.on_error_callback("read_error") - return None - - async def get_state(self) -> FFmpegState: - async with self._state_lock: - return self.state - - async def restart(self) -> bool: - async with self._state_lock: - if self.state == FFmpegState.RESTARTING: - logger.warning("Restart already in progress.") - return False - self.state = FFmpegState.RESTARTING - - logger.info("Restarting FFmpeg...") - - try: - await self.stop() - await asyncio.sleep(1) # short delay before restarting - return await self.start() - except Exception as e: - logger.error(f"Error during FFmpeg restart: {e}") - async with self._state_lock: - self.state = FFmpegState.FAILED - if self.on_error_callback: - await self.on_error_callback("restart_failed") - return False - - async def _drain_stderr(self): - try: - while True: - line = await self.process.stderr.readline() - if not line: - break - logger.debug(f"FFmpeg stderr: {line.decode(errors='ignore').strip()}") - except asyncio.CancelledError: - logger.info("FFmpeg stderr drain task cancelled.") - except Exception as e: - logger.error(f"Error draining FFmpeg stderr: {e}") \ No newline at end of file diff --git a/whisperlivekit/web/live_transcription.js b/whisperlivekit/web/live_transcription.js index c51ce35..12d738c 100644 --- a/whisperlivekit/web/live_transcription.js +++ b/whisperlivekit/web/live_transcription.js @@ -12,6 +12,8 @@ let timerInterval = null; let audioContext = null; let analyser = null; let microphone = null; +let scriptProcessor = null; +let recorderWorker = null; let waveCanvas = document.getElementById("waveCanvas"); let waveCtx = waveCanvas.getContext("2d"); let animationFrame = null; @@ -457,13 +459,31 @@ async function startRecording() { microphone = audioContext.createMediaStreamSource(stream); microphone.connect(analyser); - recorder = new MediaRecorder(stream, { mimeType: "audio/webm" }); - recorder.ondataavailable = (e) => { + scriptProcessor = audioContext.createScriptProcessor(4096, 1, 1); + microphone.connect(scriptProcessor); + scriptProcessor.connect(audioContext.destination); + + recorderWorker = new Worker("/web/recorder_worker.js"); + recorderWorker.postMessage({ + command: "init", + config: { + sampleRate: audioContext.sampleRate, + }, + }); + + recorderWorker.onmessage = (e) => { if (websocket && websocket.readyState === WebSocket.OPEN) { - websocket.send(e.data); + websocket.send(e.data.buffer); } }; - recorder.start(chunkDuration); + + scriptProcessor.onaudioprocess = (e) => { + const inputData = e.inputBuffer.getChannelData(0); + recorderWorker.postMessage({ + command: "record", + buffer: inputData.buffer, + }, [inputData.buffer]); + }; startTime = Date.now(); timerInterval = setInterval(updateTimer, 1000); @@ -501,9 +521,14 @@ async function stopRecording() { statusText.textContent = "Recording stopped. Processing final audio..."; } - if (recorder) { - recorder.stop(); - recorder = null; + if (recorderWorker) { + recorderWorker.terminate(); + recorderWorker = null; + } + + if (scriptProcessor) { + scriptProcessor.disconnect(); + scriptProcessor = null; } if (microphone) { diff --git a/whisperlivekit/web/recorder_worker.js b/whisperlivekit/web/recorder_worker.js new file mode 100644 index 0000000..96b4cf1 --- /dev/null +++ b/whisperlivekit/web/recorder_worker.js @@ -0,0 +1,58 @@ +let sampleRate = 48000; +let targetSampleRate = 16000; + +self.onmessage = function (e) { + switch (e.data.command) { + case 'init': + init(e.data.config); + break; + case 'record': + record(e.data.buffer); + break; + } +}; + +function init(config) { + sampleRate = config.sampleRate; + targetSampleRate = config.targetSampleRate || 16000; +} + +function record(inputBuffer) { + const buffer = new Float32Array(inputBuffer); + const resampledBuffer = resample(buffer, sampleRate, targetSampleRate); + const pcmBuffer = toPCM(resampledBuffer); + self.postMessage({ buffer: pcmBuffer }, [pcmBuffer]); +} + +function resample(buffer, from, to) { + if (from === to) { + return buffer; + } + const ratio = from / to; + const newLength = Math.round(buffer.length / ratio); + const result = new Float32Array(newLength); + let offsetResult = 0; + let offsetBuffer = 0; + while (offsetResult < result.length) { + const nextOffsetBuffer = Math.round((offsetResult + 1) * ratio); + let accum = 0, count = 0; + for (let i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++) { + accum += buffer[i]; + count++; + } + result[offsetResult] = accum / count; + offsetResult++; + offsetBuffer = nextOffsetBuffer; + } + return result; +} + +function toPCM(input) { + const buffer = new ArrayBuffer(input.length * 2); + const view = new DataView(buffer); + for (let i = 0; i < input.length; i++) { + const s = Math.max(-1, Math.min(1, input[i])); + view.setInt16(i * 2, s < 0 ? s * 0x8000 : s * 0x7FFF, true); + } + return buffer; +}