PipeWire 1.4.1
All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
audio-src-ring2.c

Audio source using pw_stream and ringbuffer.

Audio source using pw_stream and ringbuffer. This one uses a thread-loop and does a blocking push into a ringbuffer.


/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2024 Wim Taymans */
/* SPDX-License-Identifier: MIT */
/*
[title]
Audio source using \ref pw_stream "pw_stream" and ringbuffer.
This one uses a thread-loop and does a blocking push into a
ringbuffer.
[title]
*/
#include <stdio.h>
#include <errno.h>
#include <math.h>
#include <signal.h>
#define M_PI_M2f (float)(M_PI+M_PI)
#define DEFAULT_RATE 44100
#define DEFAULT_CHANNELS 2
#define DEFAULT_VOLUME 0.7f
#define BUFFER_SIZE (16*1024)
#define MIN_SIZE 256
#define MAX_SIZE BUFFER_SIZE
static float samples[BUFFER_SIZE * DEFAULT_CHANNELS];
struct data {
struct pw_thread_loop *thread_loop;
struct pw_loop *loop;
struct pw_stream *stream;
int eventfd;
bool running;
float accumulator;
struct spa_ringbuffer ring;
float buffer[BUFFER_SIZE * DEFAULT_CHANNELS];
};
static void fill_f32(struct data *d, float *samples, int n_frames)
{
float val;
int i, c;
for (i = 0; i < n_frames; i++) {
d->accumulator += M_PI_M2f * 440 / DEFAULT_RATE;
if (d->accumulator >= M_PI_M2f)
d->accumulator -= M_PI_M2f;
val = sinf(d->accumulator) * DEFAULT_VOLUME;
for (c = 0; c < DEFAULT_CHANNELS; c++)
samples[i * DEFAULT_CHANNELS + c] = val;
}
}
/* this can be called from any thread with a block of samples to write into
* the ringbuffer. It will block until all data has been written */
static void push_samples(void *userdata, float *samples, uint32_t n_samples)
{
struct data *data = userdata;
int32_t filled;
uint32_t index, avail, stride = sizeof(float) * DEFAULT_CHANNELS;
uint64_t count;
float *s = samples;
while (n_samples > 0) {
while (true) {
filled = spa_ringbuffer_get_write_index(&data->ring, &index);
/* we xrun, this can not happen because we never read more
* than what there is in the ringbuffer and we never write more than
* what is left */
spa_assert(filled >= 0);
spa_assert(filled <= BUFFER_SIZE);
/* this is how much samples we can write */
avail = BUFFER_SIZE - filled;
if (avail > 0)
break;
/* no space.. block and wait for free space */
spa_system_eventfd_read(data->loop->system, data->eventfd, &count);
}
if (avail > n_samples)
avail = n_samples;
data->buffer, BUFFER_SIZE * stride,
(index % BUFFER_SIZE) * stride,
s, avail * stride);
s += avail * DEFAULT_CHANNELS;
n_samples -= avail;
/* and advance the ringbuffer */
spa_ringbuffer_write_update(&data->ring, index + avail);
}
}
/* our data processing function is in general:
*
* struct pw_buffer *b;
* b = pw_stream_dequeue_buffer(stream);
*
* .. generate stuff in the buffer ...
* In this case we read samples from a ringbuffer. The ringbuffer is
* filled up by another thread.
*
* pw_stream_queue_buffer(stream, b);
*/
static void on_process(void *userdata)
{
struct data *data = userdata;
struct pw_buffer *b;
struct spa_buffer *buf;
uint8_t *p;
uint32_t index, to_read, to_silence;
int32_t avail, n_frames, stride;
if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) {
pw_log_warn("out of buffers: %m");
return;
}
buf = b->buffer;
if ((p = buf->datas[0].data) == NULL)
return;
/* the amount of space in the ringbuffer and the read index */
avail = spa_ringbuffer_get_read_index(&data->ring, &index);
stride = sizeof(float) * DEFAULT_CHANNELS;
n_frames = buf->datas[0].maxsize / stride;
if (b->requested)
n_frames = SPA_MIN((int32_t)b->requested, n_frames);
/* we can read if there is something available */
to_read = avail > 0 ? SPA_MIN(avail, n_frames) : 0;
/* and fill the remainder with silence */
to_silence = n_frames - to_read;
if (to_read > 0) {
/* read data into the buffer */
data->buffer, BUFFER_SIZE * stride,
(index % BUFFER_SIZE) * stride,
p, to_read * stride);
/* update the read pointer */
spa_ringbuffer_read_update(&data->ring, index + to_read);
}
if (to_silence > 0)
/* set the rest of the buffer to silence */
memset(SPA_PTROFF(p, to_read * stride, void), 0, to_silence * stride);
buf->datas[0].chunk->offset = 0;
buf->datas[0].chunk->stride = stride;
buf->datas[0].chunk->size = n_frames * stride;
pw_stream_queue_buffer(data->stream, b);
/* signal the main thread to fill the ringbuffer, we can only do this, for
* example when the available ringbuffer space falls below a certain
* level. */
spa_system_eventfd_write(data->loop->system, data->eventfd, 1);
}
static const struct pw_stream_events stream_events = {
.process = on_process,
};
static void do_quit(void *userdata, int signal_number)
{
struct data *data = userdata;
data->running = false;
}
int main(int argc, char *argv[])
{
struct data data = { 0, };
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct pw_properties *props;
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
pw_init(&argc, &argv);
data.thread_loop = pw_thread_loop_new("audio-src", NULL);
data.loop = pw_thread_loop_get_loop(data.thread_loop);
data.running = true;
pw_thread_loop_lock(data.thread_loop);
pw_loop_add_signal(data.loop, SIGINT, do_quit, &data);
pw_loop_add_signal(data.loop, SIGTERM, do_quit, &data);
spa_ringbuffer_init(&data.ring);
if ((data.eventfd = spa_system_eventfd_create(data.loop->system, SPA_FD_CLOEXEC)) < 0)
return data.eventfd;
pw_thread_loop_start(data.thread_loop);
PW_KEY_MEDIA_CATEGORY, "Playback",
NULL);
if (argc > 1)
/* Set stream target if given on command line */
data.stream = pw_stream_new_simple(
data.loop,
"audio-src-ring",
props,
&stream_events,
&data);
/* Make one parameter with the supported formats. The SPA_PARAM_EnumFormat
* id means that this is a format enumeration (of 1 value). */
.channels = DEFAULT_CHANNELS,
.rate = DEFAULT_RATE ));
/* Now connect this stream. We ask that our process function is
* called in a realtime thread. */
pw_stream_connect(data.stream,
params, 1);
/* prefill the ringbuffer */
fill_f32(&data, samples, BUFFER_SIZE);
push_samples(&data, samples, BUFFER_SIZE);
srand(time(NULL));
pw_thread_loop_start(data.thread_loop);
pw_thread_loop_unlock(data.thread_loop);
while (data.running) {
uint32_t size = rand() % ((MAX_SIZE - MIN_SIZE + 1) + MIN_SIZE);
/* make new random sized block of samples and push */
fill_f32(&data, samples, size);
push_samples(&data, samples, size);
}
pw_thread_loop_lock(data.thread_loop);
pw_stream_destroy(data.stream);
pw_thread_loop_unlock(data.thread_loop);
pw_thread_loop_destroy(data.thread_loop);
close(data.eventfd);
return 0;
}
#define PW_ID_ANY
Definition core.h:77
#define PW_KEY_MEDIA_TYPE
Media.
Definition keys.h:499
#define PW_KEY_TARGET_OBJECT
a target object to link to.
Definition keys.h:566
#define PW_KEY_MEDIA_ROLE
Role: Movie, Music, Camera, Screen, Communication, Game, Notification, DSP, Production,...
Definition keys.h:505
#define PW_KEY_MEDIA_CATEGORY
Media Category: Playback, Capture, Duplex, Monitor, Manager.
Definition keys.h:502
#define pw_log_warn(...)
Definition log.h:179
PW_API_LOOP_IMPL struct spa_source * pw_loop_add_signal(struct pw_loop *object, int signal_number, spa_source_signal_func_t func, void *data)
Definition loop.h:141
void pw_init(int *argc, char **argv[])
Initialize PipeWire.
Definition pipewire.c:488
void pw_deinit(void)
Deinitialize PipeWire.
Definition pipewire.c:603
#define PW_DIRECTION_OUTPUT
Definition port.h:55
struct pw_properties * pw_properties_new(const char *key,...)
Make a new properties object.
Definition properties.c:97
int pw_properties_set(struct pw_properties *properties, const char *key, const char *value)
Set a property value.
Definition properties.c:585
int pw_stream_connect(struct pw_stream *stream, enum pw_direction direction, uint32_t target_id, enum pw_stream_flags flags, const struct spa_pod **params, uint32_t n_params)
Connect a stream for input or output on port_path.
Definition stream.c:1902
struct pw_buffer * pw_stream_dequeue_buffer(struct pw_stream *stream)
Get a buffer that can be filled for playback streams or consumed for capture streams.
Definition stream.c:2458
int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
Submit a buffer for playback or recycle a buffer for capture.
Definition stream.c:2486
struct pw_stream * pw_stream_new_simple(struct pw_loop *loop, const char *name, struct pw_properties *props, const struct pw_stream_events *events, void *data)
Definition stream.c:1599
#define PW_VERSION_STREAM_EVENTS
Definition stream.h:417
void pw_stream_destroy(struct pw_stream *stream)
Destroy a stream.
Definition stream.c:1693
@ PW_STREAM_FLAG_MAP_BUFFERS
mmap the buffers except DmaBuf that is not explicitly marked as mappable.
Definition stream.h:469
@ PW_STREAM_FLAG_AUTOCONNECT
try to automatically connect this stream
Definition stream.h:464
@ PW_STREAM_FLAG_RT_PROCESS
call process from the realtime thread.
Definition stream.h:472
void pw_thread_loop_unlock(struct pw_thread_loop *loop)
Unlock the loop.
Definition thread-loop.c:381
int pw_thread_loop_start(struct pw_thread_loop *loop)
Start the thread loop.
Definition thread-loop.c:316
struct pw_loop * pw_thread_loop_get_loop(struct pw_thread_loop *loop)
Get the loop implementation of the thread loop.
Definition thread-loop.c:277
void pw_thread_loop_destroy(struct pw_thread_loop *loop)
Destroy a thread loop.
Definition thread-loop.c:243
void pw_thread_loop_lock(struct pw_thread_loop *loop)
Lock the loop.
Definition thread-loop.c:369
struct pw_thread_loop * pw_thread_loop_new(const char *name, const struct spa_dict *props)
Make a new thread loop with the given name and optional properties.
Definition thread-loop.c:214
#define SPA_AUDIO_INFO_RAW_INIT(...)
Definition raw.h:291
SPA_API_AUDIO_RAW_UTILS struct spa_pod * spa_format_audio_raw_build(struct spa_pod_builder *builder, uint32_t id, const struct spa_audio_info_raw *info)
Definition raw-utils.h:57
@ SPA_PARAM_EnumFormat
available formats as SPA_TYPE_OBJECT_Format
Definition param.h:33
@ SPA_AUDIO_FORMAT_F32
Definition raw.h:104
#define SPA_POD_BUILDER_INIT(buffer, size)
Definition builder.h:72
SPA_API_RINGBUFFER int32_t spa_ringbuffer_get_read_index(struct spa_ringbuffer *rbuf, uint32_t *index)
Get the read index and available bytes for reading.
Definition ringbuffer.h:87
SPA_API_RINGBUFFER void spa_ringbuffer_write_update(struct spa_ringbuffer *rbuf, int32_t index)
Update the write pointer to index.
Definition ringbuffer.h:171
SPA_API_RINGBUFFER void spa_ringbuffer_read_update(struct spa_ringbuffer *rbuf, int32_t index)
Update the read pointer to index.
Definition ringbuffer.h:121
SPA_API_RINGBUFFER int32_t spa_ringbuffer_get_write_index(struct spa_ringbuffer *rbuf, uint32_t *index)
Get the write index and the number of bytes inside the ringbuffer.
Definition ringbuffer.h:137
SPA_API_RINGBUFFER void spa_ringbuffer_write_data(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, uint32_t offset, const void *data, uint32_t len)
Write len bytes to buffer starting offset.
Definition ringbuffer.h:155
SPA_API_RINGBUFFER void spa_ringbuffer_init(struct spa_ringbuffer *rbuf)
Initialize a spa_ringbuffer with size.
Definition ringbuffer.h:60
SPA_API_RINGBUFFER void spa_ringbuffer_read_data(struct spa_ringbuffer *rbuf, const void *buffer, uint32_t size, uint32_t offset, void *data, uint32_t len)
Read len bytes from rbuf starting offset.
Definition ringbuffer.h:105
#define SPA_FD_CLOEXEC
Definition system.h:70
SPA_API_SYSTEM int spa_system_eventfd_write(struct spa_system *object, int fd, uint64_t count)
Definition system.h:208
SPA_API_SYSTEM int spa_system_eventfd_read(struct spa_system *object, int fd, uint64_t *count)
Definition system.h:213
SPA_API_SYSTEM int spa_system_eventfd_create(struct spa_system *object, int flags)
Definition system.h:204
#define SPA_MIN(a, b)
Definition defs.h:165
#define SPA_PTROFF(ptr_, offset_, type_)
Return the address (buffer + offset) as pointer of type.
Definition defs.h:222
#define spa_assert(expr)
Definition defs.h:493
pipewire/pipewire.h
spa/utils/ringbuffer.h
a buffer structure obtained from pw_stream_dequeue_buffer().
Definition stream.h:261
uint64_t requested
For playback streams, this field contains the suggested amount of data to provide.
Definition stream.h:273
struct spa_buffer * buffer
the spa buffer
Definition stream.h:262
Definition loop.h:33
Definition properties.h:39
Events for a stream.
Definition stream.h:415
A Buffer.
Definition buffer.h:110
struct spa_data * datas
array of data members
Definition buffer.h:114
int32_t stride
stride of valid data
Definition buffer.h:65
uint32_t size
size of valid data.
Definition buffer.h:63
uint32_t offset
offset of valid data.
Definition buffer.h:60
struct spa_chunk * chunk
valid chunk of memory
Definition buffer.h:106
void * data
optional data pointer
Definition buffer.h:105
uint32_t maxsize
max size of data
Definition buffer.h:104
Definition builder.h:63
uint32_t size
Definition builder.h:65
Definition pod.h:43
A ringbuffer type.
Definition ringbuffer.h:47