Real-time voice agent (Pipecat + LiveKit)
Production voice agent: Deepgram ASR → Claude → ElevenLabs TTS, all routed through a LiveKit WebRTC room.
The cascaded ASR → LLM → TTS pipeline is the workhorse of production voice AI. Below is the minimum Pipecat program that joins a LiveKit room and runs a fully-streaming agent.
Install
pip install "pipecat-ai[livekit,deepgram,anthropic,elevenlabs,silero]"
Agent
import asyncio
import os
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.livekit import LiveKitTransport, LiveKitParams
from pipecat.vad.silero import SileroVADAnalyzer
async def main(room: str, token: str):
transport = LiveKitTransport(
url=os.environ["LIVEKIT_URL"],
token=token,
room_name=room,
params=LiveKitParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
llm = AnthropicLLMService(
api_key=os.environ["ANTHROPIC_API_KEY"],
model="claude-opus-4-7",
)
tts = ElevenLabsTTSService(
api_key=os.environ["ELEVENLABS_API_KEY"],
voice_id=os.environ["ELEVEN_VOICE_ID"],
)
context = OpenAILLMContext(
messages=[
{
"role": "system",
"content": "You are a helpful concierge. Keep replies under 2 sentences.",
}
]
)
pipeline = Pipeline([
transport.input(),
stt,
llm.create_context_aggregator(context).user(),
llm,
tts,
transport.output(),
llm.create_context_aggregator(context).assistant(),
])
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def _on_join(_t, participant):
await task.queue_frames([LLMMessagesFrame(context.messages)])
@transport.event_handler("on_participant_left")
async def _on_leave(_t, _p, _r):
await task.queue_frame(EndFrame())
await PipelineRunner().run(task)
if __name__ == "__main__":
asyncio.run(main(os.environ["ROOM"], os.environ["TOKEN"]))
Latency budget that worked at 40k locations
| Stage | Target | |---|---| | ASR partial → final | < 250 ms | | LLM TTFT | < 350 ms | | TTS TTFB | < 250 ms | | End-to-end | < 1.2 s |
Five optimizations gave a measured −41.8% E2E latency in production:
- Streaming-first TTS chunking (start synthesizing on the first sentence boundary, not on full reply).
- Concurrent intent detection and synthesis — biggest single win.
- Prompt compression on the LLM call (cache the long system prompt; trim turn history).
- Session-state caching to avoid re-priming Claude every turn.
- Adaptive VAD endpointing — Silero sensitivity tuned per environment noise floor.
Source: my paper Latency Optimization in Production Voice AI Pipelines (Rodrigues, 2026).