1use bbx_core::{
7 StackVec,
8 spsc::{Consumer, Producer, SpscRingBuffer},
9};
10
11use crate::message::NetMessage;
12
13pub const MAX_NET_EVENTS_PER_BUFFER: usize = 64;
15
16pub struct NetBufferProducer {
18 producer: Producer<NetMessage>,
19}
20
21impl NetBufferProducer {
22 #[inline]
27 pub fn try_send(&mut self, message: NetMessage) -> bool {
28 self.producer.try_push(message).is_ok()
29 }
30
31 #[inline]
33 pub fn is_full(&self) -> bool {
34 self.producer.is_full()
35 }
36
37 #[inline]
39 pub fn len(&self) -> usize {
40 self.producer.len()
41 }
42
43 #[inline]
45 pub fn is_empty(&self) -> bool {
46 self.producer.is_empty()
47 }
48}
49
50pub struct NetBufferConsumer {
54 consumer: Consumer<NetMessage>,
55}
56
57impl NetBufferConsumer {
58 #[inline]
62 pub fn try_pop(&mut self) -> Option<NetMessage> {
63 self.consumer.try_pop()
64 }
65
66 #[inline]
74 pub fn drain_into_stack(&mut self) -> StackVec<NetMessage, MAX_NET_EVENTS_PER_BUFFER> {
75 let mut buffer: StackVec<NetMessage, MAX_NET_EVENTS_PER_BUFFER> = StackVec::new();
76 while !buffer.is_full() {
77 match self.consumer.try_pop() {
78 Some(msg) => {
79 let _ = buffer.push(msg);
80 }
81 None => break,
82 }
83 }
84 buffer
85 }
86
87 #[inline]
97 pub fn drain_into(&mut self, buffer: &mut Vec<NetMessage>) -> usize {
98 let mut count = 0;
99 while let Some(msg) = self.consumer.try_pop() {
100 buffer.push(msg);
101 count += 1;
102 }
103 count
104 }
105
106 #[inline]
108 pub fn is_empty(&self) -> bool {
109 self.consumer.is_empty()
110 }
111
112 #[inline]
114 pub fn len(&self) -> usize {
115 self.consumer.len()
116 }
117}
118
119pub fn net_buffer(capacity: usize) -> (NetBufferProducer, NetBufferConsumer) {
143 let (producer, consumer) = SpscRingBuffer::new(capacity);
144 (NetBufferProducer { producer }, NetBufferConsumer { consumer })
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::address::NodeId;
151
152 #[test]
153 fn test_send_receive() {
154 let (mut producer, mut consumer) = net_buffer(8);
155
156 let node_id = NodeId::from_parts(1, 2);
157 let msg = NetMessage::param_change("gain", 0.75, node_id);
158 assert!(producer.try_send(msg));
159
160 let received = consumer.try_pop().expect("should have message");
161 assert!((received.payload.value().unwrap() - 0.75).abs() < f32::EPSILON);
162 assert!(consumer.try_pop().is_none());
163 }
164
165 #[test]
166 fn test_buffer_overflow() {
167 let (mut producer, _consumer) = net_buffer(2);
168
169 let node_id = NodeId::from_parts(1, 2);
170 let msg = NetMessage::param_change("test", 0.5, node_id);
171
172 assert!(producer.try_send(msg));
173 assert!(producer.try_send(msg));
174 assert!(!producer.try_send(msg));
175 assert!(producer.is_full());
176 }
177
178 #[test]
179 fn test_drain_into_stack() {
180 let (mut producer, mut consumer) = net_buffer(8);
181
182 let node_id = NodeId::from_parts(1, 2);
183 producer.try_send(NetMessage::param_change("p1", 0.1, node_id));
184 producer.try_send(NetMessage::param_change("p2", 0.2, node_id));
185 producer.try_send(NetMessage::param_change("p3", 0.3, node_id));
186
187 let stack = consumer.drain_into_stack();
188
189 assert_eq!(stack.len(), 3);
190 assert!((stack[0].payload.value().unwrap() - 0.1).abs() < f32::EPSILON);
191 assert!((stack[1].payload.value().unwrap() - 0.2).abs() < f32::EPSILON);
192 assert!((stack[2].payload.value().unwrap() - 0.3).abs() < f32::EPSILON);
193 assert!(consumer.is_empty());
194 }
195
196 #[test]
197 fn test_drain_into_vec() {
198 let (mut producer, mut consumer) = net_buffer(8);
199
200 let node_id = NodeId::from_parts(3, 4);
201 producer.try_send(NetMessage::trigger("a", node_id));
202 producer.try_send(NetMessage::trigger("b", node_id));
203
204 let mut buffer = Vec::new();
205 let count = consumer.drain_into(&mut buffer);
206
207 assert_eq!(count, 2);
208 assert_eq!(buffer.len(), 2);
209 assert!(consumer.is_empty());
210 }
211
212 #[test]
213 fn test_is_empty() {
214 let (mut producer, mut consumer) = net_buffer(4);
215
216 assert!(consumer.is_empty());
217
218 let node_id = NodeId::from_parts(5, 6);
219 producer.try_send(NetMessage::param_change("x", 0.0, node_id));
220 assert!(!consumer.is_empty());
221
222 consumer.try_pop();
223 assert!(consumer.is_empty());
224 }
225
226 #[test]
227 fn test_len() {
228 let (mut producer, consumer) = net_buffer(8);
229
230 assert_eq!(producer.len(), 0);
231 assert_eq!(consumer.len(), 0);
232
233 let node_id = NodeId::from_parts(7, 8);
234 producer.try_send(NetMessage::param_change("a", 0.0, node_id));
235 producer.try_send(NetMessage::param_change("b", 0.0, node_id));
236
237 assert_eq!(producer.len(), 2);
238 assert_eq!(consumer.len(), 2);
239 }
240
241 #[test]
242 fn test_drain_stack_overflow_preserves_remaining() {
243 let (mut producer, mut consumer) = net_buffer(128);
244
245 let node_id = NodeId::from_parts(9, 10);
246
247 for i in 0..100 {
248 producer.try_send(NetMessage::param_change("x", i as f32, node_id));
249 }
250
251 let stack = consumer.drain_into_stack();
252 assert_eq!(stack.len(), MAX_NET_EVENTS_PER_BUFFER);
253
254 let remaining = 100 - MAX_NET_EVENTS_PER_BUFFER;
255 assert_eq!(consumer.len(), remaining);
256 }
257}