pipewire/stream/
mod.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! Streams are higher-level objects providing a convenient way to send and receive data streams to/from PipeWire.
5//!
6//! This module contains wrappers for [`pw_stream`](pw_sys::pw_stream) and related itmes.
7
8use crate::buffer::Buffer;
9use crate::{error::Error, properties::Properties};
10use bitflags::bitflags;
11use spa::utils::result::SpaResult;
12use std::{
13    ffi::{self, CStr, CString},
14    fmt::Debug,
15    mem, os,
16    pin::Pin,
17    ptr,
18};
19
20mod box_;
21pub use box_::*;
22mod rc;
23pub use rc::*;
24
25#[derive(Debug, PartialEq)]
26pub enum StreamState {
27    Error(String),
28    Unconnected,
29    Connecting,
30    Paused,
31    Streaming,
32}
33
34impl StreamState {
35    pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
36        match state {
37            pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
38            pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
39            pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
40            pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
41            _ => {
42                let error = if error.is_null() {
43                    "".to_string()
44                } else {
45                    unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
46                };
47
48                StreamState::Error(error)
49            }
50        }
51    }
52}
53
54/// Transparent wrapper around a [stream](self).
55///
56/// This does not own the underlying object and is usually seen behind a `&` reference.
57///
58/// For owning wrappers that can construct streams, see [`StreamBox`] and [`StreamRc`].
59///
60/// For an explanation of these, see [Smart pointers to PipeWire
61/// objects](crate#smart-pointers-to-pipewire-objects).
62#[repr(transparent)]
63pub struct Stream(pw_sys::pw_stream);
64
65impl Stream {
66    pub fn as_raw(&self) -> &pw_sys::pw_stream {
67        &self.0
68    }
69
70    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_stream {
71        ptr::addr_of!(self.0).cast_mut()
72    }
73
74    /// Add a local listener builder
75    #[must_use = "Use the builder to register event callbacks"]
76    pub fn add_local_listener_with_user_data<D>(
77        &self,
78        user_data: D,
79    ) -> ListenerLocalBuilder<'_, D> {
80        let mut callbacks = ListenerLocalCallbacks::with_user_data(user_data);
81        callbacks.stream =
82            Some(ptr::NonNull::new(self.as_raw_ptr()).expect("Pointer should be nonnull"));
83        ListenerLocalBuilder {
84            stream: self,
85            callbacks,
86        }
87    }
88
89    /// Add a local listener builder. User data is initialized with its default value
90    #[must_use = "Use the builder to register event callbacks"]
91    pub fn add_local_listener<D: Default>(&self) -> ListenerLocalBuilder<'_, D> {
92        self.add_local_listener_with_user_data(Default::default())
93    }
94
95    /// Connect the stream
96    ///
97    /// Tries to connect to the node `id` in the given `direction`. If no node
98    /// is provided then any suitable node will be used.
99    // FIXME: high-level API for params
100    pub fn connect(
101        &self,
102        direction: spa::utils::Direction,
103        id: Option<u32>,
104        flags: StreamFlags,
105        params: &mut [&spa::pod::Pod],
106    ) -> Result<(), Error> {
107        let r = unsafe {
108            pw_sys::pw_stream_connect(
109                self.as_raw_ptr(),
110                direction.as_raw(),
111                id.unwrap_or(crate::constants::ID_ANY),
112                flags.bits(),
113                // We cast from *mut [&spa::pod::Pod] to *mut [*const spa_sys::spa_pod] here,
114                // which is valid because spa::pod::Pod is a transparent wrapper around spa_sys::spa_pod
115                params.as_mut_ptr().cast(),
116                params.len() as u32,
117            )
118        };
119
120        SpaResult::from_c(r).into_sync_result()?;
121        Ok(())
122    }
123
124    /// Update Parameters
125    ///
126    /// Call from the `param_changed` callback to negotiate a new set of
127    /// parameters for the stream.
128    // FIXME: high-level API for params
129    pub fn update_params(&self, params: &mut [&spa::pod::Pod]) -> Result<(), Error> {
130        let r = unsafe {
131            pw_sys::pw_stream_update_params(
132                self.as_raw_ptr(),
133                params.as_mut_ptr().cast(),
134                params.len() as u32,
135            )
136        };
137
138        SpaResult::from_c(r).into_sync_result()?;
139        Ok(())
140    }
141
142    /// Activate or deactivate the stream
143    pub fn set_active(&self, active: bool) -> Result<(), Error> {
144        let r = unsafe { pw_sys::pw_stream_set_active(self.as_raw_ptr(), active) };
145
146        SpaResult::from_c(r).into_sync_result()?;
147        Ok(())
148    }
149
150    /// Take a Buffer from the Stream
151    ///
152    /// Removes a buffer from the stream. If this is an input stream the buffer
153    /// will contain data ready to process. If this is an output stream it can
154    /// be filled.
155    ///
156    /// # Safety
157    ///
158    /// The pointer returned could be NULL if no buffer is available. The buffer
159    /// should be returned to the stream once processing is complete.
160    pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
161        pw_sys::pw_stream_dequeue_buffer(self.as_raw_ptr())
162    }
163
164    pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
165        unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
166    }
167
168    /// Return a Buffer to the Stream
169    ///
170    /// Give back a buffer once processing is complete. Use this to queue up a
171    /// frame for an output stream, or return the buffer to the pool ready to
172    /// receive new data for an input stream.
173    ///
174    /// # Safety
175    ///
176    /// The buffer pointer should be one obtained from this stream instance by
177    /// a call to [Self::dequeue_raw_buffer()].
178    pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
179        pw_sys::pw_stream_queue_buffer(self.as_raw_ptr(), buffer);
180    }
181
182    /// Disconnect the stream
183    pub fn disconnect(&self) -> Result<(), Error> {
184        let r = unsafe { pw_sys::pw_stream_disconnect(self.as_raw_ptr()) };
185
186        SpaResult::from_c(r).into_sync_result()?;
187        Ok(())
188    }
189
190    /// Set the stream in error state
191    ///
192    /// # Panics
193    /// Will panic if `error` contains a 0 byte.
194    ///
195    pub fn set_error(&mut self, res: i32, error: &str) {
196        let error = CString::new(error).expect("failed to convert error to CString");
197        let error_cstr = error.as_c_str();
198        Stream::set_error_cstr(self, res, error_cstr)
199    }
200
201    /// Set the stream in error state with CStr
202    ///
203    /// # Panics
204    /// Will panic if `error` contains a 0 byte.
205    ///
206    pub fn set_error_cstr(&mut self, res: i32, error: &CStr) {
207        unsafe {
208            pw_sys::pw_stream_set_error(self.as_raw_ptr(), res, error.as_ptr());
209        }
210    }
211
212    /// Flush the stream. When  `drain` is `true`, the `drained` callback will
213    /// be called when all data is played or recorded.
214    pub fn flush(&self, drain: bool) -> Result<(), Error> {
215        let r = unsafe { pw_sys::pw_stream_flush(self.as_raw_ptr(), drain) };
216
217        SpaResult::from_c(r).into_sync_result()?;
218        Ok(())
219    }
220
221    pub fn set_control(&self, id: u32, values: &[f32]) -> Result<(), Error> {
222        let r = unsafe {
223            pw_sys::pw_stream_set_control(
224                self.as_raw_ptr(),
225                id,
226                values.len() as u32,
227                values.as_ptr() as *mut f32,
228            )
229        };
230        SpaResult::from_c(r).into_sync_result()?;
231        Ok(())
232    }
233
234    // getters
235
236    /// Get the name of the stream.
237    pub fn name(&self) -> String {
238        let name = unsafe {
239            let name = pw_sys::pw_stream_get_name(self.as_raw_ptr());
240            CStr::from_ptr(name)
241        };
242
243        name.to_string_lossy().to_string()
244    }
245
246    /// Get the current state of the stream.
247    pub fn state(&self) -> StreamState {
248        let mut error: *const std::os::raw::c_char = ptr::null();
249        let state = unsafe {
250            pw_sys::pw_stream_get_state(self.as_raw_ptr(), (&mut error) as *mut *const _)
251        };
252        StreamState::from_raw(state, error)
253    }
254
255    /// Get the properties of the stream.
256    pub fn properties(&self) -> &Properties {
257        unsafe {
258            let props = pw_sys::pw_stream_get_properties(self.as_raw_ptr());
259            let props = ptr::NonNull::new(props.cast_mut()).expect("stream properties is NULL");
260            props.cast().as_ref()
261        }
262    }
263
264    /// Get the node ID of the stream.
265    pub fn node_id(&self) -> u32 {
266        unsafe { pw_sys::pw_stream_get_node_id(self.as_raw_ptr()) }
267    }
268
269    #[cfg(feature = "v0_3_34")]
270    pub fn is_driving(&self) -> bool {
271        unsafe { pw_sys::pw_stream_is_driving(self.as_raw_ptr()) }
272    }
273
274    #[cfg(feature = "v0_3_34")]
275    pub fn trigger_process(&self) -> Result<(), Error> {
276        let r = unsafe { pw_sys::pw_stream_trigger_process(self.as_raw_ptr()) };
277
278        SpaResult::from_c(r).into_result()?;
279        Ok(())
280    }
281
282    // TODO: pw_stream_get_core()
283    // TODO: pw_stream_get_time()
284}
285
286type ParamChangedCB<D> = dyn FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>);
287type ProcessCB<D> = dyn FnMut(&Stream, &mut D);
288
289#[allow(clippy::type_complexity)]
290pub struct ListenerLocalCallbacks<D> {
291    pub state_changed: Option<Box<dyn FnMut(&Stream, &mut D, StreamState, StreamState)>>,
292    pub control_info:
293        Option<Box<dyn FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control)>>,
294    pub io_changed: Option<Box<dyn FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32)>>,
295    pub param_changed: Option<Box<ParamChangedCB<D>>>,
296    pub add_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
297    pub remove_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
298    pub process: Option<Box<ProcessCB<D>>>,
299    pub drained: Option<Box<dyn FnMut(&Stream, &mut D)>>,
300    #[cfg(feature = "v0_3_39")]
301    pub command: Option<Box<dyn FnMut(&Stream, &mut D, *const spa_sys::spa_command)>>,
302    #[cfg(feature = "v0_3_40")]
303    pub trigger_done: Option<Box<dyn FnMut(&Stream, &mut D)>>,
304    pub user_data: D,
305    stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
306}
307
308unsafe fn unwrap_stream_ptr<'a>(stream: Option<ptr::NonNull<pw_sys::pw_stream>>) -> &'a Stream {
309    stream
310        .map(|ptr| ptr.cast::<Stream>().as_ref())
311        .expect("stream cannot be null")
312}
313
314impl<D> ListenerLocalCallbacks<D> {
315    fn with_user_data(user_data: D) -> Self {
316        ListenerLocalCallbacks {
317            process: Default::default(),
318            stream: Default::default(),
319            drained: Default::default(),
320            add_buffer: Default::default(),
321            control_info: Default::default(),
322            io_changed: Default::default(),
323            param_changed: Default::default(),
324            remove_buffer: Default::default(),
325            state_changed: Default::default(),
326            #[cfg(feature = "v0_3_39")]
327            command: Default::default(),
328            #[cfg(feature = "v0_3_40")]
329            trigger_done: Default::default(),
330            user_data,
331        }
332    }
333
334    pub(crate) fn into_raw(
335        self,
336    ) -> (
337        Pin<Box<pw_sys::pw_stream_events>>,
338        Box<ListenerLocalCallbacks<D>>,
339    ) {
340        let callbacks = Box::new(self);
341
342        unsafe extern "C" fn on_state_changed<D>(
343            data: *mut os::raw::c_void,
344            old: pw_sys::pw_stream_state,
345            new: pw_sys::pw_stream_state,
346            error: *const os::raw::c_char,
347        ) {
348            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
349                if let Some(cb) = &mut state.state_changed {
350                    let stream = unwrap_stream_ptr(state.stream);
351                    let old = StreamState::from_raw(old, error);
352                    let new = StreamState::from_raw(new, error);
353                    cb(stream, &mut state.user_data, old, new)
354                };
355            }
356        }
357
358        unsafe extern "C" fn on_control_info<D>(
359            data: *mut os::raw::c_void,
360            id: u32,
361            control: *const pw_sys::pw_stream_control,
362        ) {
363            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
364                if let Some(cb) = &mut state.control_info {
365                    let stream = unwrap_stream_ptr(state.stream);
366                    cb(stream, &mut state.user_data, id, control);
367                }
368            }
369        }
370
371        unsafe extern "C" fn on_io_changed<D>(
372            data: *mut os::raw::c_void,
373            id: u32,
374            area: *mut os::raw::c_void,
375            size: u32,
376        ) {
377            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
378                if let Some(cb) = &mut state.io_changed {
379                    let stream = unwrap_stream_ptr(state.stream);
380                    cb(stream, &mut state.user_data, id, area, size);
381                }
382            }
383        }
384
385        unsafe extern "C" fn on_param_changed<D>(
386            data: *mut os::raw::c_void,
387            id: u32,
388            param: *const spa_sys::spa_pod,
389        ) {
390            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
391                if let Some(cb) = &mut state.param_changed {
392                    let stream = unwrap_stream_ptr(state.stream);
393                    let param = if !param.is_null() {
394                        Some(spa::pod::Pod::from_raw(param))
395                    } else {
396                        None
397                    };
398
399                    cb(stream, &mut state.user_data, id, param);
400                }
401            }
402        }
403
404        unsafe extern "C" fn on_add_buffer<D>(
405            data: *mut ::std::os::raw::c_void,
406            buffer: *mut pw_sys::pw_buffer,
407        ) {
408            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
409                if let Some(cb) = &mut state.add_buffer {
410                    let stream = unwrap_stream_ptr(state.stream);
411                    cb(stream, &mut state.user_data, buffer);
412                }
413            }
414        }
415
416        unsafe extern "C" fn on_remove_buffer<D>(
417            data: *mut ::std::os::raw::c_void,
418            buffer: *mut pw_sys::pw_buffer,
419        ) {
420            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
421                if let Some(cb) = &mut state.remove_buffer {
422                    let stream = unwrap_stream_ptr(state.stream);
423                    cb(stream, &mut state.user_data, buffer);
424                }
425            }
426        }
427
428        unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
429            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
430                if let Some(cb) = &mut state.process {
431                    let stream = unwrap_stream_ptr(state.stream);
432                    cb(stream, &mut state.user_data);
433                }
434            }
435        }
436
437        unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
438            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
439                if let Some(cb) = &mut state.drained {
440                    let stream = unwrap_stream_ptr(state.stream);
441                    cb(stream, &mut state.user_data);
442                }
443            }
444        }
445
446        #[cfg(feature = "v0_3_39")]
447        unsafe extern "C" fn on_command<D>(
448            data: *mut ::std::os::raw::c_void,
449            command: *const spa_sys::spa_command,
450        ) {
451            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
452                if let Some(cb) = &mut state.command {
453                    let stream = unwrap_stream_ptr(state.stream);
454                    cb(stream, &mut state.user_data, command);
455                }
456            }
457        }
458
459        #[cfg(feature = "v0_3_40")]
460        unsafe extern "C" fn on_trigger_done<D>(data: *mut ::std::os::raw::c_void) {
461            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
462                if let Some(cb) = &mut state.trigger_done {
463                    let stream = unwrap_stream_ptr(state.stream);
464                    cb(stream, &mut state.user_data);
465                }
466            }
467        }
468
469        let events = unsafe {
470            let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
471            events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
472
473            if callbacks.state_changed.is_some() {
474                events.state_changed = Some(on_state_changed::<D>);
475            }
476            if callbacks.control_info.is_some() {
477                events.control_info = Some(on_control_info::<D>);
478            }
479            if callbacks.io_changed.is_some() {
480                events.io_changed = Some(on_io_changed::<D>);
481            }
482            if callbacks.param_changed.is_some() {
483                events.param_changed = Some(on_param_changed::<D>);
484            }
485            if callbacks.add_buffer.is_some() {
486                events.add_buffer = Some(on_add_buffer::<D>);
487            }
488            if callbacks.remove_buffer.is_some() {
489                events.remove_buffer = Some(on_remove_buffer::<D>);
490            }
491            if callbacks.process.is_some() {
492                events.process = Some(on_process::<D>);
493            }
494            if callbacks.drained.is_some() {
495                events.drained = Some(on_drained::<D>);
496            }
497            #[cfg(feature = "v0_3_39")]
498            if callbacks.command.is_some() {
499                events.command = Some(on_command::<D>);
500            }
501            #[cfg(feature = "v0_3_40")]
502            if callbacks.trigger_done.is_some() {
503                events.trigger_done = Some(on_trigger_done::<D>);
504            }
505
506            events
507        };
508
509        (events, callbacks)
510    }
511}
512
513/// A builder for registering stream event callbacks.
514///
515/// Use [`Stream::add_local_listener`] or [`Stream::add_local_listener_with_user_data`] to create this and register callbacks that will be called when events of interest occur.
516/// After adding callbacks, use [`register`](Self::register) to get back a [`StreamListener`].
517///
518/// # Examples
519/// ```
520/// # use pipewire::stream::Stream;
521/// # use pipewire::spa::pod::Pod;
522/// # fn example(stream: Stream) {
523/// let stream_listener = stream.add_local_listener::<()>()
524///     .state_changed(|_stream, _user_data, old, new| println!("Stream state changed from {old:?} to {new:?}"))
525///     .control_info(|_stream, _user_data, id, control| println!("Stream control info: id {id}, control {control:?}"))
526///     .io_changed(|_stream, _user_data, id, area, size| println!("Stream IO change: IO type {id}, area {area:?}, size {size}"))
527///     .param_changed(|_stream, _user_data, id, param| println!("Stream param change: id {id}, param {:?}",
528///         param.map(Pod::as_bytes)))
529///     .add_buffer(|_stream, _user_data, buffer| println!("Stream buffer added {buffer:?}"))
530///     .remove_buffer(|_stream, _user_data, buffer| println!("Stream buffer removed {buffer:?}"))
531///     .process(|stream, _user_data| {
532///         println!("Stream can be processed");
533///         let buf = stream.dequeue_buffer();
534///         // Produce or consume data using the buffer
535///         // The buffer is enqueued back to the stream when it's dropped
536///     })
537///     .drained(|_stream, _user_data| println!("Stream is drained"))
538///     .register();
539/// # }
540/// ```
541pub struct ListenerLocalBuilder<'a, D> {
542    stream: &'a Stream,
543    callbacks: ListenerLocalCallbacks<D>,
544}
545
546impl<'a, D> ListenerLocalBuilder<'a, D> {
547    /// Set the stream `state_changed` event callback of the listener.
548    ///
549    /// This event is emitted when the stream state changes.
550    ///
551    /// # Callback parameters
552    /// `stream`: The stream  
553    /// `data`: User data  
554    /// `old`: Old stream state  
555    /// `new`: New stream state
556    ///
557    /// # Examples
558    /// ```
559    /// # use pipewire::stream::Stream;
560    /// # fn example(stream: Stream) {
561    /// let stream_listener = stream.add_local_listener::<()>()
562    ///     .state_changed(|_stream, _user_data, old, new| println!("Stream state changed from {old:?} to {new:?}"))
563    ///     .register();
564    /// # }
565    /// ```
566    #[must_use = "Call `.register()` to start receiving events"]
567    pub fn state_changed<F>(mut self, callback: F) -> Self
568    where
569        F: FnMut(&Stream, &mut D, StreamState, StreamState) + 'static,
570    {
571        self.callbacks.state_changed = Some(Box::new(callback));
572        self
573    }
574
575    /// Set the stream `control_info` event callback of the listener.
576    ///
577    /// This event is emitted when there is information about a control.
578    ///
579    /// # Callback parameters
580    /// `stream`: The stream  
581    /// `user_data`: User data  
582    /// `id`: Type of the control  
583    /// `control`: The control
584    ///
585    /// # Examples
586    /// ```
587    /// # use pipewire::stream::Stream;
588    /// # fn example(stream: Stream) {
589    /// let stream_listener = stream.add_local_listener::<()>()
590    ///     .control_info(|_stream, _user_data, id, _control| println!("Stream control info {id}"))
591    ///     .register();
592    /// # }
593    /// ```
594    #[must_use = "Call `.register()` to start receiving events"]
595    pub fn control_info<F>(mut self, callback: F) -> Self
596    where
597        F: FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control) + 'static,
598    {
599        self.callbacks.control_info = Some(Box::new(callback));
600        self
601    }
602
603    /// Set the stream `io_changed` event callback of the listener.
604    ///
605    /// This event is emitted when IO is changed on the stream.
606    ///
607    /// # Callback parameters
608    /// `stream`: The stream  
609    /// `user_data`: User data  
610    /// `id`: Type of the IO area  
611    /// `area`: The IO area  
612    /// `size`: The IO area size
613    ///
614    /// # Examples
615    /// ```
616    /// # use pipewire::stream::Stream;
617    /// # fn example(stream: Stream) {
618    /// let stream_listener = stream.add_local_listener::<()>()
619    ///     .io_changed(|_stream, _user_data, id, _area, size| println!("Stream IO change: IO type {id}"))
620    ///     .register();
621    /// # }
622    /// ```
623    #[must_use = "Call `.register()` to start receiving events"]
624    pub fn io_changed<F>(mut self, callback: F) -> Self
625    where
626        F: FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32) + 'static,
627    {
628        self.callbacks.io_changed = Some(Box::new(callback));
629        self
630    }
631
632    /// Set the stream `param_changed` event callback of the listener.
633    ///
634    /// This event is emitted when a param is changed.
635    ///
636    /// # Callback parameters
637    /// `stream`: The stream  
638    /// `user_data`: User data  
639    /// `id`: Type of the param  
640    /// `param`: The param
641    ///
642    /// # Examples
643    /// ```
644    /// # use pipewire::stream::Stream;
645    /// # use pipewire::spa::pod::Pod;
646    /// # fn example(stream: Stream) {
647    /// let stream_listener = stream.add_local_listener::<()>()
648    ///     .param_changed(|_stream, _user_data, id, param| println!("Stream param change: id {id}, param {:?}",
649    ///         param.map(Pod::as_bytes)))
650    ///     .register();
651    /// # }
652    /// ```
653    #[must_use = "Call `.register()` to start receiving events"]
654    pub fn param_changed<F>(mut self, callback: F) -> Self
655    where
656        F: FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>) + 'static,
657    {
658        self.callbacks.param_changed = Some(Box::new(callback));
659        self
660    }
661
662    /// Set the stream `add_buffer` event callback of the listener.
663    ///
664    /// This event is emitted when a buffer was added for this stream.
665    ///
666    /// # Callback parameters
667    /// `stream`: The stream  
668    /// `user_data`: User data  
669    /// `buffer`: The buffer
670    ///
671    /// # Examples
672    /// ```
673    /// # use pipewire::stream::Stream;
674    /// # fn example(stream: Stream) {
675    /// let stream_listener = stream.add_local_listener::<()>()
676    ///     .add_buffer(|_stream, _user_data, buffer| println!("Stream buffer added {buffer:?}"))
677    ///     .register();
678    /// # }
679    /// ```
680    #[must_use = "Call `.register()` to start receiving events"]
681    pub fn add_buffer<F>(mut self, callback: F) -> Self
682    where
683        F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
684    {
685        self.callbacks.add_buffer = Some(Box::new(callback));
686        self
687    }
688
689    /// Set the stream `remove_buffer` event callback of the listener.
690    ///
691    /// This event is emitted when a buffer was removed for this stream.
692    ///
693    /// # Callback parameters
694    /// `stream`: The stream  
695    /// `user_data`: User data  
696    /// `buffer`: The buffer
697    ///
698    /// # Examples
699    /// ```
700    /// # use pipewire::stream::Stream;
701    /// # fn example(stream: Stream) {
702    /// let stream_listener = stream.add_local_listener::<()>()
703    ///     .remove_buffer(|_stream, _user_data, buffer| println!("Stream buffer removed {buffer:?}"))
704    ///     .register();
705    /// # }
706    /// ```
707    #[must_use = "Call `.register()` to start receiving events"]
708    pub fn remove_buffer<F>(mut self, callback: F) -> Self
709    where
710        F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
711    {
712        self.callbacks.remove_buffer = Some(Box::new(callback));
713        self
714    }
715
716    /// Set the stream `process` event callback of the listener.
717    ///
718    /// This event is emitted when a buffer can be queued (for playback streams) or dequeued (for capture streams).
719    ///
720    /// This is normally called from the mainloop but can also be called directly from the realtime data thread if the user is prepared to deal with this.
721    ///
722    /// # Callback parameters
723    /// `stream`: The stream  
724    /// `user_data`: User data
725    ///
726    /// # Examples
727    /// ```
728    /// # use pipewire::stream::Stream;
729    /// # fn example(stream: Stream) {
730    /// let stream_listener = stream.add_local_listener::<()>()
731    ///     .process(|stream, _user_data| {
732    ///         println!("Stream can be processed");
733    ///         let buf = stream.dequeue_buffer();
734    ///         // Produce or consume data using the buffer
735    ///         // The buffer is enqueued back to the stream when it's dropped
736    ///     })
737    ///     .register();
738    /// # }
739    /// ```
740    #[must_use = "Call `.register()` to start receiving events"]
741    pub fn process<F>(mut self, callback: F) -> Self
742    where
743        F: FnMut(&Stream, &mut D) + 'static,
744    {
745        self.callbacks.process = Some(Box::new(callback));
746        self
747    }
748
749    /// Set the stream `drained` event callback of the listener.
750    ///
751    /// This event is emitted when the stream is drained.
752    ///
753    /// # Callback parameters
754    /// `stream`: The stream  
755    /// `user_data`: User data
756    ///
757    /// # Examples
758    /// ```
759    /// # use pipewire::stream::Stream;
760    /// # fn example(stream: Stream) {
761    /// let stream_listener = stream.add_local_listener::<()>()
762    ///     .drained(|_stream, _user_data| println!("Stream is drained"))
763    ///     .register();
764    /// # }
765    /// ```
766    #[must_use = "Call `.register()` to start receiving events"]
767    pub fn drained<F>(mut self, callback: F) -> Self
768    where
769        F: FnMut(&Stream, &mut D) + 'static,
770    {
771        self.callbacks.drained = Some(Box::new(callback));
772        self
773    }
774
775    /// Subscribe to events and register any provided callbacks.
776    pub fn register(self) -> Result<StreamListener<D>, Error> {
777        let (events, data) = self.callbacks.into_raw();
778        let (listener, data) = unsafe {
779            let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
780            let raw_listener = Box::into_raw(listener);
781            let raw_data = Box::into_raw(data);
782            pw_sys::pw_stream_add_listener(
783                self.stream.as_raw_ptr(),
784                raw_listener,
785                events.as_ref().get_ref(),
786                raw_data as *mut _,
787            );
788            (Box::from_raw(raw_listener), Box::from_raw(raw_data))
789        };
790        Ok(StreamListener {
791            listener,
792            _events: events,
793            _data: data,
794        })
795    }
796}
797
798/// An owned listener for stream events.
799///
800/// This is created by [`stream::ListenerLocalBuilder`][ListenerLocalBuilder] and will receive events as long as it is alive.
801/// When this gets dropped, the listener gets unregistered and no events will be received by it.
802#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
803pub struct StreamListener<D> {
804    listener: Box<spa_sys::spa_hook>,
805    // Need to stay allocated while the listener is registered
806    _events: Pin<Box<pw_sys::pw_stream_events>>,
807    _data: Box<ListenerLocalCallbacks<D>>,
808}
809
810impl<D> StreamListener<D> {
811    /// Stop the listener from receiving any events
812    ///
813    /// Removes the listener registration and cleans up allocated resources.
814    pub fn unregister(self) {
815        // do nothing, drop will clean up.
816    }
817}
818
819impl<D> std::ops::Drop for StreamListener<D> {
820    fn drop(&mut self) {
821        spa::utils::hook::remove(*self.listener);
822    }
823}
824
825bitflags! {
826    /// Extra flags that can be used in [`Stream::connect()`]
827    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
828    pub struct StreamFlags: pw_sys::pw_stream_flags {
829        const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
830        const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
831        const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
832        const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
833        const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
834        const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
835        const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
836        const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
837        const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
838        #[cfg(feature = "v0_3_41")]
839        const TRIGGER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_TRIGGER;
840    }
841}