pipewire/channel.rs
1// Copyright The pipewire-rs Contributors.
2// SPDX-License-Identifier: MIT
3
4//! This module provides a channel for communicating with a thread running a pipewire loop.
5//! The channel is split into two types, [`Sender`] and [`Receiver`].
6//!
7//! It can be created using the [`channel`] function.
8//! The returned receiver can then be attached to a pipewire loop, and the sender can be used to send messages to
9//! the receiver.
10//!
11//! # Examples
12//! This program will print "Hello" three times before terminating, using two threads:
13// ignored because https://gitlab.freedesktop.org/pipewire/pipewire-rs/-/issues/19
14//! ```no_run
15//! use std::{time::Duration, sync::mpsc, thread};
16//! use pipewire::main_loop::MainLoopRc;
17//!
18//! // Our message to the pipewire loop, this tells it to terminate.
19//! struct Terminate;
20//!
21//! fn main() {
22//! let (main_sender, main_receiver) = mpsc::channel();
23//! let (pw_sender, pw_receiver) = pipewire::channel::channel();
24//!
25//! let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver));
26//!
27//! // Count up to three "Hello"'s.
28//! let mut n = 0;
29//! while n < 3 {
30//! println!("{}", main_receiver.recv().unwrap());
31//! n += 1;
32//! }
33//!
34//! // We printed hello 3 times, terminate the pipewire thread now.
35//! pw_sender.send(Terminate);
36//!
37//! pw_thread.join();
38//! }
39//!
40//! // This is the code that will run in the pipewire thread.
41//! fn pw_thread(
42//! main_sender: mpsc::Sender<String>,
43//! pw_receiver: pipewire::channel::Receiver<Terminate>
44//! ) {
45//! let mainloop = MainLoopRc::new(None).expect("Failed to create main loop");
46//!
47//! // When we receive a `Terminate` message, quit the main loop.
48//! let _receiver = pw_receiver.attach(mainloop.loop_(), {
49//! let mainloop = mainloop.clone();
50//! move |_| mainloop.quit()
51//! });
52//!
53//! // Every 100ms, send `"Hello"` to the main thread.
54//! let timer = mainloop.loop_().add_timer(move |_| {
55//! main_sender.send(String::from("Hello"));
56//! });
57//! timer.update_timer(
58//! Some(Duration::from_millis(1)), // Send the first message immediately
59//! Some(Duration::from_millis(100))
60//! );
61//!
62//! mainloop.run();
63//! }
64//! ```
65
66use std::{
67 collections::VecDeque,
68 os::unix::prelude::*,
69 sync::{Arc, Mutex},
70};
71
72use crate::loop_::{IoSource, Loop};
73use spa::support::system::IoFlags;
74
75/// A receiver that has not been attached to a loop.
76///
77/// Use its [`attach`](`Self::attach`) function to receive messages by attaching it to a loop.
78pub struct Receiver<T: 'static> {
79 channel: Arc<Mutex<Channel<T>>>,
80}
81
82impl<T: 'static> Receiver<T> {
83 /// Attach the receiver to a loop with a callback.
84 ///
85 /// This will make the loop call the callback with any messages that get sent to the receiver.
86 #[must_use]
87 pub fn attach<F>(self, loop_: &Loop, callback: F) -> AttachedReceiver<'_, T>
88 where
89 F: Fn(T) + 'static,
90 {
91 let channel = self.channel.clone();
92 let readfd = channel
93 .lock()
94 .expect("Channel mutex lock poisoned")
95 .readfd
96 .as_raw_fd();
97
98 // Attach the pipe as an IO source to the loop.
99 // Whenever the pipe is written to, call the users callback with each message in the queue.
100 let iosource = loop_.add_io(readfd, IoFlags::IN, move |_| {
101 let mut channel = channel.lock().expect("Channel mutex lock poisoned");
102
103 // Read from the pipe to make it block until written to again.
104 let _ = nix::unistd::read(&channel.readfd, &mut [0]);
105
106 channel.queue.drain(..).for_each(&callback);
107 });
108
109 AttachedReceiver {
110 _source: iosource,
111 receiver: self,
112 }
113 }
114}
115
116/// A [`Receiver`] that has been attached to a loop.
117///
118/// Dropping this will cause it to be detached from the loop, so no more messages will be received.
119pub struct AttachedReceiver<'l, T>
120where
121 T: 'static,
122{
123 _source: IoSource<'l, RawFd>,
124 receiver: Receiver<T>,
125}
126
127impl<'l, T> AttachedReceiver<'l, T>
128where
129 T: 'static,
130{
131 /// Detach the receiver from the loop.
132 ///
133 /// No more messages will be received until you attach it to a loop again.
134 #[must_use]
135 pub fn deattach(self) -> Receiver<T> {
136 self.receiver
137 }
138}
139
140/// A `Sender` can be used to send messages to its associated [`Receiver`].
141///
142/// It can be freely cloned, so you can send messages from multiple places.
143pub struct Sender<T> {
144 channel: Arc<Mutex<Channel<T>>>,
145}
146
147impl<T> Clone for Sender<T> {
148 fn clone(&self) -> Self {
149 Self {
150 channel: self.channel.clone(),
151 }
152 }
153}
154
155impl<T> Sender<T> {
156 /// Send a message to the associated receiver.
157 ///
158 /// On any errors, this returns the message back to the caller.
159 pub fn send(&self, t: T) -> Result<(), T> {
160 // Lock the channel.
161 let mut channel = match self.channel.lock() {
162 Ok(chan) => chan,
163 Err(_) => return Err(t),
164 };
165
166 // If no messages are waiting already, signal the receiver to read some.
167 // Because the channel mutex is locked, it is alright to do this before pushing the message.
168 if channel.queue.is_empty() {
169 match nix::unistd::write(&channel.writefd, &[1u8]) {
170 Ok(_) => (),
171 Err(_) => return Err(t),
172 }
173 }
174
175 // Push the new message into the queue.
176 channel.queue.push_back(t);
177
178 Ok(())
179 }
180}
181
182/// Shared state between the [`Sender`]s and the [`Receiver`].
183struct Channel<T> {
184 /// A pipe used to signal the loop the receiver is attached to that messages are waiting.
185 readfd: OwnedFd,
186 writefd: OwnedFd,
187 /// Queue of any messages waiting to be received.
188 queue: VecDeque<T>,
189}
190
191/// Create a Sender-Receiver pair, where the sender can be used to send messages to the receiver.
192///
193/// This functions similar to [`std::sync::mpsc`], but with a receiver that can be attached to any
194/// [`Loop`](`crate::loop_::Loop`) to have the loop invoke a callback with any new messages.
195///
196/// This can be used for inter-thread communication without shared state and where [`std::sync::mpsc`] can not be used
197/// because the receiving thread is running the pipewire loop.
198pub fn channel<T>() -> (Sender<T>, Receiver<T>)
199where
200 T: 'static,
201{
202 let fds = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
203
204 let channel: Arc<Mutex<Channel<T>>> = Arc::new(Mutex::new(Channel {
205 readfd: fds.0,
206 writefd: fds.1,
207 queue: VecDeque::new(),
208 }));
209
210 (
211 Sender {
212 channel: channel.clone(),
213 },
214 Receiver { channel },
215 )
216}