pipewire/loop_/
mod.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! The PipeWire event loop responsible for listening to and handling events.
5//!
6//! For a wrapper that continuously runs a loop in the current thread, see [`main_loop`](crate::main_loop).
7//!
8//! This module contains wrappers for [`pw_loop`](pw_sys::pw_loop) and related items.
9
10use std::{convert::TryInto, os::unix::prelude::*, ptr, time::Duration};
11
12use libc::{c_int, c_void};
13pub use rustix::process::Signal;
14use spa::{spa_interface_call_method, support::system::IoFlags, utils::result::SpaResult};
15
16use crate::utils::assert_main_thread;
17
18mod box_;
19pub use box_::*;
20mod rc;
21pub use rc::*;
22
23/// Transparent wrapper around a [loop](self).
24///
25/// This does not own the underlying object and is usually seen behind a `&` reference.
26///
27/// For owning wrappers that can construct loops, see [`LoopBox`] and [`LoopRc`].
28///
29/// For an explanation of these, see [Smart pointers to PipeWire objects](crate#smart-pointers-to-pipewire-objects).
30#[repr(transparent)]
31pub struct Loop(pw_sys::pw_loop);
32
33impl Loop {
34    pub fn as_raw(&self) -> &pw_sys::pw_loop {
35        &self.0
36    }
37
38    pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_loop {
39        std::ptr::addr_of!(self.0).cast_mut()
40    }
41
42    /// Get the file descriptor backing this loop.
43    pub fn fd(&self) -> BorrowedFd<'_> {
44        unsafe {
45            let mut iface = self.as_raw().control.as_ref().unwrap().iface;
46
47            let raw_fd = spa_interface_call_method!(
48                &mut iface as *mut spa_sys::spa_interface,
49                spa_sys::spa_loop_control_methods,
50                get_fd,
51            );
52
53            BorrowedFd::borrow_raw(raw_fd)
54        }
55    }
56
57    /// Enter a loop
58    ///
59    /// Start an iteration of the loop. This function should be called
60    /// before calling iterate and is typically used to capture the thread
61    /// that this loop will run in.
62    ///
63    /// # Safety
64    /// Each call of `enter` must be paired with a call of `leave`.
65    pub unsafe fn enter(&self) {
66        let mut iface = self.as_raw().control.as_ref().unwrap().iface;
67
68        spa_interface_call_method!(
69            &mut iface as *mut spa_sys::spa_interface,
70            spa_sys::spa_loop_control_methods,
71            enter,
72        )
73    }
74
75    /// Leave a loop
76    ///
77    /// Ends the iteration of a loop. This should be called after calling
78    /// iterate.
79    ///
80    /// # Safety
81    /// Each call of `leave` must be paired with a call of `enter`.
82    pub unsafe fn leave(&self) {
83        let mut iface = self.as_raw().control.as_ref().unwrap().iface;
84
85        spa_interface_call_method!(
86            &mut iface as *mut spa_sys::spa_interface,
87            spa_sys::spa_loop_control_methods,
88            leave,
89        )
90    }
91
92    /// Perform one iteration of the loop.
93    ///
94    /// Timeout must be provided, see [`Timeout`].
95    ///
96    /// This function will block
97    /// up to the provided timeout and then dispatch the fds with activity.
98    /// The number of dispatched fds is returned.
99    ///
100    /// This will automatically call [`Self::enter()`] on the loop before iterating, and [`Self::leave()`] afterwards.
101    ///
102    /// # Panics
103    /// This function will panic if the provided timeout as milliseconds does not fit inside a
104    /// `c_int` integer.
105    pub fn iterate(&self, timeout: Timeout) -> i32 {
106        struct LeaveGuard<'a>(&'a Loop);
107
108        impl Drop for LeaveGuard<'_> {
109            fn drop(&mut self) {
110                unsafe {
111                    self.0.leave();
112                }
113            }
114        }
115
116        unsafe {
117            self.enter();
118
119            let _guard = LeaveGuard(self);
120
121            self.iterate_unguarded(timeout)
122        }
123    }
124
125    /// A variant of [`iterate()`](`Self::iterate()`) that does not call [`Self::enter()`]  and [`Self::leave()`] on the loop.
126    ///
127    /// # Safety
128    /// Before calling this, [`Self::enter()`] must be called, and [`Self::leave()`] must be called afterwards.
129    pub unsafe fn iterate_unguarded(&self, timeout: Timeout) -> i32 {
130        let mut iface = self.as_raw().control.as_ref().unwrap().iface;
131
132        let timeout: c_int =
133            c_int::try_from(timeout).expect("Provided timeout does not fit in a c_int");
134
135        spa_interface_call_method!(
136            &mut iface as *mut spa_sys::spa_interface,
137            spa_sys::spa_loop_control_methods,
138            iterate,
139            timeout
140        )
141    }
142
143    /// Register some type of IO object with a callback that is called when reading/writing on the IO object
144    /// is available.
145    ///
146    /// The specified `event_mask` determines whether to trigger when either input, output, or any of the two is available.
147    ///
148    /// The returned IoSource needs to take ownership of the IO object, but will provide a reference to the callback when called.
149    #[must_use]
150    pub fn add_io<I, F>(&self, io: I, event_mask: IoFlags, callback: F) -> IoSource<'_, I>
151    where
152        I: AsRawFd,
153        F: Fn(&mut I) + 'static,
154        Self: Sized,
155    {
156        unsafe extern "C" fn call_closure<I>(data: *mut c_void, _fd: RawFd, _mask: u32)
157        where
158            I: AsRawFd,
159        {
160            let (io, callback) = (data as *mut IoSourceData<I>).as_mut().unwrap();
161            callback(io);
162        }
163
164        let fd = io.as_raw_fd();
165        let data = Box::into_raw(Box::new((io, Box::new(callback) as Box<dyn Fn(&mut I)>)));
166
167        let (source, data) = unsafe {
168            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
169
170            let source = spa_interface_call_method!(
171                &mut iface as *mut spa_sys::spa_interface,
172                spa_sys::spa_loop_utils_methods,
173                add_io,
174                fd,
175                event_mask.bits(),
176                // Never let the loop close the fd, this should be handled via `Drop` implementations.
177                false,
178                Some(call_closure::<I>),
179                data as *mut _
180            );
181
182            (source, Box::from_raw(data))
183        };
184
185        let ptr = ptr::NonNull::new(source).expect("source is NULL");
186
187        IoSource {
188            ptr,
189            loop_: self,
190            _data: data,
191        }
192    }
193
194    /// Register a callback to be called whenever the loop is idle.
195    ///
196    /// This can be enabled and disabled as needed with the `enabled` parameter,
197    /// and also with the `enable` method on the returned source.
198    #[must_use]
199    pub fn add_idle<F>(&self, enabled: bool, callback: F) -> IdleSource<'_>
200    where
201        F: Fn() + 'static,
202    {
203        unsafe extern "C" fn call_closure<F>(data: *mut c_void)
204        where
205            F: Fn(),
206        {
207            let callback = (data as *mut F).as_ref().unwrap();
208            callback();
209        }
210
211        let data = Box::into_raw(Box::new(callback));
212
213        let (source, data) = unsafe {
214            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
215
216            let source = spa_interface_call_method!(
217                &mut iface as *mut spa_sys::spa_interface,
218                spa_sys::spa_loop_utils_methods,
219                add_idle,
220                enabled,
221                Some(call_closure::<F>),
222                data as *mut _
223            );
224
225            (source, Box::from_raw(data))
226        };
227
228        let ptr = ptr::NonNull::new(source).expect("source is NULL");
229
230        IdleSource {
231            ptr,
232            loop_: self,
233            _data: data,
234        }
235    }
236
237    /// Register a signal with a callback that is called when the signal is sent.
238    ///
239    /// For example, this can be used to quit the loop when the process receives the `SIGTERM` signal.
240    #[must_use]
241    pub fn add_signal_local<F>(&self, signal: Signal, callback: F) -> SignalSource<'_>
242    where
243        F: Fn() + 'static,
244        Self: Sized,
245    {
246        assert_main_thread();
247
248        unsafe extern "C" fn call_closure<F>(data: *mut c_void, _signal: c_int)
249        where
250            F: Fn(),
251        {
252            let callback = (data as *mut F).as_ref().unwrap();
253            callback();
254        }
255
256        let data = Box::into_raw(Box::new(callback));
257
258        let (source, data) = unsafe {
259            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
260
261            let source = spa_interface_call_method!(
262                &mut iface as *mut spa_sys::spa_interface,
263                spa_sys::spa_loop_utils_methods,
264                add_signal,
265                signal.as_raw(),
266                Some(call_closure::<F>),
267                data as *mut _
268            );
269
270            (source, Box::from_raw(data))
271        };
272
273        let ptr = ptr::NonNull::new(source).expect("source is NULL");
274
275        SignalSource {
276            ptr,
277            loop_: self,
278            _data: data,
279        }
280    }
281
282    /// Register a new event with a callback that is called when the event happens.
283    ///
284    /// The returned [`EventSource`] can be used to trigger the event.
285    #[must_use]
286    pub fn add_event<F>(&self, callback: F) -> EventSource<'_>
287    where
288        F: Fn() + 'static,
289        Self: Sized,
290    {
291        unsafe extern "C" fn call_closure<F>(data: *mut c_void, _count: u64)
292        where
293            F: Fn(),
294        {
295            let callback = (data as *mut F).as_ref().unwrap();
296            callback();
297        }
298
299        let data = Box::into_raw(Box::new(callback));
300
301        let (source, data) = unsafe {
302            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
303
304            let source = spa_interface_call_method!(
305                &mut iface as *mut spa_sys::spa_interface,
306                spa_sys::spa_loop_utils_methods,
307                add_event,
308                Some(call_closure::<F>),
309                data as *mut _
310            );
311            (source, Box::from_raw(data))
312        };
313
314        let ptr = ptr::NonNull::new(source).expect("source is NULL");
315
316        EventSource {
317            ptr,
318            loop_: self,
319            _data: data,
320        }
321    }
322
323    /// Register a timer with the loop with a callback that is called after the timer expired.
324    ///
325    /// The timer will start out inactive, and the returned [`TimerSource`] can be used to arm the timer, or disarm it again.
326    ///
327    /// The callback will be provided with the number of timer expirations since the callback was last called.
328    #[must_use]
329    pub fn add_timer<F>(&self, callback: F) -> TimerSource<'_>
330    where
331        F: Fn(u64) + 'static,
332        Self: Sized,
333    {
334        unsafe extern "C" fn call_closure<F>(data: *mut c_void, expirations: u64)
335        where
336            F: Fn(u64),
337        {
338            let callback = (data as *mut F).as_ref().unwrap();
339            callback(expirations);
340        }
341
342        let data = Box::into_raw(Box::new(callback));
343
344        let (source, data) = unsafe {
345            let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
346
347            let source = spa_interface_call_method!(
348                &mut iface as *mut spa_sys::spa_interface,
349                spa_sys::spa_loop_utils_methods,
350                add_timer,
351                Some(call_closure::<F>),
352                data as *mut _
353            );
354            (source, Box::from_raw(data))
355        };
356
357        let ptr = ptr::NonNull::new(source).expect("source is NULL");
358
359        TimerSource {
360            ptr,
361            loop_: self,
362            _data: data,
363        }
364    }
365
366    /// Destroy a source that belongs to this loop.
367    ///
368    /// # Safety
369    /// The provided source must belong to this loop.
370    unsafe fn destroy_source<S>(&self, source: &S)
371    where
372        S: IsSource,
373        Self: Sized,
374    {
375        let mut iface = self.as_raw().utils.as_ref().unwrap().iface;
376
377        spa_interface_call_method!(
378            &mut iface as *mut spa_sys::spa_interface,
379            spa_sys::spa_loop_utils_methods,
380            destroy_source,
381            source.as_ptr()
382        )
383    }
384}
385
386/// Timeout for [`Loop::iterate()`]
387#[derive(Debug, Clone)]
388pub enum Timeout {
389    None,
390    Infinite,
391    Finite(Duration),
392}
393
394impl TryFrom<Timeout> for c_int {
395    type Error = <u128 as TryInto<c_int>>::Error;
396    /// # Errors
397    /// This function will return an error if the provided timeout as milliseconds does not fit inside a
398    /// `c_int` integer.
399    fn try_from(value: Timeout) -> Result<Self, Self::Error> {
400        match value {
401            Timeout::None => Ok(0),
402            Timeout::Infinite => Ok(-1),
403            Timeout::Finite(duration) => duration.as_millis().try_into(),
404        }
405    }
406}
407
408pub trait IsSource {
409    /// Return a valid pointer to a raw `spa_source`.
410    fn as_ptr(&self) -> *mut spa_sys::spa_source;
411}
412
413type IoSourceData<I> = (I, Box<dyn Fn(&mut I) + 'static>);
414
415/// A source that can be used to react to IO events.
416///
417/// This source can be obtained by calling [`add_io`](`Loop::add_io`) on a loop, registering a callback to it.
418pub struct IoSource<'l, I>
419where
420    I: AsRawFd,
421{
422    ptr: ptr::NonNull<spa_sys::spa_source>,
423    loop_: &'l Loop,
424    // Store data wrapper to prevent leak
425    _data: Box<IoSourceData<I>>,
426}
427
428impl<'l, I> IsSource for IoSource<'l, I>
429where
430    I: AsRawFd,
431{
432    fn as_ptr(&self) -> *mut spa_sys::spa_source {
433        self.ptr.as_ptr()
434    }
435}
436
437impl<'l, I> Drop for IoSource<'l, I>
438where
439    I: AsRawFd,
440{
441    fn drop(&mut self) {
442        unsafe { self.loop_.destroy_source(self) }
443    }
444}
445
446/// A source that can be used to have a callback called when the loop is idle.
447///
448/// This source can be obtained by calling [`add_idle`](`Loop::add_idle`) on a loop, registering a callback to it.
449pub struct IdleSource<'l> {
450    ptr: ptr::NonNull<spa_sys::spa_source>,
451    loop_: &'l Loop,
452    // Store data wrapper to prevent leak
453    _data: Box<dyn Fn() + 'static>,
454}
455
456impl<'l> IdleSource<'l> {
457    /// Set the source as enabled or disabled, allowing or preventing the callback from being called.
458    pub fn enable(&self, enable: bool) {
459        unsafe {
460            let mut iface = self.loop_.as_raw().utils.as_ref().unwrap().iface;
461
462            spa_interface_call_method!(
463                &mut iface as *mut spa_sys::spa_interface,
464                spa_sys::spa_loop_utils_methods,
465                enable_idle,
466                self.as_ptr(),
467                enable
468            );
469        }
470    }
471}
472
473impl<'l> IsSource for IdleSource<'l> {
474    fn as_ptr(&self) -> *mut spa_sys::spa_source {
475        self.ptr.as_ptr()
476    }
477}
478
479impl<'l> Drop for IdleSource<'l> {
480    fn drop(&mut self) {
481        unsafe { self.loop_.destroy_source(self) }
482    }
483}
484
485/// A source that can be used to react to signals.
486///
487/// This source can be obtained by calling [`add_signal_local`](`Loop::add_signal_local`) on a loop, registering a callback to it.
488pub struct SignalSource<'l> {
489    ptr: ptr::NonNull<spa_sys::spa_source>,
490    loop_: &'l Loop,
491    // Store data wrapper to prevent leak
492    _data: Box<dyn Fn() + 'static>,
493}
494
495impl<'l> IsSource for SignalSource<'l> {
496    fn as_ptr(&self) -> *mut spa_sys::spa_source {
497        self.ptr.as_ptr()
498    }
499}
500
501impl<'l> Drop for SignalSource<'l> {
502    fn drop(&mut self) {
503        unsafe { self.loop_.destroy_source(self) }
504    }
505}
506
507/// A source that can be used to signal to a loop that an event has occurred.
508///
509/// This source can be obtained by calling [`add_event`](`Loop::add_event`) on a loop, registering a callback to it.
510///
511/// By calling [`signal`](`EventSource::signal`) on the `EventSource`, the loop is signaled that the event has occurred.
512/// It will then call the callback at the next possible occasion.
513pub struct EventSource<'l> {
514    ptr: ptr::NonNull<spa_sys::spa_source>,
515    loop_: &'l Loop,
516    // Store data wrapper to prevent leak
517    _data: Box<dyn Fn() + 'static>,
518}
519
520impl<'l> IsSource for EventSource<'l> {
521    fn as_ptr(&self) -> *mut spa_sys::spa_source {
522        self.ptr.as_ptr()
523    }
524}
525
526impl<'l> EventSource<'l> {
527    /// Signal the loop associated with this source that the event has occurred,
528    /// to make the loop call the callback at the next possible occasion.
529    pub fn signal(&self) -> SpaResult {
530        let res = unsafe {
531            let mut iface = self.loop_.as_raw().utils.as_ref().unwrap().iface;
532
533            spa_interface_call_method!(
534                &mut iface as *mut spa_sys::spa_interface,
535                spa_sys::spa_loop_utils_methods,
536                signal_event,
537                self.as_ptr()
538            )
539        };
540
541        SpaResult::from_c(res)
542    }
543}
544
545impl<'l> Drop for EventSource<'l> {
546    fn drop(&mut self) {
547        unsafe { self.loop_.destroy_source(self) }
548    }
549}
550
551/// A source that can be used to have a callback called on a timer.
552///
553/// This source can be obtained by calling [`add_timer`](`Loop::add_timer`) on a loop, registering a callback to it.
554///
555/// The timer starts out inactive.
556/// You can arm or disarm the timer by calling [`update_timer`](`Self::update_timer`).
557pub struct TimerSource<'l> {
558    ptr: ptr::NonNull<spa_sys::spa_source>,
559    loop_: &'l Loop,
560    // Store data wrapper to prevent leak
561    _data: Box<dyn Fn(u64) + 'static>,
562}
563
564impl<'l> TimerSource<'l> {
565    /// Arm or disarm the timer.
566    ///
567    /// The timer will be called the next time after the provided `value` duration.
568    /// After that, the timer will be repeatedly called again at the the specified `interval`.
569    ///
570    /// If `interval` is `None` or zero, the timer will only be called once. \
571    /// If `value` is `None` or zero, the timer will be disabled.
572    ///
573    /// # Panics
574    /// The provided durations seconds must fit in an i64. Otherwise, this function will panic.
575    pub fn update_timer(&self, value: Option<Duration>, interval: Option<Duration>) -> SpaResult {
576        fn duration_to_timespec(duration: Duration) -> spa_sys::timespec {
577            // Some 32-bit systems e.g. musl add padding fields for 64-bit time compatibility
578            let mut timespec =
579                unsafe { std::mem::MaybeUninit::<spa_sys::timespec>::zeroed().assume_init() };
580
581            timespec.tv_sec = duration.as_secs().try_into().expect("Duration too long");
582
583            #[allow(clippy::unnecessary_fallible_conversions)] // Architecture dependent
584            {
585                timespec.tv_nsec = duration
586                    .subsec_nanos()
587                    .try_into()
588                    .expect("Nanoseconds should fit into timespec");
589            }
590
591            timespec
592        }
593
594        let value = duration_to_timespec(value.unwrap_or_default());
595        let interval = duration_to_timespec(interval.unwrap_or_default());
596
597        let res = unsafe {
598            let mut iface = self.loop_.as_raw().utils.as_ref().unwrap().iface;
599
600            spa_interface_call_method!(
601                &mut iface as *mut spa_sys::spa_interface,
602                spa_sys::spa_loop_utils_methods,
603                update_timer,
604                self.as_ptr(),
605                &value as *const _ as *mut _,
606                &interval as *const _ as *mut _,
607                false
608            )
609        };
610
611        SpaResult::from_c(res)
612    }
613}
614
615impl<'l> IsSource for TimerSource<'l> {
616    fn as_ptr(&self) -> *mut spa_sys::spa_source {
617        self.ptr.as_ptr()
618    }
619}
620
621impl<'l> Drop for TimerSource<'l> {
622    fn drop(&mut self) {
623        unsafe { self.loop_.destroy_source(self) }
624    }
625}