1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
// Copyright The pipewire-rs Contributors.
// SPDX-License-Identifier: MIT
//! This module provides a channel for communicating with a thread running a pipewire loop.
//! The channel is split into two types, [`Sender`] and [`Receiver`].
//!
//! It can be created using the [`channel`] function.
//! The returned receiver can then be attached to a pipewire loop, and the sender can be used to send messages to
//! the receiver.
//!
//! # Examples
//! This program will print "Hello" three times before terminating, using two threads:
// ignored because https://gitlab.freedesktop.org/pipewire/pipewire-rs/-/issues/19
//! ```no_run
//! use std::{time::Duration, sync::mpsc, thread};
//! use pipewire::main_loop::MainLoop;
//!
//! // Our message to the pipewire loop, this tells it to terminate.
//! struct Terminate;
//!
//! fn main() {
//! let (main_sender, main_receiver) = mpsc::channel();
//! let (pw_sender, pw_receiver) = pipewire::channel::channel();
//!
//! let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver));
//!
//! // Count up to three "Hello"'s.
//! let mut n = 0;
//! while n < 3 {
//! println!("{}", main_receiver.recv().unwrap());
//! n += 1;
//! }
//!
//! // We printed hello 3 times, terminate the pipewire thread now.
//! pw_sender.send(Terminate);
//!
//! pw_thread.join();
//! }
//!
//! // This is the code that will run in the pipewire thread.
//! fn pw_thread(
//! main_sender: mpsc::Sender<String>,
//! pw_receiver: pipewire::channel::Receiver<Terminate>
//! ) {
//! let mainloop = MainLoop::new(None).expect("Failed to create main loop");
//!
//! // When we receive a `Terminate` message, quit the main loop.
//! let _receiver = pw_receiver.attach(mainloop.loop_(), {
//! let mainloop = mainloop.clone();
//! move |_| mainloop.quit()
//! });
//!
//! // Every 100ms, send `"Hello"` to the main thread.
//! let timer = mainloop.loop_().add_timer(move |_| {
//! main_sender.send(String::from("Hello"));
//! });
//! timer.update_timer(
//! Some(Duration::from_millis(1)), // Send the first message immediately
//! Some(Duration::from_millis(100))
//! );
//!
//! mainloop.run();
//! }
//! ```
use std::{
collections::VecDeque,
os::unix::prelude::*,
sync::{Arc, Mutex},
};
use crate::loop_::{IoSource, LoopRef};
use spa::support::system::IoFlags;
/// A receiver that has not been attached to a loop.
///
/// Use its [`attach`](`Self::attach`) function to receive messages by attaching it to a loop.
pub struct Receiver<T: 'static> {
channel: Arc<Mutex<Channel<T>>>,
}
impl<T: 'static> Receiver<T> {
/// Attach the receiver to a loop with a callback.
///
/// This will make the loop call the callback with any messages that get sent to the receiver.
#[must_use]
pub fn attach<F>(self, loop_: &LoopRef, callback: F) -> AttachedReceiver<T>
where
F: Fn(T) + 'static,
{
let channel = self.channel.clone();
let readfd = channel.lock().expect("Channel mutex lock poisoned").readfd;
// Attach the pipe as an IO source to the loop.
// Whenever the pipe is written to, call the users callback with each message in the queue.
let iosource = loop_.add_io(readfd, IoFlags::IN, move |_| {
let mut channel = channel.lock().expect("Channel mutex lock poisoned");
// Read from the pipe to make it block until written to again.
let _ = nix::unistd::read(channel.readfd, &mut [0]);
channel.queue.drain(..).for_each(&callback);
});
AttachedReceiver {
_source: iosource,
receiver: self,
}
}
}
/// A [`Receiver`] that has been attached to a loop.
///
/// Dropping this will cause it to be detached from the loop, so no more messages will be received.
pub struct AttachedReceiver<'l, T>
where
T: 'static,
{
_source: IoSource<'l, RawFd>,
receiver: Receiver<T>,
}
impl<'l, T> AttachedReceiver<'l, T>
where
T: 'static,
{
/// Detach the receiver from the loop.
///
/// No more messages will be received until you attach it to a loop again.
#[must_use]
pub fn deattach(self) -> Receiver<T> {
self.receiver
}
}
/// A `Sender` can be used to send messages to its associated [`Receiver`].
///
/// It can be freely cloned, so you can send messages from multiple places.
pub struct Sender<T> {
channel: Arc<Mutex<Channel<T>>>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
channel: self.channel.clone(),
}
}
}
impl<T> Sender<T> {
/// Send a message to the associated receiver.
///
/// On any errors, this returns the message back to the caller.
pub fn send(&self, t: T) -> Result<(), T> {
// Lock the channel.
let mut channel = match self.channel.lock() {
Ok(chan) => chan,
Err(_) => return Err(t),
};
// If no messages are waiting already, signal the receiver to read some.
// Because the channel mutex is locked, it is alright to do this before pushing the message.
if channel.queue.is_empty() {
match nix::unistd::write(channel.writefd, &[1u8]) {
Ok(_) => (),
Err(_) => return Err(t),
}
}
// Push the new message into the queue.
channel.queue.push_back(t);
Ok(())
}
}
/// Shared state between the [`Sender`]s and the [`Receiver`].
struct Channel<T> {
/// A pipe used to signal the loop the receiver is attached to that messages are waiting.
readfd: RawFd,
writefd: RawFd,
/// Queue of any messages waiting to be received.
queue: VecDeque<T>,
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// We do not error check here, because the pipe does not contain any data that might be lost,
// and because there is no way to handle an error in a `Drop` implementation anyways.
let _ = nix::unistd::close(self.readfd);
let _ = nix::unistd::close(self.writefd);
}
}
/// Create a Sender-Receiver pair, where the sender can be used to send messages to the receiver.
///
/// This functions similar to [`std::sync::mpsc`], but with a receiver that can be attached to any
/// [`LoopRef`](`crate::loop_::LoopRef`) to have the loop invoke a callback with any new messages.
///
/// This can be used for inter-thread communication without shared state and where [`std::sync::mpsc`] can not be used
/// because the receiving thread is running the pipewire loop.
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
where
T: 'static,
{
let fds = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();
let channel: Arc<Mutex<Channel<T>>> = Arc::new(Mutex::new(Channel {
readfd: fds.0,
writefd: fds.1,
queue: VecDeque::new(),
}));
(
Sender {
channel: channel.clone(),
},
Receiver { channel },
)
}