bbx_net/
buffer.rs

1//! Lock-free network buffer for thread-safe message passing.
2//!
3//! Provides a realtime-safe channel for passing network messages between threads,
4//! suitable for network input thread to audio thread communication.
5
6use bbx_core::{
7    StackVec,
8    spsc::{Consumer, Producer, SpscRingBuffer},
9};
10
11use crate::message::NetMessage;
12
13/// Maximum network events per audio buffer for stack allocation.
14pub const MAX_NET_EVENTS_PER_BUFFER: usize = 64;
15
16/// Producer side of the network buffer (used in network thread).
17pub struct NetBufferProducer {
18    producer: Producer<NetMessage>,
19}
20
21impl NetBufferProducer {
22    /// Try to send a network message to the consumer.
23    ///
24    /// Returns `true` if the message was sent, `false` if the buffer was full.
25    /// This method is non-blocking and safe to call from any thread.
26    #[inline]
27    pub fn try_send(&mut self, message: NetMessage) -> bool {
28        self.producer.try_push(message).is_ok()
29    }
30
31    /// Check if the buffer is full.
32    #[inline]
33    pub fn is_full(&self) -> bool {
34        self.producer.is_full()
35    }
36
37    /// Returns the approximate number of messages in the buffer.
38    #[inline]
39    pub fn len(&self) -> usize {
40        self.producer.len()
41    }
42
43    /// Returns `true` if the buffer is empty.
44    #[inline]
45    pub fn is_empty(&self) -> bool {
46        self.producer.is_empty()
47    }
48}
49
50/// Consumer side of the network buffer (used in audio thread).
51///
52/// All methods are realtime-safe and do not allocate.
53pub struct NetBufferConsumer {
54    consumer: Consumer<NetMessage>,
55}
56
57impl NetBufferConsumer {
58    /// Pop a single network message from the buffer (realtime-safe).
59    ///
60    /// Returns `Some(NetMessage)` if available, `None` if empty.
61    #[inline]
62    pub fn try_pop(&mut self) -> Option<NetMessage> {
63        self.consumer.try_pop()
64    }
65
66    /// Drain all available messages into a stack-allocated buffer.
67    ///
68    /// Returns a `StackVec` of messages. This is realtime-safe as it uses
69    /// stack allocation with a fixed maximum capacity of `MAX_NET_EVENTS_PER_BUFFER`.
70    ///
71    /// If more messages are available than the stack buffer can hold,
72    /// the remaining messages stay in the ring buffer for the next call.
73    #[inline]
74    pub fn drain_into_stack(&mut self) -> StackVec<NetMessage, MAX_NET_EVENTS_PER_BUFFER> {
75        let mut buffer: StackVec<NetMessage, MAX_NET_EVENTS_PER_BUFFER> = StackVec::new();
76        while !buffer.is_full() {
77            match self.consumer.try_pop() {
78                Some(msg) => {
79                    let _ = buffer.push(msg);
80                }
81                None => break,
82            }
83        }
84        buffer
85    }
86
87    /// Drain all available messages into a provided Vec.
88    ///
89    /// Returns the number of messages drained.
90    ///
91    /// # Warning
92    ///
93    /// This method allocates heap memory and is **NOT suitable for realtime
94    /// audio processing**. Use [`drain_into_stack()`](Self::drain_into_stack)
95    /// instead for audio thread code.
96    #[inline]
97    pub fn drain_into(&mut self, buffer: &mut Vec<NetMessage>) -> usize {
98        let mut count = 0;
99        while let Some(msg) = self.consumer.try_pop() {
100            buffer.push(msg);
101            count += 1;
102        }
103        count
104    }
105
106    /// Check if the buffer is empty.
107    #[inline]
108    pub fn is_empty(&self) -> bool {
109        self.consumer.is_empty()
110    }
111
112    /// Returns the approximate number of messages in the buffer.
113    #[inline]
114    pub fn len(&self) -> usize {
115        self.consumer.len()
116    }
117}
118
119/// Create a network buffer pair for thread-safe message transfer.
120///
121/// The `capacity` determines how many network messages can be buffered.
122/// A typical value is 256-1024 messages for high-throughput scenarios.
123///
124/// # Examples
125///
126/// ```
127/// use bbx_net::{NetMessage, NodeId, net_buffer};
128///
129/// let (mut producer, mut consumer) = net_buffer(256);
130///
131/// // In network thread
132/// let node_id = NodeId::default();
133/// let msg = NetMessage::param_change("gain", 0.5, node_id);
134/// producer.try_send(msg);
135///
136/// // In audio thread (realtime-safe)
137/// let events = consumer.drain_into_stack();
138/// for event in events {
139///     // Process network message
140/// }
141/// ```
142pub fn net_buffer(capacity: usize) -> (NetBufferProducer, NetBufferConsumer) {
143    let (producer, consumer) = SpscRingBuffer::new(capacity);
144    (NetBufferProducer { producer }, NetBufferConsumer { consumer })
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::address::NodeId;
151
152    #[test]
153    fn test_send_receive() {
154        let (mut producer, mut consumer) = net_buffer(8);
155
156        let node_id = NodeId::from_parts(1, 2);
157        let msg = NetMessage::param_change("gain", 0.75, node_id);
158        assert!(producer.try_send(msg));
159
160        let received = consumer.try_pop().expect("should have message");
161        assert!((received.payload.value().unwrap() - 0.75).abs() < f32::EPSILON);
162        assert!(consumer.try_pop().is_none());
163    }
164
165    #[test]
166    fn test_buffer_overflow() {
167        let (mut producer, _consumer) = net_buffer(2);
168
169        let node_id = NodeId::from_parts(1, 2);
170        let msg = NetMessage::param_change("test", 0.5, node_id);
171
172        assert!(producer.try_send(msg));
173        assert!(producer.try_send(msg));
174        assert!(!producer.try_send(msg));
175        assert!(producer.is_full());
176    }
177
178    #[test]
179    fn test_drain_into_stack() {
180        let (mut producer, mut consumer) = net_buffer(8);
181
182        let node_id = NodeId::from_parts(1, 2);
183        producer.try_send(NetMessage::param_change("p1", 0.1, node_id));
184        producer.try_send(NetMessage::param_change("p2", 0.2, node_id));
185        producer.try_send(NetMessage::param_change("p3", 0.3, node_id));
186
187        let stack = consumer.drain_into_stack();
188
189        assert_eq!(stack.len(), 3);
190        assert!((stack[0].payload.value().unwrap() - 0.1).abs() < f32::EPSILON);
191        assert!((stack[1].payload.value().unwrap() - 0.2).abs() < f32::EPSILON);
192        assert!((stack[2].payload.value().unwrap() - 0.3).abs() < f32::EPSILON);
193        assert!(consumer.is_empty());
194    }
195
196    #[test]
197    fn test_drain_into_vec() {
198        let (mut producer, mut consumer) = net_buffer(8);
199
200        let node_id = NodeId::from_parts(3, 4);
201        producer.try_send(NetMessage::trigger("a", node_id));
202        producer.try_send(NetMessage::trigger("b", node_id));
203
204        let mut buffer = Vec::new();
205        let count = consumer.drain_into(&mut buffer);
206
207        assert_eq!(count, 2);
208        assert_eq!(buffer.len(), 2);
209        assert!(consumer.is_empty());
210    }
211
212    #[test]
213    fn test_is_empty() {
214        let (mut producer, mut consumer) = net_buffer(4);
215
216        assert!(consumer.is_empty());
217
218        let node_id = NodeId::from_parts(5, 6);
219        producer.try_send(NetMessage::param_change("x", 0.0, node_id));
220        assert!(!consumer.is_empty());
221
222        consumer.try_pop();
223        assert!(consumer.is_empty());
224    }
225
226    #[test]
227    fn test_len() {
228        let (mut producer, consumer) = net_buffer(8);
229
230        assert_eq!(producer.len(), 0);
231        assert_eq!(consumer.len(), 0);
232
233        let node_id = NodeId::from_parts(7, 8);
234        producer.try_send(NetMessage::param_change("a", 0.0, node_id));
235        producer.try_send(NetMessage::param_change("b", 0.0, node_id));
236
237        assert_eq!(producer.len(), 2);
238        assert_eq!(consumer.len(), 2);
239    }
240
241    #[test]
242    fn test_drain_stack_overflow_preserves_remaining() {
243        let (mut producer, mut consumer) = net_buffer(128);
244
245        let node_id = NodeId::from_parts(9, 10);
246
247        for i in 0..100 {
248            producer.try_send(NetMessage::param_change("x", i as f32, node_id));
249        }
250
251        let stack = consumer.drain_into_stack();
252        assert_eq!(stack.len(), MAX_NET_EVENTS_PER_BUFFER);
253
254        let remaining = 100 - MAX_NET_EVENTS_PER_BUFFER;
255        assert_eq!(consumer.len(), remaining);
256    }
257}