pipewire/
node.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4use bitflags::bitflags;
5use libc::c_void;
6use std::ops::Deref;
7use std::pin::Pin;
8use std::{ffi::CStr, ptr};
9use std::{fmt, mem};
10
11use crate::{
12    proxy::{Listener, Proxy, ProxyT},
13    types::ObjectType,
14};
15use spa::{pod::Pod, spa_interface_call_method};
16
17#[derive(Debug)]
18pub struct Node {
19    proxy: Proxy,
20}
21
22impl Node {
23    // TODO: add non-local version when we'll bind pw_thread_loop_start()
24    #[must_use]
25    pub fn add_listener_local(&self) -> NodeListenerLocalBuilder<'_> {
26        NodeListenerLocalBuilder {
27            node: self,
28            cbs: ListenerLocalCallbacks::default(),
29        }
30    }
31
32    /// Subscribe to parameter changes
33    ///
34    /// Automatically emit `param` events for the given ids when they are changed
35    // FIXME: Return result?
36    pub fn subscribe_params(&self, ids: &[spa::param::ParamType]) {
37        unsafe {
38            spa_interface_call_method!(
39                self.proxy.as_ptr(),
40                pw_sys::pw_node_methods,
41                subscribe_params,
42                ids.as_ptr() as *mut _,
43                ids.len().try_into().unwrap()
44            );
45        }
46    }
47
48    /// Enumerate node parameters
49    ///
50    /// Start enumeration of node parameters. For each param, a
51    /// param event will be emitted.
52    ///
53    /// # Parameters
54    /// `seq`: a sequence number to place in the reply \
55    /// `id`: the parameter id to enum, or [`None`] to allow any id \
56    /// `start`: the start index or 0 for the first param \
57    /// `num`: the maximum number of params to retrieve ([`u32::MAX`] may be used to retrieve all params)
58    // FIXME: Add filter parameter
59    // FIXME: Return result?
60    pub fn enum_params(&self, seq: i32, id: Option<spa::param::ParamType>, start: u32, num: u32) {
61        let id = id.map(|id| id.as_raw()).unwrap_or(crate::constants::ID_ANY);
62
63        unsafe {
64            spa_interface_call_method!(
65                self.proxy.as_ptr(),
66                pw_sys::pw_node_methods,
67                enum_params,
68                seq,
69                id,
70                start,
71                num,
72                std::ptr::null()
73            );
74        }
75    }
76
77    pub fn set_param(&self, id: spa::param::ParamType, flags: u32, param: &Pod) {
78        unsafe {
79            spa_interface_call_method!(
80                self.proxy.as_ptr(),
81                pw_sys::pw_node_methods,
82                set_param,
83                id.as_raw(),
84                flags,
85                param.as_raw_ptr()
86            );
87        }
88    }
89
90    pub fn send_command(&self, command: &spa::node::command::NodeCommand) {
91        unsafe {
92            spa_interface_call_method!(
93                self.proxy.as_ptr(),
94                pw_sys::pw_node_methods,
95                send_command,
96                command.as_raw_ptr()
97            );
98        }
99    }
100}
101
102impl ProxyT for Node {
103    fn type_() -> ObjectType {
104        ObjectType::Node
105    }
106
107    fn upcast(self) -> Proxy {
108        self.proxy
109    }
110
111    fn upcast_ref(&self) -> &Proxy {
112        &self.proxy
113    }
114
115    unsafe fn from_proxy_unchecked(proxy: Proxy) -> Self
116    where
117        Self: Sized,
118    {
119        Self { proxy }
120    }
121}
122
123#[derive(Default)]
124struct ListenerLocalCallbacks {
125    #[allow(clippy::type_complexity)]
126    info: Option<Box<dyn Fn(&NodeInfoRef)>>,
127    #[allow(clippy::type_complexity)]
128    param: Option<Box<dyn Fn(i32, spa::param::ParamType, u32, u32, Option<&Pod>)>>,
129}
130
131pub struct NodeListenerLocalBuilder<'a> {
132    node: &'a Node,
133    cbs: ListenerLocalCallbacks,
134}
135
136#[repr(transparent)]
137pub struct NodeInfoRef(pw_sys::pw_node_info);
138
139impl NodeInfoRef {
140    pub fn as_raw(&self) -> &pw_sys::pw_node_info {
141        &self.0
142    }
143
144    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_node_info {
145        std::ptr::addr_of!(self.0).cast_mut()
146    }
147
148    pub fn id(&self) -> u32 {
149        self.0.id
150    }
151
152    pub fn max_input_ports(&self) -> u32 {
153        self.0.max_input_ports
154    }
155
156    pub fn max_output_ports(&self) -> u32 {
157        self.0.max_output_ports
158    }
159
160    pub fn change_mask(&self) -> NodeChangeMask {
161        NodeChangeMask::from_bits_retain(self.0.change_mask)
162    }
163
164    pub fn n_input_ports(&self) -> u32 {
165        self.0.n_input_ports
166    }
167
168    pub fn n_output_ports(&self) -> u32 {
169        self.0.n_output_ports
170    }
171
172    pub fn state(&self) -> NodeState<'_> {
173        let raw_state = self.0.state;
174        match raw_state {
175            pw_sys::pw_node_state_PW_NODE_STATE_ERROR => {
176                let error = unsafe {
177                    let error = self.0.error;
178                    CStr::from_ptr(error).to_str().unwrap()
179                };
180                NodeState::Error(error)
181            }
182            pw_sys::pw_node_state_PW_NODE_STATE_CREATING => NodeState::Creating,
183            pw_sys::pw_node_state_PW_NODE_STATE_SUSPENDED => NodeState::Suspended,
184            pw_sys::pw_node_state_PW_NODE_STATE_IDLE => NodeState::Idle,
185            pw_sys::pw_node_state_PW_NODE_STATE_RUNNING => NodeState::Running,
186            _ => panic!("Invalid node state: {raw_state}"),
187        }
188    }
189
190    pub fn props(&self) -> Option<&spa::utils::dict::DictRef> {
191        let props_ptr: *mut spa::utils::dict::DictRef = self.0.props.cast();
192        ptr::NonNull::new(props_ptr).map(|ptr| unsafe { ptr.as_ref() })
193    }
194
195    /// Get the param infos for the node.
196    pub fn params(&self) -> &[spa::param::ParamInfo] {
197        unsafe {
198            let params_ptr = self.0.params;
199
200            if params_ptr.is_null() {
201                &[]
202            } else {
203                std::slice::from_raw_parts(
204                    params_ptr as *const _,
205                    self.0.n_params.try_into().unwrap(),
206                )
207            }
208        }
209    }
210}
211
212impl fmt::Debug for NodeInfoRef {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        f.debug_struct("NodeInfoRef")
215            .field("id", &self.id())
216            .field("max-input-ports", &self.max_input_ports())
217            .field("max-output-ports", &self.max_output_ports())
218            .field("change-mask", &self.change_mask())
219            .field("n-input-ports", &self.n_input_ports())
220            .field("n-output-ports", &self.n_output_ports())
221            .field("state", &self.state())
222            .field("props", &self.props())
223            .field("params", &self.params())
224            .finish()
225    }
226}
227
228pub struct NodeInfo {
229    ptr: ptr::NonNull<pw_sys::pw_node_info>,
230}
231
232impl NodeInfo {
233    pub fn new(ptr: ptr::NonNull<pw_sys::pw_node_info>) -> Self {
234        Self { ptr }
235    }
236
237    /// Create a `NodeInfo` from a raw `pw_sys::pw_node_info`.
238    ///
239    /// # Safety
240    /// `ptr` must point to a valid, well aligned `pw_sys::pw_node_info`.
241    pub fn from_raw(raw: *mut pw_sys::pw_node_info) -> Self {
242        Self {
243            ptr: ptr::NonNull::new(raw).expect("Provided pointer is null"),
244        }
245    }
246
247    pub fn into_raw(self) -> *mut pw_sys::pw_node_info {
248        std::mem::ManuallyDrop::new(self).ptr.as_ptr()
249    }
250}
251
252impl Drop for NodeInfo {
253    fn drop(&mut self) {
254        unsafe { pw_sys::pw_node_info_free(self.ptr.as_ptr()) }
255    }
256}
257
258impl std::ops::Deref for NodeInfo {
259    type Target = NodeInfoRef;
260
261    fn deref(&self) -> &Self::Target {
262        unsafe { self.ptr.cast::<NodeInfoRef>().as_ref() }
263    }
264}
265
266impl AsRef<NodeInfoRef> for NodeInfo {
267    fn as_ref(&self) -> &NodeInfoRef {
268        self.deref()
269    }
270}
271
272bitflags! {
273    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
274    pub struct NodeChangeMask: u64 {
275        const INPUT_PORTS = pw_sys::PW_NODE_CHANGE_MASK_INPUT_PORTS as u64;
276        const OUTPUT_PORTS = pw_sys::PW_NODE_CHANGE_MASK_OUTPUT_PORTS as u64;
277        const STATE = pw_sys::PW_NODE_CHANGE_MASK_STATE as u64;
278        const PROPS = pw_sys::PW_NODE_CHANGE_MASK_PROPS as u64;
279        const PARAMS = pw_sys::PW_NODE_CHANGE_MASK_PARAMS as u64;
280    }
281}
282
283#[derive(Debug)]
284pub enum NodeState<'a> {
285    Error(&'a str),
286    Creating,
287    Suspended,
288    Idle,
289    Running,
290}
291
292impl fmt::Debug for NodeInfo {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("NodeInfo")
295            .field("id", &self.id())
296            .field("max-input-ports", &self.max_input_ports())
297            .field("max-output-ports", &self.max_output_ports())
298            .field("change-mask", &self.change_mask())
299            .field("n-input-ports", &self.n_input_ports())
300            .field("n-output-ports", &self.n_output_ports())
301            .field("state", &self.state())
302            .field("props", &self.props())
303            .field("params", &self.params())
304            .finish()
305    }
306}
307
308pub struct NodeListener {
309    // Need to stay allocated while the listener is registered
310    #[allow(dead_code)]
311    events: Pin<Box<pw_sys::pw_node_events>>,
312    listener: Pin<Box<spa_sys::spa_hook>>,
313    #[allow(dead_code)]
314    data: Box<ListenerLocalCallbacks>,
315}
316
317impl Listener for NodeListener {}
318
319impl Drop for NodeListener {
320    fn drop(&mut self) {
321        spa::utils::hook::remove(*self.listener);
322    }
323}
324
325impl<'a> NodeListenerLocalBuilder<'a> {
326    #[must_use]
327    pub fn info<F>(mut self, info: F) -> Self
328    where
329        F: Fn(&NodeInfoRef) + 'static,
330    {
331        self.cbs.info = Some(Box::new(info));
332        self
333    }
334
335    #[must_use]
336    pub fn param<F>(mut self, param: F) -> Self
337    where
338        F: Fn(i32, spa::param::ParamType, u32, u32, Option<&Pod>) + 'static,
339    {
340        self.cbs.param = Some(Box::new(param));
341        self
342    }
343
344    #[must_use]
345    pub fn register(self) -> NodeListener {
346        unsafe extern "C" fn node_events_info(
347            data: *mut c_void,
348            info: *const pw_sys::pw_node_info,
349        ) {
350            let callbacks = (data as *mut ListenerLocalCallbacks).as_ref().unwrap();
351            let info = ptr::NonNull::new(info as *mut pw_sys::pw_node_info).expect("info is NULL");
352            let info = info.cast::<NodeInfoRef>().as_ref();
353            callbacks.info.as_ref().unwrap()(info);
354        }
355
356        unsafe extern "C" fn node_events_param(
357            data: *mut c_void,
358            seq: i32,
359            id: u32,
360            index: u32,
361            next: u32,
362            param: *const spa_sys::spa_pod,
363        ) {
364            let callbacks = (data as *mut ListenerLocalCallbacks).as_ref().unwrap();
365
366            let id = spa::param::ParamType::from_raw(id);
367            let param = if !param.is_null() {
368                unsafe { Some(Pod::from_raw(param)) }
369            } else {
370                None
371            };
372
373            callbacks.param.as_ref().unwrap()(seq, id, index, next, param);
374        }
375
376        let e = unsafe {
377            let mut e: Pin<Box<pw_sys::pw_node_events>> = Box::pin(mem::zeroed());
378            e.version = pw_sys::PW_VERSION_NODE_EVENTS;
379
380            if self.cbs.info.is_some() {
381                e.info = Some(node_events_info);
382            }
383            if self.cbs.param.is_some() {
384                e.param = Some(node_events_param);
385            }
386
387            e
388        };
389
390        let (listener, data) = unsafe {
391            let node = &self.node.proxy.as_ptr();
392
393            let data = Box::into_raw(Box::new(self.cbs));
394            let mut listener: Pin<Box<spa_sys::spa_hook>> = Box::pin(mem::zeroed());
395            let listener_ptr: *mut spa_sys::spa_hook = listener.as_mut().get_unchecked_mut();
396
397            spa_interface_call_method!(
398                node,
399                pw_sys::pw_node_methods,
400                add_listener,
401                listener_ptr.cast(),
402                e.as_ref().get_ref(),
403                data as *mut _
404            );
405
406            (listener, Box::from_raw(data))
407        };
408
409        NodeListener {
410            events: e,
411            listener,
412            data,
413        }
414    }
415}