From d58365421ffc7f0d2f7b2f253a48c93e88a4a77c Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Sun, 25 Jan 2026 13:48:00 +0100 Subject: [PATCH] Refactor audio processor async pipeline --- whisperlivekit/audio_processor.py | 64 ++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 3d43c03..ffb4772 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -6,14 +6,16 @@ from typing import Any, AsyncGenerator, List, Optional, Union import numpy as np -from whisperlivekit.core import (TranscriptionEngine, - online_diarization_factory, online_factory, - online_translation_factory) -from whisperlivekit.metrics_collector import SessionMetrics +from whisperlivekit.core import ( + TranscriptionEngine, + online_diarization_factory, + online_factory, + online_translation_factory, +) from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState +from whisperlivekit.metrics_collector import SessionMetrics from whisperlivekit.silero_vad_iterator import FixedVADIterator, OnnxWrapper, load_jit_vad -from whisperlivekit.timed_objects import (ASRToken, ChangeSpeaker, FrontData, - Segment, Silence, State, Transcript) +from whisperlivekit.timed_objects import ChangeSpeaker, FrontData, Silence, State from whisperlivekit.tokens_alignment import TokensAlignment logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") @@ -57,6 +59,8 @@ class AudioProcessor: def __init__(self, **kwargs: Any) -> None: """Initialize the audio processor with configuration, models, and state.""" + # Extract per-session language override before passing to TranscriptionEngine + session_language = kwargs.pop('language', None) if 'transcription_engine' in kwargs and isinstance(kwargs['transcription_engine'], TranscriptionEngine): models = kwargs['transcription_engine'] @@ -126,7 +130,7 @@ class AudioProcessor: self.diarization: Optional[Any] = None if self.args.transcription: - self.transcription = online_factory(self.args, models.asr) + self.transcription = online_factory(self.args, models.asr, language=session_language) self.sep = self.transcription.asr.sep if self.args.diarization: self.diarization = online_diarization_factory(self.args, models.diarization_model) @@ -175,7 +179,7 @@ class AudioProcessor: self.metrics.n_silence_events += 1 if self.current_silence.duration is not None: self.metrics.total_silence_duration_s += self.current_silence.duration - if self.current_silence.duration > MIN_DURATION_REAL_SILENCE: + if self.current_silence.duration and self.current_silence.duration > MIN_DURATION_REAL_SILENCE: self.state.new_tokens.append(self.current_silence) # Push the completed silence as the end event (separate from the start event) await self._push_silence_event() @@ -287,6 +291,7 @@ class AudioProcessor: final_tokens = final_tokens or [] if final_tokens: logger.info(f"Finish flushed {len(final_tokens)} tokens") + self.metrics.n_tokens_produced += len(final_tokens) _buffer_transcript = self.transcription.get_buffer() async with self.lock: self.state.tokens.extend(final_tokens) @@ -307,8 +312,23 @@ class AudioProcessor: while True: try: - # item = await self.transcription_queue.get() - item = await get_all_from_queue(self.transcription_queue) + # Use a timeout so we periodically wake up and refresh the + # buffer state. Streaming backends (e.g. voxtral) may + # produce text tokens asynchronously; without a periodic + # drain, those tokens would sit unread until the next audio + # chunk arrives — causing the frontend to show nothing. + try: + item = await asyncio.wait_for( + get_all_from_queue(self.transcription_queue), + timeout=0.5, + ) + except asyncio.TimeoutError: + # No new audio — just refresh buffer for streaming backends + _buffer_transcript = self.transcription.get_buffer() + async with self.lock: + self.state.buffer_transcription = _buffer_transcript + continue + if item is SENTINEL: logger.debug("Transcription processor received sentinel. Finishing.") await self._finish_transcription() @@ -326,7 +346,7 @@ class AudioProcessor: new_tokens, current_audio_processed_upto = await asyncio.to_thread( self.transcription.start_silence ) - asr_processing_logs += f" + Silence starting" + asr_processing_logs += " + Silence starting" if item.has_ended: asr_processing_logs += f" + Silence of = {item.duration:.2f}s" cumulative_pcm_duration_stream_time += item.duration @@ -404,7 +424,7 @@ class AudioProcessor: item = await get_all_from_queue(self.diarization_queue) if item is SENTINEL: break - elif type(item) is Silence: + elif isinstance(item, Silence): if item.has_ended: self.diarization.insert_silence(item.duration) continue @@ -431,7 +451,11 @@ class AudioProcessor: if item is SENTINEL: logger.debug("Translation processor received sentinel. Finishing.") break - elif type(item) is Silence: + + new_translation = None + new_translation_buffer = None + + if isinstance(item, Silence): if item.is_starting: new_translation, new_translation_buffer = self.translation.validate_buffer_and_reset() if item.has_ended: @@ -439,13 +463,14 @@ class AudioProcessor: continue elif isinstance(item, ChangeSpeaker): new_translation, new_translation_buffer = self.translation.validate_buffer_and_reset() - pass else: self.translation.insert_tokens(item) new_translation, new_translation_buffer = await asyncio.to_thread(self.translation.process) - async with self.lock: - self.state.new_translation.append(new_translation) - self.state.new_translation_buffer = new_translation_buffer + + if new_translation is not None: + async with self.lock: + self.state.new_translation.append(new_translation) + self.state.new_translation_buffer = new_translation_buffer except Exception as e: logger.warning(f"Exception in translation_processor: {e}") logger.warning(f"Traceback: {traceback.format_exc()}") @@ -465,7 +490,8 @@ class AudioProcessor: lines, buffer_diarization_text, buffer_translation_text = self.tokens_alignment.get_lines( diarization=self.args.diarization, translation=bool(self.translation), - current_silence=self.current_silence + current_silence=self.current_silence, + audio_time=self.total_pcm_samples / self.sample_rate if self.sample_rate else None, ) state = await self.get_current_state() @@ -497,7 +523,7 @@ class AudioProcessor: await asyncio.sleep(0.05) - except Exception as e: + except Exception: logger.warning(f"Exception in results_formatter. Traceback: {traceback.format_exc()}") await asyncio.sleep(0.5)