1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// Copyright The pipewire-rs Contributors.
// SPDX-License-Identifier: MIT

//! This module provides a channel for communicating with a thread running a pipewire loop.
//! The channel is split into two types, [`Sender`] and [`Receiver`].
//!
//! It can be created using the [`channel`] function.
//! The returned receiver can then be attached to a pipewire loop, and the sender can be used to send messages to
//! the receiver.
//!
//! # Examples
//! This program will print "Hello" three times before terminating, using two threads:
// ignored because https://gitlab.freedesktop.org/pipewire/pipewire-rs/-/issues/19
//! ```no_run
//! use std::{time::Duration, sync::mpsc, thread};
//! use pipewire::main_loop::MainLoop;
//!
//! // Our message to the pipewire loop, this tells it to terminate.
//! struct Terminate;
//!
//! fn main() {
//!     let (main_sender, main_receiver) = mpsc::channel();
//!     let (pw_sender, pw_receiver) = pipewire::channel::channel();
//!
//!     let pw_thread = thread::spawn(move || pw_thread(main_sender, pw_receiver));
//!
//!     // Count up to three "Hello"'s.
//!     let mut n = 0;
//!     while n < 3 {
//!         println!("{}", main_receiver.recv().unwrap());
//!         n += 1;
//!     }
//!
//!     // We printed hello 3 times, terminate the pipewire thread now.
//!     pw_sender.send(Terminate);
//!
//!     pw_thread.join();
//! }
//!
//! // This is the code that will run in the pipewire thread.
//! fn pw_thread(
//!     main_sender: mpsc::Sender<String>,
//!     pw_receiver: pipewire::channel::Receiver<Terminate>
//! ) {
//!     let mainloop = MainLoop::new(None).expect("Failed to create main loop");
//!
//!     // When we receive a `Terminate` message, quit the main loop.
//!     let _receiver = pw_receiver.attach(mainloop.loop_(), {
//!         let mainloop = mainloop.clone();
//!         move |_| mainloop.quit()
//!     });
//!
//!     // Every 100ms, send `"Hello"` to the main thread.
//!     let timer = mainloop.loop_().add_timer(move |_| {
//!         main_sender.send(String::from("Hello"));
//!     });
//!     timer.update_timer(
//!         Some(Duration::from_millis(1)), // Send the first message immediately
//!         Some(Duration::from_millis(100))
//!     );
//!
//!     mainloop.run();
//! }
//! ```

use std::{
    collections::VecDeque,
    os::unix::prelude::*,
    sync::{Arc, Mutex},
};

use crate::loop_::{IoSource, LoopRef};
use spa::support::system::IoFlags;

/// A receiver that has not been attached to a loop.
///
/// Use its [`attach`](`Self::attach`) function to receive messages by attaching it to a loop.
pub struct Receiver<T: 'static> {
    channel: Arc<Mutex<Channel<T>>>,
}

impl<T: 'static> Receiver<T> {
    /// Attach the receiver to a loop with a callback.
    ///
    /// This will make the loop call the callback with any messages that get sent to the receiver.
    #[must_use]
    pub fn attach<F>(self, loop_: &LoopRef, callback: F) -> AttachedReceiver<T>
    where
        F: Fn(T) + 'static,
    {
        let channel = self.channel.clone();
        let readfd = channel.lock().expect("Channel mutex lock poisoned").readfd;

        // Attach the pipe as an IO source to the loop.
        // Whenever the pipe is written to, call the users callback with each message in the queue.
        let iosource = loop_.add_io(readfd, IoFlags::IN, move |_| {
            let mut channel = channel.lock().expect("Channel mutex lock poisoned");

            // Read from the pipe to make it block until written to again.
            let _ = nix::unistd::read(channel.readfd, &mut [0]);

            channel.queue.drain(..).for_each(&callback);
        });

        AttachedReceiver {
            _source: iosource,
            receiver: self,
        }
    }
}

/// A [`Receiver`] that has been attached to a loop.
///
/// Dropping this will cause it to be detached from the loop, so no more messages will be received.
pub struct AttachedReceiver<'l, T>
where
    T: 'static,
{
    _source: IoSource<'l, RawFd>,
    receiver: Receiver<T>,
}

impl<'l, T> AttachedReceiver<'l, T>
where
    T: 'static,
{
    /// Detach the receiver from the loop.
    ///
    /// No more messages will be received until you attach it to a loop again.
    #[must_use]
    pub fn deattach(self) -> Receiver<T> {
        self.receiver
    }
}

/// A `Sender` can be used to send messages to its associated [`Receiver`].
///
/// It can be freely cloned, so you can send messages from multiple  places.
pub struct Sender<T> {
    channel: Arc<Mutex<Channel<T>>>,
}

impl<T> Clone for Sender<T> {
    fn clone(&self) -> Self {
        Self {
            channel: self.channel.clone(),
        }
    }
}

impl<T> Sender<T> {
    /// Send a message to the associated receiver.
    ///
    /// On any errors, this returns the message back to the caller.
    pub fn send(&self, t: T) -> Result<(), T> {
        // Lock the channel.
        let mut channel = match self.channel.lock() {
            Ok(chan) => chan,
            Err(_) => return Err(t),
        };

        // If no messages are waiting already, signal the receiver to read some.
        // Because the channel mutex is locked, it is alright to do this before pushing the message.
        if channel.queue.is_empty() {
            match nix::unistd::write(channel.writefd, &[1u8]) {
                Ok(_) => (),
                Err(_) => return Err(t),
            }
        }

        // Push the new message into the queue.
        channel.queue.push_back(t);

        Ok(())
    }
}

/// Shared state between the [`Sender`]s and the [`Receiver`].
struct Channel<T> {
    /// A pipe used to signal the loop the receiver is attached to that messages are waiting.
    readfd: RawFd,
    writefd: RawFd,
    /// Queue of any messages waiting to be received.
    queue: VecDeque<T>,
}

impl<T> Drop for Channel<T> {
    fn drop(&mut self) {
        // We do not error check here, because the pipe does not contain any data that might be lost,
        // and because there is no way to handle an error in a `Drop` implementation anyways.
        let _ = nix::unistd::close(self.readfd);
        let _ = nix::unistd::close(self.writefd);
    }
}

/// Create a Sender-Receiver pair, where the sender can be used to send messages to the receiver.
///
/// This functions similar to [`std::sync::mpsc`], but with a receiver that can be attached to any
/// [`LoopRef`](`crate::loop_::LoopRef`) to have the loop invoke a callback with any new messages.
///
/// This can be used for inter-thread communication without shared state and where [`std::sync::mpsc`] can not be used
/// because the receiving thread is running the pipewire loop.
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
where
    T: 'static,
{
    let fds = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC).unwrap();

    let channel: Arc<Mutex<Channel<T>>> = Arc::new(Mutex::new(Channel {
        readfd: fds.0,
        writefd: fds.1,
        queue: VecDeque::new(),
    }));

    (
        Sender {
            channel: channel.clone(),
        },
        Receiver { channel },
    )
}