Pipecat Integration
Pair contextweaver's async context compilation and context firewall with Pipecat so real-time voice and video agents stay budget-aware and don't stall the pipeline on a single multi-KB function result.
!!! tip "Canonical worked example"
The Voice agent reference architecture
is the canonical end-to-end worked example for this guide. It walks
a five-turn customer-service call against an 18-tool catalog,
exercises every recommendation on this page (the
asyncio.to_thread(mgr.build_sync, …) pattern, tight per-phase
budgets, persistent facts across the call), and ships a deterministic
OUTPUT.md you can read without running the script. Start there if
you want to see the pieces work together before reading the
integration details below.
Why
Pipecat's pipeline architecture is designed for sub-100 ms turn-taking. Three things break that latency budget on long calls:
- Unbounded conversation history. Every turn is appended verbatim; by minute 10 the prompt is enormous.
- Function-result stalls. A single large response (50 orders, an embedding result, a database dump) blocks the LLM frame until it fits the model's input.
- Loading every function into the prompt. Every available function is in the system instructions on every turn.
contextweaver fixes all three asynchronously: ContextManager.build()
is awaitable so it slots into a Pipecat FrameProcessor without
serialising the pipeline.
Prerequisites
pip install contextweaver pipecat-ai openai
export OPENAI_API_KEY=sk-...
# Optional: Daily.co transport
export DAILY_API_KEY=...
Architecture
Audio Input
│
▼
[VAD] ← voice activity detection
│
▼
[STT] ← speech-to-text (TextFrame)
│
▼
[contextweaver FrameProcessor]
│ ─ ctx_mgr.ingest (user turn)
│ ─ router.route → shortlist
│ ─ await ctx_mgr.build(phase=Phase.call)
▼
[LLM] ← receives pack.prompt + shortlist
│
▼ (function call)
[Function execution]
│ ─ ctx_mgr.ingest_tool_result (firewall)
│ ─ await ctx_mgr.build(phase=Phase.answer)
▼
[TTS] ← text-to-speech
│
▼
Audio Output
You hook contextweaver in via a custom FrameProcessor that sits
between STT and the LLM. The async build() runs concurrently with
TTS / network IO so the pipeline doesn't block.
Async-aware FrameProcessor
from __future__ import annotations
from pipecat.frames.frames import (
FunctionCallResultFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.processors.frame_processor import FrameProcessor
from contextweaver.context.manager import ContextManager
from contextweaver.routing.catalog import Catalog
from contextweaver.routing.router import Router
from contextweaver.routing.tree import TreeBuilder
from contextweaver.types import ContextItem, ItemKind, Phase, SelectableItem
class ContextWeaverProcessor(FrameProcessor):
"""Pipecat frame processor that drives the contextweaver pipeline."""
def __init__(self, catalog: Catalog, ctx_mgr: ContextManager) -> None:
super().__init__()
self._catalog = catalog
self._ctx_mgr = ctx_mgr
graph = TreeBuilder(max_children=8).build(catalog.all())
self._router = Router(graph, items=catalog.all(), top_k=3)
self._turn = 0
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# 1. User said something (or your STT did).
self._turn += 1
user_text = frame.text
await self._ctx_mgr.ingest_async(ContextItem(
id=f"u{self._turn}", kind=ItemKind.user_turn, text=user_text,
))
# 2. Route to the top-k functions, off the audio thread.
routed = self._router.route(user_text)
# 3. Build the call-phase prompt asynchronously.
pack = await self._ctx_mgr.build(phase=Phase.call, query=user_text)
# 4. Hand the LLM a focused frame: budgeted prompt + shortlist.
await self.push_frame(LLMMessagesFrame(
messages=[{"role": "user", "content": pack.prompt}],
functions=[
{"name": rid, "description": self._catalog.get(rid).description}
for rid in routed.candidate_ids
],
), direction)
elif isinstance(frame, FunctionCallResultFrame):
# 5. The LLM called a function; the result flows back here.
self._ctx_mgr.ingest_tool_result_sync(
tool_call_id=f"tc-{self._turn}",
raw_output=str(frame.result),
tool_name=frame.function_name,
)
# 6. Build the answer-phase prompt asynchronously.
answer_pack = await self._ctx_mgr.build(
phase=Phase.answer, query=frame.context,
)
await self.push_frame(LLMMessagesFrame(
messages=[{"role": "user", "content": answer_pack.prompt}],
), direction)
else:
await self.push_frame(frame, direction)
ContextManager.ingest_async() and ContextManager.build() are both
real async APIs — they don't asyncio.to_thread under the hood, they
hand-roll the pipeline so the event loop stays free for the audio
pipeline.
Wiring into a Pipecat pipeline
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.openai import OpenAILLMService
from contextweaver.context.manager import ContextManager
from contextweaver.config import ContextBudget
from contextweaver.routing.catalog import Catalog
from contextweaver.types import SelectableItem
def build_catalog() -> Catalog:
catalog = Catalog()
for name, desc in [
("check_order", "Look up the status of an order"),
("update_address", "Update the customer's delivery address"),
("schedule_callback", "Schedule a callback at a chosen time"),
("send_email", "Send an email to the customer"),
]:
catalog.register(SelectableItem(
id=name, kind="tool", name=name, description=desc, namespace="support",
))
return catalog
async def main() -> None:
catalog = build_catalog()
ctx_mgr = ContextManager(
# Tight budgets keep TTS responsive in real time.
budget=ContextBudget(route=300, call=600, interpret=500, answer=1500),
)
pipeline = Pipeline([
# ... (Daily transport, VAD, STT services) ...
ContextWeaverProcessor(catalog, ctx_mgr),
OpenAILLMService(model="gpt-4"),
# ... (TTS service, transport output) ...
])
await pipeline.run()
if __name__ == "__main__":
asyncio.run(main())
Latency notes
contextweaver's pipeline is pure Python computation — no IO except into the configured stores — and the in-memory stores are O(1) per append. End-to-end timings on a single Macbook-class core:
| Stage | Typical |
|---|---|
ingest_async() |
< 1 ms |
router.route(...) over 50 tools |
5 – 15 ms |
build(phase=Phase.call) with 20 events |
10 – 30 ms |
| Firewall on a 50 KB tool result | 5 – 10 ms |
Compared to passing the full conversation + every function definition to the LLM, the wall-clock gain is usually 50 – 200 ms per turn.
Advanced patterns
- Per-session episodic memory. Persist
ctx_mgr.event_log.to_dict()between calls so when the same customer calls back, the agent already knows their preferences without re-asking. - Custom phase budgets. Tighten
Phase.answerfor fast responses, loosenPhase.interpretfor long tool results. - Strict / seeded modes. Lock determinism for production replays
by passing a
ProfileConfigwithmode=Mode.seeded. - Async firewall summarisers. If your summariser does network IO,
wrap it as a
Summarizerprotocol implementation and feed it intoContextManager(summarizer=...); the pipeline awaits at the right boundary.
Troubleshooting
- TTS gap mid-turn.
pack.stats.included_countis high andbuild()is taking > 100 ms — drop the per-phase budget so the scoring stage processes fewer candidates, or filterevent_logto the current session window. - Function-call loop. Use
exclude_idson the nextrouter.route()so the model doesn't re-pick the function it just used. - Async deadlock. Always
await ctx_mgr.build(...)inside Pipecat processors — callingbuild_sync()on a running event loop is fine in principle (the implementation doesn't block on IO), but mixing them inside one processor is confusing. Pick one style per processor. - Event log grows unboundedly. In-memory
EventLogis intentionally append-only. Snapshot + clear at session boundaries; durable backends (SQLite, Redis) are tracked under issue #174.
See also
- How contextweaver Fits — boundary, hook points, non-goals
- Cookbook — copy-paste recipes
- Pipecat docs ·
FrameProcessorreference - Tracking issue: #79