"""Multi-transport Pipecat server for browser and ESP32 over WebSocket."""
from __future__ import annotations
import json
import os
import uvicorn
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
from loguru import logger
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from bot import create_esp32_auth_message, run_bot_session
from esp32_transport import BrowserWebsocketTransport, Esp32WebsocketTransport, RawPCMFrameSerializer
from models.providers import get_provider_catalog, validate_classic_provider_stack
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "7860"))
BROWSER_INPUT_SAMPLE_RATE = int(os.getenv("BROWSER_INPUT_SAMPLE_RATE", "16000"))
ESP32_INPUT_SAMPLE_RATE = int(os.getenv("ESP32_INPUT_SAMPLE_RATE", "16000"))
AUDIO_OUTPUT_SAMPLE_RATE = int(os.getenv("AUDIO_OUTPUT_SAMPLE_RATE", "24000"))
ALLOWED_ORIGINS = [
origin.strip()
for origin in os.getenv("ALLOWED_ORIGINS", "*").split(",")
if origin.strip()
]
BROWSER_HTML = """
Pipecat Browser WS
Pipecat Browser WebSocket Test
idle
"""
def create_browser_transport(websocket: WebSocket) -> BrowserWebsocketTransport:
return BrowserWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=BROWSER_INPUT_SAMPLE_RATE,
audio_out_sample_rate=AUDIO_OUTPUT_SAMPLE_RATE,
serializer=RawPCMFrameSerializer(input_sample_rate=BROWSER_INPUT_SAMPLE_RATE),
),
)
def create_esp32_transport(websocket: WebSocket) -> Esp32WebsocketTransport:
return Esp32WebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=ESP32_INPUT_SAMPLE_RATE,
audio_out_sample_rate=AUDIO_OUTPUT_SAMPLE_RATE,
serializer=RawPCMFrameSerializer(input_sample_rate=ESP32_INPUT_SAMPLE_RATE),
),
)
def create_app() -> FastAPI:
app = FastAPI()
allow_all_origins = ALLOWED_ORIGINS == ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS if not allow_all_origins else ["*"],
allow_credentials=not allow_all_origins,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/browser")
@app.get("/browser", response_class=HTMLResponse)
async def browser_page():
return HTMLResponse(BROWSER_HTML)
@app.get("/healthz")
async def healthcheck():
return {"ok": True}
@app.get("/providers")
async def providers_catalog():
return get_provider_catalog()
@app.on_event("startup")
async def validate_provider_configuration():
selected = validate_classic_provider_stack()
logger.info(
"Classic provider stack validated: stt={} llm={} tts={}",
selected["stt"],
selected["llm"],
selected["tts"],
)
@app.websocket("/ws/browser")
async def browser_websocket(websocket: WebSocket):
await websocket.accept()
logger.info("Browser websocket connected")
transport = create_browser_transport(websocket)
await run_bot_session(transport, "browser", False)
@app.websocket("/ws/nextjs")
async def nextjs_websocket(websocket: WebSocket):
await websocket.accept()
logger.info("NextJS websocket connected")
transport = create_browser_transport(websocket)
await run_bot_session(transport, "browser", False)
@app.websocket("/ws/esp32")
async def esp32_websocket(websocket: WebSocket):
await websocket.accept()
logger.info(
"ESP32 websocket connected: mac={} rssi={} auth={}",
websocket.headers.get("x-device-mac", "unknown"),
websocket.headers.get("x-wifi-rssi", "unknown"),
"yes" if websocket.headers.get("authorization") else "no",
)
await websocket.send_text(json.dumps(create_esp32_auth_message()))
transport = create_esp32_transport(websocket)
await run_bot_session(transport, "esp32", False)
return app
app = create_app()
if __name__ == "__main__":
uvicorn.run("server:app", host=HOST, port=PORT, reload=False)