pipewire/
port.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! Ports are attached on [nodes](crate::node) and provide interfaces for input or output of data
5//! on the node.
6//!
7//! This module contains wrappers for [`pw_port`](pw_sys::pw_port) and related items.
8
9use bitflags::bitflags;
10use libc::c_void;
11use std::ops::Deref;
12use std::{fmt, mem};
13use std::{pin::Pin, ptr};
14
15use crate::{
16    proxy::{Listener, Proxy, ProxyT},
17    spa::utils::Direction,
18    types::ObjectType,
19};
20use spa::{pod::Pod, spa_interface_call_method};
21
22/// A [proxy][Proxy] to a [port](self).
23#[derive(Debug)]
24pub struct Port {
25    proxy: Proxy,
26}
27
28impl Port {
29    // TODO: add non-local version when we'll bind pw_thread_loop_start()
30    #[must_use = "Use the builder to register event callbacks"]
31    pub fn add_listener_local(&self) -> PortListenerLocalBuilder<'_> {
32        PortListenerLocalBuilder {
33            port: self,
34            cbs: ListenerLocalCallbacks::default(),
35        }
36    }
37
38    /// Subscribe to parameter changes.
39    ///
40    /// Automatically emit [`param`](PortListenerLocalBuilder::param) events for the `ids` when they are changed.
41    ///
42    /// # Permissions
43    /// Requires [`X`](crate::permissions::PermissionFlags::X) permissions on the port.
44    // FIXME: Return result?
45    pub fn subscribe_params(&self, ids: &[spa::param::ParamType]) {
46        unsafe {
47            spa_interface_call_method!(
48                self.proxy.as_ptr(),
49                pw_sys::pw_port_methods,
50                subscribe_params,
51                ids.as_ptr() as *mut _,
52                ids.len().try_into().unwrap()
53            );
54        }
55    }
56
57    /// Enumerate node parameters
58    ///
59    /// Start enumeration of node parameters. For each param, a
60    /// [`param`](PortListenerLocalBuilder::param) event will be emitted.
61    ///
62    /// # Permissions
63    /// Requires [`X`](crate::permissions::PermissionFlags::X) permissions on the port.
64    ///
65    /// # Parameters
66    /// `seq`: a sequence number to place in the reply \
67    /// `id`: the parameter id to enum, or [`None`] to allow any id \
68    /// `start`: the start index or 0 for the first param \
69    /// `num`: the maximum number of params to retrieve ([`u32::MAX`] may be used to retrieve all params)
70    // FIXME: Add filter parameter
71    // FIXME: Return result?
72    pub fn enum_params(&self, seq: i32, id: Option<spa::param::ParamType>, start: u32, num: u32) {
73        let id = id.map(|id| id.as_raw()).unwrap_or(crate::constants::ID_ANY);
74
75        unsafe {
76            spa_interface_call_method!(
77                self.proxy.as_ptr(),
78                pw_sys::pw_node_methods,
79                enum_params,
80                seq,
81                id,
82                start,
83                num,
84                std::ptr::null()
85            );
86        }
87    }
88}
89
90impl ProxyT for Port {
91    fn type_() -> ObjectType {
92        ObjectType::Port
93    }
94
95    fn upcast(self) -> Proxy {
96        self.proxy
97    }
98
99    fn upcast_ref(&self) -> &Proxy {
100        &self.proxy
101    }
102
103    unsafe fn from_proxy_unchecked(proxy: Proxy) -> Self
104    where
105        Self: Sized,
106    {
107        Self { proxy }
108    }
109}
110
111#[derive(Default)]
112struct ListenerLocalCallbacks {
113    #[allow(clippy::type_complexity)]
114    info: Option<Box<dyn Fn(&PortInfoRef)>>,
115    #[allow(clippy::type_complexity)]
116    param: Option<Box<dyn Fn(i32, spa::param::ParamType, u32, u32, Option<&Pod>)>>,
117}
118
119/// A builder for registering port event callbacks.
120///
121/// Use [`Port::add_listener_local`] to create this and register callbacks that will be called when events of interest occur.
122/// After adding callbacks, use [`register`](Self::register) to get back a [`PortListener`].
123///
124/// # Examples
125/// ```
126/// # use pipewire::port::Port;
127/// # use pipewire::spa::pod::Pod;
128/// # fn example(port: Port) {
129/// let port_listener = port.add_listener_local()
130///     .info(|info| println!("New port info: {info:?}"))
131///     .param(|seq, param_type, index, next, param| {
132///         println!("New port param: seq {seq}, param type {param_type:?}, index {index}, next {next}, param {:?}",
133///             param.map(Pod::as_bytes));
134///     })
135///     .register();
136/// # }
137/// ```
138pub struct PortListenerLocalBuilder<'a> {
139    port: &'a Port,
140    cbs: ListenerLocalCallbacks,
141}
142
143#[repr(transparent)]
144pub struct PortInfoRef(pw_sys::pw_port_info);
145
146impl PortInfoRef {
147    pub fn as_raw(&self) -> &pw_sys::pw_port_info {
148        &self.0
149    }
150
151    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_port_info {
152        std::ptr::addr_of!(self.0).cast_mut()
153    }
154
155    pub fn id(&self) -> u32 {
156        self.0.id
157    }
158
159    pub fn direction(&self) -> Direction {
160        Direction::from_raw(self.0.direction)
161    }
162
163    pub fn change_mask(&self) -> PortChangeMask {
164        PortChangeMask::from_bits_retain(self.0.change_mask)
165    }
166
167    pub fn props(&self) -> Option<&spa::utils::dict::DictRef> {
168        let props_ptr: *mut spa::utils::dict::DictRef = self.0.props.cast();
169        ptr::NonNull::new(props_ptr).map(|ptr| unsafe { ptr.as_ref() })
170    }
171
172    /// Get the param infos for the port.
173    pub fn params(&self) -> &[spa::param::ParamInfo] {
174        let params = self.0.params;
175        if params.is_null() {
176            &[]
177        } else {
178            unsafe {
179                std::slice::from_raw_parts(params as *const _, self.0.n_params.try_into().unwrap())
180            }
181        }
182    }
183}
184
185impl fmt::Debug for PortInfoRef {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        f.debug_struct("PortInfoRef")
188            .field("id", &self.id())
189            .field("direction", &self.direction())
190            .field("change-mask", &self.change_mask())
191            .field("props", &self.props())
192            .field("params", &self.params())
193            .finish()
194    }
195}
196
197pub struct PortInfo {
198    ptr: ptr::NonNull<pw_sys::pw_port_info>,
199}
200
201impl PortInfo {
202    pub fn new(ptr: ptr::NonNull<pw_sys::pw_port_info>) -> Self {
203        Self { ptr }
204    }
205
206    pub fn from_raw(raw: *mut pw_sys::pw_port_info) -> Self {
207        Self {
208            ptr: ptr::NonNull::new(raw).expect("Provided pointer is null"),
209        }
210    }
211
212    pub fn into_raw(self) -> *mut pw_sys::pw_port_info {
213        std::mem::ManuallyDrop::new(self).ptr.as_ptr()
214    }
215}
216
217impl Drop for PortInfo {
218    fn drop(&mut self) {
219        unsafe { pw_sys::pw_port_info_free(self.ptr.as_ptr()) }
220    }
221}
222
223impl std::ops::Deref for PortInfo {
224    type Target = PortInfoRef;
225
226    fn deref(&self) -> &Self::Target {
227        unsafe { self.ptr.cast::<PortInfoRef>().as_ref() }
228    }
229}
230
231impl AsRef<PortInfoRef> for PortInfo {
232    fn as_ref(&self) -> &PortInfoRef {
233        self.deref()
234    }
235}
236
237bitflags! {
238    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
239    pub struct PortChangeMask: u64 {
240        const PROPS = pw_sys::PW_PORT_CHANGE_MASK_PROPS as u64;
241        const PARAMS = pw_sys::PW_PORT_CHANGE_MASK_PARAMS as u64;
242    }
243}
244
245impl fmt::Debug for PortInfo {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        f.debug_struct("PortInfo")
248            .field("id", &self.id())
249            .field("direction", &self.direction())
250            .field("change-mask", &self.change_mask())
251            .field("props", &self.props())
252            .field("params", &self.params())
253            .finish()
254    }
255}
256
257/// An owned listener for port events.
258///
259/// This is created by [`PortListenerLocalBuilder`] and will receive events as long as it is alive.
260/// When this gets dropped, the listener gets unregistered and no events will be received by it.
261#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
262pub struct PortListener {
263    // Need to stay allocated while the listener is registered
264    #[allow(dead_code)]
265    events: Pin<Box<pw_sys::pw_port_events>>,
266    listener: Pin<Box<spa_sys::spa_hook>>,
267    #[allow(dead_code)]
268    data: Box<ListenerLocalCallbacks>,
269}
270
271impl Listener for PortListener {}
272
273impl Drop for PortListener {
274    fn drop(&mut self) {
275        spa::utils::hook::remove(*self.listener);
276    }
277}
278
279impl<'a> PortListenerLocalBuilder<'a> {
280    /// Set the port `info` event callback of the listener.
281    ///
282    /// # Callback parameters
283    /// `info`: Info about the port.
284    ///
285    /// # Examples
286    /// ```
287    /// # use pipewire::port::Port;
288    /// # fn example(port: Port) {
289    /// let port_listener = port.add_listener_local()
290    ///     .info(|info| println!("New port info: {info:?}"))
291    ///     .register();
292    /// # }
293    /// ```
294    #[must_use = "Call `.register()` to start receiving events"]
295    pub fn info<F>(mut self, info: F) -> Self
296    where
297        F: Fn(&PortInfoRef) + 'static,
298    {
299        self.cbs.info = Some(Box::new(info));
300        self
301    }
302
303    /// Set the port `param` event callback of the listener.
304    ///
305    /// This event is emitted as a result of [`enum_params`](Port::enum_params) or
306    /// [`subscribe_params`](Port::subscribe_params).
307    ///
308    /// # Callback parameters
309    /// `seq`: The sequence number of the request  
310    /// `param_type`: The param type  
311    /// `index`: The param index  
312    /// `next`: The param index of the next param  
313    /// `param`: The param
314    ///
315    /// # Examples
316    /// ```
317    /// # use pipewire::port::Port;
318    /// # use pipewire::spa::pod::Pod;
319    /// # fn example(port: Port) {
320    /// let port_listener = port.add_listener_local()
321    ///     .param(|seq, param_type, index, next, param| {
322    ///         println!("New port param: seq {seq}, param type {param_type:?}, index {index}, next {next}, param {:?}",
323    ///             param.map(Pod::as_bytes));
324    ///     })
325    ///     .register();
326    /// # }
327    /// ```
328    #[must_use = "Call `.register()` to start receiving events"]
329    pub fn param<F>(mut self, param: F) -> Self
330    where
331        F: Fn(i32, spa::param::ParamType, u32, u32, Option<&Pod>) + 'static,
332    {
333        self.cbs.param = Some(Box::new(param));
334        self
335    }
336
337    /// Subscribe to events and register any provided callbacks.
338    pub fn register(self) -> PortListener {
339        unsafe extern "C" fn port_events_info(
340            data: *mut c_void,
341            info: *const pw_sys::pw_port_info,
342        ) {
343            let callbacks = (data as *mut ListenerLocalCallbacks).as_ref().unwrap();
344            let info = ptr::NonNull::new(info as *mut pw_sys::pw_port_info).expect("info is NULL");
345            let info = info.cast::<PortInfoRef>().as_ref();
346            callbacks.info.as_ref().unwrap()(info);
347        }
348
349        unsafe extern "C" fn port_events_param(
350            data: *mut c_void,
351            seq: i32,
352            id: u32,
353            index: u32,
354            next: u32,
355            param: *const spa_sys::spa_pod,
356        ) {
357            let callbacks = (data as *mut ListenerLocalCallbacks).as_ref().unwrap();
358
359            let id = spa::param::ParamType::from_raw(id);
360            let param = if !param.is_null() {
361                unsafe { Some(Pod::from_raw(param)) }
362            } else {
363                None
364            };
365
366            callbacks.param.as_ref().unwrap()(seq, id, index, next, param);
367        }
368
369        let e = unsafe {
370            let mut e: Pin<Box<pw_sys::pw_port_events>> = Box::pin(mem::zeroed());
371            e.version = pw_sys::PW_VERSION_PORT_EVENTS;
372
373            if self.cbs.info.is_some() {
374                e.info = Some(port_events_info);
375            }
376            if self.cbs.param.is_some() {
377                e.param = Some(port_events_param);
378            }
379
380            e
381        };
382
383        let (listener, data) = unsafe {
384            let port = &self.port.proxy.as_ptr();
385
386            let data = Box::into_raw(Box::new(self.cbs));
387            let mut listener: Pin<Box<spa_sys::spa_hook>> = Box::pin(mem::zeroed());
388            let listener_ptr: *mut spa_sys::spa_hook = listener.as_mut().get_unchecked_mut();
389
390            spa_interface_call_method!(
391                port,
392                pw_sys::pw_port_methods,
393                add_listener,
394                listener_ptr.cast(),
395                e.as_ref().get_ref(),
396                data as *mut _
397            );
398
399            (listener, Box::from_raw(data))
400        };
401
402        PortListener {
403            events: e,
404            listener,
405            data,
406        }
407    }
408}