diff --git a/server-cloudflare/models/llm.ts b/server-cloudflare/models/llm.ts new file mode 100644 index 0000000..1c96d7f --- /dev/null +++ b/server-cloudflare/models/llm.ts @@ -0,0 +1,54 @@ +import type { Env } from "../src/types"; +import { getFirstMessagePrompt, getSystemPrompt } from "../src/prompt"; + +export interface ChatMessage { + role: "system" | "user" | "assistant"; + content: string; +} + +export async function generateOpenAIReply( + env: Env, + transcript: string | null, + history: ChatMessage[], +): Promise { + if (!env.OPENAI_API_KEY?.trim()) { + throw new Error("OPENAI_API_KEY is missing"); + } + + const messages: ChatMessage[] = [ + { role: "system", content: getSystemPrompt(env) }, + ...history, + ]; + + if (transcript && transcript.trim().length > 0) { + messages.push({ role: "user", content: transcript }); + } else { + messages.push({ role: "user", content: getFirstMessagePrompt(env) }); + } + + const response = await fetch("https://api.openai.com/v1/chat/completions", { + method: "POST", + headers: { + Authorization: `Bearer ${env.OPENAI_API_KEY}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + model: env.ELATO_OPENAI_MODEL || "gpt-4.1-mini", + messages, + temperature: 0.7, + }), + }); + + if (!response.ok) { + throw new Error(`OpenAI request failed: ${response.status} ${await response.text()}`); + } + + const data = (await response.json()) as { + choices?: Array<{ message?: { content?: string } }>; + }; + + return ( + data.choices?.[0]?.message?.content?.trim() || + "I heard you, but I do not have a response yet." + ); +} diff --git a/server-cloudflare/models/openai.ts b/server-cloudflare/models/session.ts similarity index 74% rename from server-cloudflare/models/openai.ts rename to server-cloudflare/models/session.ts index bfc3a3d..3e6afb1 100644 --- a/server-cloudflare/models/openai.ts +++ b/server-cloudflare/models/session.ts @@ -1,16 +1,10 @@ import { DurableObject } from "cloudflare:workers"; -import { WorkersAIFluxSTT, type TranscriberSession } from "@cloudflare/voice"; import type { Env } from "../src/types"; import { createOpusPacketizer } from "../src/opus"; -import { getFirstMessagePrompt, getSystemPrompt } from "../src/prompt"; - -const AUDIO_OUTPUT_SAMPLE_RATE = 24_000; -const STT_SAMPLE_RATE = 16_000; - -interface OpenAIChatMessage { - role: "system" | "user" | "assistant"; - content: string; -} +import { createSttSession } from "./stt"; +import { generateOpenAIReply, type ChatMessage } from "./llm"; +import { synthesizeSpeech } from "./tts"; +import type { TranscriberSession } from "@cloudflare/voice"; function createAuthMessage() { return { @@ -37,80 +31,13 @@ function errorMessage(error: unknown): string { return String(error); } -async function generateOpenAIReply( - env: Env, - transcript: string | null, - history: OpenAIChatMessage[], -): Promise { - if (!env.OPENAI_API_KEY?.trim()) { - throw new Error("OPENAI_API_KEY is missing"); - } - - const messages: OpenAIChatMessage[] = [ - { role: "system", content: getSystemPrompt(env) }, - ...history, - ]; - - if (transcript && transcript.trim().length > 0) { - messages.push({ role: "user", content: transcript }); - } else { - messages.push({ role: "user", content: getFirstMessagePrompt(env) }); - } - - const response = await fetch("https://api.openai.com/v1/chat/completions", { - method: "POST", - headers: { - Authorization: `Bearer ${env.OPENAI_API_KEY}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ - model: env.ELATO_OPENAI_MODEL || "gpt-4.1-mini", - messages, - temperature: 0.7, - }), - }); - - if (!response.ok) { - throw new Error(`OpenAI request failed: ${response.status} ${await response.text()}`); - } - - const data = (await response.json()) as { - choices?: Array<{ message?: { content?: string } }>; - }; - - return ( - data.choices?.[0]?.message?.content?.trim() || - "I heard you, but I do not have a response yet." - ); -} - -async function synthesizeSpeech(env: Env, text: string): Promise { - if (!env.AI) { - throw new Error("Cloudflare AI binding is missing"); - } - - return env.AI.run( - "@cf/deepgram/aura-2-en", - { - text, - speaker: "asteria", - encoding: "linear16", - container: "none", - sample_rate: AUDIO_OUTPUT_SAMPLE_RATE, - }, - { - returnRawResponse: true, - }, - ) as Promise; -} - -export class ElatoOpenAiVoiceAgent extends DurableObject { +export class ElatoVoiceSession extends DurableObject { private isGenerating = false; private opusPromise: Promise>> | null = null; private hasStartedConversation = false; private transcriberSession: TranscriberSession | null = null; private currentWebSocket: WebSocket | null = null; - private history: OpenAIChatMessage[] = []; + private history: ChatMessage[] = []; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); @@ -141,23 +68,12 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { return; } - const transcriber = new WorkersAIFluxSTT(this.env.AI, { - sampleRate: STT_SAMPLE_RATE, - eotTimeoutMs: 1000, - }); - - this.transcriberSession = transcriber.createSession({ - onInterim: (text) => { - if (text.trim()) { - console.log(`[cloudflare][stt] interim: ${text}`); - } + this.transcriberSession = createSttSession( + this.env, + (text) => { + console.log(`[cloudflare][stt] interim: ${text}`); }, - onUtterance: (transcript) => { - const text = transcript.trim(); - if (!text) { - return; - } - + (text) => { void this.ctx.blockConcurrencyWhile(async () => { if (!this.currentWebSocket || this.isGenerating) { return; @@ -174,7 +90,7 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { } }); }, - }); + ); console.log("[cloudflare][stt] started continuous Flux session"); } @@ -262,11 +178,12 @@ export class ElatoOpenAiVoiceAgent extends DurableObject { return new Response("Expected websocket", { status: 426 }); } + this.resetSessionState(); + const pair = new WebSocketPair(); const [client, server] = Object.values(pair); server.accept(); - this.resetSessionState(); this.currentWebSocket = server; this.ensureTranscriberSession(); diff --git a/server-cloudflare/models/stt.ts b/server-cloudflare/models/stt.ts new file mode 100644 index 0000000..90f703b --- /dev/null +++ b/server-cloudflare/models/stt.ts @@ -0,0 +1,29 @@ +import { WorkersAIFluxSTT, type TranscriberSession } from "@cloudflare/voice"; +import type { Env } from "../src/types"; + +const STT_SAMPLE_RATE = 16_000; + +export function createSttSession( + env: Env, + onInterim: (text: string) => void, + onUtterance: (transcript: string) => void, +): TranscriberSession { + const transcriber = new WorkersAIFluxSTT(env.AI, { + sampleRate: STT_SAMPLE_RATE, + eotTimeoutMs: 1000, + }); + + return transcriber.createSession({ + onInterim: (text) => { + if (text.trim()) { + onInterim(text); + } + }, + onUtterance: (transcript) => { + const text = transcript.trim(); + if (text) { + onUtterance(text); + } + }, + }); +} diff --git a/server-cloudflare/models/tts.ts b/server-cloudflare/models/tts.ts new file mode 100644 index 0000000..92e928d --- /dev/null +++ b/server-cloudflare/models/tts.ts @@ -0,0 +1,23 @@ +import type { Env } from "../src/types"; + +const AUDIO_OUTPUT_SAMPLE_RATE = 24_000; + +export async function synthesizeSpeech(env: Env, text: string): Promise { + if (!env.AI) { + throw new Error("Cloudflare AI binding is missing"); + } + + return env.AI.run( + "@cf/deepgram/aura-2-en", + { + text, + speaker: "asteria", + encoding: "linear16", + container: "none", + sample_rate: AUDIO_OUTPUT_SAMPLE_RATE, + }, + { + returnRawResponse: true, + }, + ) as Promise; +} diff --git a/server-cloudflare/src/index.ts b/server-cloudflare/src/index.ts index f140328..54aab44 100644 --- a/server-cloudflare/src/index.ts +++ b/server-cloudflare/src/index.ts @@ -1,6 +1,6 @@ import type { Env } from "./types"; -export { ElatoOpenAiVoiceAgent } from "../models/openai"; +export { ElatoVoiceSession } from "../models/session"; export default { async fetch(request: Request, env: Env): Promise { @@ -13,8 +13,8 @@ export default { if (url.pathname === "/ws/esp32" || url.pathname.startsWith("/ws/esp32/")) { /* Add AUTH here */ - const stub = env.ElatoOpenAiVoiceAgent.get( - env.ElatoOpenAiVoiceAgent.newUniqueId(), + const stub = env.ElatoVoiceSession.get( + env.ElatoVoiceSession.newUniqueId(), ); return stub.fetch(request); } diff --git a/server-cloudflare/src/types.ts b/server-cloudflare/src/types.ts index b2d8b9a..01bce6c 100644 --- a/server-cloudflare/src/types.ts +++ b/server-cloudflare/src/types.ts @@ -5,5 +5,5 @@ export interface Env { ELATO_OPENAI_MODEL?: string; ELATO_OPENAI_SYSTEM_PROMPT?: string; ELATO_OPENAI_FIRST_MESSAGE?: string; - ElatoOpenAiVoiceAgent: DurableObjectNamespace; + ElatoVoiceSession: DurableObjectNamespace; } diff --git a/server-cloudflare/wrangler.toml b/server-cloudflare/wrangler.toml index fad0787..57b75eb 100644 --- a/server-cloudflare/wrangler.toml +++ b/server-cloudflare/wrangler.toml @@ -7,12 +7,12 @@ compatibility_flags = ["nodejs_compat"] binding = "AI" [[durable_objects.bindings]] -name = "ElatoOpenAiVoiceAgent" -class_name = "ElatoOpenAiVoiceAgent" +name = "ElatoVoiceSession" +class_name = "ElatoVoiceSession" [[migrations]] tag = "v1" -new_sqlite_classes = ["ElatoOpenAiVoiceAgent"] +new_sqlite_classes = ["ElatoVoiceSession"] [observability] [observability.logs]