Skip to main content

Overview

Voxray’s pipeline is a linear chain of processors. Each processor receives a frame, does work (filtering, transforming, calling an external API), and pushes the result downstream. Processors are registered by name in a global registry and instantiated at startup from config.json — no recompilation required to swap or reorder processors. The registry lives in pkg/pipeline/registry.go:
// ProcessorConstructor builds a processor from a name and optional JSON options.
type ProcessorConstructor func(name string, opts json.RawMessage) processors.Processor

func RegisterProcessor(name string, ctor ProcessorConstructor)
func ProcessorsFromConfig(cfg *config.Config) ([]processors.Processor, error)
At startup, cmd/voxray/main.go imports all processor packages via blank imports. Each package’s init() function calls pipeline.RegisterProcessor(name, ctor), which adds the constructor to the in-memory registry. When a new transport connection arrives, ProcessorsFromConfig iterates config.Plugins, looks up each name in the registry, and calls the constructor with the matching plugin_options blob — or nil if no options were provided. If a plugin name in config.Plugins is not found in the registry, ProcessorsFromConfig returns an error and the server refuses to start. This makes config mistakes loud and explicit.

Built-in Plugins

Plugin NamePackageDescription
echopkg/processors/echoEchoes received text frames back to the sender. Useful for testing the full transport stack without requiring any AI provider credentials.
loggerpkg/processors/loggerLogs every frame that passes through, including type, direction, and content summary. Essential for debugging frame routing issues.
frame_filterpkg/processors/filtersPasses or blocks frames by type. Configure allowed_types to create an allowlist. Frames not in the list are dropped.
wake_check_filterpkg/processors/filtersHolds the pipeline in a dormant state until a wake phrase is detected in a TranscriptionFrame. After activation, a keepalive_secs timer keeps the pipeline live; it returns to dormant on timeout.
stt_mute_filterpkg/processors/filtersMutes STT input while the bot is speaking to prevent the bot’s own TTS audio from being fed back into the speech recogniser. Strategy always mutes continuously; first_speech mutes only on the first bot utterance.
audio_filterpkg/processors/filtersApplies a configurable chain of audio transforms (e.g. gain normalisation, noise reduction) to audio frames before they reach STT.
interruption_controllerpkg/processors/voiceHandles user barge-in. When the user starts speaking while the bot is playing audio, this processor decides whether to cancel the current TTS based on the configured strategy (min_words, keyword).
external_chainpkg/processors/frameworksForwards the latest user message from LLMContextFrame to an HTTP endpoint (e.g. a Python LangChain or Strands sidecar) and streams the response back as LLMTextFrame.
rtvipkg/processors/frameworksReal-Time Voice Interface protocol processor. Handles client-ready and send-text client messages; emits bot-ready and bot-output server messages. Required when connecting RTVI-compatible frontends.

Wiring Plugins in Config

Plugins are declared in two parallel arrays in config.json:
  • plugins — ordered list of processor names. Voxray builds the pipeline in this order, left to right.
  • plugin_options — a map from processor name to an arbitrary JSON object passed to that processor’s constructor.
{
  "plugins": ["stt_mute_filter", "interruption_controller", "rtvi"],
  "plugin_options": {
    "stt_mute_filter": {},
    "interruption_controller": {
      "strategy": "min_words",
      "min_words": 3
    },
    "rtvi": {}
  }
}
The order in plugins matters: frames flow through processors in declaration order. In the example above, audio arrives at stt_mute_filter first (where it may be dropped if the bot is speaking), then passes to interruption_controller (which checks whether a barge-in should be declared), and finally reaches rtvi (which serialises bot output into RTVI protocol messages for the frontend).
If a processor name appears in plugins but not in plugin_options, the constructor receives nil for opts and must apply its own defaults. All built-in processors handle nil opts gracefully.

Common plugin option reference

frame_filter
"frame_filter": {
  "allowed_types": ["TextFrame", "TranscriptionFrame"]
}
Frames whose Go type name is not in allowed_types are dropped. wake_check_filter
"wake_check_filter": {
  "wake_phrases": ["hey bot", "ok voxray"],
  "keepalive_secs": 5
}
Case-insensitive substring match against TranscriptionFrame text. stt_mute_filter
"stt_mute_filter": {
  "strategies": ["always"]
}
audio_filter
"audio_filter": {
  "filters": [
    { "type": "gain", "gain": 0.9 }
  ]
}
interruption_controller
"interruption_controller": {
  "strategy": "min_words",
  "min_words": 3
}

RTVI Protocol

RTVI (Real-Time Voice Interface) is the client–server messaging layer used by Pipecat-based frontend clients. Voxray implements both the processor and the wire serializer.

Enabling RTVI

Two steps are required:
  1. The WebSocket client must connect to /ws?rtvi=1. The ?rtvi=1 query parameter tells the server to select the RTVI serializer instead of the default JSON or binary serializer.
  2. "rtvi" must appear in plugins. The RTVIProcessor handles the handshake and message routing.
{
  "transport": "websocket",
  "plugins": ["rtvi"],
  "plugin_options": {
    "rtvi": { "protocol_version": "1.2.0" }
  }
}

Message types

DirectionTypePayload
Client → Serverclient-ready{ "version": "1.2.0", "about": {...} }
Client → Serversend-text{ "content": "user message text" }
Server → Clientbot-ready{ "version": "1.2.0", "about": {...} }
Server → Clientbot-output{ "text": "bot response text" }
Server → Clientuser-transcription{ "text": "...", "final": true }
Server → Clienterror{ "error": "...", "fatal": false }
Server → Clientbot-started-speaking(no payload)
Server → Clientbot-stopped-speaking(no payload)
Flow:
  1. Client connects to ws://host:port/ws?rtvi=1.
  2. Voxray pushes StartFrame into the pipeline; RTVIProcessor responds with bot-ready.
  3. Client sends client-ready; RTVIProcessor records the client version.
  4. Client sends send-text; RTVIProcessor converts it to a TranscriptionFrame and pushes it downstream (to LLM → TTS if in a voice pipeline, or to any downstream processor in a plugin pipeline).
  5. LLMTextFrame and other output frames are serialized as bot-output and sent to the client.

External Chain (Python Sidecar)

external_chain bridges the Go pipeline to a Python LangChain, LangGraph, or Strands service over HTTP. This is the recommended pattern when your agent logic requires Python-only libraries (e.g. custom LangChain tools, complex graph traversal, retrieval-augmented generation with Python vector stores).

How it works

When an LLMContextFrame arrives, external_chain extracts the last user message and POSTs it to the configured URL. The response is streamed back as LLMTextFrame instances, followed by LLMFullResponseStartFrame / LLMFullResponseEndFrame markers. Downstream processors (e.g. TTS) consume these exactly as they would from a native LLM provider.

Config

{
  "plugins": ["external_chain"],
  "plugin_options": {
    "external_chain": {
      "url": "http://localhost:8001/invoke",
      "stream": true,
      "timeout_sec": 45,
      "transcript_key": "input"
    }
  }
}
FieldDefaultDescription
url(required)HTTP endpoint of the sidecar
methodPOSTHTTP method
streamfalseParse response as SSE or line-delimited JSON
timeout_sec30Per-request timeout in seconds
transcript_key"input"JSON key for the user message in the request body
headers{}Additional HTTP headers (e.g. auth)
Streaming response format (when stream: true): SSE lines data: {"text":"..."} or {"content":"..."}. Each chunk is emitted as an LLMTextFrame in real time, enabling TTS to begin speaking before the full response is received. Python sidecar contract:
# FastAPI example
@app.post("/invoke")
async def invoke(body: dict):
    user_msg = body["input"]
    # ... run your chain ...
    return StreamingResponse(generate(user_msg), media_type="text/event-stream")

Writing a Custom Processor

Custom processors follow a five-step pattern: define a struct, implement the interface, register with init(), import the package, and add to config.
1

Create a Go package

Create a new directory under pkg/processors/:
mkdir -p pkg/processors/myprocessor
touch pkg/processors/myprocessor/myprocessor.go
2

Define the struct

Embed *processors.BaseProcessor, which provides the downstream push channel, name, and default no-op implementations:
package myprocessor

import (
    "context"
    "encoding/json"

    "voxray-go/pkg/frames"
    "voxray-go/pkg/pipeline"
    "voxray-go/pkg/processors"
)

type MyConfig struct {
    Prefix string `json:"prefix"`
}

type MyProcessor struct {
    *processors.BaseProcessor
    cfg MyConfig
}
3

Implement ProcessFrame

ProcessFrame is called for every frame that reaches this processor. Call p.PushDownstream to forward frames to the next processor in the chain. Frames you do not forward are effectively dropped.
func (p *MyProcessor) ProcessFrame(
    ctx context.Context,
    frame frames.Frame,
    dir processors.FrameDirection,
) error {
    // Example: annotate TextFrames, forward everything else unchanged.
    if tf, ok := frame.(*frames.TextFrame); ok {
        tf.Text = p.cfg.Prefix + tf.Text
    }
    return p.PushDownstream(ctx, frame)
}
Always call p.PushDownstream(ctx, frame) for frames you do not want to drop, including frame types your processor does not handle. Failing to forward a frame type like StartFrame or CancelFrame will break pipeline lifecycle management.
4

Register with init()

The init() function runs automatically when the package is imported. It registers your constructor under the name that config will reference:
func init() {
    pipeline.RegisterProcessor("my_processor",
        func(name string, opts json.RawMessage) processors.Processor {
            var cfg MyConfig
            if len(opts) > 0 {
                _ = json.Unmarshal(opts, &cfg)
            }
            return &MyProcessor{
                BaseProcessor: processors.NewBaseProcessor(name),
                cfg:           cfg,
            }
        },
    )
}
5

Blank-import the package in main.go

Go’s init() only runs if the package is imported. Add a blank import to cmd/voxray/main.go:
import (
    // ... existing imports ...
    _ "voxray-go/pkg/processors/myprocessor"
)
This is the same pattern used by all built-in processors (e.g. pkg/processors/voice/register.go registers interruption_controller this way).
6

Add to config

Add your processor name to plugins and provide any options under plugin_options:
{
  "plugins": ["stt_mute_filter", "my_processor", "interruption_controller"],
  "plugin_options": {
    "my_processor": {
      "prefix": "[bot] "
    }
  }
}
Restart the server. Voxray will log a startup error and refuse to run if your processor name is in plugins but not in the registry — a fast feedback loop for typos or missing imports.

Processor Interface Reference

// Processor is implemented by every pipeline stage.
type Processor interface {
    Name() string
    Setup(ctx context.Context) error
    Cleanup(ctx context.Context) error
    ProcessFrame(ctx context.Context, frame frames.Frame, dir FrameDirection) error
    SetDownstream(downstream Processor)
    PushDownstream(ctx context.Context, frame frames.Frame) error
}
BaseProcessor implements all methods with safe defaults. Override only ProcessFrame (and optionally Setup/Cleanup for resource lifecycle) in custom processors.

Frame direction

type FrameDirection int

const (
    FrameDirectionDownstream FrameDirection = iota // client → sink (normal)
    FrameDirectionUpstream                         // sink → client (e.g. interruption signals)
)
Most processors handle only FrameDirectionDownstream. The interruption_controller uses FrameDirectionUpstream to propagate cancellation signals back toward the TTS processor.

Pipeline Execution Model

Frames flow synchronously through the processor chain within a single goroutine per Push call. Processors must not block indefinitely — use ctx for cancellation. If a processor needs to spawn background work (e.g. an async HTTP call), it should push a result frame from that goroutine via a channel and a separate Pipeline.Push call, not block ProcessFrame. The runner feeds frames into the pipeline via a buffered queue (default capacity: 256 frames). If the pipeline falls behind, the queue fills and the transport reader blocks — providing back-pressure to the client. This is intentional and prevents unbounded memory growth under load.
Transport.Input → [buffered queue, cap=256] → Pipeline.Push → Processor₁ → … → Sink → Transport.Output
Processors at the end of a plugin pipeline automatically receive a Sink appended by the pipeline builder, which writes frames to Transport.Output. You do not need to wire the sink manually.