Back to Blog
Backend Development

Real-time AI Processing with WebSockets

Implementing real-time AI processing capabilities using WebSockets and worker processes.

D
Don Wilson
Mobile Developer
May 22, 2025
9 min read
Real-time AI Processing with WebSockets

Real-time AI is what makes a product feel alive. Streamed LLM tokens, live transcription, instant moderation — all of these need a transport that beats request/response latency. WebSockets are the workhorse, paired with worker queues that keep the model busy without blocking the connection.

When to Choose WebSockets

  • Bidirectional streams: voice, collab, multi-turn agent UIs
  • Low-overhead push: order-of-magnitude cheaper than HTTP per message
  • Persistent context: keep model state across messages without re-auth

If you only need server → client streaming, Server-Sent Events are simpler. Pick WebSockets when the client also streams.

Architecture Overview

  1. Client opens WebSocket to gateway
  2. Gateway authenticates, attaches session ID, hands off to a worker
  3. Worker streams model output back through the socket
  4. Long-running tasks publish to a queue; results stream back as they arrive

FastAPI WebSocket Endpoint

from fastapi import FastAPI, WebSocket
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.websocket("/ws/chat")
async def chat(ws: WebSocket):
    await ws.accept()
    history = []
    try:
        while True:
            user = await ws.receive_text()
            history.append({"role": "user", "content": user})
            stream = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=history,
                stream=True,
            )
            assistant = ""
            async for chunk in stream:
                token = chunk.choices[0].delta.content or ""
                assistant += token
                await ws.send_text(token)
            history.append({"role": "assistant", "content": assistant})
            await ws.send_text("[DONE]")
    except Exception as e:
        await ws.close(code=1011)

Worker Processes for Heavy Tasks

Pinning model inference to the WebSocket process kills throughput. Move it to dedicated workers:

  • Web tier handles auth, framing, backpressure
  • Workers consume jobs from Redis Streams or NATS
  • Results published to a per-session channel; web tier subscribes and forwards

Backpressure

Slow clients can fill server buffers. Protect the system:

  • Cap per-connection outbound queue size
  • Drop or coalesce non-critical messages when the queue is full
  • Send heartbeats; close idle sockets

Scaling WebSockets Horizontally

Each pod holds connections. Use sticky load balancing or session affinity. Share session state via Redis so any pod can resume after a reconnect.

Token Streaming UX

Users perceive speed by first-token latency, not total. Optimize:

  • Reuse system prompts to keep KV-cache warm
  • Pre-load model weights, never on the hot path
  • Send tokens as soon as they emerge — do not buffer for punctuation

Voice and Audio

For real-time transcription, stream audio frames over the WebSocket and pipe them into a streaming ASR (Whisper streaming, Deepgram, AssemblyAI). Send partial transcripts, then the final, then the response.

Security

  • Use wss://, never ws://
  • Authenticate at the upgrade with short-lived tokens
  • Validate every inbound message — never trust the client
  • Rate-limit per session and per user

Observability

  • Track concurrent connections, message rate, drop rate
  • Log first-token latency and total stream time
  • Trace from socket open to model call to socket close
  • Alert on reconnect storms

Conclusion

Real-time AI is a moat — once users feel the difference, the request/response experience feels broken. WebSockets give you the transport; workers give you the throughput; observability keeps it honest. Build it once and it pays back across every feature you ship.

Tags

BackendWebSocketsReal-time Processing

Share this article