diff --git a/server-fastapi/README.md b/server-fastapi/README.md new file mode 100644 index 0000000..194c2b3 --- /dev/null +++ b/server-fastapi/README.md @@ -0,0 +1,256 @@ +## ElatoAI: Realtime Voice AI Models on FastAPI + +`server-fastapi` is the simplest self-hosted Elato backend for people who want a normal Python server instead of an edge runtime. + +Use this if you want: + +- a FastAPI server you can run on your own machine or VM +- a classic `STT -> LLM -> TTS` voice pipeline +- a smaller provider surface that is easy to understand +- the same ESP32 transport shape as the rest of Elato + +If you are new to the project, read these first: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/README.md` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/README.md` + +## The Simple Provider Set + +To keep onboarding straightforward, the classic FastAPI route is centered around a small set of providers. + +### LLM + +- `openai` +- `claude` +- `gemini` +- `grok` + +### STT + +- `deepgram` +- `whisper` + +### TTS + +- `elevenlabs` +- `cartesia` +- `deepgram` +- `openai` + +The code still uses the `models/llm`, `models/stt`, and `models/tts` layout, but the active registry is intentionally trimmed so the default experience stays simple. + +## Default Setup + +The default classic route is: + +- STT: `deepgram` +- LLM: `openai` +- TTS: `elevenlabs` + +That gives people one obvious path to get running before they start swapping providers. + +## Project Layout + +```text +server-fastapi/ +├── bot.py +├── classic_route.py +├── esp32_transport.py +├── server.py +├── env.example +└── models/ + ├── llm/ + ├── stt/ + └── tts/ +``` + +## How The FastAPI Server Fits Into Elato + +Elato has three backend options right now: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/deno` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/cloudflare` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi` + +A clean way to think about them is: + +- `Deno`: edge-first, mature provider integrations +- `Cloudflare`: Workers + Durable Objects + Workers AI +- `FastAPI`: normal Python server, easy to self-host, easy to reason about + +## Quick Start + +### 1. Create or activate your Python environment + +Use whatever you prefer. If you already use `uv`, that is a good default. + +### 2. Install dependencies + +This repo uses `pyproject.toml`, so install from that environment rather than a `requirements.txt` file. + +With `uv`: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi +uv sync +``` + +Or with plain pip in your venv: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi +pip install -e . +``` + +### 3. Create your env file + +Copy the example values from: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/env.example` + +Minimum example for the default route: + +```env +DEEPGRAM_API_KEY=your_deepgram_api_key +OPENAI_API_KEY=your_openai_api_key +ELEVENLABS_API_KEY=your_elevenlabs_api_key + +CURRENT_VOICE_ROUTE=classic +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=openai +CLASSIC_TTS_PROVIDER=elevenlabs + +ESP32_INPUT_SAMPLE_RATE=16000 +BROWSER_INPUT_SAMPLE_RATE=16000 +AUDIO_OUTPUT_SAMPLE_RATE=24000 +PIPELINE_AUDIO_IN_SAMPLE_RATE=16000 +PIPELINE_AUDIO_OUT_SAMPLE_RATE=24000 + +ALLOWED_ORIGINS=* +HOST=0.0.0.0 +PORT=7860 +``` + +### 4. Run the server + +If you use `uv`: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi +uv run server.py +``` + +If you use your activated venv directly: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi +python server.py +``` + +### 5. Point your ESP32 at the FastAPI backend + +Update the firmware config so your hardware connects to this server instead of the Deno or Cloudflare backend. + +The ESP32 route is: + +```text +/ws/esp32 +``` + +For browser or Next.js testing, the server also exposes: + +- `/ws/browser` +- `/ws/nextjs` + +## How Provider Selection Works + +The classic route reads three env vars: + +- `CLASSIC_STT_PROVIDER` +- `CLASSIC_LLM_PROVIDER` +- `CLASSIC_TTS_PROVIDER` + +So changing providers is just an env change. + +Examples: + +### OpenAI + Deepgram + ElevenLabs + +```env +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=openai +CLASSIC_TTS_PROVIDER=elevenlabs +``` + +### Whisper + Claude + Cartesia + +```env +CLASSIC_STT_PROVIDER=whisper +CLASSIC_LLM_PROVIDER=claude +CLASSIC_TTS_PROVIDER=cartesia +``` + +### Deepgram + Gemini + OpenAI TTS + +```env +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=gemini +CLASSIC_TTS_PROVIDER=openai +``` + +## Unified Experience Across Elato + +A simple way to keep the product understandable is: + +- keep the Next.js frontend focused on character creation and device management +- keep the ESP32 firmware focused on one transport protocol +- let users choose one backend runtime: + - Deno + - Cloudflare + - FastAPI +- inside each backend, expose the same conceptual knobs: + - `STT` + - `LLM` + - `TTS` + +That means the hardware story stays stable: + +- one firmware +- one websocket-style mental model +- three server deployment choices + +The cleanest unification strategy is not “every backend supports every provider.” +It is: + +- every backend should expose the same categories +- each backend should have one recommended default stack +- advanced users can swap providers later + +## Recommended Defaults + +If you want a simple opinionated experience for users, keep one default combo per backend. + +Suggested defaults: + +- `Deno`: OpenAI realtime +- `Cloudflare`: Workers AI STT/TTS + OpenAI LLM +- `FastAPI`: Deepgram + OpenAI + ElevenLabs + +That gives users one obvious starting point without taking away flexibility. + +## Important Files + +If you want to change the FastAPI backend, start here: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/server.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/classic_route.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/esp32_transport.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/models/llm/__init__.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/models/stt/__init__.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server-fastapi/models/tts/__init__.py` + +## Current Notes + +- The filesystem still contains many scaffolded provider modules from the earlier broader experiment. +- The active provider registry is now intentionally much smaller. +- That means the codebase stays extensible, but the user-facing default path stays simple. diff --git a/server-fastapi/__pycache__/classic_route.cpython-313.pyc b/server-fastapi/__pycache__/classic_route.cpython-313.pyc new file mode 100644 index 0000000..efd08aa Binary files /dev/null and b/server-fastapi/__pycache__/classic_route.cpython-313.pyc differ diff --git a/server-fastapi/classic_route.py b/server-fastapi/classic_route.py new file mode 100644 index 0000000..b8355a4 --- /dev/null +++ b/server-fastapi/classic_route.py @@ -0,0 +1,55 @@ +"""Classic STT -> LLM -> TTS pipeline builder.""" + +from __future__ import annotations + +import os + +from character_prompt import LANGUAGE_LEARNING_PAL_PROMPT +from loguru import logger +from models.llm import create_llm_service +from models.stt import create_stt_service +from models.tts import create_tts_service +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) + + +def build_classic_route(input_processor, context: LLMContext): + stt_provider = os.getenv("CLASSIC_STT_PROVIDER", "deepgram") + llm_provider = os.getenv("CLASSIC_LLM_PROVIDER", "openai") + tts_provider = os.getenv("CLASSIC_TTS_PROVIDER", "elevenlabs") + + logger.info( + "Building classic route with stt={} llm={} tts={}", + stt_provider, + llm_provider, + tts_provider, + ) + + stt = create_stt_service(stt_provider) + llm = create_llm_service( + llm_provider, + system_instruction=LANGUAGE_LEARNING_PAL_PROMPT, + ) + tts = create_tts_service(tts_provider) + + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1)) + ), + ) + + processors = [ + input_processor, + stt, + user_aggregator, + llm, + tts, + ] + + return processors, assistant_aggregator diff --git a/server-fastapi/env.example b/server-fastapi/env.example new file mode 100644 index 0000000..8f22444 --- /dev/null +++ b/server-fastapi/env.example @@ -0,0 +1,27 @@ +DEEPGRAM_API_KEY=your_deepgram_api_key +OPENAI_API_KEY=your_openai_api_key +ANTHROPIC_API_KEY=your_anthropic_api_key +GEMINI_API_KEY=your_gemini_api_key +XAI_API_KEY=your_xai_api_key +ELEVENLABS_API_KEY=your_elevenlabs_api_key +CARTESIA_API_KEY=your_cartesia_api_key + +# Classic route providers +CURRENT_VOICE_ROUTE=classic +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=openai +CLASSIC_TTS_PROVIDER=elevenlabs + +# Transport and pipeline sample rates +ESP32_INPUT_SAMPLE_RATE=16000 +BROWSER_INPUT_SAMPLE_RATE=16000 +AUDIO_OUTPUT_SAMPLE_RATE=24000 +PIPELINE_AUDIO_IN_SAMPLE_RATE=16000 +PIPELINE_AUDIO_OUT_SAMPLE_RATE=24000 + +# Browser / Next.js access +ALLOWED_ORIGINS=* + +# WebSocket server settings +HOST=0.0.0.0 +PORT=7860 diff --git a/server-fastapi/models/llm/__init__.py b/server-fastapi/models/llm/__init__.py new file mode 100644 index 0000000..fe825f2 --- /dev/null +++ b/server-fastapi/models/llm/__init__.py @@ -0,0 +1,20 @@ +"""LLM provider registry.""" + +from __future__ import annotations + +from models._provider_loader import load_provider_factory + +LLM_REGISTRY = { + "claude": "models.llm.anthropic", + "anthropic": "models.llm.anthropic", + "gemini": "models.llm.google_gemini", + "google_gemini": "models.llm.google_gemini", + "google_vertex_ai": "models.llm.google_vertex_ai", + "grok": "models.llm.grok", + "openai": "models.llm.openai", +} + + +def create_llm_service(provider_name: str, **kwargs): + factory = load_provider_factory(LLM_REGISTRY, provider_name, "LLM") + return factory(**kwargs) diff --git a/server-fastapi/models/llm/__pycache__/__init__.cpython-313.pyc b/server-fastapi/models/llm/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..26d562f Binary files /dev/null and b/server-fastapi/models/llm/__pycache__/__init__.cpython-313.pyc differ diff --git a/server-fastapi/models/stt/__init__.py b/server-fastapi/models/stt/__init__.py new file mode 100644 index 0000000..26cb9d7 --- /dev/null +++ b/server-fastapi/models/stt/__init__.py @@ -0,0 +1,16 @@ +"""STT provider registry.""" + +from __future__ import annotations + +from models._provider_loader import load_provider_factory + +STT_REGISTRY = { + "deepgram": "models.stt.deepgram", + "openai": "models.stt.openai", + "whisper": "models.stt.whisper", +} + + +def create_stt_service(provider_name: str, **kwargs): + factory = load_provider_factory(STT_REGISTRY, provider_name, "STT") + return factory(**kwargs) diff --git a/server-fastapi/models/stt/__pycache__/__init__.cpython-313.pyc b/server-fastapi/models/stt/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..4aaac67 Binary files /dev/null and b/server-fastapi/models/stt/__pycache__/__init__.cpython-313.pyc differ diff --git a/server-fastapi/models/tts/__init__.py b/server-fastapi/models/tts/__init__.py new file mode 100644 index 0000000..efc7468 --- /dev/null +++ b/server-fastapi/models/tts/__init__.py @@ -0,0 +1,17 @@ +"""TTS provider registry.""" + +from __future__ import annotations + +from models._provider_loader import load_provider_factory + +TTS_REGISTRY = { + "cartesia": "models.tts.cartesia", + "deepgram": "models.tts.deepgram", + "elevenlabs": "models.tts.elevenlabs", + "openai": "models.tts.openai", +} + + +def create_tts_service(provider_name: str, **kwargs): + factory = load_provider_factory(TTS_REGISTRY, provider_name, "TTS") + return factory(**kwargs) diff --git a/server-fastapi/models/tts/__pycache__/__init__.cpython-313.pyc b/server-fastapi/models/tts/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..eaf3005 Binary files /dev/null and b/server-fastapi/models/tts/__pycache__/__init__.cpython-313.pyc differ diff --git a/server/fastapi/.gitignore b/server/fastapi/.gitignore index b8d21e9..14f8aa0 100644 --- a/server/fastapi/.gitignore +++ b/server/fastapi/.gitignore @@ -1,4 +1,18 @@ -.venv/ -.uv-cache/ __pycache__/ -*.pyc +*.py[codz] +*.so +build/ +dist/ +*.egg-info/ +.env +.envrc +.venv +env/ +venv/ +.pytest_cache/ +.ruff_cache/ +.mypy_cache/ +.pyre/ +.pytype/ +.idea/ +.vscode/ diff --git a/server/fastapi/Dockerfile b/server/fastapi/Dockerfile new file mode 100644 index 0000000..b2206f4 --- /dev/null +++ b/server/fastapi/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PORT=7860 +ENV PATH="/app/.venv/bin:$PATH" + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + ffmpeg \ + git \ + curl \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache-dir uv + +COPY pyproject.toml uv.lock ./ +RUN uv sync --frozen --no-dev + +COPY . . + +EXPOSE 7860 + +CMD ["python", "server.py"] diff --git a/server/fastapi/README.md b/server/fastapi/README.md new file mode 100644 index 0000000..d11ee4d --- /dev/null +++ b/server/fastapi/README.md @@ -0,0 +1,340 @@ +## ElatoAI: Realtime Voice AI Models on FastAPI + +`server/fastapi` is the simplest self-hosted Elato backend for people who want a normal Python server instead of an edge runtime. + +Use this if you want: + +- a FastAPI server you can run on your own machine or VM +- a classic `STT -> LLM -> TTS` voice pipeline +- a smaller provider surface that is easy to understand +- the same ESP32 transport shape as the rest of Elato + +If you are new to the project, read these first: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/README.md` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/README.md` + +## The Simple Provider Set + +To keep onboarding straightforward, the classic FastAPI route is centered around a small set of providers. + +### LLM + +- `openai` +- `claude` +- `gemini` +- `grok` + +### STT + +- `deepgram` +- `whisper` + +### TTS + +- `elevenlabs` +- `cartesia` +- `deepgram` +- `openai` + +The code still uses the `models/llm`, `models/stt`, and `models/tts` layout, but the active registry is intentionally trimmed so the default experience stays simple. + +## Default Setup + +The default classic route is: + +- STT: `deepgram` +- LLM: `openai` +- TTS: `elevenlabs` + +That gives people one obvious path to get running before they start swapping providers. + +## Project Layout + +```text +server/fastapi/ +├── bot.py +├── classic_route.py +├── esp32_transport.py +├── server.py +├── env.example +└── models/ + ├── llm/ + ├── stt/ + └── tts/ +``` + +## How The FastAPI Server Fits Into Elato + +Elato has three backend options right now: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/deno` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/cloudflare` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi` + +A clean way to think about them is: + +- `Deno`: edge-first, mature provider integrations +- `Cloudflare`: Workers + Durable Objects + Workers AI +- `FastAPI`: normal Python server, easy to self-host, easy to reason about + +## Quick Start + +### 1. Create or activate your Python environment + +Use whatever you prefer. If you already use `uv`, that is a good default. + +### 2. Install dependencies + +This repo uses `pyproject.toml`, so install from that environment rather than a `requirements.txt` file. + +With `uv`: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi +uv sync +``` + +Or with plain pip in your venv: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi +pip install -e . +``` + +### 3. Create your env file + +Copy the example values from: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/env.example` + +Minimum example for the default route: + +```env +DEEPGRAM_API_KEY=your_deepgram_api_key +OPENAI_API_KEY=your_openai_api_key +ELEVENLABS_API_KEY=your_elevenlabs_api_key + +CURRENT_VOICE_ROUTE=classic +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=openai +CLASSIC_TTS_PROVIDER=elevenlabs + +ESP32_INPUT_SAMPLE_RATE=16000 +BROWSER_INPUT_SAMPLE_RATE=16000 +AUDIO_OUTPUT_SAMPLE_RATE=24000 +PIPELINE_AUDIO_IN_SAMPLE_RATE=16000 +PIPELINE_AUDIO_OUT_SAMPLE_RATE=24000 + +ALLOWED_ORIGINS=* +HOST=0.0.0.0 +PORT=7860 +``` + +### 4. Run the server + +If you use `uv`: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi +uv run server.py +``` + +If you use your activated venv directly: + +```bash +cd /Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi +python server.py +``` + +### 5. Point your ESP32 at the FastAPI backend + +Update the firmware config so your hardware connects to this server instead of the Deno or Cloudflare backend. + +The ESP32 route is: + +```text +/ws/esp32 +``` + +For browser or Next.js testing, the server also exposes: + +- `/ws/browser` +- `/ws/nextjs` + +## How Provider Selection Works + +The classic route reads three env vars: + +- `CLASSIC_STT_PROVIDER` +- `CLASSIC_LLM_PROVIDER` +- `CLASSIC_TTS_PROVIDER` + +So changing providers is just an env change. + +Pipecat handles the runtime orchestration for us: + +- STT turns incoming audio into transcripts +- the LLM receives conversation context and streams text back +- TTS turns that streamed text into audio + +In other words, Pipecat stitches the pipeline together, but Elato still needs to provide: + +- the provider selection UX +- the transport protocol for ESP32 +- the environment-variable contract for API keys +- the recommended defaults + +That is why this FastAPI backend now has a simple provider catalog and validation layer in: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/providers.py` + +This lets the app answer questions like: + +- which LLMs do we support? +- which key does `deepgram` require? +- can the server start with the currently selected stack? + +### Required API Keys By Provider + +The current simple provider map is: + +- `openai` LLM: `OPENAI_API_KEY` +- `claude` LLM: `ANTHROPIC_API_KEY` +- `gemini` LLM: `GEMINI_API_KEY` +- `grok` LLM: `XAI_API_KEY` +- `deepgram` STT: `DEEPGRAM_API_KEY` +- `whisper` STT: no external API key required +- `elevenlabs` TTS: `ELEVENLABS_API_KEY` +- `cartesia` TTS: `CARTESIA_API_KEY` +- `deepgram` TTS: `DEEPGRAM_API_KEY` +- `openai` TTS: `OPENAI_API_KEY` + +At startup, the server now validates the selected `CLASSIC_*_PROVIDER` values and fails early if the required keys are missing. + +### Provider Modules + +Each supported provider now has its own module file so the layout is easy to understand: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/llm/openai.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/llm/anthropic.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/llm/gemini.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/llm/grok.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/stt/deepgram.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/stt/whisper.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/tts/elevenlabs.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/tts/cartesia.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/tts/deepgram.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/tts/openai.py` + +Under the hood, these modules delegate to Pipecat service implementations. We keep that wiring thin on purpose so users mostly think in terms of: + +- `STT` +- `LLM` +- `TTS` + +not internal service classes. + +Examples: + +### OpenAI + Deepgram + ElevenLabs + +```env +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=openai +CLASSIC_TTS_PROVIDER=elevenlabs +``` + +### Whisper + Claude + Cartesia + +```env +CLASSIC_STT_PROVIDER=whisper +CLASSIC_LLM_PROVIDER=claude +CLASSIC_TTS_PROVIDER=cartesia +``` + +### Deepgram + Gemini + OpenAI TTS + +```env +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=gemini +CLASSIC_TTS_PROVIDER=openai +``` + +## Unified Experience Across Elato + +A simple way to keep the product understandable is: + +- keep the Next.js frontend focused on character creation and device management +- keep the ESP32 firmware focused on one transport protocol +- let users choose one backend runtime: + - Deno + - Cloudflare + - FastAPI +- inside each backend, expose the same conceptual knobs: + - `STT` + - `LLM` + - `TTS` + +That means the hardware story stays stable: + +- one firmware +- one websocket-style mental model +- three server deployment choices + +The cleanest unification strategy is not “every backend supports every provider.” +It is: + +- every backend should expose the same categories +- each backend should have one recommended default stack +- advanced users can swap providers later + +## What This Looks Like In A UI + +For Elato, the cleanest UI model is: + +1. user picks a backend runtime: + - `deno` + - `cloudflare` + - `fastapi` +2. user picks one option in each category: + - `stt` + - `llm` + - `tts` +3. UI shows which API keys are required +4. backend validates the selection before starting a session + +This FastAPI server now exposes a simple provider catalog at: + +- `/providers` + +So your Next.js frontend can eventually fetch the available providers and render a model picker without hardcoding everything in the UI. + +## Recommended Defaults + +If you want a simple opinionated experience for users, keep one default combo per backend. + +Suggested defaults: + +- `Deno`: OpenAI realtime +- `Cloudflare`: Workers AI STT/TTS + OpenAI LLM +- `FastAPI`: Deepgram + OpenAI + ElevenLabs + +That gives users one obvious starting point without taking away flexibility. + +## Important Files + +If you want to change the FastAPI backend, start here: + +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/server.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/classic_route.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/esp32_transport.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/llm/__init__.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/stt/__init__.py` +- `/Users/akashdeepdeb/Desktop/Projects/ElatoAI/server/fastapi/models/tts/__init__.py` + +## Current Notes + +- The filesystem still contains many scaffolded provider modules from the earlier broader experiment. +- The active provider registry is now intentionally much smaller. +- That means the codebase stays extensible, but the user-facing default path stays simple. diff --git a/server/fastapi/bot.py b/server/fastapi/bot.py new file mode 100644 index 0000000..b6c4207 --- /dev/null +++ b/server/fastapi/bot.py @@ -0,0 +1,209 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Shared Pipecat bot logic for the local multi-transport server.""" + +import os +from typing import Literal + +from classic_route import build_classic_route +from dotenv import load_dotenv +from gem_live_route import build_gem_live_route +from grok_route import build_grok_route +from loguru import logger + +logger.info("Loading Silero VAD model...") + +logger.info("Silero VAD model loaded") + +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + EmulateUserStoppedSpeakingFrame, + ErrorFrame, + Frame, + InputTransportMessageFrame, + InterruptionFrame, + LLMContextFrame, + LLMRunFrame, + OutputAudioRawFrame, + OutputTransportMessageFrame, + STTMuteFrame, + TTSStoppedFrame, + UserStoppedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.transports.base_transport import BaseTransport + +logger.info("All components loaded successfully") + +load_dotenv(override=True) +CURRENT_VOICE_ROUTE = os.getenv("CURRENT_VOICE_ROUTE", "classic").strip().lower() +AUDIO_IN_SAMPLE_RATE = int(os.getenv("PIPELINE_AUDIO_IN_SAMPLE_RATE", "16000")) +AUDIO_OUT_SAMPLE_RATE = int(os.getenv("PIPELINE_AUDIO_OUT_SAMPLE_RATE", "24000")) + + +class RealtimeInputControlProcessor(FrameProcessor): + """Bridge incoming websocket control messages into Pipecat frames.""" + + def __init__(self, voice_route: str): + super().__init__() + self._voice_route = voice_route + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, InputTransportMessageFrame): + message = frame.message if isinstance(frame.message, dict) else {} + msg_type = message.get("type") + msg = message.get("msg") + + if msg_type == "instruction" and msg == "end_of_speech": + if self._voice_route == "gem_live": + await self.push_frame(VADUserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM) + else: + await self.push_frame( + EmulateUserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM + ) + await self.push_frame(STTMuteFrame(mute=True), FrameDirection.DOWNSTREAM) + return + + if msg_type == "instruction" and msg == "INTERRUPT": + await self.push_frame(InterruptionFrame(), FrameDirection.DOWNSTREAM) + if self._voice_route != "gem_live": + await self.push_frame(STTMuteFrame(mute=False), FrameDirection.DOWNSTREAM) + return + + await self.push_frame(frame, direction) + + +class RealtimeOutputControlProcessor(FrameProcessor): + """Translate pipeline state changes into the old websocket control protocol.""" + + def __init__(self): + super().__init__() + self._response_started = False + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if direction is FrameDirection.DOWNSTREAM: + if isinstance(frame, (UserStoppedSpeakingFrame, VADUserStoppedSpeakingFrame)): + await self.push_frame( + OutputTransportMessageFrame(message={"type": "server", "msg": "AUDIO.COMMITTED"}), + direction, + ) + elif isinstance(frame, OutputAudioRawFrame) and not self._response_started: + self._response_started = True + logger.debug("Sending RESPONSE.CREATED before first audio packet") + await self.push_frame(STTMuteFrame(mute=True), direction) + await self.push_frame( + OutputTransportMessageFrame(message={"type": "server", "msg": "RESPONSE.CREATED"}), + direction, + ) + elif isinstance(frame, (TTSStoppedFrame, BotStoppedSpeakingFrame)): + self._response_started = False + logger.debug("Sending RESPONSE.COMPLETE after TTS stop") + await self.push_frame(STTMuteFrame(mute=False), direction) + await self.push_frame(frame, direction) + await self.push_frame( + OutputTransportMessageFrame(message={"type": "server", "msg": "RESPONSE.COMPLETE"}), + direction, + ) + return + elif isinstance(frame, ErrorFrame): + self._response_started = False + await self.push_frame(STTMuteFrame(mute=False), direction) + await self.push_frame( + OutputTransportMessageFrame(message={"type": "server", "msg": "RESPONSE.ERROR"}), + direction, + ) + + await self.push_frame(frame, direction) + + +def create_esp32_auth_message() -> dict: + return { + "type": "auth", + "volume_control": int(os.getenv("ESP32_DEFAULT_VOLUME", "100")), + "pitch_factor": float(os.getenv("ESP32_DEFAULT_PITCH_FACTOR", "1.0")), + "is_ota": False, + "is_reset": False, + } + + +async def run_bot_session( + transport: BaseTransport, + transport_kind: Literal["browser", "esp32"], + handle_sigint: bool = False, +): + voice_route = CURRENT_VOICE_ROUTE + logger.info(f"Starting bot session for {transport_kind} via route={voice_route}") + + context = LLMContext() + input_processor = RealtimeInputControlProcessor(voice_route) + if voice_route == "gem_live": + route_processors, assistant_aggregator = build_gem_live_route(input_processor, context) + elif voice_route == "grok": + route_processors, assistant_aggregator = build_grok_route(input_processor, context) + else: + route_processors, assistant_aggregator = build_classic_route(input_processor, context) + + processors = [transport.input(), *route_processors] + + if transport_kind in {"esp32", "browser"}: + processors.append(RealtimeOutputControlProcessor()) + processors.append(transport.output()) + processors.append(assistant_aggregator) + + pipeline = Pipeline(processors) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + audio_in_sample_rate=AUDIO_IN_SAMPLE_RATE, + audio_out_sample_rate=AUDIO_OUT_SAMPLE_RATE, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"{transport_kind} client connected") + if voice_route in {"gem_live", "grok"}: + context.add_message( + { + "role": "user", + "content": "Say hello and briefly introduce yourself.", + } + ) + await task.queue_frames( + [ + LLMContextFrame(context=context) + ] + ) + else: + context.add_message( + { + "role": "developer", + "content": "Say hello and briefly introduce yourself.", + } + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"{transport_kind} client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + await runner.run(task) diff --git a/server/fastapi/character_prompt.py b/server/fastapi/character_prompt.py new file mode 100644 index 0000000..af09426 --- /dev/null +++ b/server/fastapi/character_prompt.py @@ -0,0 +1,47 @@ +"""Shared character prompt for the MyPhoenyx language-learning pal.""" + +LANGUAGE_LEARNING_PAL_PROMPT = """ +You are Phoenyx, a warm, playful children's language-learning pal for MyPhoenyx. + +Your job is to help children learn Spanish in a way that feels safe, fun, and encouraging. + +Personality: +- Friendly, patient, and upbeat. +- Cheerful and playful without being chaotic or overwhelming. +- Gentle, supportive, and never judgmental. +- Focused on helping children feel brave about trying new words. + +Teaching style: +- Teach simple Spanish words, short phrases, and tiny conversations. +- Use age-appropriate examples and short sentences. +- Repeat new words clearly and explain them in simple English. +- Celebrate effort, not perfection. +- Offer one question or one step at a time. +- When useful, give a Spanish word, then the English meaning, then a tiny example. +- Keep responses concise and easy to understand for children. +- If the child makes a mistake, gently correct it and model the right phrase. +- Use games, songs, mini-quizzes, or pretend play when helpful. + +Spanish guidance: +- Prefer beginner-friendly Spanish. +- Start with greetings, colors, numbers, animals, family, food, feelings, and classroom words. +- Encourage pronunciation with short phonetic hints when helpful. +- If the child asks for a translation, provide it simply. +- You can mix English and Spanish, but keep the Spanish amount matched to the child’s level. + +Safety and tone: +- Be appropriate for children at all times. +- Avoid scary, romantic, violent, or adult content. +- Never shame, tease, or pressure the child. +- Keep the tone bright, reassuring, and age-appropriate. + +Brand feel: +- This character is for MyPhoenyx. +- The voice should feel magical, trustworthy, and educational. +- The goal is for children to feel like they are learning with a kind friend. + +Example style: +- "Great try! 'Hola' means 'hello.' Can you say it with me?" +- "Nice work. 'Rojo' means 'red.' Want to learn another color?" +- "Let's practice: 'Me llamo Ana' means 'My name is Ana.'" +""".strip() diff --git a/server/fastapi/classic_route.py b/server/fastapi/classic_route.py new file mode 100644 index 0000000..b8355a4 --- /dev/null +++ b/server/fastapi/classic_route.py @@ -0,0 +1,55 @@ +"""Classic STT -> LLM -> TTS pipeline builder.""" + +from __future__ import annotations + +import os + +from character_prompt import LANGUAGE_LEARNING_PAL_PROMPT +from loguru import logger +from models.llm import create_llm_service +from models.stt import create_stt_service +from models.tts import create_tts_service +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) + + +def build_classic_route(input_processor, context: LLMContext): + stt_provider = os.getenv("CLASSIC_STT_PROVIDER", "deepgram") + llm_provider = os.getenv("CLASSIC_LLM_PROVIDER", "openai") + tts_provider = os.getenv("CLASSIC_TTS_PROVIDER", "elevenlabs") + + logger.info( + "Building classic route with stt={} llm={} tts={}", + stt_provider, + llm_provider, + tts_provider, + ) + + stt = create_stt_service(stt_provider) + llm = create_llm_service( + llm_provider, + system_instruction=LANGUAGE_LEARNING_PAL_PROMPT, + ) + tts = create_tts_service(tts_provider) + + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1)) + ), + ) + + processors = [ + input_processor, + stt, + user_aggregator, + llm, + tts, + ] + + return processors, assistant_aggregator diff --git a/server/fastapi/env.example b/server/fastapi/env.example new file mode 100644 index 0000000..4903ce4 --- /dev/null +++ b/server/fastapi/env.example @@ -0,0 +1,28 @@ +DEEPGRAM_API_KEY=your_deepgram_api_key +OPENAI_API_KEY=your_openai_api_key +ANTHROPIC_API_KEY=your_anthropic_api_key +GEMINI_API_KEY=your_gemini_api_key +XAI_API_KEY=your_xai_api_key +ELEVENLABS_API_KEY=your_elevenlabs_api_key +CARTESIA_API_KEY=your_cartesia_api_key +# Whisper STT in Pipecat is local/offline, so it does not require an API key. + +# Classic route providers +CURRENT_VOICE_ROUTE=classic +CLASSIC_STT_PROVIDER=deepgram +CLASSIC_LLM_PROVIDER=openai +CLASSIC_TTS_PROVIDER=elevenlabs + +# Transport and pipeline sample rates +ESP32_INPUT_SAMPLE_RATE=16000 +BROWSER_INPUT_SAMPLE_RATE=16000 +AUDIO_OUTPUT_SAMPLE_RATE=24000 +PIPELINE_AUDIO_IN_SAMPLE_RATE=16000 +PIPELINE_AUDIO_OUT_SAMPLE_RATE=24000 + +# Browser / Next.js access +ALLOWED_ORIGINS=* + +# WebSocket server settings +HOST=0.0.0.0 +PORT=7860 diff --git a/server/fastapi/esp32_transport.py b/server/fastapi/esp32_transport.py new file mode 100644 index 0000000..47bc98b --- /dev/null +++ b/server/fastapi/esp32_transport.py @@ -0,0 +1,215 @@ +"""Custom websocket transports for raw PCM input with ESP32/browser outputs.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +import av +import numpy as np +from fastapi import WebSocket +from loguru import logger + +from pipecat.frames.frames import ( + Frame, + InputAudioRawFrame, + InputTransportMessageFrame, + OutputAudioRawFrame, + OutputTransportMessageFrame, + OutputTransportMessageUrgentFrame, +) +from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketCallbacks, + FastAPIWebsocketClient, + FastAPIWebsocketInputTransport, + FastAPIWebsocketOutputTransport, + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + + +class RawPCMFrameSerializer(FrameSerializer): + """Deserialize raw PCM and JSON control messages.""" + + def __init__(self, input_sample_rate: int, input_channels: int = 1): + super().__init__() + self._input_sample_rate = input_sample_rate + self._input_channels = input_channels + + async def serialize(self, frame: Frame) -> str | bytes | None: + if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): + if self.should_ignore_frame(frame): + return None + return json.dumps(frame.message) + return None + + async def deserialize(self, data: str | bytes) -> Frame | None: + if isinstance(data, bytes): + return InputAudioRawFrame( + audio=data, + sample_rate=self._input_sample_rate, + num_channels=self._input_channels, + ) + + if isinstance(data, str): + try: + message = json.loads(data) + except json.JSONDecodeError: + logger.warning("Ignoring non-JSON websocket text frame") + return None + return InputTransportMessageFrame(message=message) + + return None + + +@dataclass +class OpusEncoder: + sample_rate: int = 24000 + channels: int = 1 + bit_rate: int = 24000 + frame_duration_ms: int = 120 + + def __post_init__(self): + self._codec = av.CodecContext.create("libopus", "w") + self._codec.sample_rate = self.sample_rate + self._codec.rate = self.sample_rate + self._codec.layout = "mono" if self.channels == 1 else "stereo" + self._codec.format = "s16" + self._codec.bit_rate = self.bit_rate + self._codec.options = { + "application": "voip", + "frame_duration": str(self.frame_duration_ms), + } + self._codec.open() + self._frame_size = int(self.sample_rate * self.frame_duration_ms / 1000) + self._bytes_per_frame = self._frame_size * self.channels * 2 + self._buffer = bytearray() + + def encode(self, pcm_audio: bytes) -> list[bytes]: + packets: list[bytes] = [] + self._buffer.extend(pcm_audio) + + while len(self._buffer) >= self._bytes_per_frame: + chunk = bytes(self._buffer[: self._bytes_per_frame]) + del self._buffer[: self._bytes_per_frame] + + samples = np.frombuffer(chunk, dtype=np.int16).reshape(self.channels, -1) + frame = av.AudioFrame.from_ndarray(samples, format="s16", layout=self._codec.layout.name) + frame.sample_rate = self.sample_rate + packets.extend(bytes(packet) for packet in self._codec.encode(frame)) + + return packets + + def flush(self, pad_final_frame: bool = False) -> list[bytes]: + if not self._buffer: + return [] + + if not pad_final_frame: + self._buffer.clear() + return [] + + padded = bytes(self._buffer) + b"\x00" * (self._bytes_per_frame - len(self._buffer)) + self._buffer.clear() + return self.encode(padded) + + def reset(self): + self._buffer.clear() + + def close(self): + self._buffer.clear() + + +class RawPCMWebsocketOutputTransport(FastAPIWebsocketOutputTransport): + async def send_message( + self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame + ): + if self._client.is_closing or not self._client.is_connected: + return + payload = await self._params.serializer.serialize(frame) if self._params.serializer else None + if payload: + await self._client.send(payload) + + async def write_audio_frame(self, frame: OutputAudioRawFrame) -> bool: + if self._client.is_closing or not self._client.is_connected: + return False + + await self._client.send(frame.audio) + await self._write_audio_sleep() + return True + + +class OpusWebsocketOutputTransport(FastAPIWebsocketOutputTransport): + def __init__(self, transport, client, params, **kwargs): + super().__init__(transport, client, params, **kwargs) + self._encoder = OpusEncoder( + sample_rate=params.audio_out_sample_rate or 24000, + channels=params.audio_out_channels or 1, + bit_rate=24000, + ) + + async def send_message( + self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame + ): + if self._client.is_closing or not self._client.is_connected: + return + + message = frame.message if isinstance(frame.message, dict) else {} + msg = message.get("msg") + + if msg == "RESPONSE.CREATED": + self._encoder.reset() + elif msg == "RESPONSE.COMPLETE": + for packet in self._encoder.flush(pad_final_frame=True): + await self._client.send(packet) + elif msg == "RESPONSE.ERROR": + self._encoder.reset() + + payload = await self._params.serializer.serialize(frame) if self._params.serializer else None + if payload: + await self._client.send(payload) + + async def write_audio_frame(self, frame: OutputAudioRawFrame) -> bool: + if self._client.is_closing or not self._client.is_connected: + return False + + for packet in self._encoder.encode(frame.audio): + await self._client.send(packet) + + await self._write_audio_sleep() + return True + + +class BaseRawWebsocketTransport(FastAPIWebsocketTransport): + output_transport_cls = RawPCMWebsocketOutputTransport + + def __init__( + self, + websocket: WebSocket, + params: FastAPIWebsocketParams, + input_name: str | None = None, + output_name: str | None = None, + ): + super(FastAPIWebsocketTransport, self).__init__(input_name=input_name, output_name=output_name) + self._params = params + self._callbacks = FastAPIWebsocketCallbacks( + on_client_connected=self._on_client_connected, + on_client_disconnected=self._on_client_disconnected, + on_session_timeout=self._on_session_timeout, + ) + self._client = FastAPIWebsocketClient(websocket, self._callbacks) + self._input = FastAPIWebsocketInputTransport( + self, self._client, self._params, name=self._input_name + ) + self._output = self.output_transport_cls(self, self._client, self._params, name=self._output_name) + self._register_event_handler("on_client_connected") + self._register_event_handler("on_client_disconnected") + self._register_event_handler("on_session_timeout") + + +class Esp32WebsocketTransport(BaseRawWebsocketTransport): + output_transport_cls = OpusWebsocketOutputTransport + + +class BrowserWebsocketTransport(BaseRawWebsocketTransport): + output_transport_cls = RawPCMWebsocketOutputTransport diff --git a/server/fastapi/gem_live_route.py b/server/fastapi/gem_live_route.py new file mode 100644 index 0000000..fa8d726 --- /dev/null +++ b/server/fastapi/gem_live_route.py @@ -0,0 +1,44 @@ +"""Gemini Live native speech-to-speech pipeline builder.""" + +from __future__ import annotations + +import os + +from character_prompt import LANGUAGE_LEARNING_PAL_PROMPT +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair + + +def build_gem_live_route(input_processor, context: LLMContext): + try: + from pipecat.services.google.gemini_live import GeminiLiveLLMService + except Exception as exc: + raise RuntimeError( + "Gemini Live route requires pipecat-ai[google]. Add the google extra and redeploy." + ) from exc + + api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") + if not api_key: + raise RuntimeError("Gemini Live route requires GEMINI_API_KEY or GOOGLE_API_KEY.") + + voice = os.getenv("GEMINI_LIVE_VOICE", "Callirrhoe") + model = os.getenv("GEMINI_LIVE_MODEL", "models/gemini-2.5-flash-native-audio-preview-12-2025") + + llm = GeminiLiveLLMService( + api_key=api_key, + inference_on_context_initialization=True, + settings=GeminiLiveLLMService.Settings( + model=model, + voice=voice, + system_instruction=LANGUAGE_LEARNING_PAL_PROMPT, + ), + ) + + user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + processors = [ + input_processor, + user_aggregator, + llm, + ] + + return processors, assistant_aggregator diff --git a/server/fastapi/grok_route.py b/server/fastapi/grok_route.py new file mode 100644 index 0000000..1078a9b --- /dev/null +++ b/server/fastapi/grok_route.py @@ -0,0 +1,58 @@ +"""Grok Realtime native speech-to-speech pipeline builder.""" + +from __future__ import annotations + +import os + +from character_prompt import LANGUAGE_LEARNING_PAL_PROMPT +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair + + +def build_grok_route(input_processor, context: LLMContext): + try: + from pipecat.services.xai.realtime.events import ( + AudioConfiguration, + AudioInput, + AudioOutput, + PCMAudioFormat, + SessionProperties, + TurnDetection, + ) + from pipecat.services.xai.realtime.llm import GrokRealtimeLLMService + except Exception as exc: + raise RuntimeError( + "Grok route requires pipecat-ai[grok]. Add the grok extra and redeploy." + ) from exc + + api_key = os.getenv("XAI_API_KEY") + if not api_key: + raise RuntimeError("Grok route requires XAI_API_KEY.") + + voice = os.getenv("GROK_VOICE", "Ara") + + llm = GrokRealtimeLLMService( + api_key=api_key, + settings=GrokRealtimeLLMService.Settings( + session_properties=SessionProperties( + instructions=( + LANGUAGE_LEARNING_PAL_PROMPT + ), + voice=voice, + turn_detection=TurnDetection(type="server_vad"), + audio=AudioConfiguration( + input=AudioInput(format=PCMAudioFormat(rate=16000)), + output=AudioOutput(format=PCMAudioFormat(rate=24000)), + ), + ), + ), + ) + + user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context) + processors = [ + input_processor, + user_aggregator, + llm, + ] + + return processors, assistant_aggregator diff --git a/server/fastapi/models/__init__.py b/server/fastapi/models/__init__.py new file mode 100644 index 0000000..77302a2 --- /dev/null +++ b/server/fastapi/models/__init__.py @@ -0,0 +1 @@ +"""Provider registry package for classic STT -> LLM -> TTS routes.""" diff --git a/server/fastapi/models/_autodiscover.py b/server/fastapi/models/_autodiscover.py new file mode 100644 index 0000000..f936a65 --- /dev/null +++ b/server/fastapi/models/_autodiscover.py @@ -0,0 +1,5 @@ +"""Compatibility entrypoint for autodiscovered provider factories.""" + +from models._unsupported import autodiscovered_provider_factory + +__all__ = ["autodiscovered_provider_factory"] diff --git a/server/fastapi/models/_provider_loader.py b/server/fastapi/models/_provider_loader.py new file mode 100644 index 0000000..d313a4e --- /dev/null +++ b/server/fastapi/models/_provider_loader.py @@ -0,0 +1,33 @@ +"""Helpers for loading provider factories by normalized name.""" + +from __future__ import annotations + +import importlib +import re +from collections.abc import Callable + + +def normalize_provider_name(name: str) -> str: + normalized = name.strip().lower() + normalized = normalized.replace("&", "and") + normalized = re.sub(r"[()/\-]+", "_", normalized) + normalized = re.sub(r"[^a-z0-9_]+", "_", normalized) + normalized = re.sub(r"_+", "_", normalized) + return normalized.strip("_") + + +def load_provider_factory( + registry: dict[str, str], + provider_name: str, + category: str, +) -> Callable: + key = normalize_provider_name(provider_name) + module_path = registry.get(key) + if not module_path: + available = ", ".join(sorted(registry)) + raise ValueError( + f"Unknown {category} provider '{provider_name}'. Available providers: {available}" + ) + + module = importlib.import_module(module_path) + return module.create_service diff --git a/server/fastapi/models/_unsupported.py b/server/fastapi/models/_unsupported.py new file mode 100644 index 0000000..65c6284 --- /dev/null +++ b/server/fastapi/models/_unsupported.py @@ -0,0 +1,186 @@ +"""Generic runtime provider loader for autodiscovered Pipecat services.""" + +from __future__ import annotations + +import importlib +import inspect +import os +import pkgutil +import re +from collections.abc import Callable + +TOKEN_CASE_MAP = { + "ai": "AI", + "aws": "AWS", + "llm": "LLM", + "openai": "OpenAI", + "stt": "STT", + "tts": "TTS", + "xai": "xAI", +} + + +def _normalize(value: str) -> str: + value = value.strip().lower().replace("&", "and") + value = re.sub(r"[()/\-]+", "_", value) + value = re.sub(r"[^a-z0-9_]+", "_", value) + value = re.sub(r"_+", "_", value) + return value.strip("_") + + +def _provider_tokens(provider_label: str) -> list[str]: + normalized = _normalize(provider_label) + aliases = { + "aws": "amazon", + "wizper": "whisper", + "speech_to_text": "stt", + "text_to_speech": "tts", + } + tokens = [token for token in normalized.split("_") if token] + expanded = [] + for token in tokens: + expanded.append(token) + if token in aliases: + expanded.append(aliases[token]) + return list(dict.fromkeys(expanded)) + + +def _iter_matching_modules(provider_label: str, category: str) -> list[str]: + try: + import pipecat.services # type: ignore + except ModuleNotFoundError as exc: + raise NotImplementedError( + "Pipecat is not installed in the current Python environment, so provider " + f"'{provider_label}' cannot be wired yet." + ) from exc + + tokens = _provider_tokens(provider_label) + category_token = category.lower() + matches = [] + for module_info in pkgutil.walk_packages( + pipecat.services.__path__, pipecat.services.__name__ + "." + ): + name = module_info.name.lower() + if category_token not in name: + continue + score = 0 + for token in tokens: + if token in name: + score += 2 + if name.endswith(f".{category_token}"): + score += 4 + if score > 0: + matches.append((score, name)) + + matches.sort(key=lambda item: (-item[0], item[1])) + return [name for _, name in matches] + + +def _candidate_class_names(provider_label: str, category: str) -> list[str]: + category_suffix = f"{category.upper()}Service" + words = [part for part in _normalize(provider_label).split("_") if part] + pascal = "".join(TOKEN_CASE_MAP.get(word, word.capitalize()) for word in words) + variants = [ + f"{pascal}{category_suffix}", + ] + if "openai" in words and "responses" in words: + variants.insert(0, "OpenAIResponsesLLMService") + if "aws" in words and "polly" in words: + variants.insert(0, "PollyTTSService") + if "aws" in words and "transcribe" in words: + variants.insert(0, "TranscribeSTTService") + return list(dict.fromkeys(variants)) + + +def _candidate_env_keys(provider_label: str, category: str) -> list[str]: + base = _normalize(provider_label).upper() + category_upper = category.upper() + service_specific = f"{base}_{category_upper}_API_KEY" + generic = f"{base}_API_KEY" + return [service_specific, generic] + + +def _build_settings(service_cls, kwargs: dict[str, object]): + settings_cls = getattr(service_cls, "Settings", None) + if settings_cls is None: + return None + + settings_signature = inspect.signature(settings_cls) + settings_kwargs = {} + for key in ("system_instruction", "model", "voice", "language", "temperature"): + if key in kwargs and key in settings_signature.parameters and kwargs[key] is not None: + settings_kwargs[key] = kwargs[key] + + if not settings_kwargs: + return None + return settings_cls(**settings_kwargs) + + +def _instantiate_service(service_cls, provider_label: str, category: str, kwargs: dict[str, object]): + signature = inspect.signature(service_cls) + init_kwargs = {} + + if "api_key" in signature.parameters: + api_key = kwargs.get("api_key") + if api_key is None: + for env_key in _candidate_env_keys(provider_label, category): + if os.getenv(env_key): + api_key = os.getenv(env_key) + break + if api_key is not None: + init_kwargs["api_key"] = api_key + + settings = _build_settings(service_cls, kwargs) + if settings is not None and "settings" in signature.parameters: + init_kwargs["settings"] = settings + + for key in ("model", "voice", "base_url", "sample_rate"): + if key in kwargs and key in signature.parameters and kwargs[key] is not None: + init_kwargs[key] = kwargs[key] + + return service_cls(**init_kwargs) + + +def autodiscovered_provider_factory(provider_label: str, category: str) -> Callable: + def create_service(**_: object): + kwargs = dict(_) + module_errors = [] + + for module_name in _iter_matching_modules(provider_label, category): + try: + module = importlib.import_module(module_name) + except Exception as exc: + module_errors.append(f"{module_name}: {exc}") + continue + + for class_name in _candidate_class_names(provider_label, category): + service_cls = getattr(module, class_name, None) + if service_cls is None: + continue + try: + return _instantiate_service(service_cls, provider_label, category, kwargs) + except Exception as exc: + module_errors.append(f"{module_name}.{class_name}: {exc}") + + for attribute_name in dir(module): + if not attribute_name.endswith(f"{category.upper()}Service"): + continue + service_cls = getattr(module, attribute_name) + if not inspect.isclass(service_cls): + continue + try: + return _instantiate_service(service_cls, provider_label, category, kwargs) + except Exception as exc: + module_errors.append(f"{module_name}.{attribute_name}: {exc}") + + details = "; ".join(module_errors[:10]) if module_errors else "no matching Pipecat modules found" + raise NotImplementedError( + f"{category} provider '{provider_label}' could not be resolved from the installed " + f"Pipecat services. Details: {details}" + ) + + return create_service + + +# Backwards-compatible alias for earlier scaffolding. +unsupported_provider_factory = autodiscovered_provider_factory diff --git a/server/fastapi/models/llm/__init__.py b/server/fastapi/models/llm/__init__.py new file mode 100644 index 0000000..cf3f314 --- /dev/null +++ b/server/fastapi/models/llm/__init__.py @@ -0,0 +1,13 @@ +"""LLM provider registry.""" + +from __future__ import annotations + +from models._provider_loader import load_provider_factory +from models.providers import get_provider_registry + +LLM_REGISTRY = get_provider_registry("llm") + + +def create_llm_service(provider_name: str, **kwargs): + factory = load_provider_factory(LLM_REGISTRY, provider_name, "LLM") + return factory(**kwargs) diff --git a/server/fastapi/models/llm/anthropic.py b/server/fastapi/models/llm/anthropic.py new file mode 100644 index 0000000..3375132 --- /dev/null +++ b/server/fastapi/models/llm/anthropic.py @@ -0,0 +1,5 @@ +"""Anthropic Claude LLM provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Anthropic", "LLM") diff --git a/server/fastapi/models/llm/gemini.py b/server/fastapi/models/llm/gemini.py new file mode 100644 index 0000000..7082eb2 --- /dev/null +++ b/server/fastapi/models/llm/gemini.py @@ -0,0 +1,5 @@ +"""Google Gemini LLM provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Google Gemini", "LLM") diff --git a/server/fastapi/models/llm/grok.py b/server/fastapi/models/llm/grok.py new file mode 100644 index 0000000..655a0f4 --- /dev/null +++ b/server/fastapi/models/llm/grok.py @@ -0,0 +1,5 @@ +"""xAI Grok LLM provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Grok", "LLM") diff --git a/server/fastapi/models/llm/openai.py b/server/fastapi/models/llm/openai.py new file mode 100644 index 0000000..f757876 --- /dev/null +++ b/server/fastapi/models/llm/openai.py @@ -0,0 +1,5 @@ +"""OpenAI LLM provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("OpenAI", "LLM") diff --git a/server/fastapi/models/providers.py b/server/fastapi/models/providers.py new file mode 100644 index 0000000..6eae831 --- /dev/null +++ b/server/fastapi/models/providers.py @@ -0,0 +1,153 @@ +"""Provider metadata and validation for the simple FastAPI voice stack.""" + +from __future__ import annotations + +import os +from dataclasses import asdict, dataclass +from typing import Literal + + +ProviderCategory = Literal["llm", "stt", "tts"] + + +@dataclass(frozen=True) +class ProviderSpec: + name: str + category: ProviderCategory + module: str + env: tuple[str, ...] = () + aliases: tuple[str, ...] = () + description: str = "" + + +PROVIDER_SPECS: dict[ProviderCategory, dict[str, ProviderSpec]] = { + "llm": { + "openai": ProviderSpec( + name="openai", + category="llm", + module="models.llm.openai", + env=("OPENAI_API_KEY",), + description="OpenAI chat-completions compatible LLM.", + ), + "claude": ProviderSpec( + name="claude", + category="llm", + module="models.llm.anthropic", + env=("ANTHROPIC_API_KEY",), + aliases=("anthropic",), + description="Anthropic Claude via Pipecat's AnthropicLLMService.", + ), + "gemini": ProviderSpec( + name="gemini", + category="llm", + module="models.llm.gemini", + env=("GEMINI_API_KEY",), + aliases=("google_gemini",), + description="Google Gemini text model via Pipecat.", + ), + "grok": ProviderSpec( + name="grok", + category="llm", + module="models.llm.grok", + env=("XAI_API_KEY",), + description="xAI Grok via Pipecat.", + ), + }, + "stt": { + "deepgram": ProviderSpec( + name="deepgram", + category="stt", + module="models.stt.deepgram", + env=("DEEPGRAM_API_KEY",), + description="Deepgram real-time transcription service.", + ), + "whisper": ProviderSpec( + name="whisper", + category="stt", + module="models.stt.whisper", + description="Local Whisper transcription service with no external API key.", + ), + }, + "tts": { + "elevenlabs": ProviderSpec( + name="elevenlabs", + category="tts", + module="models.tts.elevenlabs", + env=("ELEVENLABS_API_KEY",), + description="ElevenLabs streaming TTS service.", + ), + "cartesia": ProviderSpec( + name="cartesia", + category="tts", + module="models.tts.cartesia", + env=("CARTESIA_API_KEY",), + description="Cartesia TTS service.", + ), + "deepgram": ProviderSpec( + name="deepgram", + category="tts", + module="models.tts.deepgram", + env=("DEEPGRAM_API_KEY",), + description="Deepgram Aura TTS service.", + ), + "openai": ProviderSpec( + name="openai", + category="tts", + module="models.tts.openai", + env=("OPENAI_API_KEY",), + description="OpenAI text-to-speech service.", + ), + }, +} + + +def get_provider_registry(category: ProviderCategory) -> dict[str, str]: + specs = PROVIDER_SPECS[category] + registry: dict[str, str] = {} + for key, spec in specs.items(): + registry[key] = spec.module + for alias in spec.aliases: + registry[alias] = spec.module + return registry + + +def get_provider_spec(category: ProviderCategory, provider_name: str) -> ProviderSpec: + normalized = provider_name.strip().lower() + for key, spec in PROVIDER_SPECS[category].items(): + if normalized == key or normalized in spec.aliases: + return spec + available = ", ".join(sorted(PROVIDER_SPECS[category])) + raise ValueError(f"Unknown {category} provider '{provider_name}'. Available providers: {available}") + + +def validate_provider_env(category: ProviderCategory, provider_name: str) -> None: + spec = get_provider_spec(category, provider_name) + missing = [env_key for env_key in spec.env if not os.getenv(env_key)] + if missing: + raise RuntimeError( + f"Selected {category} provider '{spec.name}' requires environment variables: " + + ", ".join(missing) + ) + + +def validate_classic_provider_stack() -> dict[str, str]: + selected = { + "stt": os.getenv("CLASSIC_STT_PROVIDER", "deepgram").strip().lower(), + "llm": os.getenv("CLASSIC_LLM_PROVIDER", "openai").strip().lower(), + "tts": os.getenv("CLASSIC_TTS_PROVIDER", "elevenlabs").strip().lower(), + } + for category, provider_name in selected.items(): + validate_provider_env(category, provider_name) # type: ignore[arg-type] + return selected + + +def get_provider_catalog() -> dict[str, list[dict[str, object]]]: + catalog: dict[str, list[dict[str, object]]] = {} + for category, providers in PROVIDER_SPECS.items(): + catalog[category] = [] + for spec in providers.values(): + payload = asdict(spec) + payload["env"] = list(spec.env) + payload["aliases"] = list(spec.aliases) + catalog[category].append(payload) + return catalog diff --git a/server/fastapi/models/stt/__init__.py b/server/fastapi/models/stt/__init__.py new file mode 100644 index 0000000..50034ee --- /dev/null +++ b/server/fastapi/models/stt/__init__.py @@ -0,0 +1,13 @@ +"""STT provider registry.""" + +from __future__ import annotations + +from models._provider_loader import load_provider_factory +from models.providers import get_provider_registry + +STT_REGISTRY = get_provider_registry("stt") + + +def create_stt_service(provider_name: str, **kwargs): + factory = load_provider_factory(STT_REGISTRY, provider_name, "STT") + return factory(**kwargs) diff --git a/server/fastapi/models/stt/deepgram.py b/server/fastapi/models/stt/deepgram.py new file mode 100644 index 0000000..ce46a2c --- /dev/null +++ b/server/fastapi/models/stt/deepgram.py @@ -0,0 +1,5 @@ +"""Deepgram STT provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Deepgram", "STT") diff --git a/server/fastapi/models/stt/whisper.py b/server/fastapi/models/stt/whisper.py new file mode 100644 index 0000000..462f3fb --- /dev/null +++ b/server/fastapi/models/stt/whisper.py @@ -0,0 +1,5 @@ +"""Local Whisper STT provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Whisper", "STT") diff --git a/server/fastapi/models/tts/__init__.py b/server/fastapi/models/tts/__init__.py new file mode 100644 index 0000000..1a569cf --- /dev/null +++ b/server/fastapi/models/tts/__init__.py @@ -0,0 +1,13 @@ +"""TTS provider registry.""" + +from __future__ import annotations + +from models._provider_loader import load_provider_factory +from models.providers import get_provider_registry + +TTS_REGISTRY = get_provider_registry("tts") + + +def create_tts_service(provider_name: str, **kwargs): + factory = load_provider_factory(TTS_REGISTRY, provider_name, "TTS") + return factory(**kwargs) diff --git a/server/fastapi/models/tts/cartesia.py b/server/fastapi/models/tts/cartesia.py new file mode 100644 index 0000000..5c672b0 --- /dev/null +++ b/server/fastapi/models/tts/cartesia.py @@ -0,0 +1,5 @@ +"""Cartesia TTS provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Cartesia", "TTS") diff --git a/server/fastapi/models/tts/deepgram.py b/server/fastapi/models/tts/deepgram.py new file mode 100644 index 0000000..7e9fe2f --- /dev/null +++ b/server/fastapi/models/tts/deepgram.py @@ -0,0 +1,5 @@ +"""Deepgram TTS provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("Deepgram", "TTS") diff --git a/server/fastapi/models/tts/elevenlabs.py b/server/fastapi/models/tts/elevenlabs.py new file mode 100644 index 0000000..6147978 --- /dev/null +++ b/server/fastapi/models/tts/elevenlabs.py @@ -0,0 +1,5 @@ +"""ElevenLabs TTS provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("ElevenLabs", "TTS") diff --git a/server/fastapi/models/tts/openai.py b/server/fastapi/models/tts/openai.py new file mode 100644 index 0000000..cc2b639 --- /dev/null +++ b/server/fastapi/models/tts/openai.py @@ -0,0 +1,5 @@ +"""OpenAI TTS provider.""" + +from models._autodiscover import autodiscovered_provider_factory + +create_service = autodiscovered_provider_factory("OpenAI", "TTS") diff --git a/server/fastapi/pcc-deploy.toml b/server/fastapi/pcc-deploy.toml new file mode 100644 index 0000000..996925b --- /dev/null +++ b/server/fastapi/pcc-deploy.toml @@ -0,0 +1,7 @@ +agent_name = "pipecat-test-ws-ap-south" +region = "ap-south" +secret_set = "pipecat-test-ws-ap-south-secrets" +agent_profile = "agent-1x" + +[scaling] + min_agents = 1 diff --git a/server/fastapi/pyproject.toml b/server/fastapi/pyproject.toml new file mode 100644 index 0000000..48101c3 --- /dev/null +++ b/server/fastapi/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "pipecat-quickstart" +version = "0.1.0" +description = "Quickstart example for building voice AI bots with Pipecat" +requires-python = ">=3.10" +dependencies = [ + "pipecat-ai[webrtc,websocket,silero,deepgram,openai,cartesia,google,grok,runner]", + "pipecat-ai-cli", +] + +[dependency-groups] +dev = [ + "pyright>=1.1.404,<2", + "ruff>=0.12.11,<1", +] + +[tool.ruff] +line-length = 100 + +[tool.ruff.lint] +select = ["I"] diff --git a/server/fastapi/server.py b/server/fastapi/server.py new file mode 100644 index 0000000..7c74de2 --- /dev/null +++ b/server/fastapi/server.py @@ -0,0 +1,304 @@ +"""Multi-transport Pipecat server for browser and ESP32 over WebSocket.""" + +from __future__ import annotations + +import json +import os + +import uvicorn +from fastapi import FastAPI, WebSocket +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import HTMLResponse, RedirectResponse +from loguru import logger +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +from bot import create_esp32_auth_message, run_bot_session +from esp32_transport import BrowserWebsocketTransport, Esp32WebsocketTransport, RawPCMFrameSerializer +from models.providers import get_provider_catalog, validate_classic_provider_stack + +HOST = os.getenv("HOST", "0.0.0.0") +PORT = int(os.getenv("PORT", "7860")) +BROWSER_INPUT_SAMPLE_RATE = int(os.getenv("BROWSER_INPUT_SAMPLE_RATE", "16000")) +ESP32_INPUT_SAMPLE_RATE = int(os.getenv("ESP32_INPUT_SAMPLE_RATE", "16000")) +AUDIO_OUTPUT_SAMPLE_RATE = int(os.getenv("AUDIO_OUTPUT_SAMPLE_RATE", "24000")) +ALLOWED_ORIGINS = [ + origin.strip() + for origin in os.getenv("ALLOWED_ORIGINS", "*").split(",") + if origin.strip() +] + +BROWSER_HTML = """ + +
+ + +