diff --git a/server-cloudflare/models/openai.ts b/server-cloudflare/models/openai.ts index afd7831..b447e40 100644 --- a/server-cloudflare/models/openai.ts +++ b/server-cloudflare/models/openai.ts @@ -4,6 +4,8 @@ import { createOpusPacketizer } from "../src/opus"; import { getFirstMessagePrompt, getSystemPrompt } from "../src/prompt"; const AUDIO_OUTPUT_SAMPLE_RATE = 24_000; +const INPUT_SILENCE_DURATION_MS = 1000; +const INPUT_LEVEL_THRESHOLD = 900; interface OpenAIChatMessage { role: "system" | "user" | "assistant"; @@ -119,6 +121,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { private isGenerating = false; private opusPromise: Promise>> | null = null; private hasStartedConversation = false; + private sawUserSpeech = false; + private vadTimer: ReturnType | null = null; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); @@ -131,6 +135,25 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { this.audioBuffer = next; } + private calculateAudioLevel(audio: Uint8Array): number { + if (audio.byteLength < 2) { + return 0; + } + + const samples = new Int16Array( + audio.buffer, + audio.byteOffset, + Math.floor(audio.byteLength / 2), + ); + + let sum = 0; + for (let i = 0; i < samples.length; i += 1) { + sum += Math.abs(samples[i] ?? 0); + } + + return Math.round(sum / samples.length); + } + private async loadSessionState(): Promise { const stored = await this.ctx.storage.get("session_state"); return stored || { history: [] }; @@ -144,6 +167,13 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { this.audioBuffer = new Uint8Array(0); } + private clearVadTimer() { + if (this.vadTimer) { + clearTimeout(this.vadTimer); + this.vadTimer = null; + } + } + private getOpusPacketizer(websocket: WebSocket) { if (!this.opusPromise) { this.opusPromise = createOpusPacketizer((packet) => websocket.send(packet)); @@ -157,6 +187,34 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { websocket.close(1011, "startup_failed"); } + private scheduleAutoCommit(websocket: WebSocket) { + if (this.isGenerating || !this.sawUserSpeech) { + return; + } + + this.clearVadTimer(); + this.vadTimer = setTimeout(() => { + void this.ctx.blockConcurrencyWhile(async () => { + if (this.isGenerating || !this.sawUserSpeech || this.audioBuffer.byteLength === 0) { + return; + } + + console.log("[cloudflare][vad] silence detected, auto-committing turn"); + this.isGenerating = true; + this.sawUserSpeech = false; + + try { + await this.handleTurn(websocket); + } catch (error) { + console.error(`[cloudflare][turn] ${errorMessage(error)}`); + websocket.send(createServerMessage("RESPONSE.ERROR")); + } finally { + this.isGenerating = false; + } + }); + }, INPUT_SILENCE_DURATION_MS); + } + private async streamAssistantReply(websocket: WebSocket, reply: string) { const opus = await this.getOpusPacketizer(websocket); opus.reset(); @@ -199,6 +257,7 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { } websocket.send(createServerMessage("AUDIO.COMMITTED")); + this.clearVadTimer(); const transcript = await transcribePcm(this.env, pcm); if (!transcript) { @@ -259,7 +318,21 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { server.addEventListener("message", (event) => { void this.ctx.blockConcurrencyWhile(async () => { if (typeof event.data !== "string") { - this.appendAudio(new Uint8Array(event.data as ArrayBuffer)); + const chunk = new Uint8Array(event.data as ArrayBuffer); + this.appendAudio(chunk); + + if (!this.isGenerating) { + const level = this.calculateAudioLevel(chunk); + if (level >= INPUT_LEVEL_THRESHOLD) { + if (!this.sawUserSpeech) { + console.log(`[cloudflare][vad] speech started (level=${level})`); + } + this.sawUserSpeech = true; + this.scheduleAutoCommit(server); + } else if (this.sawUserSpeech) { + this.scheduleAutoCommit(server); + } + } return; } @@ -277,6 +350,7 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { return; } this.isGenerating = true; + this.sawUserSpeech = false; try { await this.handleTurn(server); } catch (error) { @@ -290,6 +364,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { if (message.msg === "INTERRUPT") { this.isGenerating = false; + this.sawUserSpeech = false; + this.clearVadTimer(); this.resetBufferedAudio(); server.send(createServerMessage("RESPONSE.COMPLETE", { volume_control: 100 })); return; @@ -304,6 +380,8 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { server.addEventListener("close", () => { this.isGenerating = false; + this.sawUserSpeech = false; + this.clearVadTimer(); this.resetBufferedAudio(); if (this.opusPromise) { void this.opusPromise.then((opus) => opus.close()).catch(() => {});