pipewire/
node.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! Nodes are media processing elements that consume or produce buffers with data such as audio or
5//! video.
6//!
7//! For receiving and consuming or producting and sending data from/to PipeWire, see
8//! [streams](crate::stream).
9//!
10//! This module contains wrappers for [`pw_node`](pw_sys::pw_node) and related items.
11
12use bitflags::bitflags;
13use libc::c_void;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::{ffi::CStr, ptr};
17use std::{fmt, mem};
18
19use crate::{
20    proxy::{Listener, Proxy, ProxyT},
21    types::ObjectType,
22};
23use spa::{pod::Pod, spa_interface_call_method};
24
25/// A [proxy][Proxy] to a [node](self).
26#[derive(Debug)]
27pub struct Node {
28    proxy: Proxy,
29}
30
31impl Node {
32    // TODO: add non-local version when we'll bind pw_thread_loop_start()
33    #[must_use = "Use the builder to register event callbacks"]
34    pub fn add_listener_local(&self) -> NodeListenerLocalBuilder<'_> {
35        NodeListenerLocalBuilder {
36            node: self,
37            cbs: ListenerLocalCallbacks::default(),
38        }
39    }
40
41    /// Subscribe to parameter changes
42    ///
43    /// Automatically emit [`param`](NodeListenerLocalBuilder::param) events for the `ids` when they are changed
44    ///
45    /// # Permissions
46    /// Requires [`X`](crate::permissions::PermissionFlags::X) permissions on the node.
47    // FIXME: Return result?
48    pub fn subscribe_params(&self, ids: &[spa::param::ParamType]) {
49        unsafe {
50            spa_interface_call_method!(
51                self.proxy.as_ptr(),
52                pw_sys::pw_node_methods,
53                subscribe_params,
54                ids.as_ptr() as *mut _,
55                ids.len().try_into().unwrap()
56            );
57        }
58    }
59
60    /// Enumerate node parameters
61    ///
62    /// Start enumeration of node parameters. For each param, a
63    /// [`param`](NodeListenerLocalBuilder::param) event will be emitted.
64    ///
65    /// # Permissions
66    /// Requires [`X`](crate::permissions::PermissionFlags::X) permissions on the node.
67    ///
68    /// # Parameters
69    /// `seq`: a sequence number to place in the reply \
70    /// `id`: the parameter id to enum, or [`None`] to allow any id \
71    /// `start`: the start index or 0 for the first param \
72    /// `num`: the maximum number of params to retrieve ([`u32::MAX`] may be used to retrieve all params)
73    // FIXME: Add filter parameter
74    // FIXME: Return result?
75    pub fn enum_params(&self, seq: i32, id: Option<spa::param::ParamType>, start: u32, num: u32) {
76        let id = id.map(|id| id.as_raw()).unwrap_or(crate::constants::ID_ANY);
77
78        unsafe {
79            spa_interface_call_method!(
80                self.proxy.as_ptr(),
81                pw_sys::pw_node_methods,
82                enum_params,
83                seq,
84                id,
85                start,
86                num,
87                std::ptr::null()
88            );
89        }
90    }
91
92    /// Set a parameter on the node.
93    ///
94    /// # Permissions
95    /// Requires [`W`](crate::permissions::PermissionFlags::W) and
96    /// [`X`](crate::permissions::PermissionFlags::X) permissions on the node.
97    pub fn set_param(&self, id: spa::param::ParamType, flags: u32, param: &Pod) {
98        unsafe {
99            spa_interface_call_method!(
100                self.proxy.as_ptr(),
101                pw_sys::pw_node_methods,
102                set_param,
103                id.as_raw(),
104                flags,
105                param.as_raw_ptr()
106            );
107        }
108    }
109
110    /// Send a command to the node.
111    ///
112    /// # Permissions
113    /// Requires [`W`](crate::permissions::PermissionFlags::W) and
114    /// [`X`](crate::permissions::PermissionFlags::X) permissions on the node.
115    pub fn send_command(&self, command: &spa::node::command::NodeCommand) {
116        unsafe {
117            spa_interface_call_method!(
118                self.proxy.as_ptr(),
119                pw_sys::pw_node_methods,
120                send_command,
121                command.as_raw_ptr()
122            );
123        }
124    }
125}
126
127impl ProxyT for Node {
128    fn type_() -> ObjectType {
129        ObjectType::Node
130    }
131
132    fn upcast(self) -> Proxy {
133        self.proxy
134    }
135
136    fn upcast_ref(&self) -> &Proxy {
137        &self.proxy
138    }
139
140    unsafe fn from_proxy_unchecked(proxy: Proxy) -> Self
141    where
142        Self: Sized,
143    {
144        Self { proxy }
145    }
146}
147
148#[derive(Default)]
149struct ListenerLocalCallbacks {
150    #[allow(clippy::type_complexity)]
151    info: Option<Box<dyn Fn(&NodeInfoRef)>>,
152    #[allow(clippy::type_complexity)]
153    param: Option<Box<dyn Fn(i32, spa::param::ParamType, u32, u32, Option<&Pod>)>>,
154}
155
156/// A builder for registering node event callbacks.
157///
158/// Use [`Node::add_listener_local`] to create this and register callbacks that will be called when events of interest occur.
159/// After adding callbacks, use [`register`](Self::register) to get back a [`NodeListener`].
160///
161/// # Examples
162/// ```
163/// # use pipewire::node::Node;
164/// # use pipewire::spa::pod::Pod;
165/// # fn example(node: Node) {
166/// let node_listener = node.add_listener_local()
167///     .info(|info| println!("New node info: {info:?}"))
168///     .param(|seq, param_type, index, next, param| {
169///         println!("New node param: seq {seq}, param type {param_type:?}, index {index}, next {next}, param {:?}",
170///             param.map(Pod::as_bytes));
171///     })
172///     .register();
173/// # }
174/// ```
175pub struct NodeListenerLocalBuilder<'a> {
176    node: &'a Node,
177    cbs: ListenerLocalCallbacks,
178}
179
180#[repr(transparent)]
181pub struct NodeInfoRef(pw_sys::pw_node_info);
182
183impl NodeInfoRef {
184    pub fn as_raw(&self) -> &pw_sys::pw_node_info {
185        &self.0
186    }
187
188    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_node_info {
189        std::ptr::addr_of!(self.0).cast_mut()
190    }
191
192    pub fn id(&self) -> u32 {
193        self.0.id
194    }
195
196    pub fn max_input_ports(&self) -> u32 {
197        self.0.max_input_ports
198    }
199
200    pub fn max_output_ports(&self) -> u32 {
201        self.0.max_output_ports
202    }
203
204    pub fn change_mask(&self) -> NodeChangeMask {
205        NodeChangeMask::from_bits_retain(self.0.change_mask)
206    }
207
208    pub fn n_input_ports(&self) -> u32 {
209        self.0.n_input_ports
210    }
211
212    pub fn n_output_ports(&self) -> u32 {
213        self.0.n_output_ports
214    }
215
216    pub fn state(&self) -> NodeState<'_> {
217        let raw_state = self.0.state;
218        match raw_state {
219            pw_sys::pw_node_state_PW_NODE_STATE_ERROR => {
220                let error = unsafe {
221                    let error = self.0.error;
222                    CStr::from_ptr(error).to_str().unwrap()
223                };
224                NodeState::Error(error)
225            }
226            pw_sys::pw_node_state_PW_NODE_STATE_CREATING => NodeState::Creating,
227            pw_sys::pw_node_state_PW_NODE_STATE_SUSPENDED => NodeState::Suspended,
228            pw_sys::pw_node_state_PW_NODE_STATE_IDLE => NodeState::Idle,
229            pw_sys::pw_node_state_PW_NODE_STATE_RUNNING => NodeState::Running,
230            _ => panic!("Invalid node state: {raw_state}"),
231        }
232    }
233
234    pub fn props(&self) -> Option<&spa::utils::dict::DictRef> {
235        let props_ptr: *mut spa::utils::dict::DictRef = self.0.props.cast();
236        ptr::NonNull::new(props_ptr).map(|ptr| unsafe { ptr.as_ref() })
237    }
238
239    /// Get the param infos for the node.
240    pub fn params(&self) -> &[spa::param::ParamInfo] {
241        unsafe {
242            let params_ptr = self.0.params;
243
244            if params_ptr.is_null() {
245                &[]
246            } else {
247                std::slice::from_raw_parts(
248                    params_ptr as *const _,
249                    self.0.n_params.try_into().unwrap(),
250                )
251            }
252        }
253    }
254}
255
256impl fmt::Debug for NodeInfoRef {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        f.debug_struct("NodeInfoRef")
259            .field("id", &self.id())
260            .field("max-input-ports", &self.max_input_ports())
261            .field("max-output-ports", &self.max_output_ports())
262            .field("change-mask", &self.change_mask())
263            .field("n-input-ports", &self.n_input_ports())
264            .field("n-output-ports", &self.n_output_ports())
265            .field("state", &self.state())
266            .field("props", &self.props())
267            .field("params", &self.params())
268            .finish()
269    }
270}
271
272pub struct NodeInfo {
273    ptr: ptr::NonNull<pw_sys::pw_node_info>,
274}
275
276impl NodeInfo {
277    pub fn new(ptr: ptr::NonNull<pw_sys::pw_node_info>) -> Self {
278        Self { ptr }
279    }
280
281    /// Create a `NodeInfo` from a raw `pw_sys::pw_node_info`.
282    ///
283    /// # Safety
284    /// `ptr` must point to a valid, well aligned `pw_sys::pw_node_info`.
285    pub fn from_raw(raw: *mut pw_sys::pw_node_info) -> Self {
286        Self {
287            ptr: ptr::NonNull::new(raw).expect("Provided pointer is null"),
288        }
289    }
290
291    pub fn into_raw(self) -> *mut pw_sys::pw_node_info {
292        std::mem::ManuallyDrop::new(self).ptr.as_ptr()
293    }
294}
295
296impl Drop for NodeInfo {
297    fn drop(&mut self) {
298        unsafe { pw_sys::pw_node_info_free(self.ptr.as_ptr()) }
299    }
300}
301
302impl std::ops::Deref for NodeInfo {
303    type Target = NodeInfoRef;
304
305    fn deref(&self) -> &Self::Target {
306        unsafe { self.ptr.cast::<NodeInfoRef>().as_ref() }
307    }
308}
309
310impl AsRef<NodeInfoRef> for NodeInfo {
311    fn as_ref(&self) -> &NodeInfoRef {
312        self.deref()
313    }
314}
315
316bitflags! {
317    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
318    pub struct NodeChangeMask: u64 {
319        const INPUT_PORTS = pw_sys::PW_NODE_CHANGE_MASK_INPUT_PORTS as u64;
320        const OUTPUT_PORTS = pw_sys::PW_NODE_CHANGE_MASK_OUTPUT_PORTS as u64;
321        const STATE = pw_sys::PW_NODE_CHANGE_MASK_STATE as u64;
322        const PROPS = pw_sys::PW_NODE_CHANGE_MASK_PROPS as u64;
323        const PARAMS = pw_sys::PW_NODE_CHANGE_MASK_PARAMS as u64;
324    }
325}
326
327#[derive(Debug)]
328pub enum NodeState<'a> {
329    Error(&'a str),
330    Creating,
331    Suspended,
332    Idle,
333    Running,
334}
335
336impl fmt::Debug for NodeInfo {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        f.debug_struct("NodeInfo")
339            .field("id", &self.id())
340            .field("max-input-ports", &self.max_input_ports())
341            .field("max-output-ports", &self.max_output_ports())
342            .field("change-mask", &self.change_mask())
343            .field("n-input-ports", &self.n_input_ports())
344            .field("n-output-ports", &self.n_output_ports())
345            .field("state", &self.state())
346            .field("props", &self.props())
347            .field("params", &self.params())
348            .finish()
349    }
350}
351
352/// An owned listener for node events.
353///
354/// This is created by [`NodeListenerLocalBuilder`] and will receive events as long as it is alive.
355/// When this gets dropped, the listener gets unregistered and no events will be received by it.
356#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
357pub struct NodeListener {
358    // Need to stay allocated while the listener is registered
359    #[allow(dead_code)]
360    events: Pin<Box<pw_sys::pw_node_events>>,
361    listener: Pin<Box<spa_sys::spa_hook>>,
362    #[allow(dead_code)]
363    data: Box<ListenerLocalCallbacks>,
364}
365
366impl Listener for NodeListener {}
367
368impl Drop for NodeListener {
369    fn drop(&mut self) {
370        spa::utils::hook::remove(*self.listener);
371    }
372}
373
374impl<'a> NodeListenerLocalBuilder<'a> {
375    /// Set the node `info` event callback of the listener.
376    ///
377    /// # Callback parameters
378    /// `info`: Info about the node.
379    ///
380    /// # Examples
381    /// ```
382    /// # use pipewire::node::Node;
383    /// # fn example(node: Node) {
384    /// let node_listener = node.add_listener_local()
385    ///     .info(|info| println!("New node info: {info:?}"))
386    ///     .register();
387    /// # }
388    /// ```
389    #[must_use = "Call `.register()` to start receiving events"]
390    pub fn info<F>(mut self, info: F) -> Self
391    where
392        F: Fn(&NodeInfoRef) + 'static,
393    {
394        self.cbs.info = Some(Box::new(info));
395        self
396    }
397
398    /// Set the node `param` event callback of the listener.
399    ///
400    /// This event is emitted as a result of [`enum_params`](Node::enum_params) or
401    /// [`subscribe_params`](Node::subscribe_params).
402    ///
403    /// # Callback parameters
404    /// `seq`: The sequence number of the request  
405    /// `param_type`: The param type  
406    /// `index`: The param index  
407    /// `next`: The param index of the next param  
408    /// `param`: The param
409    /// # Examples
410    /// ```
411    /// # use pipewire::node::Node;
412    /// # use pipewire::spa::pod::Pod;
413    /// # fn example(node: Node) {
414    /// let node_listener = node.add_listener_local()
415    ///     .param(|seq, param_type, index, next, param| {
416    ///         println!("New node param: seq {seq}, param type {param_type:?}, index {index}, next {next}, param {:?}",
417    ///             param.map(Pod::as_bytes));
418    ///     })
419    ///     .register();
420    /// # }
421    /// ```
422    #[must_use = "Call `.register()` to start receiving events"]
423    pub fn param<F>(mut self, param: F) -> Self
424    where
425        F: Fn(i32, spa::param::ParamType, u32, u32, Option<&Pod>) + 'static,
426    {
427        self.cbs.param = Some(Box::new(param));
428        self
429    }
430
431    /// Subscribe to events and register any provided callbacks.
432    pub fn register(self) -> NodeListener {
433        unsafe extern "C" fn node_events_info(
434            data: *mut c_void,
435            info: *const pw_sys::pw_node_info,
436        ) {
437            let callbacks = (data as *mut ListenerLocalCallbacks).as_ref().unwrap();
438            let info = ptr::NonNull::new(info as *mut pw_sys::pw_node_info).expect("info is NULL");
439            let info = info.cast::<NodeInfoRef>().as_ref();
440            callbacks.info.as_ref().unwrap()(info);
441        }
442
443        unsafe extern "C" fn node_events_param(
444            data: *mut c_void,
445            seq: i32,
446            id: u32,
447            index: u32,
448            next: u32,
449            param: *const spa_sys::spa_pod,
450        ) {
451            let callbacks = (data as *mut ListenerLocalCallbacks).as_ref().unwrap();
452
453            let id = spa::param::ParamType::from_raw(id);
454            let param = if !param.is_null() {
455                unsafe { Some(Pod::from_raw(param)) }
456            } else {
457                None
458            };
459
460            callbacks.param.as_ref().unwrap()(seq, id, index, next, param);
461        }
462
463        let e = unsafe {
464            let mut e: Pin<Box<pw_sys::pw_node_events>> = Box::pin(mem::zeroed());
465            e.version = pw_sys::PW_VERSION_NODE_EVENTS;
466
467            if self.cbs.info.is_some() {
468                e.info = Some(node_events_info);
469            }
470            if self.cbs.param.is_some() {
471                e.param = Some(node_events_param);
472            }
473
474            e
475        };
476
477        let (listener, data) = unsafe {
478            let node = &self.node.proxy.as_ptr();
479
480            let data = Box::into_raw(Box::new(self.cbs));
481            let mut listener: Pin<Box<spa_sys::spa_hook>> = Box::pin(mem::zeroed());
482            let listener_ptr: *mut spa_sys::spa_hook = listener.as_mut().get_unchecked_mut();
483
484            spa_interface_call_method!(
485                node,
486                pw_sys::pw_node_methods,
487                add_listener,
488                listener_ptr.cast(),
489                e.as_ref().get_ref(),
490                data as *mut _
491            );
492
493            (listener, Box::from_raw(data))
494        };
495
496        NodeListener {
497            events: e,
498            listener,
499            data,
500        }
501    }
502}