pipewire/
channel.rs

1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! This module provides a channel for communicating with a thread running a pipewire loop.
5//! The channel is split into two types, [`Sender`] and [`Receiver`].
6//!
7//! It can be created using the [`channel`] function.
8//! The returned receiver can then be attached to a pipewire loop, and the sender can be used to send messages to
9//! the receiver.
10//!
11//! # Examples
12//! This program will print "Hello" three times before terminating, using two threads:
13// ignored because https://gitlab.freedesktop.org/pipewire/pipewire-rs/-/issues/19
14//! ```no_run
15//! use std::{time::Duration, sync::mpsc, thread};
16//! use pipewire::main_loop::MainLoopRc;
17//!
18//! // Our message to the pipewire loop, this tells it to terminate.
19//! struct Terminate;
20//!
21//! fn main() {
22//!     let (main_sender, main_receiver) = mpsc::channel();
23//!     let (pw_sender, pw_receiver) = pipewire::channel::channel();
24//!
25//!     let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver));
26//!
27//!     // Count up to three "Hello"'s.
28//!     let mut n = 0;
29//!     while n < 3 {
30//!         println!("{}", main_receiver.recv().unwrap());
31//!         n += 1;
32//!     }
33//!
34//!     // We printed hello 3 times, terminate the pipewire thread now.
35//!     pw_sender.send(Terminate);
36//!
37//!     pw_thread.join();
38//! }
39//!
40//! // This is the code that will run in the pipewire thread.
41//! fn pw_thread(
42//!     main_sender: mpsc::Sender<String>,
43//!     pw_receiver: pipewire::channel::Receiver<Terminate>
44//! ) {
45//!     let mainloop = MainLoopRc::new(None).expect("Failed to create main loop");
46//!
47//!     // When we receive a `Terminate` message, quit the main loop.
48//!     let _receiver = pw_receiver.attach(mainloop.loop_(), {
49//!         let mainloop = mainloop.clone();
50//!         move |_| mainloop.quit()
51//!     });
52//!
53//!     // Every 100ms, send `"Hello"` to the main thread.
54//!     let timer = mainloop.loop_().add_timer(move |_| {
55//!         main_sender.send(String::from("Hello"));
56//!     });
57//!     timer.update_timer(
58//!         Some(Duration::from_millis(1)), // Send the first message immediately
59//!         Some(Duration::from_millis(100))
60//!     );
61//!
62//!     mainloop.run();
63//! }
64//! ```
65
66use std::{
67    collections::VecDeque,
68    os::unix::prelude::*,
69    sync::{Arc, Mutex},
70};
71
72use crate::loop_::{IoSource, Loop};
73use spa::support::system::IoFlags;
74
75/// A receiver that has not been attached to a loop.
76///
77/// Use its [`attach`](`Self::attach`) function to receive messages by attaching it to a loop.
78pub struct Receiver<T: 'static> {
79    channel: Arc<Mutex<Channel<T>>>,
80}
81
82impl<T: 'static> Receiver<T> {
83    /// Attach the receiver to a loop with a callback.
84    ///
85    /// This will make the loop call the callback with any messages that get sent to the receiver.
86    #[must_use]
87    pub fn attach<F>(self, loop_: &Loop, callback: F) -> AttachedReceiver<'_, T>
88    where
89        F: Fn(T) + 'static,
90    {
91        let channel = self.channel.clone();
92        let readfd = channel
93            .lock()
94            .expect("Channel mutex lock poisoned")
95            .readfd
96            .as_raw_fd();
97
98        // Attach the pipe as an IO source to the loop.
99        // Whenever the pipe is written to, call the users callback with each message in the queue.
100        let iosource = loop_.add_io(readfd, IoFlags::IN, move |_| {
101            let mut channel = channel.lock().expect("Channel mutex lock poisoned");
102
103            // Read from the pipe to make it block until written to again.
104            let _ = nix::unistd::read(&channel.readfd, &mut [0]);
105
106            channel.queue.drain(..).for_each(&callback);
107        });
108
109        AttachedReceiver {
110            _source: iosource,
111            receiver: self,
112        }
113    }
114}
115
116/// A [`Receiver`] that has been attached to a loop.
117///
118/// Dropping this will cause it to be detached from the loop, so no more messages will be received.
119pub struct AttachedReceiver<'l, T>
120where
121    T: 'static,
122{
123    _source: IoSource<'l, RawFd>,
124    receiver: Receiver<T>,
125}
126
127impl<'l, T> AttachedReceiver<'l, T>
128where
129    T: 'static,
130{
131    /// Detach the receiver from the loop.
132    ///
133    /// No more messages will be received until you attach it to a loop again.
134    #[must_use]
135    pub fn deattach(self) -> Receiver<T> {
136        self.receiver
137    }
138}
139
140/// A `Sender` can be used to send messages to its associated [`Receiver`].
141///
142/// It can be freely cloned, so you can send messages from multiple  places.
143pub struct Sender<T> {
144    channel: Arc<Mutex<Channel<T>>>,
145}
146
147impl<T> Clone for Sender<T> {
148    fn clone(&self) -> Self {
149        Self {
150            channel: self.channel.clone(),
151        }
152    }
153}
154
155impl<T> Sender<T> {
156    /// Send a message to the associated receiver.
157    ///
158    /// On any errors, this returns the message back to the caller.
159    pub fn send(&self, t: T) -> Result<(), T> {
160        // Lock the channel.
161        let mut channel = match self.channel.lock() {
162            Ok(chan) => chan,
163            Err(_) => return Err(t),
164        };
165
166        // If no messages are waiting already, signal the receiver to read some.
167        // Because the channel mutex is locked, it is alright to do this before pushing the message.
168        if channel.queue.is_empty() {
169            match nix::unistd::write(&channel.writefd, &[1u8]) {
170                Ok(_) => (),
171                Err(_) => return Err(t),
172            }
173        }
174
175        // Push the new message into the queue.
176        channel.queue.push_back(t);
177
178        Ok(())
179    }
180}
181
182/// Shared state between the [`Sender`]s and the [`Receiver`].
183struct Channel<T> {
184    /// A pipe used to signal the loop the receiver is attached to that messages are waiting.
185    readfd: OwnedFd,
186    writefd: OwnedFd,
187    /// Queue of any messages waiting to be received.
188    queue: VecDeque<T>,
189}
190
191/// Create a Sender-Receiver pair, where the sender can be used to send messages to the receiver.
192///
193/// This functions similar to [`std::sync::mpsc`], but with a receiver that can be attached to any
194/// [`Loop`](`crate::loop_::Loop`) to have the loop invoke a callback with any new messages.
195///
196/// This can be used for inter-thread communication without shared state and where [`std::sync::mpsc`] can not be used
197/// because the receiving thread is running the pipewire loop.
198pub fn channel<T>() -> (Sender<T>, Receiver<T>)
199where
200    T: 'static,
201{
202    let fds = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
203
204    let channel: Arc<Mutex<Channel<T>>> = Arc::new(Mutex::new(Channel {
205        readfd: fds.0,
206        writefd: fds.1,
207        queue: VecDeque::new(),
208    }));
209
210    (
211        Sender {
212            channel: channel.clone(),
213        },
214        Receiver { channel },
215    )
216}