1use crate::buffer::Buffer;
7use crate::{error::Error, properties::Properties};
8use bitflags::bitflags;
9use spa::utils::result::SpaResult;
10use std::{
11 ffi::{self, CStr, CString},
12 fmt::Debug,
13 mem, os,
14 pin::Pin,
15 ptr,
16};
17
18mod box_;
19pub use box_::*;
20mod rc;
21pub use rc::*;
22
23#[derive(Debug, PartialEq)]
24pub enum StreamState {
25 Error(String),
26 Unconnected,
27 Connecting,
28 Paused,
29 Streaming,
30}
31
32impl StreamState {
33 pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
34 match state {
35 pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
36 pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
37 pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
38 pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
39 _ => {
40 let error = if error.is_null() {
41 "".to_string()
42 } else {
43 unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
44 };
45
46 StreamState::Error(error)
47 }
48 }
49 }
50}
51
52#[repr(transparent)]
57pub struct Stream(pw_sys::pw_stream);
58
59impl Stream {
60 pub fn as_raw(&self) -> &pw_sys::pw_stream {
61 &self.0
62 }
63
64 pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_stream {
65 ptr::addr_of!(self.0).cast_mut()
66 }
67
68 #[must_use = "Fluent builder API"]
70 pub fn add_local_listener_with_user_data<D>(
71 &self,
72 user_data: D,
73 ) -> ListenerLocalBuilder<'_, D> {
74 let mut callbacks = ListenerLocalCallbacks::with_user_data(user_data);
75 callbacks.stream =
76 Some(ptr::NonNull::new(self.as_raw_ptr()).expect("Pointer should be nonnull"));
77 ListenerLocalBuilder {
78 stream: self,
79 callbacks,
80 }
81 }
82
83 #[must_use = "Fluent builder API"]
85 pub fn add_local_listener<D: Default>(&self) -> ListenerLocalBuilder<'_, D> {
86 self.add_local_listener_with_user_data(Default::default())
87 }
88
89 pub fn connect(
95 &self,
96 direction: spa::utils::Direction,
97 id: Option<u32>,
98 flags: StreamFlags,
99 params: &mut [&spa::pod::Pod],
100 ) -> Result<(), Error> {
101 let r = unsafe {
102 pw_sys::pw_stream_connect(
103 self.as_raw_ptr(),
104 direction.as_raw(),
105 id.unwrap_or(crate::constants::ID_ANY),
106 flags.bits(),
107 params.as_mut_ptr().cast(),
110 params.len() as u32,
111 )
112 };
113
114 SpaResult::from_c(r).into_sync_result()?;
115 Ok(())
116 }
117
118 pub fn update_params(&self, params: &mut [&spa::pod::Pod]) -> Result<(), Error> {
124 let r = unsafe {
125 pw_sys::pw_stream_update_params(
126 self.as_raw_ptr(),
127 params.as_mut_ptr().cast(),
128 params.len() as u32,
129 )
130 };
131
132 SpaResult::from_c(r).into_sync_result()?;
133 Ok(())
134 }
135
136 pub fn set_active(&self, active: bool) -> Result<(), Error> {
138 let r = unsafe { pw_sys::pw_stream_set_active(self.as_raw_ptr(), active) };
139
140 SpaResult::from_c(r).into_sync_result()?;
141 Ok(())
142 }
143
144 pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
155 pw_sys::pw_stream_dequeue_buffer(self.as_raw_ptr())
156 }
157
158 pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
159 unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
160 }
161
162 pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
173 pw_sys::pw_stream_queue_buffer(self.as_raw_ptr(), buffer);
174 }
175
176 pub fn disconnect(&self) -> Result<(), Error> {
178 let r = unsafe { pw_sys::pw_stream_disconnect(self.as_raw_ptr()) };
179
180 SpaResult::from_c(r).into_sync_result()?;
181 Ok(())
182 }
183
184 pub fn set_error(&mut self, res: i32, error: &str) {
190 let error = CString::new(error).expect("failed to convert error to CString");
191 let error_cstr = error.as_c_str();
192 Stream::set_error_cstr(self, res, error_cstr)
193 }
194
195 pub fn set_error_cstr(&mut self, res: i32, error: &CStr) {
201 unsafe {
202 pw_sys::pw_stream_set_error(self.as_raw_ptr(), res, error.as_ptr());
203 }
204 }
205
206 pub fn flush(&self, drain: bool) -> Result<(), Error> {
209 let r = unsafe { pw_sys::pw_stream_flush(self.as_raw_ptr(), drain) };
210
211 SpaResult::from_c(r).into_sync_result()?;
212 Ok(())
213 }
214
215 pub fn set_control(&self, id: u32, values: &[f32]) -> Result<(), Error> {
216 let r = unsafe {
217 pw_sys::pw_stream_set_control(
218 self.as_raw_ptr(),
219 id,
220 values.len() as u32,
221 values.as_ptr() as *mut f32,
222 )
223 };
224 SpaResult::from_c(r).into_sync_result()?;
225 Ok(())
226 }
227
228 pub fn name(&self) -> String {
232 let name = unsafe {
233 let name = pw_sys::pw_stream_get_name(self.as_raw_ptr());
234 CStr::from_ptr(name)
235 };
236
237 name.to_string_lossy().to_string()
238 }
239
240 pub fn state(&self) -> StreamState {
242 let mut error: *const std::os::raw::c_char = ptr::null();
243 let state = unsafe {
244 pw_sys::pw_stream_get_state(self.as_raw_ptr(), (&mut error) as *mut *const _)
245 };
246 StreamState::from_raw(state, error)
247 }
248
249 pub fn properties(&self) -> &Properties {
251 unsafe {
252 let props = pw_sys::pw_stream_get_properties(self.as_raw_ptr());
253 let props = ptr::NonNull::new(props.cast_mut()).expect("stream properties is NULL");
254 props.cast().as_ref()
255 }
256 }
257
258 pub fn node_id(&self) -> u32 {
260 unsafe { pw_sys::pw_stream_get_node_id(self.as_raw_ptr()) }
261 }
262
263 #[cfg(feature = "v0_3_34")]
264 pub fn is_driving(&self) -> bool {
265 unsafe { pw_sys::pw_stream_is_driving(self.as_raw_ptr()) }
266 }
267
268 #[cfg(feature = "v0_3_34")]
269 pub fn trigger_process(&self) -> Result<(), Error> {
270 let r = unsafe { pw_sys::pw_stream_trigger_process(self.as_raw_ptr()) };
271
272 SpaResult::from_c(r).into_result()?;
273 Ok(())
274 }
275
276 }
279
280type ParamChangedCB<D> = dyn FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>);
281type ProcessCB<D> = dyn FnMut(&Stream, &mut D);
282
283#[allow(clippy::type_complexity)]
284pub struct ListenerLocalCallbacks<D> {
285 pub state_changed: Option<Box<dyn FnMut(&Stream, &mut D, StreamState, StreamState)>>,
286 pub control_info:
287 Option<Box<dyn FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control)>>,
288 pub io_changed: Option<Box<dyn FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32)>>,
289 pub param_changed: Option<Box<ParamChangedCB<D>>>,
290 pub add_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
291 pub remove_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
292 pub process: Option<Box<ProcessCB<D>>>,
293 pub drained: Option<Box<dyn FnMut(&Stream, &mut D)>>,
294 #[cfg(feature = "v0_3_39")]
295 pub command: Option<Box<dyn FnMut(&Stream, &mut D, *const spa_sys::spa_command)>>,
296 #[cfg(feature = "v0_3_40")]
297 pub trigger_done: Option<Box<dyn FnMut(&Stream, &mut D)>>,
298 pub user_data: D,
299 stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
300}
301
302unsafe fn unwrap_stream_ptr<'a>(stream: Option<ptr::NonNull<pw_sys::pw_stream>>) -> &'a Stream {
303 stream
304 .map(|ptr| ptr.cast::<Stream>().as_ref())
305 .expect("stream cannot be null")
306}
307
308impl<D> ListenerLocalCallbacks<D> {
309 fn with_user_data(user_data: D) -> Self {
310 ListenerLocalCallbacks {
311 process: Default::default(),
312 stream: Default::default(),
313 drained: Default::default(),
314 add_buffer: Default::default(),
315 control_info: Default::default(),
316 io_changed: Default::default(),
317 param_changed: Default::default(),
318 remove_buffer: Default::default(),
319 state_changed: Default::default(),
320 #[cfg(feature = "v0_3_39")]
321 command: Default::default(),
322 #[cfg(feature = "v0_3_40")]
323 trigger_done: Default::default(),
324 user_data,
325 }
326 }
327
328 pub(crate) fn into_raw(
329 self,
330 ) -> (
331 Pin<Box<pw_sys::pw_stream_events>>,
332 Box<ListenerLocalCallbacks<D>>,
333 ) {
334 let callbacks = Box::new(self);
335
336 unsafe extern "C" fn on_state_changed<D>(
337 data: *mut os::raw::c_void,
338 old: pw_sys::pw_stream_state,
339 new: pw_sys::pw_stream_state,
340 error: *const os::raw::c_char,
341 ) {
342 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
343 if let Some(cb) = &mut state.state_changed {
344 let stream = unwrap_stream_ptr(state.stream);
345 let old = StreamState::from_raw(old, error);
346 let new = StreamState::from_raw(new, error);
347 cb(stream, &mut state.user_data, old, new)
348 };
349 }
350 }
351
352 unsafe extern "C" fn on_control_info<D>(
353 data: *mut os::raw::c_void,
354 id: u32,
355 control: *const pw_sys::pw_stream_control,
356 ) {
357 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
358 if let Some(cb) = &mut state.control_info {
359 let stream = unwrap_stream_ptr(state.stream);
360 cb(stream, &mut state.user_data, id, control);
361 }
362 }
363 }
364
365 unsafe extern "C" fn on_io_changed<D>(
366 data: *mut os::raw::c_void,
367 id: u32,
368 area: *mut os::raw::c_void,
369 size: u32,
370 ) {
371 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
372 if let Some(cb) = &mut state.io_changed {
373 let stream = unwrap_stream_ptr(state.stream);
374 cb(stream, &mut state.user_data, id, area, size);
375 }
376 }
377 }
378
379 unsafe extern "C" fn on_param_changed<D>(
380 data: *mut os::raw::c_void,
381 id: u32,
382 param: *const spa_sys::spa_pod,
383 ) {
384 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
385 if let Some(cb) = &mut state.param_changed {
386 let stream = unwrap_stream_ptr(state.stream);
387 let param = if !param.is_null() {
388 Some(spa::pod::Pod::from_raw(param))
389 } else {
390 None
391 };
392
393 cb(stream, &mut state.user_data, id, param);
394 }
395 }
396 }
397
398 unsafe extern "C" fn on_add_buffer<D>(
399 data: *mut ::std::os::raw::c_void,
400 buffer: *mut pw_sys::pw_buffer,
401 ) {
402 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
403 if let Some(cb) = &mut state.add_buffer {
404 let stream = unwrap_stream_ptr(state.stream);
405 cb(stream, &mut state.user_data, buffer);
406 }
407 }
408 }
409
410 unsafe extern "C" fn on_remove_buffer<D>(
411 data: *mut ::std::os::raw::c_void,
412 buffer: *mut pw_sys::pw_buffer,
413 ) {
414 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
415 if let Some(cb) = &mut state.remove_buffer {
416 let stream = unwrap_stream_ptr(state.stream);
417 cb(stream, &mut state.user_data, buffer);
418 }
419 }
420 }
421
422 unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
423 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
424 if let Some(cb) = &mut state.process {
425 let stream = unwrap_stream_ptr(state.stream);
426 cb(stream, &mut state.user_data);
427 }
428 }
429 }
430
431 unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
432 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
433 if let Some(cb) = &mut state.drained {
434 let stream = unwrap_stream_ptr(state.stream);
435 cb(stream, &mut state.user_data);
436 }
437 }
438 }
439
440 #[cfg(feature = "v0_3_39")]
441 unsafe extern "C" fn on_command<D>(
442 data: *mut ::std::os::raw::c_void,
443 command: *const spa_sys::spa_command,
444 ) {
445 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
446 if let Some(cb) = &mut state.command {
447 let stream = unwrap_stream_ptr(state.stream);
448 cb(stream, &mut state.user_data, command);
449 }
450 }
451 }
452
453 #[cfg(feature = "v0_3_40")]
454 unsafe extern "C" fn on_trigger_done<D>(data: *mut ::std::os::raw::c_void) {
455 if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
456 if let Some(cb) = &mut state.trigger_done {
457 let stream = unwrap_stream_ptr(state.stream);
458 cb(stream, &mut state.user_data);
459 }
460 }
461 }
462
463 let events = unsafe {
464 let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
465 events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
466
467 if callbacks.state_changed.is_some() {
468 events.state_changed = Some(on_state_changed::<D>);
469 }
470 if callbacks.control_info.is_some() {
471 events.control_info = Some(on_control_info::<D>);
472 }
473 if callbacks.io_changed.is_some() {
474 events.io_changed = Some(on_io_changed::<D>);
475 }
476 if callbacks.param_changed.is_some() {
477 events.param_changed = Some(on_param_changed::<D>);
478 }
479 if callbacks.add_buffer.is_some() {
480 events.add_buffer = Some(on_add_buffer::<D>);
481 }
482 if callbacks.remove_buffer.is_some() {
483 events.remove_buffer = Some(on_remove_buffer::<D>);
484 }
485 if callbacks.process.is_some() {
486 events.process = Some(on_process::<D>);
487 }
488 if callbacks.drained.is_some() {
489 events.drained = Some(on_drained::<D>);
490 }
491 #[cfg(feature = "v0_3_39")]
492 if callbacks.command.is_some() {
493 events.command = Some(on_command::<D>);
494 }
495 #[cfg(feature = "v0_3_40")]
496 if callbacks.trigger_done.is_some() {
497 events.trigger_done = Some(on_trigger_done::<D>);
498 }
499
500 events
501 };
502
503 (events, callbacks)
504 }
505}
506
507#[must_use]
508pub struct ListenerLocalBuilder<'a, D> {
509 stream: &'a Stream,
510 callbacks: ListenerLocalCallbacks<D>,
511}
512
513impl<'a, D> ListenerLocalBuilder<'a, D> {
514 pub fn state_changed<F>(mut self, callback: F) -> Self
516 where
517 F: FnMut(&Stream, &mut D, StreamState, StreamState) + 'static,
518 {
519 self.callbacks.state_changed = Some(Box::new(callback));
520 self
521 }
522
523 pub fn control_info<F>(mut self, callback: F) -> Self
525 where
526 F: FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control) + 'static,
527 {
528 self.callbacks.control_info = Some(Box::new(callback));
529 self
530 }
531
532 pub fn io_changed<F>(mut self, callback: F) -> Self
534 where
535 F: FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32) + 'static,
536 {
537 self.callbacks.io_changed = Some(Box::new(callback));
538 self
539 }
540
541 pub fn param_changed<F>(mut self, callback: F) -> Self
543 where
544 F: FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>) + 'static,
545 {
546 self.callbacks.param_changed = Some(Box::new(callback));
547 self
548 }
549
550 pub fn add_buffer<F>(mut self, callback: F) -> Self
552 where
553 F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
554 {
555 self.callbacks.add_buffer = Some(Box::new(callback));
556 self
557 }
558
559 pub fn remove_buffer<F>(mut self, callback: F) -> Self
561 where
562 F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
563 {
564 self.callbacks.remove_buffer = Some(Box::new(callback));
565 self
566 }
567
568 pub fn process<F>(mut self, callback: F) -> Self
570 where
571 F: FnMut(&Stream, &mut D) + 'static,
572 {
573 self.callbacks.process = Some(Box::new(callback));
574 self
575 }
576
577 pub fn drained<F>(mut self, callback: F) -> Self
579 where
580 F: FnMut(&Stream, &mut D) + 'static,
581 {
582 self.callbacks.drained = Some(Box::new(callback));
583 self
584 }
585
586 pub fn register(self) -> Result<StreamListener<D>, Error> {
591 let (events, data) = self.callbacks.into_raw();
592 let (listener, data) = unsafe {
593 let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
594 let raw_listener = Box::into_raw(listener);
595 let raw_data = Box::into_raw(data);
596 pw_sys::pw_stream_add_listener(
597 self.stream.as_raw_ptr(),
598 raw_listener,
599 events.as_ref().get_ref(),
600 raw_data as *mut _,
601 );
602 (Box::from_raw(raw_listener), Box::from_raw(raw_data))
603 };
604 Ok(StreamListener {
605 listener,
606 _events: events,
607 _data: data,
608 })
609 }
610}
611
612pub struct StreamListener<D> {
613 listener: Box<spa_sys::spa_hook>,
614 _events: Pin<Box<pw_sys::pw_stream_events>>,
616 _data: Box<ListenerLocalCallbacks<D>>,
617}
618
619impl<D> StreamListener<D> {
620 pub fn unregister(self) {
624 }
626}
627
628impl<D> std::ops::Drop for StreamListener<D> {
629 fn drop(&mut self) {
630 spa::utils::hook::remove(*self.listener);
631 }
632}
633
634bitflags! {
635 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
637 pub struct StreamFlags: pw_sys::pw_stream_flags {
638 const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
639 const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
640 const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
641 const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
642 const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
643 const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
644 const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
645 const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
646 const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
647 #[cfg(feature = "v0_3_41")]
648 const TRIGGER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_TRIGGER;
649 }
650}