feat: support web audio 16kHz PCM input and remove ffmpeg dependency
This commit is contained in:
parent
cd160caaa1
commit
2a27d2030a
4 changed files with 104 additions and 324 deletions
|
|
@ -6,7 +6,6 @@ import logging
|
|||
import traceback
|
||||
from whisperlivekit.timed_objects import ASRToken, Silence, Line, FrontData, State
|
||||
from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory
|
||||
from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState
|
||||
from whisperlivekit.silero_vad_iterator import FixedVADIterator
|
||||
from whisperlivekit.results_formater import format_output
|
||||
# Set up logging once
|
||||
|
|
@ -49,10 +48,7 @@ class AudioProcessor:
|
|||
self.bytes_per_sample = 2
|
||||
self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample
|
||||
self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz
|
||||
self.last_ffmpeg_activity = time()
|
||||
self.ffmpeg_health_check_interval = 5
|
||||
self.ffmpeg_max_idle_time = 10
|
||||
self.is_pcm_input = self.args.pcm_input
|
||||
self.is_pcm_input = True
|
||||
self.debug = False
|
||||
|
||||
# State management
|
||||
|
|
@ -79,18 +75,6 @@ class AudioProcessor:
|
|||
else:
|
||||
self.vac = None
|
||||
|
||||
self.ffmpeg_manager = FFmpegManager(
|
||||
sample_rate=self.sample_rate,
|
||||
channels=self.channels
|
||||
)
|
||||
|
||||
async def handle_ffmpeg_error(error_type: str):
|
||||
logger.error(f"FFmpeg error: {error_type}")
|
||||
self._ffmpeg_error = error_type
|
||||
|
||||
self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error
|
||||
self._ffmpeg_error = None
|
||||
|
||||
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
||||
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
||||
self.translation_queue = asyncio.Queue() if self.args.target_language else None
|
||||
|
|
@ -98,7 +82,6 @@ class AudioProcessor:
|
|||
|
||||
self.transcription_task = None
|
||||
self.diarization_task = None
|
||||
self.ffmpeg_reader_task = None
|
||||
self.watchdog_task = None
|
||||
self.all_tasks_for_cleanup = []
|
||||
|
||||
|
|
@ -172,67 +155,6 @@ class AudioProcessor:
|
|||
self.end_buffer = self.end_attributed_speaker = 0
|
||||
self.beg_loop = time()
|
||||
|
||||
async def ffmpeg_stdout_reader(self):
|
||||
"""Read audio data from FFmpeg stdout and process it."""
|
||||
beg = time()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Check if FFmpeg is running
|
||||
state = await self.ffmpeg_manager.get_state()
|
||||
if state == FFmpegState.FAILED:
|
||||
logger.error("FFmpeg is in FAILED state, cannot read data")
|
||||
break
|
||||
elif state == FFmpegState.STOPPED:
|
||||
logger.info("FFmpeg is stopped")
|
||||
break
|
||||
elif state != FFmpegState.RUNNING:
|
||||
logger.warning(f"FFmpeg is in {state} state, waiting...")
|
||||
await asyncio.sleep(0.5)
|
||||
continue
|
||||
|
||||
current_time = time()
|
||||
elapsed_time = math.floor((current_time - beg) * 10) / 10
|
||||
buffer_size = max(int(32000 * elapsed_time), 4096)
|
||||
beg = current_time
|
||||
|
||||
chunk = await self.ffmpeg_manager.read_data(buffer_size)
|
||||
|
||||
if not chunk:
|
||||
if self.is_stopping:
|
||||
logger.info("FFmpeg stdout closed, stopping.")
|
||||
break
|
||||
else:
|
||||
# No data available, but not stopping - FFmpeg might be restarting
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
self.pcm_buffer.extend(chunk)
|
||||
await self.handle_pcm_data()
|
||||
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
|
||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||
# Try to recover by waiting a bit
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Check if we should exit
|
||||
if self.is_stopping:
|
||||
break
|
||||
|
||||
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
|
||||
if self.args.transcription and self.transcription_queue:
|
||||
await self.transcription_queue.put(SENTINEL)
|
||||
logger.debug("Sentinel put into transcription_queue.")
|
||||
if self.args.diarization and self.diarization_queue:
|
||||
await self.diarization_queue.put(SENTINEL)
|
||||
logger.debug("Sentinel put into diarization_queue.")
|
||||
if self.args.target_language and self.translation_queue:
|
||||
await self.translation_queue.put(SENTINEL)
|
||||
|
||||
|
||||
async def transcription_processor(self):
|
||||
"""Process audio chunks for transcription."""
|
||||
cumulative_pcm_duration_stream_time = 0.0
|
||||
|
|
@ -312,6 +234,14 @@ class AudioProcessor:
|
|||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||
if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
|
||||
self.transcription_queue.task_done()
|
||||
|
||||
if self.is_stopping:
|
||||
logger.info("Transcription processor finishing due to stopping flag.")
|
||||
if self.diarization_queue:
|
||||
await self.diarization_queue.put(SENTINEL)
|
||||
if self.translation_queue:
|
||||
await self.translation_queue.put(SENTINEL)
|
||||
|
||||
logger.info("Transcription processor task finished.")
|
||||
|
||||
|
||||
|
|
@ -407,16 +337,7 @@ class AudioProcessor:
|
|||
"""Format processing results for output."""
|
||||
while True:
|
||||
try:
|
||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
||||
if ffmpeg_state == FFmpegState.FAILED and self._ffmpeg_error:
|
||||
yield FrontData(
|
||||
status="error",
|
||||
error=f"FFmpeg error: {self._ffmpeg_error}"
|
||||
)
|
||||
self._ffmpeg_error = None
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
# Get current state
|
||||
state = await self.get_current_state()
|
||||
|
||||
# Add dummy tokens if needed
|
||||
|
|
@ -491,16 +412,6 @@ class AudioProcessor:
|
|||
self.all_tasks_for_cleanup = []
|
||||
processing_tasks_for_watchdog = []
|
||||
|
||||
success = await self.ffmpeg_manager.start()
|
||||
if not success:
|
||||
logger.error("Failed to start FFmpeg manager")
|
||||
async def error_generator():
|
||||
yield FrontData(
|
||||
status="error",
|
||||
error="FFmpeg failed to start. Please check that FFmpeg is installed."
|
||||
)
|
||||
return error_generator()
|
||||
|
||||
if self.args.transcription and self.online:
|
||||
self.transcription_task = asyncio.create_task(self.transcription_processor())
|
||||
self.all_tasks_for_cleanup.append(self.transcription_task)
|
||||
|
|
@ -516,10 +427,6 @@ class AudioProcessor:
|
|||
self.all_tasks_for_cleanup.append(self.translation_task)
|
||||
processing_tasks_for_watchdog.append(self.translation_task)
|
||||
|
||||
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
|
||||
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
|
||||
processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
|
||||
|
||||
# Monitor overall system health
|
||||
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
|
||||
self.all_tasks_for_cleanup.append(self.watchdog_task)
|
||||
|
|
@ -540,15 +447,6 @@ class AudioProcessor:
|
|||
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
|
||||
else:
|
||||
logger.info(f"{task_name} completed normally.")
|
||||
|
||||
# Check FFmpeg status through the manager
|
||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
||||
if ffmpeg_state == FFmpegState.FAILED:
|
||||
logger.error("FFmpeg is in FAILED state, notifying results formatter")
|
||||
# FFmpeg manager will handle its own recovery
|
||||
elif ffmpeg_state == FFmpegState.STOPPED and not self.is_stopping:
|
||||
logger.warning("FFmpeg unexpectedly stopped, attempting restart")
|
||||
await self.ffmpeg_manager.restart()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Watchdog task cancelled.")
|
||||
|
|
@ -568,8 +466,6 @@ class AudioProcessor:
|
|||
if created_tasks:
|
||||
await asyncio.gather(*created_tasks, return_exceptions=True)
|
||||
logger.info("All processing tasks cancelled or finished.")
|
||||
await self.ffmpeg_manager.stop()
|
||||
logger.info("FFmpeg manager stopped.")
|
||||
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
|
||||
self.diarization.close()
|
||||
logger.info("AudioProcessor cleanup complete.")
|
||||
|
|
@ -584,8 +480,10 @@ class AudioProcessor:
|
|||
if not message:
|
||||
logger.info("Empty audio message received, initiating stop sequence.")
|
||||
self.is_stopping = True
|
||||
# Signal FFmpeg manager to stop accepting data
|
||||
await self.ffmpeg_manager.stop()
|
||||
|
||||
if self.transcription_queue:
|
||||
await self.transcription_queue.put(SENTINEL)
|
||||
|
||||
return
|
||||
|
||||
if self.is_stopping:
|
||||
|
|
@ -595,14 +493,6 @@ class AudioProcessor:
|
|||
if self.is_pcm_input:
|
||||
self.pcm_buffer.extend(message)
|
||||
await self.handle_pcm_data()
|
||||
else:
|
||||
success = await self.ffmpeg_manager.write_data(message)
|
||||
if not success:
|
||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
||||
if ffmpeg_state == FFmpegState.FAILED:
|
||||
logger.error("FFmpeg is in FAILED state, cannot process audio")
|
||||
else:
|
||||
logger.warning("Failed to write audio data to FFmpeg")
|
||||
|
||||
async def handle_pcm_data(self):
|
||||
# Process when enough data
|
||||
|
|
|
|||
|
|
@ -1,193 +0,0 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import Optional, Callable
|
||||
import contextlib
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
ERROR_INSTALL_INSTRUCTIONS = """
|
||||
FFmpeg is not installed or not found in your system's PATH.
|
||||
Please install FFmpeg to enable audio processing.
|
||||
|
||||
Installation instructions:
|
||||
|
||||
# Ubuntu/Debian:
|
||||
sudo apt update && sudo apt install ffmpeg
|
||||
|
||||
# macOS (using Homebrew):
|
||||
brew install ffmpeg
|
||||
|
||||
# Windows:
|
||||
# 1. Download the latest static build from https://ffmpeg.org/download.html
|
||||
# 2. Extract the archive (e.g., to C:\\FFmpeg).
|
||||
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable.
|
||||
|
||||
After installation, please restart the application.
|
||||
"""
|
||||
|
||||
class FFmpegState(Enum):
|
||||
STOPPED = "stopped"
|
||||
STARTING = "starting"
|
||||
RUNNING = "running"
|
||||
RESTARTING = "restarting"
|
||||
FAILED = "failed"
|
||||
|
||||
class FFmpegManager:
|
||||
def __init__(self, sample_rate: int = 16000, channels: int = 1):
|
||||
self.sample_rate = sample_rate
|
||||
self.channels = channels
|
||||
|
||||
self.process: Optional[asyncio.subprocess.Process] = None
|
||||
self._stderr_task: Optional[asyncio.Task] = None
|
||||
|
||||
self.on_error_callback: Optional[Callable[[str], None]] = None
|
||||
|
||||
self.state = FFmpegState.STOPPED
|
||||
self._state_lock = asyncio.Lock()
|
||||
|
||||
async def start(self) -> bool:
|
||||
async with self._state_lock:
|
||||
if self.state != FFmpegState.STOPPED:
|
||||
logger.warning(f"FFmpeg already running in state: {self.state}")
|
||||
return False
|
||||
self.state = FFmpegState.STARTING
|
||||
|
||||
try:
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
"-i", "pipe:0",
|
||||
"-f", "s16le",
|
||||
"-acodec", "pcm_s16le",
|
||||
"-ac", str(self.channels),
|
||||
"-ar", str(self.sample_rate),
|
||||
"pipe:1"
|
||||
]
|
||||
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
self._stderr_task = asyncio.create_task(self._drain_stderr())
|
||||
|
||||
async with self._state_lock:
|
||||
self.state = FFmpegState.RUNNING
|
||||
|
||||
logger.info("FFmpeg started.")
|
||||
return True
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error(ERROR_INSTALL_INSTRUCTIONS)
|
||||
async with self._state_lock:
|
||||
self.state = FFmpegState.FAILED
|
||||
if self.on_error_callback:
|
||||
await self.on_error_callback("ffmpeg_not_found")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting FFmpeg: {e}")
|
||||
async with self._state_lock:
|
||||
self.state = FFmpegState.FAILED
|
||||
if self.on_error_callback:
|
||||
await self.on_error_callback("start_failed")
|
||||
return False
|
||||
|
||||
async def stop(self):
|
||||
async with self._state_lock:
|
||||
if self.state == FFmpegState.STOPPED:
|
||||
return
|
||||
self.state = FFmpegState.STOPPED
|
||||
|
||||
if self.process:
|
||||
if self.process.stdin and not self.process.stdin.is_closing():
|
||||
self.process.stdin.close()
|
||||
await self.process.stdin.wait_closed()
|
||||
await self.process.wait()
|
||||
self.process = None
|
||||
|
||||
if self._stderr_task:
|
||||
self._stderr_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self._stderr_task
|
||||
|
||||
logger.info("FFmpeg stopped.")
|
||||
|
||||
async def write_data(self, data: bytes) -> bool:
|
||||
async with self._state_lock:
|
||||
if self.state != FFmpegState.RUNNING:
|
||||
logger.warning(f"Cannot write, FFmpeg state: {self.state}")
|
||||
return False
|
||||
|
||||
try:
|
||||
self.process.stdin.write(data)
|
||||
await self.process.stdin.drain()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing to FFmpeg: {e}")
|
||||
if self.on_error_callback:
|
||||
await self.on_error_callback("write_error")
|
||||
return False
|
||||
|
||||
async def read_data(self, size: int) -> Optional[bytes]:
|
||||
async with self._state_lock:
|
||||
if self.state != FFmpegState.RUNNING:
|
||||
logger.warning(f"Cannot read, FFmpeg state: {self.state}")
|
||||
return None
|
||||
|
||||
try:
|
||||
data = await asyncio.wait_for(
|
||||
self.process.stdout.read(size),
|
||||
timeout=20.0
|
||||
)
|
||||
return data
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg read timeout.")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading from FFmpeg: {e}")
|
||||
if self.on_error_callback:
|
||||
await self.on_error_callback("read_error")
|
||||
return None
|
||||
|
||||
async def get_state(self) -> FFmpegState:
|
||||
async with self._state_lock:
|
||||
return self.state
|
||||
|
||||
async def restart(self) -> bool:
|
||||
async with self._state_lock:
|
||||
if self.state == FFmpegState.RESTARTING:
|
||||
logger.warning("Restart already in progress.")
|
||||
return False
|
||||
self.state = FFmpegState.RESTARTING
|
||||
|
||||
logger.info("Restarting FFmpeg...")
|
||||
|
||||
try:
|
||||
await self.stop()
|
||||
await asyncio.sleep(1) # short delay before restarting
|
||||
return await self.start()
|
||||
except Exception as e:
|
||||
logger.error(f"Error during FFmpeg restart: {e}")
|
||||
async with self._state_lock:
|
||||
self.state = FFmpegState.FAILED
|
||||
if self.on_error_callback:
|
||||
await self.on_error_callback("restart_failed")
|
||||
return False
|
||||
|
||||
async def _drain_stderr(self):
|
||||
try:
|
||||
while True:
|
||||
line = await self.process.stderr.readline()
|
||||
if not line:
|
||||
break
|
||||
logger.debug(f"FFmpeg stderr: {line.decode(errors='ignore').strip()}")
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FFmpeg stderr drain task cancelled.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error draining FFmpeg stderr: {e}")
|
||||
|
|
@ -12,6 +12,8 @@ let timerInterval = null;
|
|||
let audioContext = null;
|
||||
let analyser = null;
|
||||
let microphone = null;
|
||||
let scriptProcessor = null;
|
||||
let recorderWorker = null;
|
||||
let waveCanvas = document.getElementById("waveCanvas");
|
||||
let waveCtx = waveCanvas.getContext("2d");
|
||||
let animationFrame = null;
|
||||
|
|
@ -457,13 +459,31 @@ async function startRecording() {
|
|||
microphone = audioContext.createMediaStreamSource(stream);
|
||||
microphone.connect(analyser);
|
||||
|
||||
recorder = new MediaRecorder(stream, { mimeType: "audio/webm" });
|
||||
recorder.ondataavailable = (e) => {
|
||||
scriptProcessor = audioContext.createScriptProcessor(4096, 1, 1);
|
||||
microphone.connect(scriptProcessor);
|
||||
scriptProcessor.connect(audioContext.destination);
|
||||
|
||||
recorderWorker = new Worker("/web/recorder_worker.js");
|
||||
recorderWorker.postMessage({
|
||||
command: "init",
|
||||
config: {
|
||||
sampleRate: audioContext.sampleRate,
|
||||
},
|
||||
});
|
||||
|
||||
recorderWorker.onmessage = (e) => {
|
||||
if (websocket && websocket.readyState === WebSocket.OPEN) {
|
||||
websocket.send(e.data);
|
||||
websocket.send(e.data.buffer);
|
||||
}
|
||||
};
|
||||
recorder.start(chunkDuration);
|
||||
|
||||
scriptProcessor.onaudioprocess = (e) => {
|
||||
const inputData = e.inputBuffer.getChannelData(0);
|
||||
recorderWorker.postMessage({
|
||||
command: "record",
|
||||
buffer: inputData.buffer,
|
||||
}, [inputData.buffer]);
|
||||
};
|
||||
|
||||
startTime = Date.now();
|
||||
timerInterval = setInterval(updateTimer, 1000);
|
||||
|
|
@ -501,9 +521,14 @@ async function stopRecording() {
|
|||
statusText.textContent = "Recording stopped. Processing final audio...";
|
||||
}
|
||||
|
||||
if (recorder) {
|
||||
recorder.stop();
|
||||
recorder = null;
|
||||
if (recorderWorker) {
|
||||
recorderWorker.terminate();
|
||||
recorderWorker = null;
|
||||
}
|
||||
|
||||
if (scriptProcessor) {
|
||||
scriptProcessor.disconnect();
|
||||
scriptProcessor = null;
|
||||
}
|
||||
|
||||
if (microphone) {
|
||||
|
|
|
|||
58
whisperlivekit/web/recorder_worker.js
Normal file
58
whisperlivekit/web/recorder_worker.js
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
let sampleRate = 48000;
|
||||
let targetSampleRate = 16000;
|
||||
|
||||
self.onmessage = function (e) {
|
||||
switch (e.data.command) {
|
||||
case 'init':
|
||||
init(e.data.config);
|
||||
break;
|
||||
case 'record':
|
||||
record(e.data.buffer);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
function init(config) {
|
||||
sampleRate = config.sampleRate;
|
||||
targetSampleRate = config.targetSampleRate || 16000;
|
||||
}
|
||||
|
||||
function record(inputBuffer) {
|
||||
const buffer = new Float32Array(inputBuffer);
|
||||
const resampledBuffer = resample(buffer, sampleRate, targetSampleRate);
|
||||
const pcmBuffer = toPCM(resampledBuffer);
|
||||
self.postMessage({ buffer: pcmBuffer }, [pcmBuffer]);
|
||||
}
|
||||
|
||||
function resample(buffer, from, to) {
|
||||
if (from === to) {
|
||||
return buffer;
|
||||
}
|
||||
const ratio = from / to;
|
||||
const newLength = Math.round(buffer.length / ratio);
|
||||
const result = new Float32Array(newLength);
|
||||
let offsetResult = 0;
|
||||
let offsetBuffer = 0;
|
||||
while (offsetResult < result.length) {
|
||||
const nextOffsetBuffer = Math.round((offsetResult + 1) * ratio);
|
||||
let accum = 0, count = 0;
|
||||
for (let i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++) {
|
||||
accum += buffer[i];
|
||||
count++;
|
||||
}
|
||||
result[offsetResult] = accum / count;
|
||||
offsetResult++;
|
||||
offsetBuffer = nextOffsetBuffer;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function toPCM(input) {
|
||||
const buffer = new ArrayBuffer(input.length * 2);
|
||||
const view = new DataView(buffer);
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
const s = Math.max(-1, Math.min(1, input[i]));
|
||||
view.setInt16(i * 2, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
Loading…
Reference in a new issue