pipewire/loop_/
mod.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4use std::{convert::TryInto, os::unix::prelude::*, ptr, time::Duration};
5
6use libc::{c_int, c_void};
7pub use rustix::process::Signal;
8use spa::{spa_interface_call_method, support::system::IoFlags, utils::result::SpaResult};
9
10use crate::utils::assert_main_thread;
11
12mod box_;
13pub use box_::*;
14mod rc;
15pub use rc::*;
16
17/// A transparent wrapper around a raw [`pw_loop`](`pw_sys::pw_loop`).
18/// It is usually only seen in a reference (`&Loop`), and does not own the `pw_loop`.
19///
20/// Owned versions, [`LoopRc`] for shared ownership and [`LoopBox`] for unique ownership, are available,
21/// which lets you create and own a [`pw_loop`](`pw_sys::pw_loop`).
22///
23/// Other objects, such as [`MainLoop`](`crate::main_loop::MainLoop`), can also contain loops.
24#[repr(transparent)]
25pub struct Loop(pw_sys::pw_loop);
26
27impl Loop {
28    pub fn as_raw(&self) -> &pw_sys::pw_loop {
29        &self.0
30    }
31
32    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_loop {
33        std::ptr::addr_of!(self.0).cast_mut()
34    }
35
36    /// Get the file descriptor backing this loop.
37    pub fn fd(&self) -> BorrowedFd<'_> {
38        unsafe {
39            let mut iface = self.as_raw().control.as_ref().unwrap().iface;
40
41            let raw_fd = spa_interface_call_method!(
42                &mut iface as *mut spa_sys::spa_interface,
43                spa_sys::spa_loop_control_methods,
44                get_fd,
45            );
46
47            BorrowedFd::borrow_raw(raw_fd)
48        }
49    }
50
51    /// Enter a loop
52    ///
53    /// Start an iteration of the loop. This function should be called
54    /// before calling iterate and is typically used to capture the thread
55    /// that this loop will run in.
56    ///
57    /// # Safety
58    /// Each call of `enter` must be paired with a call of `leave`.
59    pub unsafe fn enter(&self) {
60        let mut iface = self.as_raw().control.as_ref().unwrap().iface;
61
62        spa_interface_call_method!(
63            &mut iface as *mut spa_sys::spa_interface,
64            spa_sys::spa_loop_control_methods,
65            enter,
66        )
67    }
68
69    /// Leave a loop
70    ///
71    /// Ends the iteration of a loop. This should be called after calling
72    /// iterate.
73    ///
74    /// # Safety
75    /// Each call of `leave` must be paired with a call of `enter`.
76    pub unsafe fn leave(&self) {
77        let mut iface = self.as_raw().control.as_ref().unwrap().iface;
78
79        spa_interface_call_method!(
80            &mut iface as *mut spa_sys::spa_interface,
81            spa_sys::spa_loop_control_methods,
82            leave,
83        )
84    }
85
86    /// Perform one iteration of the loop.
87    ///
88    /// Timeout must be provided, see [`Timeout`].
89    ///
90    /// This function will block
91    /// up to the provided timeout and then dispatch the fds with activity.
92    /// The number of dispatched fds is returned.
93    ///
94    /// This will automatically call [`Self::enter()`] on the loop before iterating, and [`Self::leave()`] afterwards.
95    ///
96    /// # Panics
97    /// This function will panic if the provided timeout as milliseconds does not fit inside a
98    /// `c_int` integer.
99    pub fn iterate(&self, timeout: Timeout) -> i32 {
100        struct LeaveGuard<'a>(&'a Loop);
101
102        impl Drop for LeaveGuard<'_> {
103            fn drop(&mut self) {
104                unsafe {
105                    self.0.leave();
106                }
107            }
108        }
109
110        unsafe {
111            self.enter();
112
113            let _guard = LeaveGuard(self);
114
115            self.iterate_unguarded(timeout)
116        }
117    }
118
119    /// A variant of [`iterate()`](`Self::iterate()`) that does not call [`Self::enter()`]  and [`Self::leave()`] on the loop.
120    ///
121    /// # Safety
122    /// Before calling this, [`Self::enter()`] must be called, and [`Self::leave()`] must be called afterwards.
123    pub unsafe fn iterate_unguarded(&self, timeout: Timeout) -> i32 {
124        let mut iface = self.as_raw().control.as_ref().unwrap().iface;
125
126        let timeout: c_int =
127            c_int::try_from(timeout).expect("Provided timeout does not fit in a c_int");
128
129        spa_interface_call_method!(
130            &mut iface as *mut spa_sys::spa_interface,
131            spa_sys::spa_loop_control_methods,
132            iterate,
133            timeout
134        )
135    }
136
137    /// Register some type of IO object with a callback that is called when reading/writing on the IO object
138    /// is available.
139    ///
140    /// The specified `event_mask` determines whether to trigger when either input, output, or any of the two is available.
141    ///
142    /// The returned IoSource needs to take ownership of the IO object, but will provide a reference to the callback when called.
143    #[must_use]
144    pub fn add_io<I, F>(&self, io: I, event_mask: IoFlags, callback: F) -> IoSource<'_, I>
145    where
146        I: AsRawFd,
147        F: Fn(&mut I) + 'static,
148        Self: Sized,
149    {
150        unsafe extern "C" fn call_closure<I>(data: *mut c_void, _fd: RawFd, _mask: u32)
151        where
152            I: AsRawFd,
153        {
154            let (io, callback) = (data as *mut IoSourceData<I>).as_mut().unwrap();
155            callback(io);
156        }
157
158        let fd = io.as_raw_fd();
159        let data = Box::into_raw(Box::new((io, Box::new(callback) as Box<dyn Fn(&mut I)>)));
160
161        let (source, data) = unsafe {
162            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
163
164            let source = spa_interface_call_method!(
165                &mut iface as *mut spa_sys::spa_interface,
166                spa_sys::spa_loop_utils_methods,
167                add_io,
168                fd,
169                event_mask.bits(),
170                // Never let the loop close the fd, this should be handled via `Drop` implementations.
171                false,
172                Some(call_closure::<I>),
173                data as *mut _
174            );
175
176            (source, Box::from_raw(data))
177        };
178
179        let ptr = ptr::NonNull::new(source).expect("source is NULL");
180
181        IoSource {
182            ptr,
183            loop_: self,
184            _data: data,
185        }
186    }
187
188    /// Register a callback to be called whenever the loop is idle.
189    ///
190    /// This can be enabled and disabled as needed with the `enabled` parameter,
191    /// and also with the `enable` method on the returned source.
192    #[must_use]
193    pub fn add_idle<F>(&self, enabled: bool, callback: F) -> IdleSource<'_>
194    where
195        F: Fn() + 'static,
196    {
197        unsafe extern "C" fn call_closure<F>(data: *mut c_void)
198        where
199            F: Fn(),
200        {
201            let callback = (data as *mut F).as_ref().unwrap();
202            callback();
203        }
204
205        let data = Box::into_raw(Box::new(callback));
206
207        let (source, data) = unsafe {
208            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
209
210            let source = spa_interface_call_method!(
211                &mut iface as *mut spa_sys::spa_interface,
212                spa_sys::spa_loop_utils_methods,
213                add_idle,
214                enabled,
215                Some(call_closure::<F>),
216                data as *mut _
217            );
218
219            (source, Box::from_raw(data))
220        };
221
222        let ptr = ptr::NonNull::new(source).expect("source is NULL");
223
224        IdleSource {
225            ptr,
226            loop_: self,
227            _data: data,
228        }
229    }
230
231    /// Register a signal with a callback that is called when the signal is sent.
232    ///
233    /// For example, this can be used to quit the loop when the process receives the `SIGTERM` signal.
234    #[must_use]
235    pub fn add_signal_local<F>(&self, signal: Signal, callback: F) -> SignalSource<'_>
236    where
237        F: Fn() + 'static,
238        Self: Sized,
239    {
240        assert_main_thread();
241
242        unsafe extern "C" fn call_closure<F>(data: *mut c_void, _signal: c_int)
243        where
244            F: Fn(),
245        {
246            let callback = (data as *mut F).as_ref().unwrap();
247            callback();
248        }
249
250        let data = Box::into_raw(Box::new(callback));
251
252        let (source, data) = unsafe {
253            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
254
255            let source = spa_interface_call_method!(
256                &mut iface as *mut spa_sys::spa_interface,
257                spa_sys::spa_loop_utils_methods,
258                add_signal,
259                signal.as_raw(),
260                Some(call_closure::<F>),
261                data as *mut _
262            );
263
264            (source, Box::from_raw(data))
265        };
266
267        let ptr = ptr::NonNull::new(source).expect("source is NULL");
268
269        SignalSource {
270            ptr,
271            loop_: self,
272            _data: data,
273        }
274    }
275
276    /// Register a new event with a callback that is called when the event happens.
277    ///
278    /// The returned [`EventSource`] can be used to trigger the event.
279    #[must_use]
280    pub fn add_event<F>(&self, callback: F) -> EventSource<'_>
281    where
282        F: Fn() + 'static,
283        Self: Sized,
284    {
285        unsafe extern "C" fn call_closure<F>(data: *mut c_void, _count: u64)
286        where
287            F: Fn(),
288        {
289            let callback = (data as *mut F).as_ref().unwrap();
290            callback();
291        }
292
293        let data = Box::into_raw(Box::new(callback));
294
295        let (source, data) = unsafe {
296            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
297
298            let source = spa_interface_call_method!(
299                &mut iface as *mut spa_sys::spa_interface,
300                spa_sys::spa_loop_utils_methods,
301                add_event,
302                Some(call_closure::<F>),
303                data as *mut _
304            );
305            (source, Box::from_raw(data))
306        };
307
308        let ptr = ptr::NonNull::new(source).expect("source is NULL");
309
310        EventSource {
311            ptr,
312            loop_: self,
313            _data: data,
314        }
315    }
316
317    /// Register a timer with the loop with a callback that is called after the timer expired.
318    ///
319    /// The timer will start out inactive, and the returned [`TimerSource`] can be used to arm the timer, or disarm it again.
320    ///
321    /// The callback will be provided with the number of timer expirations since the callback was last called.
322    #[must_use]
323    pub fn add_timer<F>(&self, callback: F) -> TimerSource<'_>
324    where
325        F: Fn(u64) + 'static,
326        Self: Sized,
327    {
328        unsafe extern "C" fn call_closure<F>(data: *mut c_void, expirations: u64)
329        where
330            F: Fn(u64),
331        {
332            let callback = (data as *mut F).as_ref().unwrap();
333            callback(expirations);
334        }
335
336        let data = Box::into_raw(Box::new(callback));
337
338        let (source, data) = unsafe {
339            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
340
341            let source = spa_interface_call_method!(
342                &mut iface as *mut spa_sys::spa_interface,
343                spa_sys::spa_loop_utils_methods,
344                add_timer,
345                Some(call_closure::<F>),
346                data as *mut _
347            );
348            (source, Box::from_raw(data))
349        };
350
351        let ptr = ptr::NonNull::new(source).expect("source is NULL");
352
353        TimerSource {
354            ptr,
355            loop_: self,
356            _data: data,
357        }
358    }
359
360    /// Destroy a source that belongs to this loop.
361    ///
362    /// # Safety
363    /// The provided source must belong to this loop.
364    unsafe fn destroy_source<S>(&self, source: &S)
365    where
366        S: IsSource,
367        Self: Sized,
368    {
369        let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
370
371        spa_interface_call_method!(
372            &mut iface as *mut spa_sys::spa_interface,
373            spa_sys::spa_loop_utils_methods,
374            destroy_source,
375            source.as_ptr()
376        )
377    }
378}
379
380/// Timeout for [`Loop::iterate()`]
381#[derive(Debug, Clone)]
382pub enum Timeout {
383    None,
384    Infinite,
385    Finite(Duration),
386}
387
388impl TryFrom<Timeout> for c_int {
389    type Error = <u128 as TryInto<c_int>>::Error;
390    /// # Errors
391    /// This function will return an error if the provided timeout as milliseconds does not fit inside a
392    /// `c_int` integer.
393    fn try_from(value: Timeout) -> Result<Self, Self::Error> {
394        match value {
395            Timeout::None => Ok(0),
396            Timeout::Infinite => Ok(-1),
397            Timeout::Finite(duration) => duration.as_millis().try_into(),
398        }
399    }
400}
401
402pub trait IsSource {
403    /// Return a valid pointer to a raw `spa_source`.
404    fn as_ptr(&self) -> *mut spa_sys::spa_source;
405}
406
407type IoSourceData<I> = (I, Box<dyn Fn(&mut I) + 'static>);
408
409/// A source that can be used to react to IO events.
410///
411/// This source can be obtained by calling [`add_io`](`Loop::add_io`) on a loop, registering a callback to it.
412pub struct IoSource<'l, I>
413where
414    I: AsRawFd,
415{
416    ptr: ptr::NonNull<spa_sys::spa_source>,
417    loop_: &'l Loop,
418    // Store data wrapper to prevent leak
419    _data: Box<IoSourceData<I>>,
420}
421
422impl<'l, I> IsSource for IoSource<'l, I>
423where
424    I: AsRawFd,
425{
426    fn as_ptr(&self) -> *mut spa_sys::spa_source {
427        self.ptr.as_ptr()
428    }
429}
430
431impl<'l, I> Drop for IoSource<'l, I>
432where
433    I: AsRawFd,
434{
435    fn drop(&mut self) {
436        unsafe { self.loop_.destroy_source(self) }
437    }
438}
439
440/// A source that can be used to have a callback called when the loop is idle.
441///
442/// This source can be obtained by calling [`add_idle`](`Loop::add_idle`) on a loop, registering a callback to it.
443pub struct IdleSource<'l> {
444    ptr: ptr::NonNull<spa_sys::spa_source>,
445    loop_: &'l Loop,
446    // Store data wrapper to prevent leak
447    _data: Box<dyn Fn() + 'static>,
448}
449
450impl<'l> IdleSource<'l> {
451    /// Set the source as enabled or disabled, allowing or preventing the callback from being called.
452    pub fn enable(&self, enable: bool) {
453        unsafe {
454            let mut iface = self.loop_.as_raw().utils.as_ref().unwrap().iface;
455
456            spa_interface_call_method!(
457                &mut iface as *mut spa_sys::spa_interface,
458                spa_sys::spa_loop_utils_methods,
459                enable_idle,
460                self.as_ptr(),
461                enable
462            );
463        }
464    }
465}
466
467impl<'l> IsSource for IdleSource<'l> {
468    fn as_ptr(&self) -> *mut spa_sys::spa_source {
469        self.ptr.as_ptr()
470    }
471}
472
473impl<'l> Drop for IdleSource<'l> {
474    fn drop(&mut self) {
475        unsafe { self.loop_.destroy_source(self) }
476    }
477}
478
479/// A source that can be used to react to signals.
480///
481/// This source can be obtained by calling [`add_signal_local`](`Loop::add_signal_local`) on a loop, registering a callback to it.
482pub struct SignalSource<'l> {
483    ptr: ptr::NonNull<spa_sys::spa_source>,
484    loop_: &'l Loop,
485    // Store data wrapper to prevent leak
486    _data: Box<dyn Fn() + 'static>,
487}
488
489impl<'l> IsSource for SignalSource<'l> {
490    fn as_ptr(&self) -> *mut spa_sys::spa_source {
491        self.ptr.as_ptr()
492    }
493}
494
495impl<'l> Drop for SignalSource<'l> {
496    fn drop(&mut self) {
497        unsafe { self.loop_.destroy_source(self) }
498    }
499}
500
501/// A source that can be used to signal to a loop that an event has occurred.
502///
503/// This source can be obtained by calling [`add_event`](`Loop::add_event`) on a loop, registering a callback to it.
504///
505/// By calling [`signal`](`EventSource::signal`) on the `EventSource`, the loop is signaled that the event has occurred.
506/// It will then call the callback at the next possible occasion.
507pub struct EventSource<'l> {
508    ptr: ptr::NonNull<spa_sys::spa_source>,
509    loop_: &'l Loop,
510    // Store data wrapper to prevent leak
511    _data: Box<dyn Fn() + 'static>,
512}
513
514impl<'l> IsSource for EventSource<'l> {
515    fn as_ptr(&self) -> *mut spa_sys::spa_source {
516        self.ptr.as_ptr()
517    }
518}
519
520impl<'l> EventSource<'l> {
521    /// Signal the loop associated with this source that the event has occurred,
522    /// to make the loop call the callback at the next possible occasion.
523    pub fn signal(&self) -> SpaResult {
524        let res = unsafe {
525            let mut iface = self.loop_.as_raw().utils.as_ref().unwrap().iface;
526
527            spa_interface_call_method!(
528                &mut iface as *mut spa_sys::spa_interface,
529                spa_sys::spa_loop_utils_methods,
530                signal_event,
531                self.as_ptr()
532            )
533        };
534
535        SpaResult::from_c(res)
536    }
537}
538
539impl<'l> Drop for EventSource<'l> {
540    fn drop(&mut self) {
541        unsafe { self.loop_.destroy_source(self) }
542    }
543}
544
545/// A source that can be used to have a callback called on a timer.
546///
547/// This source can be obtained by calling [`add_timer`](`Loop::add_timer`) on a loop, registering a callback to it.
548///
549/// The timer starts out inactive.
550/// You can arm or disarm the timer by calling [`update_timer`](`Self::update_timer`).
551pub struct TimerSource<'l> {
552    ptr: ptr::NonNull<spa_sys::spa_source>,
553    loop_: &'l Loop,
554    // Store data wrapper to prevent leak
555    _data: Box<dyn Fn(u64) + 'static>,
556}
557
558impl<'l> TimerSource<'l> {
559    /// Arm or disarm the timer.
560    ///
561    /// The timer will be called the next time after the provided `value` duration.
562    /// After that, the timer will be repeatedly called again at the the specified `interval`.
563    ///
564    /// If `interval` is `None` or zero, the timer will only be called once. \
565    /// If `value` is `None` or zero, the timer will be disabled.
566    ///
567    /// # Panics
568    /// The provided durations seconds must fit in an i64. Otherwise, this function will panic.
569    pub fn update_timer(&self, value: Option<Duration>, interval: Option<Duration>) -> SpaResult {
570        fn duration_to_timespec(duration: Duration) -> spa_sys::timespec {
571            // Some 32-bit systems e.g. musl add padding fields for 64-bit time compatibility
572            let mut timespec =
573                unsafe { std::mem::MaybeUninit::<spa_sys::timespec>::zeroed().assume_init() };
574
575            timespec.tv_sec = duration.as_secs().try_into().expect("Duration too long");
576
577            #[allow(clippy::unnecessary_fallible_conversions)] // Architecture dependent
578            {
579                timespec.tv_nsec = duration
580                    .subsec_nanos()
581                    .try_into()
582                    .expect("Nanoseconds should fit into timespec");
583            }
584
585            timespec
586        }
587
588        let value = duration_to_timespec(value.unwrap_or_default());
589        let interval = duration_to_timespec(interval.unwrap_or_default());
590
591        let res = unsafe {
592            let mut iface = self.loop_.as_raw().utils.as_ref().unwrap().iface;
593
594            spa_interface_call_method!(
595                &mut iface as *mut spa_sys::spa_interface,
596                spa_sys::spa_loop_utils_methods,
597                update_timer,
598                self.as_ptr(),
599                &value as *const _ as *mut _,
600                &interval as *const _ as *mut _,
601                false
602            )
603        };
604
605        SpaResult::from_c(res)
606    }
607}
608
609impl<'l> IsSource for TimerSource<'l> {
610    fn as_ptr(&self) -> *mut spa_sys::spa_source {
611        self.ptr.as_ptr()
612    }
613}
614
615impl<'l> Drop for TimerSource<'l> {
616    fn drop(&mut self) {
617        unsafe { self.loop_.destroy_source(self) }
618    }
619}