update to deepgram

This commit is contained in:
akdeb 2026-04-17 13:19:28 +05:30
parent 71933ae5f0
commit 19cfc1610d

View file

@ -1,11 +1,11 @@
import { DurableObject } from "cloudflare:workers";
import { WorkersAIFluxSTT, type TranscriberSession } from "@cloudflare/voice";
import type { Env } from "../src/types";
import { createOpusPacketizer } from "../src/opus";
import { getFirstMessagePrompt, getSystemPrompt } from "../src/prompt";
const AUDIO_OUTPUT_SAMPLE_RATE = 24_000;
const INPUT_SILENCE_DURATION_MS = 1000;
const INPUT_LEVEL_THRESHOLD = 180;
const STT_SAMPLE_RATE = 16_000;
interface OpenAIChatMessage {
role: "system" | "user" | "assistant";
@ -41,14 +41,6 @@ function errorMessage(error: unknown): string {
return String(error);
}
async function transcribePcm(env: Env, audio: Uint8Array): Promise<string> {
const response = await env.AI.run("@cf/openai/whisper", {
audio: [...audio],
}) as { text?: string };
return response.text?.trim() || "";
}
async function generateOpenAIReply(
env: Env,
transcript: string | null,
@ -117,43 +109,16 @@ async function synthesizeSpeech(env: Env, text: string): Promise<Response> {
}
export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
private audioBuffer = new Uint8Array(0);
private isGenerating = false;
private opusPromise: Promise<Awaited<ReturnType<typeof createOpusPacketizer>>> | null = null;
private hasStartedConversation = false;
private sawUserSpeech = false;
private vadTimer: ReturnType<typeof setTimeout> | null = null;
private transcriberSession: TranscriberSession | null = null;
private currentWebSocket: WebSocket | null = null;
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
}
private appendAudio(chunk: Uint8Array) {
const next = new Uint8Array(this.audioBuffer.length + chunk.length);
next.set(this.audioBuffer, 0);
next.set(chunk, this.audioBuffer.length);
this.audioBuffer = next;
}
private calculateAudioLevel(audio: Uint8Array): number {
if (audio.byteLength < 2) {
return 0;
}
const samples = new Int16Array(
audio.buffer,
audio.byteOffset,
Math.floor(audio.byteLength / 2),
);
let sum = 0;
for (let i = 0; i < samples.length; i += 1) {
sum += Math.abs(samples[i] ?? 0);
}
return Math.round(sum / samples.length);
}
private async loadSessionState(): Promise<SessionState> {
const stored = await this.ctx.storage.get<SessionState>("session_state");
return stored || { history: [] };
@ -163,17 +128,6 @@ export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
await this.ctx.storage.put("session_state", state);
}
private resetBufferedAudio() {
this.audioBuffer = new Uint8Array(0);
}
private clearVadTimer() {
if (this.vadTimer) {
clearTimeout(this.vadTimer);
this.vadTimer = null;
}
}
private getOpusPacketizer(websocket: WebSocket) {
if (!this.opusPromise) {
this.opusPromise = createOpusPacketizer((packet) => websocket.send(packet));
@ -187,34 +141,47 @@ export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
websocket.close(1011, "startup_failed");
}
private scheduleAutoCommit(websocket: WebSocket) {
if (this.isGenerating || !this.sawUserSpeech) {
private ensureTranscriberSession(websocket: WebSocket) {
if (this.transcriberSession) {
return;
}
this.clearVadTimer();
this.vadTimer = setTimeout(() => {
void this.ctx.blockConcurrencyWhile(async () => {
if (this.isGenerating || !this.sawUserSpeech || this.audioBuffer.byteLength === 0) {
const transcriber = new WorkersAIFluxSTT(this.env.AI, {
sampleRate: STT_SAMPLE_RATE,
eotTimeoutMs: 1000,
});
this.transcriberSession = transcriber.createSession({
onInterim: (text) => {
if (text.trim()) {
console.log(`[cloudflare][stt] interim: ${text}`);
}
},
onUtterance: (transcript) => {
const text = transcript.trim();
if (!text) {
return;
}
console.log(
`[cloudflare][vad] silence detected, auto-committing turn (${this.audioBuffer.byteLength} bytes buffered)`,
);
this.isGenerating = true;
this.sawUserSpeech = false;
void this.ctx.blockConcurrencyWhile(async () => {
if (!this.currentWebSocket || this.isGenerating) {
return;
}
try {
await this.handleTurn(websocket);
} catch (error) {
console.error(`[cloudflare][turn] ${errorMessage(error)}`);
websocket.send(createServerMessage("RESPONSE.ERROR"));
} finally {
this.isGenerating = false;
}
});
}, INPUT_SILENCE_DURATION_MS);
this.isGenerating = true;
try {
await this.handleTranscriptTurn(this.currentWebSocket, text);
} catch (error) {
console.error(`[cloudflare][turn] ${errorMessage(error)}`);
this.currentWebSocket.send(createServerMessage("RESPONSE.ERROR"));
} finally {
this.isGenerating = false;
}
});
},
});
console.log("[cloudflare][stt] started continuous Flux session");
}
private async streamAssistantReply(websocket: WebSocket, reply: string) {
@ -248,25 +215,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
}
}
private async handleTurn(
websocket: WebSocket,
) {
const pcm = this.audioBuffer;
this.resetBufferedAudio();
if (pcm.byteLength === 0) {
return;
}
private async handleTranscriptTurn(websocket: WebSocket, transcript: string) {
websocket.send(createServerMessage("AUDIO.COMMITTED"));
this.clearVadTimer();
const transcript = await transcribePcm(this.env, pcm);
if (!transcript) {
console.error("[cloudflare][stt] empty transcript");
websocket.send(createServerMessage("RESPONSE.ERROR"));
return;
}
console.log(`[cloudflare][stt] transcript: ${transcript}`);
/* Add user transcript DB call here */
@ -314,25 +264,27 @@ export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
const [client, server] = Object.values(pair);
server.accept();
this.currentWebSocket = server;
this.ensureTranscriberSession(server);
server.send(JSON.stringify(createAuthMessage()));
void this.startInitialTurn(server);
server.addEventListener("message", (event) => {
void this.ctx.blockConcurrencyWhile(async () => {
if (typeof event.data !== "string") {
const chunk = new Uint8Array(event.data as ArrayBuffer);
this.appendAudio(chunk);
if (!this.isGenerating) {
const level = this.calculateAudioLevel(chunk);
if (level >= INPUT_LEVEL_THRESHOLD) {
if (!this.sawUserSpeech) {
console.log(`[cloudflare][vad] speech started (level=${level}, threshold=${INPUT_LEVEL_THRESHOLD})`);
}
this.sawUserSpeech = true;
this.scheduleAutoCommit(server);
}
if (this.isGenerating || !this.transcriberSession) {
return;
}
const chunk = event.data instanceof ArrayBuffer
? event.data
: event.data.buffer.slice(
event.data.byteOffset,
event.data.byteOffset + event.data.byteLength,
);
this.transcriberSession.feed(chunk);
return;
}
@ -345,28 +297,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
return;
}
if (message.msg === "end_of_speech") {
if (this.isGenerating) {
return;
}
this.isGenerating = true;
this.sawUserSpeech = false;
try {
await this.handleTurn(server);
} catch (error) {
console.error(`[cloudflare][turn] ${errorMessage(error)}`);
server.send(createServerMessage("RESPONSE.ERROR"));
} finally {
this.isGenerating = false;
}
return;
}
if (message.msg === "INTERRUPT") {
this.isGenerating = false;
this.sawUserSpeech = false;
this.clearVadTimer();
this.resetBufferedAudio();
server.send(createServerMessage("RESPONSE.COMPLETE", { volume_control: 100 }));
return;
}
@ -380,9 +312,9 @@ export class ElatoOpenAiVoiceAgent extends DurableObject<Env> {
server.addEventListener("close", () => {
this.isGenerating = false;
this.sawUserSpeech = false;
this.clearVadTimer();
this.resetBufferedAudio();
this.currentWebSocket = null;
this.transcriberSession?.close();
this.transcriberSession = null;
if (this.opusPromise) {
void this.opusPromise.then((opus) => opus.close()).catch(() => {});
this.opusPromise = null;