From 19cfc1610d779a720e06fc865cd7330c5dc5d2f5 Mon Sep 17 00:00:00 2001 From: akdeb Date: Fri, 17 Apr 2026 13:19:28 +0530 Subject: [PATCH] update to deepgram --- server-cloudflare/models/openai.ts | 180 +++++++++-------------------- 1 file changed, 56 insertions(+), 124 deletions(-) diff --git a/server-cloudflare/models/openai.ts b/server-cloudflare/models/openai.ts index 5e3173f..ba45d45 100644 --- a/server-cloudflare/models/openai.ts +++ b/server-cloudflare/models/openai.ts @@ -1,11 +1,11 @@ import { DurableObject } from "cloudflare:workers"; +import { WorkersAIFluxSTT, type TranscriberSession } from "@cloudflare/voice"; import type { Env } from "../src/types"; import { createOpusPacketizer } from "../src/opus"; import { getFirstMessagePrompt, getSystemPrompt } from "../src/prompt"; const AUDIO_OUTPUT_SAMPLE_RATE = 24_000; -const INPUT_SILENCE_DURATION_MS = 1000; -const INPUT_LEVEL_THRESHOLD = 180; +const STT_SAMPLE_RATE = 16_000; interface OpenAIChatMessage { role: "system" | "user" | "assistant"; @@ -41,14 +41,6 @@ function errorMessage(error: unknown): string { return String(error); } -async function transcribePcm(env: Env, audio: Uint8Array): Promise { - const response = await env.AI.run("@cf/openai/whisper", { - audio: [...audio], - }) as { text?: string }; - - return response.text?.trim() || ""; -} - async function generateOpenAIReply( env: Env, transcript: string | null, @@ -117,43 +109,16 @@ async function synthesizeSpeech(env: Env, text: string): Promise { } export class ElatoOpenAiVoiceAgent extends DurableObject { - private audioBuffer = new Uint8Array(0); private isGenerating = false; private opusPromise: Promise>> | null = null; private hasStartedConversation = false; - private sawUserSpeech = false; - private vadTimer: ReturnType | null = null; + private transcriberSession: TranscriberSession | null = null; + private currentWebSocket: WebSocket | null = null; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); } - private appendAudio(chunk: Uint8Array) { - const next = new Uint8Array(this.audioBuffer.length + chunk.length); - next.set(this.audioBuffer, 0); - next.set(chunk, this.audioBuffer.length); - this.audioBuffer = next; - } - - private calculateAudioLevel(audio: Uint8Array): number { - if (audio.byteLength < 2) { - return 0; - } - - const samples = new Int16Array( - audio.buffer, - audio.byteOffset, - Math.floor(audio.byteLength / 2), - ); - - let sum = 0; - for (let i = 0; i < samples.length; i += 1) { - sum += Math.abs(samples[i] ?? 0); - } - - return Math.round(sum / samples.length); - } - private async loadSessionState(): Promise { const stored = await this.ctx.storage.get("session_state"); return stored || { history: [] }; @@ -163,17 +128,6 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { await this.ctx.storage.put("session_state", state); } - private resetBufferedAudio() { - this.audioBuffer = new Uint8Array(0); - } - - private clearVadTimer() { - if (this.vadTimer) { - clearTimeout(this.vadTimer); - this.vadTimer = null; - } - } - private getOpusPacketizer(websocket: WebSocket) { if (!this.opusPromise) { this.opusPromise = createOpusPacketizer((packet) => websocket.send(packet)); @@ -187,34 +141,47 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { websocket.close(1011, "startup_failed"); } - private scheduleAutoCommit(websocket: WebSocket) { - if (this.isGenerating || !this.sawUserSpeech) { + private ensureTranscriberSession(websocket: WebSocket) { + if (this.transcriberSession) { return; } - this.clearVadTimer(); - this.vadTimer = setTimeout(() => { - void this.ctx.blockConcurrencyWhile(async () => { - if (this.isGenerating || !this.sawUserSpeech || this.audioBuffer.byteLength === 0) { + const transcriber = new WorkersAIFluxSTT(this.env.AI, { + sampleRate: STT_SAMPLE_RATE, + eotTimeoutMs: 1000, + }); + + this.transcriberSession = transcriber.createSession({ + onInterim: (text) => { + if (text.trim()) { + console.log(`[cloudflare][stt] interim: ${text}`); + } + }, + onUtterance: (transcript) => { + const text = transcript.trim(); + if (!text) { return; } - console.log( - `[cloudflare][vad] silence detected, auto-committing turn (${this.audioBuffer.byteLength} bytes buffered)`, - ); - this.isGenerating = true; - this.sawUserSpeech = false; + void this.ctx.blockConcurrencyWhile(async () => { + if (!this.currentWebSocket || this.isGenerating) { + return; + } - try { - await this.handleTurn(websocket); - } catch (error) { - console.error(`[cloudflare][turn] ${errorMessage(error)}`); - websocket.send(createServerMessage("RESPONSE.ERROR")); - } finally { - this.isGenerating = false; - } - }); - }, INPUT_SILENCE_DURATION_MS); + this.isGenerating = true; + try { + await this.handleTranscriptTurn(this.currentWebSocket, text); + } catch (error) { + console.error(`[cloudflare][turn] ${errorMessage(error)}`); + this.currentWebSocket.send(createServerMessage("RESPONSE.ERROR")); + } finally { + this.isGenerating = false; + } + }); + }, + }); + + console.log("[cloudflare][stt] started continuous Flux session"); } private async streamAssistantReply(websocket: WebSocket, reply: string) { @@ -248,25 +215,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { } } - private async handleTurn( - websocket: WebSocket, - ) { - const pcm = this.audioBuffer; - this.resetBufferedAudio(); - - if (pcm.byteLength === 0) { - return; - } - + private async handleTranscriptTurn(websocket: WebSocket, transcript: string) { websocket.send(createServerMessage("AUDIO.COMMITTED")); - this.clearVadTimer(); - - const transcript = await transcribePcm(this.env, pcm); - if (!transcript) { - console.error("[cloudflare][stt] empty transcript"); - websocket.send(createServerMessage("RESPONSE.ERROR")); - return; - } console.log(`[cloudflare][stt] transcript: ${transcript}`); /* Add user transcript DB call here */ @@ -314,25 +264,27 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { const [client, server] = Object.values(pair); server.accept(); + this.currentWebSocket = server; + this.ensureTranscriberSession(server); + server.send(JSON.stringify(createAuthMessage())); void this.startInitialTurn(server); server.addEventListener("message", (event) => { void this.ctx.blockConcurrencyWhile(async () => { if (typeof event.data !== "string") { - const chunk = new Uint8Array(event.data as ArrayBuffer); - this.appendAudio(chunk); - - if (!this.isGenerating) { - const level = this.calculateAudioLevel(chunk); - if (level >= INPUT_LEVEL_THRESHOLD) { - if (!this.sawUserSpeech) { - console.log(`[cloudflare][vad] speech started (level=${level}, threshold=${INPUT_LEVEL_THRESHOLD})`); - } - this.sawUserSpeech = true; - this.scheduleAutoCommit(server); - } + if (this.isGenerating || !this.transcriberSession) { + return; } + + const chunk = event.data instanceof ArrayBuffer + ? event.data + : event.data.buffer.slice( + event.data.byteOffset, + event.data.byteOffset + event.data.byteLength, + ); + + this.transcriberSession.feed(chunk); return; } @@ -345,28 +297,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { return; } - if (message.msg === "end_of_speech") { - if (this.isGenerating) { - return; - } - this.isGenerating = true; - this.sawUserSpeech = false; - try { - await this.handleTurn(server); - } catch (error) { - console.error(`[cloudflare][turn] ${errorMessage(error)}`); - server.send(createServerMessage("RESPONSE.ERROR")); - } finally { - this.isGenerating = false; - } - return; - } - if (message.msg === "INTERRUPT") { this.isGenerating = false; - this.sawUserSpeech = false; - this.clearVadTimer(); - this.resetBufferedAudio(); server.send(createServerMessage("RESPONSE.COMPLETE", { volume_control: 100 })); return; } @@ -380,9 +312,9 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { server.addEventListener("close", () => { this.isGenerating = false; - this.sawUserSpeech = false; - this.clearVadTimer(); - this.resetBufferedAudio(); + this.currentWebSocket = null; + this.transcriberSession?.close(); + this.transcriberSession = null; if (this.opusPromise) { void this.opusPromise.then((opus) => opus.close()).catch(() => {}); this.opusPromise = null;