bbx_dsp/
graph.rs

1//! DSP graph system.
2//!
3//! This module provides [`Graph`] for managing connected DSP blocks and
4//! [`GraphBuilder`] for fluent graph construction.
5//!
6//! Blocks are connected to form a signal processing chain. The graph handles
7//! buffer allocation, execution ordering via topological sort, and modulation
8//! value collection.
9
10use std::collections::HashMap;
11
12use bbx_core::StackVec;
13
14use crate::{
15    block::{BlockCategory, BlockId, BlockType},
16    blocks::{effectors::mixer::MixerBlock, io::output::OutputBlock},
17    buffer::{AudioBuffer, Buffer},
18    channel::ChannelLayout,
19    context::DspContext,
20    parameter::Parameter,
21    sample::Sample,
22};
23
24/// Maximum number of inputs a block can have (realtime-safe stack allocation).
25/// Set to 16 to support third-order ambisonics (16 channels).
26pub const MAX_BLOCK_INPUTS: usize = 16;
27/// Maximum number of outputs a block can have (realtime-safe stack allocation).
28/// Set to 16 to support third-order ambisonics (16 channels).
29pub const MAX_BLOCK_OUTPUTS: usize = 16;
30
31/// Describes an audio connection between two blocks.
32///
33/// Connects a specific output port of one block to an input port of another.
34#[derive(Debug, Clone)]
35pub struct Connection {
36    /// Source block providing audio.
37    pub from: BlockId,
38    /// Output port index on the source block.
39    pub from_output: usize,
40    /// Destination block receiving audio.
41    pub to: BlockId,
42    /// Input port index on the destination block.
43    pub to_input: usize,
44}
45
46/// Snapshot of a block's metadata for visualization.
47///
48/// Contains owned data suitable for cross-thread transfer.
49#[derive(Debug, Clone)]
50pub struct BlockSnapshot {
51    /// The block's unique identifier.
52    pub id: usize,
53    /// Display name of the block type.
54    pub name: String,
55    /// Category of the block.
56    pub category: BlockCategory,
57    /// Number of input ports.
58    pub input_count: usize,
59    /// Number of output ports.
60    pub output_count: usize,
61}
62
63/// Snapshot of a connection for visualization.
64#[derive(Debug, Clone)]
65pub struct ConnectionSnapshot {
66    /// Source block ID.
67    pub from_block: usize,
68    /// Source output port index.
69    pub from_output: usize,
70    /// Destination block ID.
71    pub to_block: usize,
72    /// Destination input port index.
73    pub to_input: usize,
74}
75
76/// Snapshot of a modulation connection for visualization.
77#[derive(Debug, Clone)]
78pub struct ModulationConnectionSnapshot {
79    /// Source modulator block ID.
80    pub from_block: usize,
81    /// Target block ID.
82    pub to_block: usize,
83    /// Name of the modulated parameter on the target block.
84    pub parameter_name: String,
85}
86
87/// Snapshot of a graph's topology for visualization.
88///
89/// Contains all block metadata and connections at a point in time.
90/// This is an owned snapshot suitable for cross-thread transfer.
91#[derive(Debug, Clone)]
92pub struct GraphTopologySnapshot {
93    /// All blocks in the graph.
94    pub blocks: Vec<BlockSnapshot>,
95    /// All audio connections between blocks.
96    pub connections: Vec<ConnectionSnapshot>,
97    /// All modulation connections from modulators to block parameters.
98    pub modulation_connections: Vec<ModulationConnectionSnapshot>,
99}
100
101/// A directed acyclic graph of connected DSP blocks.
102///
103/// The graph manages block storage, buffer allocation, and execution ordering.
104/// Blocks are processed in topologically sorted order to ensure dependencies
105/// are satisfied.
106pub struct Graph<S: Sample> {
107    blocks: Vec<BlockType<S>>,
108    connections: Vec<Connection>,
109    execution_order: Vec<BlockId>,
110    output_block: Option<BlockId>,
111
112    // Pre-allocated buffers
113    audio_buffers: Vec<AudioBuffer<S>>,
114    modulation_values: Vec<S>,
115
116    // Buffer management
117    block_buffer_start: Vec<usize>,
118    buffer_size: usize,
119    context: DspContext,
120
121    // Pre-computed connection lookups: block_id -> [input buffer indices]
122    // Computed once in prepare_for_playback() for O(1) lookup during processing
123    block_input_buffers: Vec<Vec<usize>>,
124}
125
126impl<S: Sample> Graph<S> {
127    /// Create a `Graph` with a given sample rate, buffer size, and number of channels.
128    pub fn new(sample_rate: f64, buffer_size: usize, num_channels: usize) -> Self {
129        let context = DspContext {
130            sample_rate,
131            buffer_size,
132            num_channels,
133            current_sample: 0,
134            channel_layout: ChannelLayout::default(),
135        };
136
137        Self {
138            blocks: Vec::new(),
139            connections: Vec::new(),
140            execution_order: Vec::new(),
141            output_block: None,
142            audio_buffers: Vec::new(),
143            modulation_values: Vec::new(),
144            block_buffer_start: Vec::new(),
145            buffer_size,
146            context,
147            block_input_buffers: Vec::new(),
148        }
149    }
150
151    /// Get the underlying `DspContext` used by a `Graph`.
152    #[inline]
153    pub fn context(&self) -> &DspContext {
154        &self.context
155    }
156
157    /// Get a reference to a block by its ID.
158    #[inline]
159    pub fn get_block(&self, id: BlockId) -> Option<&BlockType<S>> {
160        self.blocks.get(id.0)
161    }
162
163    /// Get a mutable reference to a block by its ID.
164    #[inline]
165    pub fn get_block_mut(&mut self, id: BlockId) -> Option<&mut BlockType<S>> {
166        self.blocks.get_mut(id.0)
167    }
168
169    /// Add an arbitrary block to the `Graph`.
170    pub fn add_block(&mut self, block: BlockType<S>) -> BlockId {
171        let block_id = BlockId(self.blocks.len());
172
173        self.block_buffer_start.push(self.audio_buffers.len());
174        self.blocks.push(block);
175
176        let output_count = self.blocks[block_id.0].output_count();
177        for _ in 0..output_count {
178            self.audio_buffers.push(AudioBuffer::new(self.buffer_size));
179        }
180
181        block_id
182    }
183
184    /// Add an output block to the `Graph`.
185    pub fn add_output_block(&mut self) -> BlockId {
186        let block = BlockType::Output(OutputBlock::<S>::new(self.context.num_channels));
187        let block_id = self.add_block(block);
188        self.output_block = Some(block_id);
189        block_id
190    }
191
192    /// Form a `Connection` between two particular blocks.
193    pub fn connect(&mut self, from: BlockId, from_output: usize, to: BlockId, to_input: usize) {
194        self.connections.push(Connection {
195            from,
196            from_output,
197            to,
198            to_input,
199        })
200    }
201
202    /// Prepares the graph for audio processing.
203    ///
204    /// Must be called after all blocks are added and connected, but before
205    /// [`process_buffers`](Self::process_buffers). Computes execution order
206    /// and pre-allocates buffers.
207    pub fn prepare_for_playback(&mut self) {
208        self.execution_order = self.topological_sort();
209        self.modulation_values.resize(self.blocks.len(), S::ZERO);
210
211        // Pre-compute input buffer indices for each block (O(1) lookup during processing)
212        self.block_input_buffers = vec![Vec::new(); self.blocks.len()];
213        for conn in &self.connections {
214            let buffer_idx = self.get_buffer_index(conn.from, conn.from_output);
215            self.block_input_buffers[conn.to.0].push(buffer_idx);
216        }
217
218        #[cfg(debug_assertions)]
219        self.validate_buffer_indices();
220    }
221
222    /// Validates that input and output buffer indices never overlap for any block.
223    ///
224    /// This invariant is critical for the safety of `process_block_unsafe()`.
225    #[cfg(debug_assertions)]
226    fn validate_buffer_indices(&self) {
227        for block_id in 0..self.blocks.len() {
228            let input_indices = &self.block_input_buffers[block_id];
229
230            // Compute output indices for this block
231            let output_count = self.blocks[block_id].output_count();
232            let output_start = self.block_buffer_start[block_id];
233
234            for output_idx in 0..output_count {
235                let buffer_idx = output_start + output_idx;
236
237                // Check that no input index matches this output index
238                debug_assert!(
239                    !input_indices.contains(&buffer_idx),
240                    "Block {block_id} has overlapping input/output buffer index {buffer_idx}. \
241                     This would cause undefined behavior in process_block_unsafe()."
242                );
243            }
244        }
245    }
246
247    fn topological_sort(&self) -> Vec<BlockId> {
248        let mut in_degree = vec![0; self.blocks.len()];
249        let mut adjacency_list: HashMap<BlockId, Vec<BlockId>> = HashMap::new();
250
251        for connection in &self.connections {
252            adjacency_list.entry(connection.from).or_default().push(connection.to);
253            in_degree[connection.to.0] += 1;
254        }
255
256        // Kahn's algorithm
257        let mut queue = Vec::new();
258        let mut result = Vec::new();
259
260        for (i, &degree) in in_degree.iter().enumerate() {
261            if degree == 0 {
262                queue.push(BlockId(i));
263            }
264        }
265
266        while let Some(block) = queue.pop() {
267            result.push(block);
268            if let Some(neighbors) = adjacency_list.get(&block) {
269                for &neighbor in neighbors {
270                    in_degree[neighbor.0] -= 1;
271                    if in_degree[neighbor.0] == 0 {
272                        queue.push(neighbor);
273                    }
274                }
275            }
276        }
277
278        result
279    }
280
281    /// Process one buffer's worth of audio through all blocks.
282    ///
283    /// Executes blocks in topologically sorted order, copying final output
284    /// to the provided buffers (one per channel).
285    #[inline]
286    pub fn process_buffers(&mut self, output_buffers: &mut [&mut [S]]) {
287        for buffer in &mut self.audio_buffers {
288            buffer.zeroize();
289        }
290
291        for i in 0..self.execution_order.len() {
292            let block_id = self.execution_order[i];
293            self.process_block_unsafe(block_id);
294            self.collect_modulation_values(block_id);
295        }
296
297        self.copy_to_output_buffer(output_buffers);
298    }
299
300    #[inline]
301    fn process_block_unsafe(&mut self, block_id: BlockId) {
302        // Use pre-computed input buffer indices (O(1) lookup instead of O(n) scan)
303        let input_indices = &self.block_input_buffers[block_id.0];
304
305        // Build output indices using stack allocation (no heap allocation)
306        let mut output_indices: StackVec<usize, MAX_BLOCK_OUTPUTS> = StackVec::new();
307        let output_count = self.blocks[block_id.0].output_count();
308        debug_assert!(
309            output_count <= MAX_BLOCK_OUTPUTS,
310            "Block output count {output_count} exceeds MAX_BLOCK_OUTPUTS {MAX_BLOCK_OUTPUTS}"
311        );
312        for output_index in 0..output_count {
313            let buffer_index = self.get_buffer_index(block_id, output_index);
314            output_indices.push_unchecked(buffer_index);
315        }
316
317        // SAFETY: Our buffer indexing guarantees that:
318        // 1. Input indices come from other blocks' outputs.
319        // 2. Output indices are unique to this block.
320        // 3. Therefore, input_indices and output_indices NEVER overlap.
321        // 4. All indices are valid (within the bounds of self.audio_buffers).
322        unsafe {
323            let buffers_ptr = self.audio_buffers.as_mut_ptr();
324
325            // Build input slices using stack allocation (no heap allocation)
326            let mut input_slices: StackVec<&[S], MAX_BLOCK_INPUTS> = StackVec::new();
327            let input_count = input_indices.len();
328            debug_assert!(
329                input_count <= MAX_BLOCK_INPUTS,
330                "Block input count {input_count} exceeds MAX_BLOCK_INPUTS {MAX_BLOCK_INPUTS}"
331            );
332            for &index in input_indices {
333                let buffer_ptr = buffers_ptr.add(index);
334                let slice = std::slice::from_raw_parts((*buffer_ptr).as_ptr(), (*buffer_ptr).len());
335                // SAFETY: We verified input_indices.len() <= MAX_BLOCK_INPUTS via debug_assert
336                input_slices.push_unchecked(slice);
337            }
338
339            // Build output slices using stack allocation (no heap allocation)
340            let mut output_slices: StackVec<&mut [S], MAX_BLOCK_OUTPUTS> = StackVec::new();
341            for &index in output_indices.as_slice() {
342                let buffer_ptr = buffers_ptr.add(index);
343                let slice = std::slice::from_raw_parts_mut((*buffer_ptr).as_mut_ptr(), (*buffer_ptr).len());
344                // SAFETY: output_indices.len() <= MAX_BLOCK_OUTPUTS (already verified above)
345                output_slices.push_unchecked(slice);
346            }
347
348            self.blocks[block_id.0].process(
349                input_slices.as_slice(),
350                output_slices.as_mut_slice(),
351                &self.modulation_values,
352                &self.context,
353            );
354        }
355    }
356
357    /// Collect modulation values from modulator blocks.
358    ///
359    /// # Control-Rate Modulation
360    ///
361    /// Modulation operates at **control rate** (per-buffer), not audio rate (per-sample).
362    /// Only the first sample of each modulator's output is used as the modulation value
363    /// for the entire buffer. This has several implications:
364    ///
365    /// - **LFO frequency limit**: Maximum LFO frequency is `sample_rate / (2 * buffer_size)`. At 44.1kHz with 512
366    ///   samples, that's ~43Hz. Higher frequencies will alias.
367    /// - **Stepped modulation**: Fast parameter changes appear "stepped" at buffer boundaries.
368    /// - **Envelope precision**: Gate on/off detection only happens at buffer boundaries.
369    ///
370    /// This design is intentional for performance reasons: audio-rate modulation would
371    /// require per-sample parameter updates, significantly increasing CPU usage.
372    #[inline]
373    fn collect_modulation_values(&mut self, block_id: BlockId) {
374        // Bounds check to prevent panic in audio thread
375        if block_id.0 >= self.blocks.len() {
376            return;
377        }
378
379        let has_modulation = !self.blocks[block_id.0].modulation_outputs().is_empty();
380        if has_modulation {
381            let buffer_index = self.get_buffer_index(block_id, 0);
382            // Take only the first sample (control rate, not audio rate)
383            if let (Some(&first_sample), Some(mod_val)) = (
384                self.audio_buffers.get(buffer_index).and_then(|b| b.as_slice().first()),
385                self.modulation_values.get_mut(block_id.0),
386            ) {
387                *mod_val = first_sample;
388            }
389        }
390    }
391
392    fn copy_to_output_buffer(&self, output_buffer: &mut [&mut [S]]) {
393        // In a more complex system, there could be multiple output blocks...
394        if let Some(output_block_id) = self.output_block {
395            let output_count = self.blocks[output_block_id.0].output_count();
396            for channel in 0..output_count.min(output_buffer.len()) {
397                let internal_buffer_index = self.get_buffer_index(output_block_id, channel);
398                let internal_buffer = &self.audio_buffers[internal_buffer_index];
399
400                let copy_length = internal_buffer.len().min(output_buffer[channel].len());
401                output_buffer[channel][..copy_length].copy_from_slice(&internal_buffer.as_slice()[..copy_length]);
402            }
403        }
404    }
405
406    #[inline]
407    fn get_buffer_index(&self, block_id: BlockId, output_index: usize) -> usize {
408        self.block_buffer_start[block_id.0] + output_index
409    }
410}
411
412/// Fluent builder for constructing DSP graphs.
413///
414/// Provides methods to add blocks, create connections, and set up modulation.
415/// Call [`build`](Self::build) to finalize and prepare the graph.
416pub struct GraphBuilder<S: Sample> {
417    graph: Graph<S>,
418}
419
420impl<S: Sample> GraphBuilder<S> {
421    /// Create a `GraphBuilder` that will construct a `Graph` with a given
422    /// sample rate, buffer size, and number of channels.
423    pub fn new(sample_rate: f64, buffer_size: usize, num_channels: usize) -> Self {
424        Self {
425            graph: Graph::new(sample_rate, buffer_size, num_channels),
426        }
427    }
428
429    /// Create a `GraphBuilder` with a specific channel layout.
430    ///
431    /// This constructor sets both the channel count and the layout, which enables
432    /// layout-aware processing for blocks like panners and decoders.
433    pub fn with_layout(sample_rate: f64, buffer_size: usize, layout: ChannelLayout) -> Self {
434        let num_channels = layout.channel_count();
435        let mut builder = Self {
436            graph: Graph::new(sample_rate, buffer_size, num_channels),
437        };
438        builder.graph.context.channel_layout = layout;
439        builder
440    }
441
442    /// Add a block to the graph.
443    ///
444    /// Accepts any block type that implements `Into<BlockType<S>>`.
445    ///
446    /// # Example
447    ///
448    /// ```ignore
449    /// use bbx_dsp::prelude::*;
450    ///
451    /// let mut builder = GraphBuilder::<f32>::new(44100.0, 512, 2);
452    /// let osc = builder.add(OscillatorBlock::new(440.0, Waveform::Sine, None));
453    /// let gain = builder.add(GainBlock::new(-6.0, None));
454    /// builder.connect(osc, 0, gain, 0);
455    /// let graph = builder.build();
456    /// ```
457    pub fn add<B: Into<BlockType<S>>>(&mut self, block: B) -> BlockId {
458        self.graph.add_block(block.into())
459    }
460
461    /// Form a `Connection` between two particular blocks.
462    pub fn connect(&mut self, from: BlockId, from_output: usize, to: BlockId, to_input: usize) -> &mut Self {
463        self.graph.connect(from, from_output, to, to_input);
464        self
465    }
466
467    /// Specify a `Parameter` to be modulated by a `Modulator` block.
468    pub fn modulate(&mut self, source: BlockId, target: BlockId, parameter: &str) -> &mut Self {
469        if let Err(e) = self.graph.blocks[target.0].set_parameter(parameter, Parameter::Modulated(source)) {
470            eprintln!("Modulation error: {e}");
471        }
472        self
473    }
474
475    /// Capture a snapshot of the current graph topology for visualization.
476    ///
477    /// Returns owned data suitable for cross-thread transfer to a visualization
478    /// thread. Call this before `build()` to capture the user-defined topology
479    /// (the output block is added during build).
480    pub fn capture_topology(&self) -> GraphTopologySnapshot {
481        let blocks = self
482            .graph
483            .blocks
484            .iter()
485            .enumerate()
486            .map(|(id, block)| BlockSnapshot {
487                id,
488                name: block.name().to_string(),
489                category: block.category(),
490                input_count: block.input_count(),
491                output_count: block.output_count(),
492            })
493            .collect();
494
495        let connections = self
496            .graph
497            .connections
498            .iter()
499            .map(|conn| ConnectionSnapshot {
500                from_block: conn.from.0,
501                from_output: conn.from_output,
502                to_block: conn.to.0,
503                to_input: conn.to_input,
504            })
505            .collect();
506
507        let modulation_connections = self
508            .graph
509            .blocks
510            .iter()
511            .enumerate()
512            .flat_map(|(target_id, block)| {
513                block
514                    .get_modulated_parameters()
515                    .into_iter()
516                    .map(move |(param_name, source_id)| ModulationConnectionSnapshot {
517                        from_block: source_id.0,
518                        to_block: target_id,
519                        parameter_name: param_name.to_string(),
520                    })
521            })
522            .collect();
523
524        GraphTopologySnapshot {
525            blocks,
526            connections,
527            modulation_connections,
528        }
529    }
530
531    /// Prepare the final DSP `Graph`.
532    ///
533    /// Automatically inserts a mixer before the output block when multiple terminal
534    /// blocks exist, unless the developer has already provided their own mixer or
535    /// output block connections.
536    ///
537    /// # Panics
538    ///
539    /// Panics if any block has more inputs or outputs than the realtime-safe
540    /// limits (`MAX_BLOCK_INPUTS` or `MAX_BLOCK_OUTPUTS`).
541    pub fn build(mut self) -> Graph<S> {
542        let num_channels = self.graph.context.num_channels;
543
544        // Check if developer already added an output block
545        let existing_output = self.graph.blocks.iter().position(|b| b.is_output()).map(BlockId);
546
547        // Find all terminal blocks: blocks with no outgoing connections,
548        // excluding modulators (LFO, Envelope) and output-type blocks.
549        let terminal_blocks: Vec<BlockId> = self
550            .graph
551            .blocks
552            .iter()
553            .enumerate()
554            .filter(|(idx, block)| {
555                let block_id = BlockId(*idx);
556                let has_outgoing = self.graph.connections.iter().any(|c| c.from == block_id);
557                !has_outgoing && !block.is_modulator() && !block.is_output()
558            })
559            .map(|(idx, _)| BlockId(idx))
560            .collect();
561
562        // Check if developer already added a mixer that receives from terminal blocks
563        let explicit_mixer = self.find_explicit_mixer(&terminal_blocks);
564
565        // Determine what needs to be added
566        let output_id = existing_output.unwrap_or_else(|| self.graph.add_output_block());
567
568        match (explicit_mixer, terminal_blocks.len()) {
569            // Developer provided a mixer - connect it to output if not already connected
570            (Some(mixer_id), _) => {
571                let mixer_has_outgoing = self.graph.connections.iter().any(|c| c.from == mixer_id);
572                if !mixer_has_outgoing {
573                    self.connect_block_to_output(mixer_id, output_id, num_channels);
574                }
575            }
576
577            // No explicit mixer, no terminal blocks - nothing to connect
578            (None, 0) => {}
579
580            // No explicit mixer, single terminal block - connect directly to output
581            (None, 1) => {
582                let block_id = terminal_blocks[0];
583                self.connect_block_to_output(block_id, output_id, num_channels);
584            }
585
586            // No explicit mixer, multiple terminal blocks - insert a mixer
587            (None, num_sources) => {
588                let mixer_id = self
589                    .graph
590                    .add_block(BlockType::Mixer(MixerBlock::new(num_sources, num_channels)));
591
592                // Connect each terminal block's outputs to the mixer's inputs
593                for (source_idx, &block_id) in terminal_blocks.iter().enumerate() {
594                    let block_output_count = self.graph.blocks[block_id.0].output_count();
595                    for ch in 0..num_channels.min(block_output_count) {
596                        let mixer_input = source_idx * num_channels + ch;
597                        self.connect(block_id, ch, mixer_id, mixer_input);
598                    }
599                }
600
601                // Connect mixer to output
602                self.connect_block_to_output(mixer_id, output_id, num_channels);
603            }
604        }
605
606        self.graph.prepare_for_playback();
607
608        // Validate that all blocks are within realtime-safe I/O limits
609        for (idx, block) in self.graph.blocks.iter().enumerate() {
610            let connected_inputs = self.graph.block_input_buffers[idx].len();
611            let output_count = block.output_count();
612
613            assert!(
614                connected_inputs <= MAX_BLOCK_INPUTS,
615                "Block {idx} has {connected_inputs} connected inputs, exceeding MAX_BLOCK_INPUTS ({MAX_BLOCK_INPUTS})"
616            );
617            assert!(
618                output_count <= MAX_BLOCK_OUTPUTS,
619                "Block {idx} has {output_count} outputs, exceeding MAX_BLOCK_OUTPUTS ({MAX_BLOCK_OUTPUTS})"
620            );
621        }
622
623        self.graph
624    }
625
626    /// Find an explicit mixer (Mixer or MatrixMixer) that has connections from terminal blocks.
627    fn find_explicit_mixer(&self, terminal_blocks: &[BlockId]) -> Option<BlockId> {
628        for (idx, block) in self.graph.blocks.iter().enumerate() {
629            let is_mixer = matches!(block, BlockType::Mixer(_) | BlockType::MatrixMixer(_));
630            if !is_mixer {
631                continue;
632            }
633
634            let block_id = BlockId(idx);
635            let has_terminal_input = self
636                .graph
637                .connections
638                .iter()
639                .any(|c| c.to == block_id && terminal_blocks.contains(&c.from));
640
641            if has_terminal_input {
642                return Some(block_id);
643            }
644        }
645        None
646    }
647
648    /// Connect a block's outputs to the output block, channel by channel.
649    fn connect_block_to_output(&mut self, from: BlockId, to: BlockId, num_channels: usize) {
650        let output_count = self.graph.blocks[from.0].output_count();
651        for ch in 0..num_channels.min(output_count) {
652            self.connect(from, ch, to, ch);
653        }
654    }
655}