pipewire/stream/
mod.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! Streams are higher-level objects providing a convenient way to send and receive data streams to/from PipeWire.
5//!
6//! This module contains wrappers for [`pw_stream`](pw_sys::pw_stream) and related itmes.
7
8use crate::buffer::Buffer;
9use crate::{error::Error, properties::Properties};
10use bitflags::bitflags;
11use spa::utils::result::SpaResult;
12use std::{
13    ffi::{self, CStr, CString},
14    fmt::Debug,
15    mem, os,
16    pin::Pin,
17    ptr,
18};
19
20mod box_;
21pub use box_::*;
22mod rc;
23pub use rc::*;
24
25#[derive(Debug, PartialEq)]
26pub enum StreamState {
27    Error(String),
28    Unconnected,
29    Connecting,
30    Paused,
31    Streaming,
32}
33
34impl StreamState {
35    pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
36        match state {
37            pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
38            pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
39            pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
40            pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
41            _ => {
42                let error = if error.is_null() {
43                    "".to_string()
44                } else {
45                    unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
46                };
47
48                StreamState::Error(error)
49            }
50        }
51    }
52}
53
54/// Stream timing information, updated every graph cycle.
55///
56/// Obtained from [`Stream::time()`].
57/// The [`now`](Self::now) field holds the timestamp of the last update;
58/// compare it with `pw_stream_get_nsec()` to interpolate the current position.
59///
60/// All timing values are relative to the stream's rate.
61///
62/// For a detailed description, see [`pw_time`'s documentation](https://docs.pipewire.org/structpw__time.html#details).
63#[repr(transparent)]
64pub struct Time(pw_sys::pw_time);
65
66impl Clone for Time {
67    fn clone(&self) -> Self {
68        Self(pw_sys::pw_time { ..self.0 })
69    }
70}
71
72impl Time {
73    pub fn as_raw(&self) -> &pw_sys::pw_time {
74        &self.0
75    }
76
77    /// The monotonic timestamp (in nanoseconds) of this report.
78    pub fn now(&self) -> i64 {
79        self.0.now
80    }
81
82    /// The rate of `ticks` and `delay`, usually expressed as 1/samplerate.
83    pub fn rate(&self) -> spa::utils::Fraction {
84        self.0.rate
85    }
86
87    /// Monotonically increasing stream position in ticks.
88    pub fn ticks(&self) -> u64 {
89        self.0.ticks
90    }
91
92    /// Delay to device in ticks, including all filters on the path. Can be
93    /// negative.
94    ///
95    /// Convert to seconds using `delay * rate.num / rate.denom`.
96    pub fn delay(&self) -> i64 {
97        self.0.delay
98    }
99
100    /// Total bytes queued in the stream.
101    pub fn queued(&self) -> u64 {
102        self.0.queued
103    }
104
105    /// Extra frames buffered in the resampler. Since PipeWire 0.3.50.
106    #[cfg(feature = "v0_3_50")]
107    pub fn buffered(&self) -> u64 {
108        self.0.buffered
109    }
110
111    /// Number of buffers currently queued. Since PipeWire 0.3.50.
112    #[cfg(feature = "v0_3_50")]
113    pub fn queued_buffers(&self) -> u32 {
114        self.0.queued_buffers
115    }
116
117    /// Number of buffers available to dequeue. Since PipeWire 0.3.50.
118    #[cfg(feature = "v0_3_50")]
119    pub fn avail_buffers(&self) -> u32 {
120        self.0.avail_buffers
121    }
122}
123
124impl Debug for Time {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        let mut s = f.debug_struct("Time");
127        s.field("now", &self.now())
128            .field("rate", &self.rate())
129            .field("ticks", &self.ticks())
130            .field("delay", &self.delay())
131            .field("queued", &self.queued());
132
133        #[cfg(feature = "v0_3_50")]
134        s.field("buffered", &self.buffered())
135            .field("queued_buffers", &self.queued_buffers())
136            .field("avail_buffers", &self.avail_buffers());
137
138        s.finish()
139    }
140}
141
142/// Transparent wrapper around a [stream](self).
143///
144/// This does not own the underlying object and is usually seen behind a `&` reference.
145///
146/// For owning wrappers that can construct streams, see [`StreamBox`] and [`StreamRc`].
147///
148/// For an explanation of these, see [Smart pointers to PipeWire
149/// objects](crate#smart-pointers-to-pipewire-objects).
150#[repr(transparent)]
151pub struct Stream(pw_sys::pw_stream);
152
153impl Stream {
154    pub fn as_raw(&self) -> &pw_sys::pw_stream {
155        &self.0
156    }
157
158    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_stream {
159        ptr::addr_of!(self.0).cast_mut()
160    }
161
162    /// Add a local listener builder
163    #[must_use = "Use the builder to register event callbacks"]
164    pub fn add_local_listener_with_user_data<D>(
165        &self,
166        user_data: D,
167    ) -> ListenerLocalBuilder<'_, D> {
168        let mut callbacks = ListenerLocalCallbacks::with_user_data(user_data);
169        callbacks.stream =
170            Some(ptr::NonNull::new(self.as_raw_ptr()).expect("Pointer should be nonnull"));
171        ListenerLocalBuilder {
172            stream: self,
173            callbacks,
174        }
175    }
176
177    /// Add a local listener builder. User data is initialized with its default value
178    #[must_use = "Use the builder to register event callbacks"]
179    pub fn add_local_listener<D: Default>(&self) -> ListenerLocalBuilder<'_, D> {
180        self.add_local_listener_with_user_data(Default::default())
181    }
182
183    /// Connect the stream
184    ///
185    /// Tries to connect to the node `id` in the given `direction`. If no node
186    /// is provided then any suitable node will be used.
187    // FIXME: high-level API for params
188    pub fn connect(
189        &self,
190        direction: spa::utils::Direction,
191        id: Option<u32>,
192        flags: StreamFlags,
193        params: &mut [&spa::pod::Pod],
194    ) -> Result<(), Error> {
195        let r = unsafe {
196            pw_sys::pw_stream_connect(
197                self.as_raw_ptr(),
198                direction.as_raw(),
199                id.unwrap_or(crate::constants::ID_ANY),
200                flags.bits(),
201                // We cast from *mut [&spa::pod::Pod] to *mut [*const spa_sys::spa_pod] here,
202                // which is valid because spa::pod::Pod is a transparent wrapper around spa_sys::spa_pod
203                params.as_mut_ptr().cast(),
204                params.len() as u32,
205            )
206        };
207
208        SpaResult::from_c(r).into_sync_result()?;
209        Ok(())
210    }
211
212    /// Update Parameters
213    ///
214    /// Call from the `param_changed` callback to negotiate a new set of
215    /// parameters for the stream.
216    // FIXME: high-level API for params
217    pub fn update_params(&self, params: &mut [&spa::pod::Pod]) -> Result<(), Error> {
218        let r = unsafe {
219            pw_sys::pw_stream_update_params(
220                self.as_raw_ptr(),
221                params.as_mut_ptr().cast(),
222                params.len() as u32,
223            )
224        };
225
226        SpaResult::from_c(r).into_sync_result()?;
227        Ok(())
228    }
229
230    /// Activate or deactivate the stream
231    pub fn set_active(&self, active: bool) -> Result<(), Error> {
232        let r = unsafe { pw_sys::pw_stream_set_active(self.as_raw_ptr(), active) };
233
234        SpaResult::from_c(r).into_sync_result()?;
235        Ok(())
236    }
237
238    /// Take a Buffer from the Stream
239    ///
240    /// Removes a buffer from the stream. If this is an input stream the buffer
241    /// will contain data ready to process. If this is an output stream it can
242    /// be filled.
243    ///
244    /// # Safety
245    ///
246    /// The pointer returned could be NULL if no buffer is available. The buffer
247    /// should be returned to the stream once processing is complete.
248    pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
249        pw_sys::pw_stream_dequeue_buffer(self.as_raw_ptr())
250    }
251
252    pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
253        unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
254    }
255
256    /// Return a Buffer to the Stream
257    ///
258    /// Give back a buffer once processing is complete. Use this to queue up a
259    /// frame for an output stream, or return the buffer to the pool ready to
260    /// receive new data for an input stream.
261    ///
262    /// # Safety
263    ///
264    /// The buffer pointer should be one obtained from this stream instance by
265    /// a call to [Self::dequeue_raw_buffer()].
266    pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
267        pw_sys::pw_stream_queue_buffer(self.as_raw_ptr(), buffer);
268    }
269
270    /// Disconnect the stream
271    pub fn disconnect(&self) -> Result<(), Error> {
272        let r = unsafe { pw_sys::pw_stream_disconnect(self.as_raw_ptr()) };
273
274        SpaResult::from_c(r).into_sync_result()?;
275        Ok(())
276    }
277
278    /// Set the stream in error state
279    ///
280    /// # Panics
281    /// Will panic if `error` contains a 0 byte.
282    ///
283    pub fn set_error(&mut self, res: i32, error: &str) {
284        let error = CString::new(error).expect("failed to convert error to CString");
285        let error_cstr = error.as_c_str();
286        Stream::set_error_cstr(self, res, error_cstr)
287    }
288
289    /// Set the stream in error state with CStr
290    ///
291    /// # Panics
292    /// Will panic if `error` contains a 0 byte.
293    ///
294    pub fn set_error_cstr(&mut self, res: i32, error: &CStr) {
295        unsafe {
296            pw_sys::pw_stream_set_error(self.as_raw_ptr(), res, error.as_ptr());
297        }
298    }
299
300    /// Flush the stream. When  `drain` is `true`, the `drained` callback will
301    /// be called when all data is played or recorded.
302    pub fn flush(&self, drain: bool) -> Result<(), Error> {
303        let r = unsafe { pw_sys::pw_stream_flush(self.as_raw_ptr(), drain) };
304
305        SpaResult::from_c(r).into_sync_result()?;
306        Ok(())
307    }
308
309    pub fn set_control(&self, id: u32, values: &[f32]) -> Result<(), Error> {
310        let r = unsafe {
311            pw_sys::pw_stream_set_control(
312                self.as_raw_ptr(),
313                id,
314                values.len() as u32,
315                values.as_ptr() as *mut f32,
316            )
317        };
318        SpaResult::from_c(r).into_sync_result()?;
319        Ok(())
320    }
321
322    // getters
323
324    /// Get the name of the stream.
325    pub fn name(&self) -> String {
326        let name = unsafe {
327            let name = pw_sys::pw_stream_get_name(self.as_raw_ptr());
328            CStr::from_ptr(name)
329        };
330
331        name.to_string_lossy().to_string()
332    }
333
334    /// Get the current state of the stream.
335    pub fn state(&self) -> StreamState {
336        let mut error: *const std::os::raw::c_char = ptr::null();
337        let state = unsafe {
338            pw_sys::pw_stream_get_state(self.as_raw_ptr(), (&mut error) as *mut *const _)
339        };
340        StreamState::from_raw(state, error)
341    }
342
343    /// Get the properties of the stream.
344    pub fn properties(&self) -> &Properties {
345        unsafe {
346            let props = pw_sys::pw_stream_get_properties(self.as_raw_ptr());
347            let props = ptr::NonNull::new(props.cast_mut()).expect("stream properties is NULL");
348            props.cast().as_ref()
349        }
350    }
351
352    /// Get the node ID of the stream.
353    pub fn node_id(&self) -> u32 {
354        unsafe { pw_sys::pw_stream_get_node_id(self.as_raw_ptr()) }
355    }
356
357    #[cfg(feature = "v0_3_34")]
358    pub fn is_driving(&self) -> bool {
359        unsafe { pw_sys::pw_stream_is_driving(self.as_raw_ptr()) }
360    }
361
362    #[cfg(feature = "v0_3_34")]
363    pub fn trigger_process(&self) -> Result<(), Error> {
364        let r = unsafe { pw_sys::pw_stream_trigger_process(self.as_raw_ptr()) };
365
366        SpaResult::from_c(r).into_result()?;
367        Ok(())
368    }
369
370    /// Query the time on the stream.
371    ///
372    /// The time reported by this function is updated every graph cycle, usually
373    /// from the process callback. Values such as `ticks` and `delay` are
374    /// meaningful when used together with `now`: use `pw_stream_get_nsec()` to
375    /// get the current timestamp and interpolate.
376    ///
377    /// With the `v0_3_50` feature enabled, this uses `pw_stream_get_time_n()`
378    /// which also populates the `buffered`, `queued_buffers`, and
379    /// `avail_buffers` fields.
380    ///
381    /// This function is RT-safe.
382    pub fn time(&self) -> Result<Time, Error> {
383        unsafe {
384            let mut time = mem::MaybeUninit::<pw_sys::pw_time>::zeroed();
385
386            #[cfg(feature = "v0_3_50")]
387            let r = pw_sys::pw_stream_get_time_n(
388                self.as_raw_ptr(),
389                time.as_mut_ptr(),
390                mem::size_of::<pw_sys::pw_time>(),
391            );
392
393            #[cfg(not(feature = "v0_3_50"))]
394            let r = pw_sys::pw_stream_get_time(self.as_raw_ptr(), time.as_mut_ptr());
395
396            SpaResult::from_c(r).into_result()?;
397            Ok(Time(time.assume_init()))
398        }
399    }
400
401    // TODO: pw_stream_get_core()
402    // TODO: pw_stream_get_nsec() (since PipeWire 1.1.0, needs v1_1 feature)
403}
404
405type ParamChangedCB<D> = dyn FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>);
406type ProcessCB<D> = dyn FnMut(&Stream, &mut D);
407
408#[allow(clippy::type_complexity)]
409pub struct ListenerLocalCallbacks<D> {
410    pub state_changed: Option<Box<dyn FnMut(&Stream, &mut D, StreamState, StreamState)>>,
411    pub control_info:
412        Option<Box<dyn FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control)>>,
413    pub io_changed: Option<Box<dyn FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32)>>,
414    pub param_changed: Option<Box<ParamChangedCB<D>>>,
415    pub add_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
416    pub remove_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
417    pub process: Option<Box<ProcessCB<D>>>,
418    pub drained: Option<Box<dyn FnMut(&Stream, &mut D)>>,
419    #[cfg(feature = "v0_3_39")]
420    pub command: Option<Box<dyn FnMut(&Stream, &mut D, *const spa_sys::spa_command)>>,
421    #[cfg(feature = "v0_3_40")]
422    pub trigger_done: Option<Box<dyn FnMut(&Stream, &mut D)>>,
423    pub user_data: D,
424    stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
425}
426
427unsafe fn unwrap_stream_ptr<'a>(stream: Option<ptr::NonNull<pw_sys::pw_stream>>) -> &'a Stream {
428    stream
429        .map(|ptr| ptr.cast::<Stream>().as_ref())
430        .expect("stream cannot be null")
431}
432
433impl<D> ListenerLocalCallbacks<D> {
434    fn with_user_data(user_data: D) -> Self {
435        ListenerLocalCallbacks {
436            process: Default::default(),
437            stream: Default::default(),
438            drained: Default::default(),
439            add_buffer: Default::default(),
440            control_info: Default::default(),
441            io_changed: Default::default(),
442            param_changed: Default::default(),
443            remove_buffer: Default::default(),
444            state_changed: Default::default(),
445            #[cfg(feature = "v0_3_39")]
446            command: Default::default(),
447            #[cfg(feature = "v0_3_40")]
448            trigger_done: Default::default(),
449            user_data,
450        }
451    }
452
453    pub(crate) fn into_raw(
454        self,
455    ) -> (
456        Pin<Box<pw_sys::pw_stream_events>>,
457        Box<ListenerLocalCallbacks<D>>,
458    ) {
459        let callbacks = Box::new(self);
460
461        unsafe extern "C" fn on_state_changed<D>(
462            data: *mut os::raw::c_void,
463            old: pw_sys::pw_stream_state,
464            new: pw_sys::pw_stream_state,
465            error: *const os::raw::c_char,
466        ) {
467            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
468                if let Some(cb) = &mut state.state_changed {
469                    let stream = unwrap_stream_ptr(state.stream);
470                    let old = StreamState::from_raw(old, error);
471                    let new = StreamState::from_raw(new, error);
472                    cb(stream, &mut state.user_data, old, new)
473                };
474            }
475        }
476
477        unsafe extern "C" fn on_control_info<D>(
478            data: *mut os::raw::c_void,
479            id: u32,
480            control: *const pw_sys::pw_stream_control,
481        ) {
482            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
483                if let Some(cb) = &mut state.control_info {
484                    let stream = unwrap_stream_ptr(state.stream);
485                    cb(stream, &mut state.user_data, id, control);
486                }
487            }
488        }
489
490        unsafe extern "C" fn on_io_changed<D>(
491            data: *mut os::raw::c_void,
492            id: u32,
493            area: *mut os::raw::c_void,
494            size: u32,
495        ) {
496            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
497                if let Some(cb) = &mut state.io_changed {
498                    let stream = unwrap_stream_ptr(state.stream);
499                    cb(stream, &mut state.user_data, id, area, size);
500                }
501            }
502        }
503
504        unsafe extern "C" fn on_param_changed<D>(
505            data: *mut os::raw::c_void,
506            id: u32,
507            param: *const spa_sys::spa_pod,
508        ) {
509            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
510                if let Some(cb) = &mut state.param_changed {
511                    let stream = unwrap_stream_ptr(state.stream);
512                    let param = if !param.is_null() {
513                        Some(spa::pod::Pod::from_raw(param))
514                    } else {
515                        None
516                    };
517
518                    cb(stream, &mut state.user_data, id, param);
519                }
520            }
521        }
522
523        unsafe extern "C" fn on_add_buffer<D>(
524            data: *mut ::std::os::raw::c_void,
525            buffer: *mut pw_sys::pw_buffer,
526        ) {
527            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
528                if let Some(cb) = &mut state.add_buffer {
529                    let stream = unwrap_stream_ptr(state.stream);
530                    cb(stream, &mut state.user_data, buffer);
531                }
532            }
533        }
534
535        unsafe extern "C" fn on_remove_buffer<D>(
536            data: *mut ::std::os::raw::c_void,
537            buffer: *mut pw_sys::pw_buffer,
538        ) {
539            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
540                if let Some(cb) = &mut state.remove_buffer {
541                    let stream = unwrap_stream_ptr(state.stream);
542                    cb(stream, &mut state.user_data, buffer);
543                }
544            }
545        }
546
547        unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
548            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
549                if let Some(cb) = &mut state.process {
550                    let stream = unwrap_stream_ptr(state.stream);
551                    cb(stream, &mut state.user_data);
552                }
553            }
554        }
555
556        unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
557            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
558                if let Some(cb) = &mut state.drained {
559                    let stream = unwrap_stream_ptr(state.stream);
560                    cb(stream, &mut state.user_data);
561                }
562            }
563        }
564
565        #[cfg(feature = "v0_3_39")]
566        unsafe extern "C" fn on_command<D>(
567            data: *mut ::std::os::raw::c_void,
568            command: *const spa_sys::spa_command,
569        ) {
570            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
571                if let Some(cb) = &mut state.command {
572                    let stream = unwrap_stream_ptr(state.stream);
573                    cb(stream, &mut state.user_data, command);
574                }
575            }
576        }
577
578        #[cfg(feature = "v0_3_40")]
579        unsafe extern "C" fn on_trigger_done<D>(data: *mut ::std::os::raw::c_void) {
580            if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
581                if let Some(cb) = &mut state.trigger_done {
582                    let stream = unwrap_stream_ptr(state.stream);
583                    cb(stream, &mut state.user_data);
584                }
585            }
586        }
587
588        let events = unsafe {
589            let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
590            events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
591
592            if callbacks.state_changed.is_some() {
593                events.state_changed = Some(on_state_changed::<D>);
594            }
595            if callbacks.control_info.is_some() {
596                events.control_info = Some(on_control_info::<D>);
597            }
598            if callbacks.io_changed.is_some() {
599                events.io_changed = Some(on_io_changed::<D>);
600            }
601            if callbacks.param_changed.is_some() {
602                events.param_changed = Some(on_param_changed::<D>);
603            }
604            if callbacks.add_buffer.is_some() {
605                events.add_buffer = Some(on_add_buffer::<D>);
606            }
607            if callbacks.remove_buffer.is_some() {
608                events.remove_buffer = Some(on_remove_buffer::<D>);
609            }
610            if callbacks.process.is_some() {
611                events.process = Some(on_process::<D>);
612            }
613            if callbacks.drained.is_some() {
614                events.drained = Some(on_drained::<D>);
615            }
616            #[cfg(feature = "v0_3_39")]
617            if callbacks.command.is_some() {
618                events.command = Some(on_command::<D>);
619            }
620            #[cfg(feature = "v0_3_40")]
621            if callbacks.trigger_done.is_some() {
622                events.trigger_done = Some(on_trigger_done::<D>);
623            }
624
625            events
626        };
627
628        (events, callbacks)
629    }
630}
631
632/// A builder for registering stream event callbacks.
633///
634/// Use [`Stream::add_local_listener`] or [`Stream::add_local_listener_with_user_data`] to create this and register callbacks that will be called when events of interest occur.
635/// After adding callbacks, use [`register`](Self::register) to get back a [`StreamListener`].
636///
637/// # Examples
638/// ```
639/// # use pipewire::stream::Stream;
640/// # use pipewire::spa::pod::Pod;
641/// # fn example(stream: Stream) {
642/// let stream_listener = stream.add_local_listener::<()>()
643///     .state_changed(|_stream, _user_data, old, new| println!("Stream state changed from {old:?} to {new:?}"))
644///     .control_info(|_stream, _user_data, id, control| println!("Stream control info: id {id}, control {control:?}"))
645///     .io_changed(|_stream, _user_data, id, area, size| println!("Stream IO change: IO type {id}, area {area:?}, size {size}"))
646///     .param_changed(|_stream, _user_data, id, param| println!("Stream param change: id {id}, param {:?}",
647///         param.map(Pod::as_bytes)))
648///     .add_buffer(|_stream, _user_data, buffer| println!("Stream buffer added {buffer:?}"))
649///     .remove_buffer(|_stream, _user_data, buffer| println!("Stream buffer removed {buffer:?}"))
650///     .process(|stream, _user_data| {
651///         println!("Stream can be processed");
652///         let buf = stream.dequeue_buffer();
653///         // Produce or consume data using the buffer
654///         // The buffer is enqueued back to the stream when it's dropped
655///     })
656///     .drained(|_stream, _user_data| println!("Stream is drained"))
657///     .register();
658/// # }
659/// ```
660pub struct ListenerLocalBuilder<'a, D> {
661    stream: &'a Stream,
662    callbacks: ListenerLocalCallbacks<D>,
663}
664
665impl<'a, D> ListenerLocalBuilder<'a, D> {
666    /// Set the stream `state_changed` event callback of the listener.
667    ///
668    /// This event is emitted when the stream state changes.
669    ///
670    /// # Callback parameters
671    /// `stream`: The stream  
672    /// `data`: User data  
673    /// `old`: Old stream state  
674    /// `new`: New stream state
675    ///
676    /// # Examples
677    /// ```
678    /// # use pipewire::stream::Stream;
679    /// # fn example(stream: Stream) {
680    /// let stream_listener = stream.add_local_listener::<()>()
681    ///     .state_changed(|_stream, _user_data, old, new| println!("Stream state changed from {old:?} to {new:?}"))
682    ///     .register();
683    /// # }
684    /// ```
685    #[must_use = "Call `.register()` to start receiving events"]
686    pub fn state_changed<F>(mut self, callback: F) -> Self
687    where
688        F: FnMut(&Stream, &mut D, StreamState, StreamState) + 'static,
689    {
690        self.callbacks.state_changed = Some(Box::new(callback));
691        self
692    }
693
694    /// Set the stream `control_info` event callback of the listener.
695    ///
696    /// This event is emitted when there is information about a control.
697    ///
698    /// # Callback parameters
699    /// `stream`: The stream  
700    /// `user_data`: User data  
701    /// `id`: Type of the control  
702    /// `control`: The control
703    ///
704    /// # Examples
705    /// ```
706    /// # use pipewire::stream::Stream;
707    /// # fn example(stream: Stream) {
708    /// let stream_listener = stream.add_local_listener::<()>()
709    ///     .control_info(|_stream, _user_data, id, _control| println!("Stream control info {id}"))
710    ///     .register();
711    /// # }
712    /// ```
713    #[must_use = "Call `.register()` to start receiving events"]
714    pub fn control_info<F>(mut self, callback: F) -> Self
715    where
716        F: FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control) + 'static,
717    {
718        self.callbacks.control_info = Some(Box::new(callback));
719        self
720    }
721
722    /// Set the stream `io_changed` event callback of the listener.
723    ///
724    /// This event is emitted when IO is changed on the stream.
725    ///
726    /// # Callback parameters
727    /// `stream`: The stream  
728    /// `user_data`: User data  
729    /// `id`: Type of the IO area  
730    /// `area`: The IO area  
731    /// `size`: The IO area size
732    ///
733    /// # Examples
734    /// ```
735    /// # use pipewire::stream::Stream;
736    /// # fn example(stream: Stream) {
737    /// let stream_listener = stream.add_local_listener::<()>()
738    ///     .io_changed(|_stream, _user_data, id, _area, size| println!("Stream IO change: IO type {id}"))
739    ///     .register();
740    /// # }
741    /// ```
742    #[must_use = "Call `.register()` to start receiving events"]
743    pub fn io_changed<F>(mut self, callback: F) -> Self
744    where
745        F: FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32) + 'static,
746    {
747        self.callbacks.io_changed = Some(Box::new(callback));
748        self
749    }
750
751    /// Set the stream `param_changed` event callback of the listener.
752    ///
753    /// This event is emitted when a param is changed.
754    ///
755    /// # Callback parameters
756    /// `stream`: The stream  
757    /// `user_data`: User data  
758    /// `id`: Type of the param  
759    /// `param`: The param
760    ///
761    /// # Examples
762    /// ```
763    /// # use pipewire::stream::Stream;
764    /// # use pipewire::spa::pod::Pod;
765    /// # fn example(stream: Stream) {
766    /// let stream_listener = stream.add_local_listener::<()>()
767    ///     .param_changed(|_stream, _user_data, id, param| println!("Stream param change: id {id}, param {:?}",
768    ///         param.map(Pod::as_bytes)))
769    ///     .register();
770    /// # }
771    /// ```
772    #[must_use = "Call `.register()` to start receiving events"]
773    pub fn param_changed<F>(mut self, callback: F) -> Self
774    where
775        F: FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>) + 'static,
776    {
777        self.callbacks.param_changed = Some(Box::new(callback));
778        self
779    }
780
781    /// Set the stream `add_buffer` event callback of the listener.
782    ///
783    /// This event is emitted when a buffer was added for this stream.
784    ///
785    /// # Callback parameters
786    /// `stream`: The stream  
787    /// `user_data`: User data  
788    /// `buffer`: The buffer
789    ///
790    /// # Examples
791    /// ```
792    /// # use pipewire::stream::Stream;
793    /// # fn example(stream: Stream) {
794    /// let stream_listener = stream.add_local_listener::<()>()
795    ///     .add_buffer(|_stream, _user_data, buffer| println!("Stream buffer added {buffer:?}"))
796    ///     .register();
797    /// # }
798    /// ```
799    #[must_use = "Call `.register()` to start receiving events"]
800    pub fn add_buffer<F>(mut self, callback: F) -> Self
801    where
802        F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
803    {
804        self.callbacks.add_buffer = Some(Box::new(callback));
805        self
806    }
807
808    /// Set the stream `remove_buffer` event callback of the listener.
809    ///
810    /// This event is emitted when a buffer was removed for this stream.
811    ///
812    /// # Callback parameters
813    /// `stream`: The stream  
814    /// `user_data`: User data  
815    /// `buffer`: The buffer
816    ///
817    /// # Examples
818    /// ```
819    /// # use pipewire::stream::Stream;
820    /// # fn example(stream: Stream) {
821    /// let stream_listener = stream.add_local_listener::<()>()
822    ///     .remove_buffer(|_stream, _user_data, buffer| println!("Stream buffer removed {buffer:?}"))
823    ///     .register();
824    /// # }
825    /// ```
826    #[must_use = "Call `.register()` to start receiving events"]
827    pub fn remove_buffer<F>(mut self, callback: F) -> Self
828    where
829        F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
830    {
831        self.callbacks.remove_buffer = Some(Box::new(callback));
832        self
833    }
834
835    /// Set the stream `process` event callback of the listener.
836    ///
837    /// This event is emitted when a buffer can be queued (for playback streams) or dequeued (for capture streams).
838    ///
839    /// This is normally called from the mainloop but can also be called directly from the realtime data thread if the user is prepared to deal with this.
840    ///
841    /// # Callback parameters
842    /// `stream`: The stream  
843    /// `user_data`: User data
844    ///
845    /// # Examples
846    /// ```
847    /// # use pipewire::stream::Stream;
848    /// # fn example(stream: Stream) {
849    /// let stream_listener = stream.add_local_listener::<()>()
850    ///     .process(|stream, _user_data| {
851    ///         println!("Stream can be processed");
852    ///         let buf = stream.dequeue_buffer();
853    ///         // Produce or consume data using the buffer
854    ///         // The buffer is enqueued back to the stream when it's dropped
855    ///     })
856    ///     .register();
857    /// # }
858    /// ```
859    #[must_use = "Call `.register()` to start receiving events"]
860    pub fn process<F>(mut self, callback: F) -> Self
861    where
862        F: FnMut(&Stream, &mut D) + 'static,
863    {
864        self.callbacks.process = Some(Box::new(callback));
865        self
866    }
867
868    /// Set the stream `drained` event callback of the listener.
869    ///
870    /// This event is emitted when the stream is drained.
871    ///
872    /// # Callback parameters
873    /// `stream`: The stream  
874    /// `user_data`: User data
875    ///
876    /// # Examples
877    /// ```
878    /// # use pipewire::stream::Stream;
879    /// # fn example(stream: Stream) {
880    /// let stream_listener = stream.add_local_listener::<()>()
881    ///     .drained(|_stream, _user_data| println!("Stream is drained"))
882    ///     .register();
883    /// # }
884    /// ```
885    #[must_use = "Call `.register()` to start receiving events"]
886    pub fn drained<F>(mut self, callback: F) -> Self
887    where
888        F: FnMut(&Stream, &mut D) + 'static,
889    {
890        self.callbacks.drained = Some(Box::new(callback));
891        self
892    }
893
894    /// Subscribe to events and register any provided callbacks.
895    pub fn register(self) -> Result<StreamListener<D>, Error> {
896        let (events, data) = self.callbacks.into_raw();
897        let (listener, data) = unsafe {
898            let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
899            let raw_listener = Box::into_raw(listener);
900            let raw_data = Box::into_raw(data);
901            pw_sys::pw_stream_add_listener(
902                self.stream.as_raw_ptr(),
903                raw_listener,
904                events.as_ref().get_ref(),
905                raw_data as *mut _,
906            );
907            (Box::from_raw(raw_listener), Box::from_raw(raw_data))
908        };
909        Ok(StreamListener {
910            listener,
911            _events: events,
912            _data: data,
913        })
914    }
915}
916
917/// An owned listener for stream events.
918///
919/// This is created by [`stream::ListenerLocalBuilder`][ListenerLocalBuilder] and will receive events as long as it is alive.
920/// When this gets dropped, the listener gets unregistered and no events will be received by it.
921#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
922pub struct StreamListener<D> {
923    listener: Box<spa_sys::spa_hook>,
924    // Need to stay allocated while the listener is registered
925    _events: Pin<Box<pw_sys::pw_stream_events>>,
926    _data: Box<ListenerLocalCallbacks<D>>,
927}
928
929impl<D> StreamListener<D> {
930    /// Stop the listener from receiving any events
931    ///
932    /// Removes the listener registration and cleans up allocated resources.
933    pub fn unregister(self) {
934        // do nothing, drop will clean up.
935    }
936}
937
938impl<D> std::ops::Drop for StreamListener<D> {
939    fn drop(&mut self) {
940        spa::utils::hook::remove(*self.listener);
941    }
942}
943
944bitflags! {
945    /// Extra flags that can be used in [`Stream::connect()`]
946    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
947    pub struct StreamFlags: pw_sys::pw_stream_flags {
948        const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
949        const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
950        const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
951        const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
952        const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
953        const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
954        const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
955        const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
956        const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
957        #[cfg(feature = "v0_3_41")]
958        const TRIGGER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_TRIGGER;
959    }
960}