Skip to main content

Documentation Index

Fetch the complete documentation index at: https://voxray-cac3ed72.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

The pipeline is the core runtime of every Voxray session. It is a linear chain of Processors that transform frames as they flow left to right (downstream) or right to left (upstream). Each processor does exactly one job, hands the frame to the next processor when done, and never blocks the goroutine beyond its own work.

What Is a Processor?

A Processor is the fundamental unit of the pipeline. It receives a Frame, inspects or transforms it, optionally emits one or more new frames to its neighbour, and returns. Every processor is linked to a next (downstream) and a prev (upstream) neighbour by the Pipeline when it is added. Processors are stateful — they own their own buffers, service clients, and mutexes — but they are never shared across sessions.

The Processor Interface

The complete interface definition from pkg/processors/processor.go:
// Direction is the frame flow direction.
type Direction int

const (
    Downstream Direction = 1
    Upstream   Direction = 2
)

// Processor processes frames and can be linked into a pipeline.
type Processor interface {
    ProcessFrame(ctx context.Context, f frames.Frame, dir Direction) error
    SetNext(p Processor)
    SetPrev(p Processor)
    Setup(ctx context.Context) error
    Cleanup(ctx context.Context) error
    Name() string
}
ProcessFrame is the hot path — it is called once per frame. Setup and Cleanup handle one-time initialisation (open connections, allocate buffers) and teardown (flush, close connections) respectively. Name returns a human-readable label used in logs and metrics.

Direction: Downstream and Upstream

Frames have a direction that controls which neighbour they are forwarded to.
DirectionValueFlowWhen it is used
Downstream1Left → Right (first processor → Sink)Normal data flow: audio in, transcription, LLM tokens, TTS audio out
Upstream2Right ← Left (Sink → first processor)Error propagation, VAD parameter updates, barge-in signals originating from the Sink side
When a processor calls PushDownstream, the frame travels toward the Sink. When it calls PushUpstream, the frame travels back toward the source. For example, ErrorFrame is created inside STTProcessor after a failed API call and pushed upstream so it can be logged or forwarded to the client by an upstream transport-facing processor, without polluting the downstream audio path.
Most processors only handle Downstream frames and silently forward anything arriving with dir == Upstream to b.prev. Check the direction early in ProcessFrame and branch accordingly — see the STTProcessor source for the canonical pattern.

BaseProcessor: The Embed Pattern

BaseProcessor is a concrete struct you embed in every custom processor. It provides the doubly-linked chain (next / prev), default no-op Setup / Cleanup, and forwarding helpers.
// BaseProcessor provides next/prev linking and default forward behavior.
type BaseProcessor struct {
    name string
    next Processor
    prev Processor
}

// ProcessFrame forwards the frame to next (downstream) or prev (upstream). Override in embeddings.
func (b *BaseProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir Direction) error {
    if dir == Downstream && b.next != nil {
        return b.next.ProcessFrame(ctx, f, dir)
    }
    if dir == Upstream && b.prev != nil {
        return b.prev.ProcessFrame(ctx, f, dir)
    }
    return nil
}

// PushDownstream forwards f to the next processor.
func (b *BaseProcessor) PushDownstream(ctx context.Context, f frames.Frame) error {
    return b.ProcessFrame(ctx, f, Downstream)
}

// PushUpstream forwards f to the previous processor.
func (b *BaseProcessor) PushUpstream(ctx context.Context, f frames.Frame) error {
    return b.ProcessFrame(ctx, f, Upstream)
}
The default ProcessFrame is a pure passthrough — every frame is forwarded in its current direction. Override ProcessFrame in your embedding struct to intercept specific frame types; call b.PushDownstream or b.PushUpstream to pass frames along.

Pipeline Construction

Processors are registered with pipeline.Add or the batch helper pipeline.Link. The Pipeline stitches the doubly-linked chain automatically:
p := pipeline.New()
p.Link(
    voice.NewTurnProcessor("Turn", vadDetector, turnAnalyzer, 16000, 1, false),
    voice.NewSTTProcessor("STT", sttService, 16000, 1),
    voice.NewLLMProcessorWithSystemPrompt("LLM", llmService, systemPrompt),
    voice.NewTTSProcessor("TTS", ttsService, 24000),
    processors.NewSink("Sink", transport.Output()),
)
p.Setup(ctx)
p.Start(ctx, nil) // pushes StartFrame
Pipeline.Push(ctx, frame) calls ProcessFrame on the first processor with dir = Downstream. Pipeline.PushUpstream calls ProcessFrame on the last processor with dir = Upstream — used by nested pipelines such as ParallelPipeline.
Pipeline.Cleanup calls processors in reverse order so each processor can safely reference the state of the processor downstream from it (e.g. the Sink can be closed before the TTS processor tries to push a final frame).

Built-in Voice Processors

TurnProcessor

Package: pkg/processors/voice Purpose: VAD-based turn detection. Buffers raw audio chunks, runs a Voice Activity Detector on each chunk, and emits one AudioRawFrame containing the full turn only when end-of-speech is confirmed. Key fields:
  • VAD vad.Detector — per-chunk speech probability classifier
  • Analyzer turn.Analyzer — silence-threshold state machine (AppendAudioturn.Complete)
  • SampleRate, Channels — governs buffer sizing (pre-allocated to maxDurationSecs × sampleRate × 2 × channels bytes to avoid GC pressure)
  • userTurnController — emits UserStartedSpeakingFrame / UserStoppedSpeakingFrame / UserIdleFrame based on VAD transitions and configurable stop/idle timeouts
Receives: AudioRawFrame (raw 16-bit PCM from the client transport) Emits:
  • AudioRawFrame — one per complete turn (concatenated audio), forwarded downstream to STTProcessor
  • UserStartedSpeakingFrame — when VAD transitions to speech
  • UserStoppedSpeakingFrame — after silence threshold is exceeded
  • UserIdleFrame — when the user has been silent beyond the idle timeout
Async mode: When useAsync = true, end-of-turn detection runs via Analyzer.AnalyzeEndOfTurnAsync — a non-blocking channel poll per audio chunk. This avoids stalling the audio receive loop while the analyzer consults a model or timer.
AudioRawFrame → VAD.IsSpeech → Analyzer.AppendAudio
    ├─ turn.Pending → buffer and wait
    └─ turn.Complete → PushDownstream(concatenated AudioRawFrame)
Update VAD parameters at runtime by pushing a VADParamsUpdateFrame upstream from anywhere in the pipeline. TurnProcessor intercepts it in both directions and calls Analyzer.UpdateParams.

STTProcessor

Package: pkg/processors/voice Purpose: Transcribes audio to text. Buffers incoming AudioRawFrame bytes until a minimum threshold is reached (default 500 ms at 16 kHz mono = 16,000 bytes), then calls STTService.Transcribe and emits TranscriptionFrame for each non-empty result. Key fields:
  • STT services.STTService — provider-agnostic transcription interface
  • SampleRate, Channels — used to compute MinBufferBytes
  • MinBufferBytes — minimum byte count before a transcription call is made (configurable via NewSTTProcessorWithBuffer; default derives from MinSTTBufferMs = 500)
Receives: AudioRawFrame Emits:
  • TranscriptionFrame — one per non-empty transcript segment; carries Text, Finalized, Language
  • ErrorFrame (upstream) — on STT API failure; non-fatal, pipeline continues
Why the minimum buffer? Sending 20 ms chunks to cloud STT APIs consistently returns empty responses. The 500 ms floor gives the provider enough signal to produce a reliable transcript before the LLM is invoked.
AudioRawFrame → accumulate in buf
    ├─ len(buf) < MinBufferBytes → wait for more audio
    └─ len(buf) >= MinBufferBytes → STTService.Transcribe → []TranscriptionFrame
           └─ for each non-empty transcript → PushDownstream(TranscriptionFrame)

LLMProcessor

Package: pkg/processors/voice Purpose: Runs a language model on the accumulated conversation history and streams response tokens downstream. Maintains an internal message list (msgs []map[string]any) that grows across turns, giving the LLM full conversation context. Key fields:
  • LLM services.LLMService — streaming chat interface; Chat(ctx, messages, onToken)
  • SystemPrompt string — prepended as a {"role": "system", ...} message when non-empty
  • msgs []map[string]any — append-only conversation history; guarded by sync.Mutex
  • OnContextUpdate OnContextUpdate — optional callback fired after every context mutation (used by IVR for mode-switching)
Receives:
  • TranscriptionFrame — appends {"role": "user", "content": text} then runs the LLM
  • LLMRunFrame — runs the LLM on the current context without adding a new user message (useful for injected system events)
  • LLMMessagesUpdateFrame — replaces the entire context; optionally runs the LLM immediately
Emits:
  • LLMTextFrame — one per streamed token, forwarded to TTSProcessor as they arrive
  • TTSSpeakFrame (empty text) — pushed at end-of-response to signal TTSProcessor to flush its sentence buffer
TranscriptionFrame → append user message → LLMService.Chat(messages, onToken)
    └─ onToken callback → PushDownstream(LLMTextFrame)  [for each token]
    └─ after Chat returns → PushDownstream(TTSSpeakFrame(""))  [flush signal]
The LLM processor appends the full assistant response to msgs only after Chat returns, once fullContent is assembled. This means the conversation history is always consistent: user message is appended before the call; assistant message is appended after.

TTSProcessor

Package: pkg/processors/voice Purpose: Synthesises text to speech. Batches streamed LLMTextFrame tokens until a sentence boundary (., !, ?, \n, ) is encountered or the buffer exceeds 120 runes, then calls TTSService.Speak and emits TTSAudioRawFrame. A minimum batch of 30 runes prevents single-token flushes that cause choppy playback. Key fields:
  • TTS services.TTSServiceSpeak(ctx, text, sampleRate) ([]TTSAudioRawFrame, error)
  • SampleRate int — output sample rate (default 24,000 Hz)
  • MaxBatchRunes int — maximum runes before flushing without a sentence end (default 120)
  • buf strings.Builder — accumulates token text between flushes
  • botSpeaking bool — tracks whether BotStartedSpeakingFrame has been emitted this turn; reset on UserStartedSpeakingFrame or StartFrame
Receives:
  • LLMTextFrame / TextFrame — appended to buf; tryFlush decides whether to synthesise
  • TTSSpeakFrame — explicit flush trigger (sent by LLMProcessor at end-of-response); any remaining buffer is spoken immediately, then the frame’s own text (if non-empty) is spoken
  • UserStartedSpeakingFrame — barge-in: buf is cleared, botSpeaking reset; frame forwarded downstream
Emits:
  • BotStartedSpeakingFrame — emitted before the first TTSAudioRawFrame of each bot response turn
  • TTSAudioRawFrame — one or more audio frames per Speak call
  • BotStoppedSpeakingFrame — emitted after the last audio frame of each synthesised segment
  • ErrorFrame (upstream) — on TTS API failure; non-fatal
LLMTextFrame → buf.WriteString(token)
    └─ tryFlush:
         ├─ runeCount < 30 → wait
         ├─ sentence boundary && runeCount >= 30 → flush
         └─ runeCount >= 120 → flush
              └─ TTSService.Speak(text) → []TTSAudioRawFrame
                   ├─ PushDownstream(BotStartedSpeakingFrame)  [first segment only]
                   ├─ PushDownstream(TTSAudioRawFrame)          [each audio frame]
                   └─ PushDownstream(BotStoppedSpeakingFrame)  [last audio frame]

InterruptionController

Package: pkg/processors/voice Purpose: Detects barge-in — the user speaking while the bot is speaking — and cancels in-progress TTS synthesis. Configurable min_words strategy prevents accidental interruptions from short affirmations (“yeah”, “ok”). The controller watches for UserStartedSpeakingFrame while botSpeaking is true. When barge-in is confirmed it pushes an InterruptionFrame downstream, which causes TTSProcessor to reset its state, and a CancelFrame to abort any pending pipeline work.

Aggregators

Aggregators are processors that accumulate frames across multiple turns or time windows before emitting a single richer frame. They live in pkg/processors/aggregators.
ProcessorPurposeTypical placement
dtmf_aggregatorAccumulates InputDTMFFrame digits; flushes as TranscriptionFrame on timeout, #, or End/CancelBefore LLMProcessor in telephony IVR pipelines
gatedBuffers all frames when a custom gate is closed; releases the queue when the gate opensFlow control between async events and the pipeline
llmfullresponseAggregates LLMTextFrame tokens between LLMFullResponseStartFrame and LLMFullResponseEndFrame; fires a callback on completion or interruptionAfter LLMProcessor for voicemail or IVR capture
llmtextConverts LLMTextFrameAggregatedTextFrame via a configurable text aggregator (e.g. sentence boundary)Between LLMProcessor and TTSProcessor when sentence aggregation is managed externally
userresponseBuffers TranscriptionFrame segments; emits one aggregated transcript on UserStoppedSpeakingFrameAfter STTProcessor when interim transcripts are enabled
gated_llm_contextHolds LLMContextFrame until an external notifier signals releaseBefore LLMProcessor when context injection is asynchronous
llmcontextsummarizerMonitors message count; pushes LLMContextSummaryRequestFrame when thresholds are exceeded; applies LLMContextSummaryResultFrame to compress historyAfter LLMProcessor for long-running sessions

ParallelPipeline

ParallelPipeline (in pkg/pipeline) wraps multiple child Pipeline instances. When a frame is pushed into the parallel pipeline, it is cloned and delivered to every branch simultaneously. Lifecycle frames (StartFrame, CancelFrame, EndFrame) are synchronised — the parallel pipeline waits for all branches to complete before forwarding the lifecycle frame downstream. This is useful when the same audio input feeds both a voice pipeline and a recording/transcription branch.
Each branch in a ParallelPipeline must end with its own Sink or output handler. Branches do not share processors and do not exchange frames with each other.

ServiceSwitcher

ServiceSwitcher (in pkg/processors) wraps an STTService, LLMService, or TTSService and allows the active provider to be swapped at runtime without stopping the pipeline. A typical use case is switching the STT language mid-call based on detected input, or swapping the LLM model when the conversation enters a specialised domain. The switcher is thread-safe and the swap takes effect on the next frame processed.

Writing a Custom Processor

To create a custom processor, embed *processors.BaseProcessor, implement ProcessFrame, and optionally override Setup and Cleanup.
package myprocessors

import (
    "context"
    "strings"

    "voxray-go/pkg/frames"
    "voxray-go/pkg/processors"
)

// UppercaseTranscriptionProcessor converts all transcription text to uppercase
// before forwarding downstream — a minimal custom processor example.
type UppercaseTranscriptionProcessor struct {
    *processors.BaseProcessor
}

func NewUppercaseTranscriptionProcessor(name string) *UppercaseTranscriptionProcessor {
    if name == "" {
        name = "UppercaseTranscription"
    }
    return &UppercaseTranscriptionProcessor{
        BaseProcessor: processors.NewBaseProcessor(name),
    }
}

// Setup is called once before the first frame. Open connections, allocate
// resources, or validate config here. Return an error to abort pipeline startup.
func (p *UppercaseTranscriptionProcessor) Setup(ctx context.Context) error {
    return nil // nothing to initialise
}

// Cleanup is called once after the pipeline shuts down (reverse order).
func (p *UppercaseTranscriptionProcessor) Cleanup(ctx context.Context) error {
    return nil // nothing to release
}

// ProcessFrame intercepts TranscriptionFrame and uppercases the text.
// All other frames — and all upstream frames — are forwarded unchanged.
func (p *UppercaseTranscriptionProcessor) ProcessFrame(
    ctx context.Context,
    f frames.Frame,
    dir processors.Direction,
) error {
    // Always forward upstream frames without modification.
    if dir == processors.Upstream {
        return p.PushUpstream(ctx, f)
    }

    tf, ok := f.(*frames.TranscriptionFrame)
    if !ok {
        // Not a transcription frame — pass it through downstream unchanged.
        return p.PushDownstream(ctx, f)
    }

    // Mutate a copy so we don't modify a shared frame.
    modified := *tf
    modified.Text = strings.ToUpper(tf.Text)
    return p.PushDownstream(ctx, &modified)
}
Register the processor in your pipeline builder before LLMProcessor:
p.Link(
    turnProc,
    sttProc,
    myprocessors.NewUppercaseTranscriptionProcessor("UppercaseSTT"),
    llmProc,
    ttsProc,
    sink,
)
Processors are not goroutine-safe by default. If your processor maintains mutable state that could be accessed from multiple goroutines (e.g. a callback or a background ticker), protect it with a sync.Mutex. See LLMProcessor.msgs for the canonical pattern.

Plugin registration

If your processor is assembled from config rather than code, register a factory function with pkg/plugin.Registry. The config key under plugins then maps to your constructor, and ProcessorFromConfig will instantiate it automatically when building the pipeline.