1use crate::buffer::Buffer;
9use crate::{error::Error, properties::Properties};
10use bitflags::bitflags;
11use spa::utils::result::SpaResult;
12use std::{
13 ffi::{self, CStr, CString},
14 fmt::Debug,
15 mem, os,
16 pin::Pin,
17 ptr,
18};
19
20mod box_;
21pub use box_::*;
22mod rc;
23pub use rc::*;
24
25#[derive(Debug, PartialEq)]
26pub enum StreamState {
27 Error(String),
28 Unconnected,
29 Connecting,
30 Paused,
31 Streaming,
32}
33
34impl StreamState {
35 pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
36 match state {
37 pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
38 pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
39 pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
40 pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
41 _ => {
42 let error = if error.is_null() {
43 "".to_string()
44 } else {
45 unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
46 };
47
48 StreamState::Error(error)
49 }
50 }
51 }
52}
53
54#[repr(transparent)]
64pub struct Time(pw_sys::pw_time);
65
66impl Clone for Time {
67 fn clone(&self) -> Self {
68 Self(pw_sys::pw_time { ..self.0 })
69 }
70}
71
72impl Time {
73 pub fn as_raw(&self) -> &pw_sys::pw_time {
74 &self.0
75 }
76
77 pub fn now(&self) -> i64 {
79 self.0.now
80 }
81
82 pub fn rate(&self) -> spa::utils::Fraction {
84 self.0.rate
85 }
86
87 pub fn ticks(&self) -> u64 {
89 self.0.ticks
90 }
91
92 pub fn delay(&self) -> i64 {
97 self.0.delay
98 }
99
100 pub fn queued(&self) -> u64 {
102 self.0.queued
103 }
104
105 #[cfg(feature = "v0_3_50")]
107 pub fn buffered(&self) -> u64 {
108 self.0.buffered
109 }
110
111 #[cfg(feature = "v0_3_50")]
113 pub fn queued_buffers(&self) -> u32 {
114 self.0.queued_buffers
115 }
116
117 #[cfg(feature = "v0_3_50")]
119 pub fn avail_buffers(&self) -> u32 {
120 self.0.avail_buffers
121 }
122}
123
124impl Debug for Time {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 let mut s = f.debug_struct("Time");
127 s.field("now", &self.now())
128 .field("rate", &self.rate())
129 .field("ticks", &self.ticks())
130 .field("delay", &self.delay())
131 .field("queued", &self.queued());
132
133 #[cfg(feature = "v0_3_50")]
134 s.field("buffered", &self.buffered())
135 .field("queued_buffers", &self.queued_buffers())
136 .field("avail_buffers", &self.avail_buffers());
137
138 s.finish()
139 }
140}
141
142#[repr(transparent)]
151pub struct Stream(pw_sys::pw_stream);
152
153impl Stream {
154 pub fn as_raw(&self) -> &pw_sys::pw_stream {
155 &self.0
156 }
157
158 pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_stream {
159 ptr::addr_of!(self.0).cast_mut()
160 }
161
162 #[must_use = "Use the builder to register event callbacks"]
164 pub fn add_local_listener_with_user_data<D>(
165 &self,
166 user_data: D,
167 ) -> ListenerLocalBuilder<'_, D> {
168 let mut callbacks = ListenerLocalCallbacks::with_user_data(user_data);
169 callbacks.stream =
170 Some(ptr::NonNull::new(self.as_raw_ptr()).expect("Pointer should be nonnull"));
171 ListenerLocalBuilder {
172 stream: self,
173 callbacks,
174 }
175 }
176
177 #[must_use = "Use the builder to register event callbacks"]
179 pub fn add_local_listener<D: Default>(&self) -> ListenerLocalBuilder<'_, D> {
180 self.add_local_listener_with_user_data(Default::default())
181 }
182
183 pub fn connect(
189 &self,
190 direction: spa::utils::Direction,
191 id: Option<u32>,
192 flags: StreamFlags,
193 params: &mut [&spa::pod::Pod],
194 ) -> Result<(), Error> {
195 let r = unsafe {
196 pw_sys::pw_stream_connect(
197 self.as_raw_ptr(),
198 direction.as_raw(),
199 id.unwrap_or(crate::constants::ID_ANY),
200 flags.bits(),
201 params.as_mut_ptr().cast(),
204 params.len() as u32,
205 )
206 };
207
208 SpaResult::from_c(r).into_sync_result()?;
209 Ok(())
210 }
211
212 pub fn update_params(&self, params: &mut [&spa::pod::Pod]) -> Result<(), Error> {
218 let r = unsafe {
219 pw_sys::pw_stream_update_params(
220 self.as_raw_ptr(),
221 params.as_mut_ptr().cast(),
222 params.len() as u32,
223 )
224 };
225
226 SpaResult::from_c(r).into_sync_result()?;
227 Ok(())
228 }
229
230 pub fn set_active(&self, active: bool) -> Result<(), Error> {
232 let r = unsafe { pw_sys::pw_stream_set_active(self.as_raw_ptr(), active) };
233
234 SpaResult::from_c(r).into_sync_result()?;
235 Ok(())
236 }
237
238 pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
249 pw_sys::pw_stream_dequeue_buffer(self.as_raw_ptr())
250 }
251
252 pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
253 unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
254 }
255
256 pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
267 pw_sys::pw_stream_queue_buffer(self.as_raw_ptr(), buffer);
268 }
269
270 pub fn disconnect(&self) -> Result<(), Error> {
272 let r = unsafe { pw_sys::pw_stream_disconnect(self.as_raw_ptr()) };
273
274 SpaResult::from_c(r).into_sync_result()?;
275 Ok(())
276 }
277
278 pub fn set_error(&mut self, res: i32, error: &str) {
284 let error = CString::new(error).expect("failed to convert error to CString");
285 let error_cstr = error.as_c_str();
286 Stream::set_error_cstr(self, res, error_cstr)
287 }
288
289 pub fn set_error_cstr(&mut self, res: i32, error: &CStr) {
295 unsafe {
296 pw_sys::pw_stream_set_error(self.as_raw_ptr(), res, error.as_ptr());
297 }
298 }
299
300 pub fn flush(&self, drain: bool) -> Result<(), Error> {
303 let r = unsafe { pw_sys::pw_stream_flush(self.as_raw_ptr(), drain) };
304
305 SpaResult::from_c(r).into_sync_result()?;
306 Ok(())
307 }
308
309 pub fn set_control(&self, id: u32, values: &[f32]) -> Result<(), Error> {
310 let r = unsafe {
311 pw_sys::pw_stream_set_control(
312 self.as_raw_ptr(),
313 id,
314 values.len() as u32,
315 values.as_ptr() as *mut f32,
316 )
317 };
318 SpaResult::from_c(r).into_sync_result()?;
319 Ok(())
320 }
321
322 pub fn name(&self) -> String {
326 let name = unsafe {
327 let name = pw_sys::pw_stream_get_name(self.as_raw_ptr());
328 CStr::from_ptr(name)
329 };
330
331 name.to_string_lossy().to_string()
332 }
333
334 pub fn state(&self) -> StreamState {
336 let mut error: *const std::os::raw::c_char = ptr::null();
337 let state = unsafe {
338 pw_sys::pw_stream_get_state(self.as_raw_ptr(), (&mut error) as *mut *const _)
339 };
340 StreamState::from_raw(state, error)
341 }
342
343 pub fn properties(&self) -> &Properties {
345 unsafe {
346 let props = pw_sys::pw_stream_get_properties(self.as_raw_ptr());
347 let props = ptr::NonNull::new(props.cast_mut()).expect("stream properties is NULL");
348 props.cast().as_ref()
349 }
350 }
351
352 pub fn node_id(&self) -> u32 {
354 unsafe { pw_sys::pw_stream_get_node_id(self.as_raw_ptr()) }
355 }
356
357 #[cfg(feature = "v0_3_34")]
358 pub fn is_driving(&self) -> bool {
359 unsafe { pw_sys::pw_stream_is_driving(self.as_raw_ptr()) }
360 }
361
362 #[cfg(feature = "v0_3_34")]
363 pub fn trigger_process(&self) -> Result<(), Error> {
364 let r = unsafe { pw_sys::pw_stream_trigger_process(self.as_raw_ptr()) };
365
366 SpaResult::from_c(r).into_result()?;
367 Ok(())
368 }
369
370 pub fn time(&self) -> Result<Time, Error> {
383 unsafe {
384 let mut time = mem::MaybeUninit::<pw_sys::pw_time>::zeroed();
385
386 #[cfg(feature = "v0_3_50")]
387 let r = pw_sys::pw_stream_get_time_n(
388 self.as_raw_ptr(),
389 time.as_mut_ptr(),
390 mem::size_of::<pw_sys::pw_time>(),
391 );
392
393 #[cfg(not(feature = "v0_3_50"))]
394 let r = pw_sys::pw_stream_get_time(self.as_raw_ptr(), time.as_mut_ptr());
395
396 SpaResult::from_c(r).into_result()?;
397 Ok(Time(time.assume_init()))
398 }
399 }
400
401 }
404
405type ParamChangedCB<D> = dyn FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>);
406type ProcessCB<D> = dyn FnMut(&Stream, &mut D);
407
408#[allow(clippy::type_complexity)]
409pub struct ListenerLocalCallbacks<D> {
410 pub state_changed: Option<Box<dyn FnMut(&Stream, &mut D, StreamState, StreamState)>>,
411 pub control_info:
412 Option<Box<dyn FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control)>>,
413 pub io_changed: Option<Box<dyn FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32)>>,
414 pub param_changed: Option<Box<ParamChangedCB<D>>>,
415 pub add_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
416 pub remove_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
417 pub process: Option<Box<ProcessCB<D>>>,
418 pub drained: Option<Box<dyn FnMut(&Stream, &mut D)>>,
419 #[cfg(feature = "v0_3_39")]
420 pub command: Option<Box<dyn FnMut(&Stream, &mut D, *const spa_sys::spa_command)>>,
421 #[cfg(feature = "v0_3_40")]
422 pub trigger_done: Option<Box<dyn FnMut(&Stream, &mut D)>>,
423 pub user_data: D,
424 stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
425}
426
427unsafe fn unwrap_stream_ptr<'a>(stream: Option<ptr::NonNull<pw_sys::pw_stream>>) -> &'a Stream {
428 stream
429 .map(|ptr| ptr.cast::<Stream>().as_ref())
430 .expect("stream cannot be null")
431}
432
433impl<D> ListenerLocalCallbacks<D> {
434 fn with_user_data(user_data: D) -> Self {
435 ListenerLocalCallbacks {
436 process: Default::default(),
437 stream: Default::default(),
438 drained: Default::default(),
439 add_buffer: Default::default(),
440 control_info: Default::default(),
441 io_changed: Default::default(),
442 param_changed: Default::default(),
443 remove_buffer: Default::default(),
444 state_changed: Default::default(),
445 #[cfg(feature = "v0_3_39")]
446 command: Default::default(),
447 #[cfg(feature = "v0_3_40")]
448 trigger_done: Default::default(),
449 user_data,
450 }
451 }
452
453 pub(crate) fn into_raw(
454 self,
455 ) -> (
456 Pin<Box<pw_sys::pw_stream_events>>,
457 Box<ListenerLocalCallbacks<D>>,
458 ) {
459 let callbacks = Box::new(self);
460
461 unsafe extern "C" fn on_state_changed<D>(
462 data: *mut os::raw::c_void,
463 old: pw_sys::pw_stream_state,
464 new: pw_sys::pw_stream_state,
465 error: *const os::raw::c_char,
466 ) {
467 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
468 if let Some(cb) = &mut state.state_changed {
469 let stream = unwrap_stream_ptr(state.stream);
470 let old = StreamState::from_raw(old, error);
471 let new = StreamState::from_raw(new, error);
472 cb(stream, &mut state.user_data, old, new)
473 };
474 }
475 }
476
477 unsafe extern "C" fn on_control_info<D>(
478 data: *mut os::raw::c_void,
479 id: u32,
480 control: *const pw_sys::pw_stream_control,
481 ) {
482 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
483 if let Some(cb) = &mut state.control_info {
484 let stream = unwrap_stream_ptr(state.stream);
485 cb(stream, &mut state.user_data, id, control);
486 }
487 }
488 }
489
490 unsafe extern "C" fn on_io_changed<D>(
491 data: *mut os::raw::c_void,
492 id: u32,
493 area: *mut os::raw::c_void,
494 size: u32,
495 ) {
496 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
497 if let Some(cb) = &mut state.io_changed {
498 let stream = unwrap_stream_ptr(state.stream);
499 cb(stream, &mut state.user_data, id, area, size);
500 }
501 }
502 }
503
504 unsafe extern "C" fn on_param_changed<D>(
505 data: *mut os::raw::c_void,
506 id: u32,
507 param: *const spa_sys::spa_pod,
508 ) {
509 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
510 if let Some(cb) = &mut state.param_changed {
511 let stream = unwrap_stream_ptr(state.stream);
512 let param = if !param.is_null() {
513 Some(spa::pod::Pod::from_raw(param))
514 } else {
515 None
516 };
517
518 cb(stream, &mut state.user_data, id, param);
519 }
520 }
521 }
522
523 unsafe extern "C" fn on_add_buffer<D>(
524 data: *mut ::std::os::raw::c_void,
525 buffer: *mut pw_sys::pw_buffer,
526 ) {
527 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
528 if let Some(cb) = &mut state.add_buffer {
529 let stream = unwrap_stream_ptr(state.stream);
530 cb(stream, &mut state.user_data, buffer);
531 }
532 }
533 }
534
535 unsafe extern "C" fn on_remove_buffer<D>(
536 data: *mut ::std::os::raw::c_void,
537 buffer: *mut pw_sys::pw_buffer,
538 ) {
539 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
540 if let Some(cb) = &mut state.remove_buffer {
541 let stream = unwrap_stream_ptr(state.stream);
542 cb(stream, &mut state.user_data, buffer);
543 }
544 }
545 }
546
547 unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
548 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
549 if let Some(cb) = &mut state.process {
550 let stream = unwrap_stream_ptr(state.stream);
551 cb(stream, &mut state.user_data);
552 }
553 }
554 }
555
556 unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
557 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
558 if let Some(cb) = &mut state.drained {
559 let stream = unwrap_stream_ptr(state.stream);
560 cb(stream, &mut state.user_data);
561 }
562 }
563 }
564
565 #[cfg(feature = "v0_3_39")]
566 unsafe extern "C" fn on_command<D>(
567 data: *mut ::std::os::raw::c_void,
568 command: *const spa_sys::spa_command,
569 ) {
570 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
571 if let Some(cb) = &mut state.command {
572 let stream = unwrap_stream_ptr(state.stream);
573 cb(stream, &mut state.user_data, command);
574 }
575 }
576 }
577
578 #[cfg(feature = "v0_3_40")]
579 unsafe extern "C" fn on_trigger_done<D>(data: *mut ::std::os::raw::c_void) {
580 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
581 if let Some(cb) = &mut state.trigger_done {
582 let stream = unwrap_stream_ptr(state.stream);
583 cb(stream, &mut state.user_data);
584 }
585 }
586 }
587
588 let events = unsafe {
589 let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
590 events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
591
592 if callbacks.state_changed.is_some() {
593 events.state_changed = Some(on_state_changed::<D>);
594 }
595 if callbacks.control_info.is_some() {
596 events.control_info = Some(on_control_info::<D>);
597 }
598 if callbacks.io_changed.is_some() {
599 events.io_changed = Some(on_io_changed::<D>);
600 }
601 if callbacks.param_changed.is_some() {
602 events.param_changed = Some(on_param_changed::<D>);
603 }
604 if callbacks.add_buffer.is_some() {
605 events.add_buffer = Some(on_add_buffer::<D>);
606 }
607 if callbacks.remove_buffer.is_some() {
608 events.remove_buffer = Some(on_remove_buffer::<D>);
609 }
610 if callbacks.process.is_some() {
611 events.process = Some(on_process::<D>);
612 }
613 if callbacks.drained.is_some() {
614 events.drained = Some(on_drained::<D>);
615 }
616 #[cfg(feature = "v0_3_39")]
617 if callbacks.command.is_some() {
618 events.command = Some(on_command::<D>);
619 }
620 #[cfg(feature = "v0_3_40")]
621 if callbacks.trigger_done.is_some() {
622 events.trigger_done = Some(on_trigger_done::<D>);
623 }
624
625 events
626 };
627
628 (events, callbacks)
629 }
630}
631
632pub struct ListenerLocalBuilder<'a, D> {
661 stream: &'a Stream,
662 callbacks: ListenerLocalCallbacks<D>,
663}
664
665impl<'a, D> ListenerLocalBuilder<'a, D> {
666 #[must_use = "Call `.register()` to start receiving events"]
686 pub fn state_changed<F>(mut self, callback: F) -> Self
687 where
688 F: FnMut(&Stream, &mut D, StreamState, StreamState) + 'static,
689 {
690 self.callbacks.state_changed = Some(Box::new(callback));
691 self
692 }
693
694 #[must_use = "Call `.register()` to start receiving events"]
714 pub fn control_info<F>(mut self, callback: F) -> Self
715 where
716 F: FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control) + 'static,
717 {
718 self.callbacks.control_info = Some(Box::new(callback));
719 self
720 }
721
722 #[must_use = "Call `.register()` to start receiving events"]
743 pub fn io_changed<F>(mut self, callback: F) -> Self
744 where
745 F: FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32) + 'static,
746 {
747 self.callbacks.io_changed = Some(Box::new(callback));
748 self
749 }
750
751 #[must_use = "Call `.register()` to start receiving events"]
773 pub fn param_changed<F>(mut self, callback: F) -> Self
774 where
775 F: FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>) + 'static,
776 {
777 self.callbacks.param_changed = Some(Box::new(callback));
778 self
779 }
780
781 #[must_use = "Call `.register()` to start receiving events"]
800 pub fn add_buffer<F>(mut self, callback: F) -> Self
801 where
802 F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
803 {
804 self.callbacks.add_buffer = Some(Box::new(callback));
805 self
806 }
807
808 #[must_use = "Call `.register()` to start receiving events"]
827 pub fn remove_buffer<F>(mut self, callback: F) -> Self
828 where
829 F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
830 {
831 self.callbacks.remove_buffer = Some(Box::new(callback));
832 self
833 }
834
835 #[must_use = "Call `.register()` to start receiving events"]
860 pub fn process<F>(mut self, callback: F) -> Self
861 where
862 F: FnMut(&Stream, &mut D) + 'static,
863 {
864 self.callbacks.process = Some(Box::new(callback));
865 self
866 }
867
868 #[must_use = "Call `.register()` to start receiving events"]
886 pub fn drained<F>(mut self, callback: F) -> Self
887 where
888 F: FnMut(&Stream, &mut D) + 'static,
889 {
890 self.callbacks.drained = Some(Box::new(callback));
891 self
892 }
893
894 pub fn register(self) -> Result<StreamListener<D>, Error> {
896 let (events, data) = self.callbacks.into_raw();
897 let (listener, data) = unsafe {
898 let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
899 let raw_listener = Box::into_raw(listener);
900 let raw_data = Box::into_raw(data);
901 pw_sys::pw_stream_add_listener(
902 self.stream.as_raw_ptr(),
903 raw_listener,
904 events.as_ref().get_ref(),
905 raw_data as *mut _,
906 );
907 (Box::from_raw(raw_listener), Box::from_raw(raw_data))
908 };
909 Ok(StreamListener {
910 listener,
911 _events: events,
912 _data: data,
913 })
914 }
915}
916
917#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
922pub struct StreamListener<D> {
923 listener: Box<spa_sys::spa_hook>,
924 _events: Pin<Box<pw_sys::pw_stream_events>>,
926 _data: Box<ListenerLocalCallbacks<D>>,
927}
928
929impl<D> StreamListener<D> {
930 pub fn unregister(self) {
934 }
936}
937
938impl<D> std::ops::Drop for StreamListener<D> {
939 fn drop(&mut self) {
940 spa::utils::hook::remove(*self.listener);
941 }
942}
943
944bitflags! {
945 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
947 pub struct StreamFlags: pw_sys::pw_stream_flags {
948 const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
949 const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
950 const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
951 const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
952 const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
953 const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
954 const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
955 const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
956 const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
957 #[cfg(feature = "v0_3_41")]
958 const TRIGGER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_TRIGGER;
959 }
960}