1use crate::buffer::Buffer;
9use crate::{error::Error, properties::Properties};
10use bitflags::bitflags;
11use spa::utils::result::SpaResult;
12use std::{
13 ffi::{self, CStr, CString},
14 fmt::Debug,
15 mem, os,
16 pin::Pin,
17 ptr,
18};
19
20mod box_;
21pub use box_::*;
22mod rc;
23pub use rc::*;
24
25#[derive(Debug, PartialEq)]
26pub enum StreamState {
27 Error(String),
28 Unconnected,
29 Connecting,
30 Paused,
31 Streaming,
32}
33
34impl StreamState {
35 pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
36 match state {
37 pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
38 pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
39 pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
40 pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
41 _ => {
42 let error = if error.is_null() {
43 "".to_string()
44 } else {
45 unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
46 };
47
48 StreamState::Error(error)
49 }
50 }
51 }
52}
53
54#[repr(transparent)]
63pub struct Stream(pw_sys::pw_stream);
64
65impl Stream {
66 pub fn as_raw(&self) -> &pw_sys::pw_stream {
67 &self.0
68 }
69
70 pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_stream {
71 ptr::addr_of!(self.0).cast_mut()
72 }
73
74 #[must_use = "Use the builder to register event callbacks"]
76 pub fn add_local_listener_with_user_data<D>(
77 &self,
78 user_data: D,
79 ) -> ListenerLocalBuilder<'_, D> {
80 let mut callbacks = ListenerLocalCallbacks::with_user_data(user_data);
81 callbacks.stream =
82 Some(ptr::NonNull::new(self.as_raw_ptr()).expect("Pointer should be nonnull"));
83 ListenerLocalBuilder {
84 stream: self,
85 callbacks,
86 }
87 }
88
89 #[must_use = "Use the builder to register event callbacks"]
91 pub fn add_local_listener<D: Default>(&self) -> ListenerLocalBuilder<'_, D> {
92 self.add_local_listener_with_user_data(Default::default())
93 }
94
95 pub fn connect(
101 &self,
102 direction: spa::utils::Direction,
103 id: Option<u32>,
104 flags: StreamFlags,
105 params: &mut [&spa::pod::Pod],
106 ) -> Result<(), Error> {
107 let r = unsafe {
108 pw_sys::pw_stream_connect(
109 self.as_raw_ptr(),
110 direction.as_raw(),
111 id.unwrap_or(crate::constants::ID_ANY),
112 flags.bits(),
113 params.as_mut_ptr().cast(),
116 params.len() as u32,
117 )
118 };
119
120 SpaResult::from_c(r).into_sync_result()?;
121 Ok(())
122 }
123
124 pub fn update_params(&self, params: &mut [&spa::pod::Pod]) -> Result<(), Error> {
130 let r = unsafe {
131 pw_sys::pw_stream_update_params(
132 self.as_raw_ptr(),
133 params.as_mut_ptr().cast(),
134 params.len() as u32,
135 )
136 };
137
138 SpaResult::from_c(r).into_sync_result()?;
139 Ok(())
140 }
141
142 pub fn set_active(&self, active: bool) -> Result<(), Error> {
144 let r = unsafe { pw_sys::pw_stream_set_active(self.as_raw_ptr(), active) };
145
146 SpaResult::from_c(r).into_sync_result()?;
147 Ok(())
148 }
149
150 pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
161 pw_sys::pw_stream_dequeue_buffer(self.as_raw_ptr())
162 }
163
164 pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
165 unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
166 }
167
168 pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
179 pw_sys::pw_stream_queue_buffer(self.as_raw_ptr(), buffer);
180 }
181
182 pub fn disconnect(&self) -> Result<(), Error> {
184 let r = unsafe { pw_sys::pw_stream_disconnect(self.as_raw_ptr()) };
185
186 SpaResult::from_c(r).into_sync_result()?;
187 Ok(())
188 }
189
190 pub fn set_error(&mut self, res: i32, error: &str) {
196 let error = CString::new(error).expect("failed to convert error to CString");
197 let error_cstr = error.as_c_str();
198 Stream::set_error_cstr(self, res, error_cstr)
199 }
200
201 pub fn set_error_cstr(&mut self, res: i32, error: &CStr) {
207 unsafe {
208 pw_sys::pw_stream_set_error(self.as_raw_ptr(), res, error.as_ptr());
209 }
210 }
211
212 pub fn flush(&self, drain: bool) -> Result<(), Error> {
215 let r = unsafe { pw_sys::pw_stream_flush(self.as_raw_ptr(), drain) };
216
217 SpaResult::from_c(r).into_sync_result()?;
218 Ok(())
219 }
220
221 pub fn set_control(&self, id: u32, values: &[f32]) -> Result<(), Error> {
222 let r = unsafe {
223 pw_sys::pw_stream_set_control(
224 self.as_raw_ptr(),
225 id,
226 values.len() as u32,
227 values.as_ptr() as *mut f32,
228 )
229 };
230 SpaResult::from_c(r).into_sync_result()?;
231 Ok(())
232 }
233
234 pub fn name(&self) -> String {
238 let name = unsafe {
239 let name = pw_sys::pw_stream_get_name(self.as_raw_ptr());
240 CStr::from_ptr(name)
241 };
242
243 name.to_string_lossy().to_string()
244 }
245
246 pub fn state(&self) -> StreamState {
248 let mut error: *const std::os::raw::c_char = ptr::null();
249 let state = unsafe {
250 pw_sys::pw_stream_get_state(self.as_raw_ptr(), (&mut error) as *mut *const _)
251 };
252 StreamState::from_raw(state, error)
253 }
254
255 pub fn properties(&self) -> &Properties {
257 unsafe {
258 let props = pw_sys::pw_stream_get_properties(self.as_raw_ptr());
259 let props = ptr::NonNull::new(props.cast_mut()).expect("stream properties is NULL");
260 props.cast().as_ref()
261 }
262 }
263
264 pub fn node_id(&self) -> u32 {
266 unsafe { pw_sys::pw_stream_get_node_id(self.as_raw_ptr()) }
267 }
268
269 #[cfg(feature = "v0_3_34")]
270 pub fn is_driving(&self) -> bool {
271 unsafe { pw_sys::pw_stream_is_driving(self.as_raw_ptr()) }
272 }
273
274 #[cfg(feature = "v0_3_34")]
275 pub fn trigger_process(&self) -> Result<(), Error> {
276 let r = unsafe { pw_sys::pw_stream_trigger_process(self.as_raw_ptr()) };
277
278 SpaResult::from_c(r).into_result()?;
279 Ok(())
280 }
281
282 }
285
286type ParamChangedCB<D> = dyn FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>);
287type ProcessCB<D> = dyn FnMut(&Stream, &mut D);
288
289#[allow(clippy::type_complexity)]
290pub struct ListenerLocalCallbacks<D> {
291 pub state_changed: Option<Box<dyn FnMut(&Stream, &mut D, StreamState, StreamState)>>,
292 pub control_info:
293 Option<Box<dyn FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control)>>,
294 pub io_changed: Option<Box<dyn FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32)>>,
295 pub param_changed: Option<Box<ParamChangedCB<D>>>,
296 pub add_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
297 pub remove_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
298 pub process: Option<Box<ProcessCB<D>>>,
299 pub drained: Option<Box<dyn FnMut(&Stream, &mut D)>>,
300 #[cfg(feature = "v0_3_39")]
301 pub command: Option<Box<dyn FnMut(&Stream, &mut D, *const spa_sys::spa_command)>>,
302 #[cfg(feature = "v0_3_40")]
303 pub trigger_done: Option<Box<dyn FnMut(&Stream, &mut D)>>,
304 pub user_data: D,
305 stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
306}
307
308unsafe fn unwrap_stream_ptr<'a>(stream: Option<ptr::NonNull<pw_sys::pw_stream>>) -> &'a Stream {
309 stream
310 .map(|ptr| ptr.cast::<Stream>().as_ref())
311 .expect("stream cannot be null")
312}
313
314impl<D> ListenerLocalCallbacks<D> {
315 fn with_user_data(user_data: D) -> Self {
316 ListenerLocalCallbacks {
317 process: Default::default(),
318 stream: Default::default(),
319 drained: Default::default(),
320 add_buffer: Default::default(),
321 control_info: Default::default(),
322 io_changed: Default::default(),
323 param_changed: Default::default(),
324 remove_buffer: Default::default(),
325 state_changed: Default::default(),
326 #[cfg(feature = "v0_3_39")]
327 command: Default::default(),
328 #[cfg(feature = "v0_3_40")]
329 trigger_done: Default::default(),
330 user_data,
331 }
332 }
333
334 pub(crate) fn into_raw(
335 self,
336 ) -> (
337 Pin<Box<pw_sys::pw_stream_events>>,
338 Box<ListenerLocalCallbacks<D>>,
339 ) {
340 let callbacks = Box::new(self);
341
342 unsafe extern "C" fn on_state_changed<D>(
343 data: *mut os::raw::c_void,
344 old: pw_sys::pw_stream_state,
345 new: pw_sys::pw_stream_state,
346 error: *const os::raw::c_char,
347 ) {
348 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
349 if let Some(cb) = &mut state.state_changed {
350 let stream = unwrap_stream_ptr(state.stream);
351 let old = StreamState::from_raw(old, error);
352 let new = StreamState::from_raw(new, error);
353 cb(stream, &mut state.user_data, old, new)
354 };
355 }
356 }
357
358 unsafe extern "C" fn on_control_info<D>(
359 data: *mut os::raw::c_void,
360 id: u32,
361 control: *const pw_sys::pw_stream_control,
362 ) {
363 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
364 if let Some(cb) = &mut state.control_info {
365 let stream = unwrap_stream_ptr(state.stream);
366 cb(stream, &mut state.user_data, id, control);
367 }
368 }
369 }
370
371 unsafe extern "C" fn on_io_changed<D>(
372 data: *mut os::raw::c_void,
373 id: u32,
374 area: *mut os::raw::c_void,
375 size: u32,
376 ) {
377 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
378 if let Some(cb) = &mut state.io_changed {
379 let stream = unwrap_stream_ptr(state.stream);
380 cb(stream, &mut state.user_data, id, area, size);
381 }
382 }
383 }
384
385 unsafe extern "C" fn on_param_changed<D>(
386 data: *mut os::raw::c_void,
387 id: u32,
388 param: *const spa_sys::spa_pod,
389 ) {
390 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
391 if let Some(cb) = &mut state.param_changed {
392 let stream = unwrap_stream_ptr(state.stream);
393 let param = if !param.is_null() {
394 Some(spa::pod::Pod::from_raw(param))
395 } else {
396 None
397 };
398
399 cb(stream, &mut state.user_data, id, param);
400 }
401 }
402 }
403
404 unsafe extern "C" fn on_add_buffer<D>(
405 data: *mut ::std::os::raw::c_void,
406 buffer: *mut pw_sys::pw_buffer,
407 ) {
408 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
409 if let Some(cb) = &mut state.add_buffer {
410 let stream = unwrap_stream_ptr(state.stream);
411 cb(stream, &mut state.user_data, buffer);
412 }
413 }
414 }
415
416 unsafe extern "C" fn on_remove_buffer<D>(
417 data: *mut ::std::os::raw::c_void,
418 buffer: *mut pw_sys::pw_buffer,
419 ) {
420 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
421 if let Some(cb) = &mut state.remove_buffer {
422 let stream = unwrap_stream_ptr(state.stream);
423 cb(stream, &mut state.user_data, buffer);
424 }
425 }
426 }
427
428 unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
429 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
430 if let Some(cb) = &mut state.process {
431 let stream = unwrap_stream_ptr(state.stream);
432 cb(stream, &mut state.user_data);
433 }
434 }
435 }
436
437 unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
438 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
439 if let Some(cb) = &mut state.drained {
440 let stream = unwrap_stream_ptr(state.stream);
441 cb(stream, &mut state.user_data);
442 }
443 }
444 }
445
446 #[cfg(feature = "v0_3_39")]
447 unsafe extern "C" fn on_command<D>(
448 data: *mut ::std::os::raw::c_void,
449 command: *const spa_sys::spa_command,
450 ) {
451 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
452 if let Some(cb) = &mut state.command {
453 let stream = unwrap_stream_ptr(state.stream);
454 cb(stream, &mut state.user_data, command);
455 }
456 }
457 }
458
459 #[cfg(feature = "v0_3_40")]
460 unsafe extern "C" fn on_trigger_done<D>(data: *mut ::std::os::raw::c_void) {
461 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
462 if let Some(cb) = &mut state.trigger_done {
463 let stream = unwrap_stream_ptr(state.stream);
464 cb(stream, &mut state.user_data);
465 }
466 }
467 }
468
469 let events = unsafe {
470 let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
471 events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
472
473 if callbacks.state_changed.is_some() {
474 events.state_changed = Some(on_state_changed::<D>);
475 }
476 if callbacks.control_info.is_some() {
477 events.control_info = Some(on_control_info::<D>);
478 }
479 if callbacks.io_changed.is_some() {
480 events.io_changed = Some(on_io_changed::<D>);
481 }
482 if callbacks.param_changed.is_some() {
483 events.param_changed = Some(on_param_changed::<D>);
484 }
485 if callbacks.add_buffer.is_some() {
486 events.add_buffer = Some(on_add_buffer::<D>);
487 }
488 if callbacks.remove_buffer.is_some() {
489 events.remove_buffer = Some(on_remove_buffer::<D>);
490 }
491 if callbacks.process.is_some() {
492 events.process = Some(on_process::<D>);
493 }
494 if callbacks.drained.is_some() {
495 events.drained = Some(on_drained::<D>);
496 }
497 #[cfg(feature = "v0_3_39")]
498 if callbacks.command.is_some() {
499 events.command = Some(on_command::<D>);
500 }
501 #[cfg(feature = "v0_3_40")]
502 if callbacks.trigger_done.is_some() {
503 events.trigger_done = Some(on_trigger_done::<D>);
504 }
505
506 events
507 };
508
509 (events, callbacks)
510 }
511}
512
513pub struct ListenerLocalBuilder<'a, D> {
542 stream: &'a Stream,
543 callbacks: ListenerLocalCallbacks<D>,
544}
545
546impl<'a, D> ListenerLocalBuilder<'a, D> {
547 #[must_use = "Call `.register()` to start receiving events"]
567 pub fn state_changed<F>(mut self, callback: F) -> Self
568 where
569 F: FnMut(&Stream, &mut D, StreamState, StreamState) + 'static,
570 {
571 self.callbacks.state_changed = Some(Box::new(callback));
572 self
573 }
574
575 #[must_use = "Call `.register()` to start receiving events"]
595 pub fn control_info<F>(mut self, callback: F) -> Self
596 where
597 F: FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control) + 'static,
598 {
599 self.callbacks.control_info = Some(Box::new(callback));
600 self
601 }
602
603 #[must_use = "Call `.register()` to start receiving events"]
624 pub fn io_changed<F>(mut self, callback: F) -> Self
625 where
626 F: FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32) + 'static,
627 {
628 self.callbacks.io_changed = Some(Box::new(callback));
629 self
630 }
631
632 #[must_use = "Call `.register()` to start receiving events"]
654 pub fn param_changed<F>(mut self, callback: F) -> Self
655 where
656 F: FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>) + 'static,
657 {
658 self.callbacks.param_changed = Some(Box::new(callback));
659 self
660 }
661
662 #[must_use = "Call `.register()` to start receiving events"]
681 pub fn add_buffer<F>(mut self, callback: F) -> Self
682 where
683 F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
684 {
685 self.callbacks.add_buffer = Some(Box::new(callback));
686 self
687 }
688
689 #[must_use = "Call `.register()` to start receiving events"]
708 pub fn remove_buffer<F>(mut self, callback: F) -> Self
709 where
710 F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
711 {
712 self.callbacks.remove_buffer = Some(Box::new(callback));
713 self
714 }
715
716 #[must_use = "Call `.register()` to start receiving events"]
741 pub fn process<F>(mut self, callback: F) -> Self
742 where
743 F: FnMut(&Stream, &mut D) + 'static,
744 {
745 self.callbacks.process = Some(Box::new(callback));
746 self
747 }
748
749 #[must_use = "Call `.register()` to start receiving events"]
767 pub fn drained<F>(mut self, callback: F) -> Self
768 where
769 F: FnMut(&Stream, &mut D) + 'static,
770 {
771 self.callbacks.drained = Some(Box::new(callback));
772 self
773 }
774
775 pub fn register(self) -> Result<StreamListener<D>, Error> {
777 let (events, data) = self.callbacks.into_raw();
778 let (listener, data) = unsafe {
779 let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
780 let raw_listener = Box::into_raw(listener);
781 let raw_data = Box::into_raw(data);
782 pw_sys::pw_stream_add_listener(
783 self.stream.as_raw_ptr(),
784 raw_listener,
785 events.as_ref().get_ref(),
786 raw_data as *mut _,
787 );
788 (Box::from_raw(raw_listener), Box::from_raw(raw_data))
789 };
790 Ok(StreamListener {
791 listener,
792 _events: events,
793 _data: data,
794 })
795 }
796}
797
798#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
803pub struct StreamListener<D> {
804 listener: Box<spa_sys::spa_hook>,
805 _events: Pin<Box<pw_sys::pw_stream_events>>,
807 _data: Box<ListenerLocalCallbacks<D>>,
808}
809
810impl<D> StreamListener<D> {
811 pub fn unregister(self) {
815 }
817}
818
819impl<D> std::ops::Drop for StreamListener<D> {
820 fn drop(&mut self) {
821 spa::utils::hook::remove(*self.listener);
822 }
823}
824
825bitflags! {
826 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
828 pub struct StreamFlags: pw_sys::pw_stream_flags {
829 const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
830 const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
831 const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
832 const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
833 const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
834 const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
835 const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
836 const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
837 const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
838 #[cfg(feature = "v0_3_41")]
839 const TRIGGER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_TRIGGER;
840 }
841}