jack_ringbuffers are used instead of AVFifoBuffers as they are safe for
lock-free use (in single-writer, single-reader scenarios).
Bug-Id: 312
---
libavdevice/jack_audio.c | 118 +++++++++++++++++++++++++++++------------------
1 file changed, 74 insertions(+), 44 deletions(-)
diff --git a/libavdevice/jack_audio.c b/libavdevice/jack_audio.c
index c261514..53f75a1 100644
--- a/libavdevice/jack_audio.c
+++ b/libavdevice/jack_audio.c
@@ -21,11 +21,11 @@
*/
#include "config.h"
-#include <semaphore.h>
+#include <pthread.h>
#include <jack/jack.h>
+#include <jack/ringbuffer.h>
#include "libavutil/log.h"
-#include "libavutil/fifo.h"
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavcodec/avcodec.h"
@@ -34,22 +34,23 @@
#include "timefilter.h"
/**
- * Size of the internal FIFO buffers as a number of audio packets
+ * Size of the internal ringbuffers as a number of audio packets
*/
-#define FIFO_PACKETS_NUM 16
+#define RING_PACKETS_NUM 16
typedef struct {
AVClass *class;
jack_client_t * client;
int activated;
- sem_t packet_count;
+ pthread_mutex_t read_lock;
+ pthread_cond_t packet_ready;
jack_nframes_t sample_rate;
jack_nframes_t buffer_size;
jack_port_t ** ports;
int nports;
TimeFilter * timefilter;
- AVFifoBuffer * new_pkts;
- AVFifoBuffer * filled_pkts;
+ jack_ringbuffer_t *new_pkts;
+ jack_ringbuffer_t *filled_pkts;
int pkt_xrun;
int jack_xrun;
} JackData;
@@ -79,13 +80,14 @@ static int process_callback(jack_nframes_t nframes, void
*arg)
self->buffer_size);
/* Check if an empty packet is available, and if there's enough space to
send it back once filled */
- if ((av_fifo_size(self->new_pkts) < sizeof(pkt)) ||
(av_fifo_space(self->filled_pkts) < sizeof(pkt))) {
+ if ((jack_ringbuffer_read_space(self->new_pkts) < sizeof(pkt)) ||
+ (jack_ringbuffer_write_space(self->filled_pkts) < sizeof(pkt))) {
self->pkt_xrun = 1;
return 0;
}
/* Retrieve empty (but allocated) packet */
- av_fifo_generic_read(self->new_pkts, &pkt, sizeof(pkt), NULL);
+ jack_ringbuffer_read(self->new_pkts, (char *) &pkt, sizeof(pkt));
pkt_data = (float *) pkt.data;
latency = 0;
@@ -107,9 +109,13 @@ static int process_callback(jack_nframes_t nframes, void
*arg)
/* Timestamp the packet with the cycle start time minus the average
latency */
pkt.pts = (cycle_time - (double) latency / (self->nports *
self->sample_rate)) * 1000000.0;
- /* Send the now filled packet back, and increase packet counter */
- av_fifo_generic_write(self->filled_pkts, &pkt, sizeof(pkt), NULL);
- sem_post(&self->packet_count);
+ /* Send the now filled packet back, and signal that it's ready */
+ jack_ringbuffer_write(self->filled_pkts, (char *) &pkt, sizeof(pkt));
+
+ if (pthread_mutex_trylock(&self->read_lock) == 0) {
+ pthread_cond_signal(&self->packet_ready);
+ pthread_mutex_unlock(&self->read_lock);
+ }
return 0;
}
@@ -136,12 +142,12 @@ static int supply_new_packets(JackData *self,
AVFormatContext *context)
/* Supply the process callback with new empty packets, by filling the new
* packets FIFO buffer with as many packets as possible. process_callback()
* can't do this by itself, because it can't allocate memory in realtime.
*/
- while (av_fifo_space(self->new_pkts) >= sizeof(pkt)) {
+ while (jack_ringbuffer_write_space(self->new_pkts) >= sizeof(pkt)) {
if ((test = av_new_packet(&pkt, pkt_size)) < 0) {
av_log(context, AV_LOG_ERROR, "Could not create packet of size
%d\n", pkt_size);
return test;
}
- av_fifo_generic_write(self->new_pkts, &pkt, sizeof(pkt), NULL);
+ jack_ringbuffer_write(self->new_pkts, (char *) &pkt, sizeof(pkt));
}
return 0;
}
@@ -160,7 +166,8 @@ static int start_jack(AVFormatContext *context)
return AVERROR(EIO);
}
- sem_init(&self->packet_count, 0, 0);
+ pthread_mutex_init(&self->read_lock, NULL);
+ pthread_cond_init(&self->packet_ready, NULL);
self->sample_rate = jack_get_sample_rate(self->client);
self->ports = av_malloc(self->nports * sizeof(*self->ports));
@@ -181,11 +188,6 @@ static int start_jack(AVFormatContext *context)
}
}
- /* Register JACK callbacks */
- jack_set_process_callback(self->client, process_callback, self);
- jack_on_shutdown(self->client, shutdown_callback, self);
- jack_set_xrun_callback(self->client, xrun_callback, self);
-
/* Create time filter */
period = (double) self->buffer_size / self->sample_rate;
o = 2 * M_PI * 1.5 * period; /// bandwidth: 1.5Hz
@@ -195,27 +197,45 @@ static int start_jack(AVFormatContext *context)
return AVERROR(ENOMEM);
}
- /* Create FIFO buffers */
- self->filled_pkts = av_fifo_alloc(FIFO_PACKETS_NUM * sizeof(AVPacket));
- /* New packets FIFO with one extra packet for safety against underruns */
- self->new_pkts = av_fifo_alloc((FIFO_PACKETS_NUM + 1) *
sizeof(AVPacket));
+ /* Create ringbuffers */
+ self->filled_pkts = jack_ringbuffer_create(RING_PACKETS_NUM *
sizeof(AVPacket));
+ if (!self->filled_pkts) {
+ jack_client_close(self->client);
+ return AVERROR(ENOMEM);
+ }
+
+ /* New packets ringbuffer with one extra packet for safety against
underruns */
+ self->new_pkts = jack_ringbuffer_create((RING_PACKETS_NUM + 1) *
sizeof(AVPacket));
+ if (!self->new_pkts) {
+ jack_client_close(self->client);
+ return AVERROR(ENOMEM);
+ }
+
+ if (jack_ringbuffer_mlock(self->filled_pkts) ||
jack_ringbuffer_mlock(self->new_pkts))
+ av_log(context, AV_LOG_WARNING, "Unable to lock JACK ringbuffer in
memory\n");
+
if ((test = supply_new_packets(self, context))) {
jack_client_close(self->client);
return test;
}
+ /* Register JACK callbacks */
+ jack_set_process_callback(self->client, process_callback, self);
+ jack_on_shutdown(self->client, shutdown_callback, self);
+ jack_set_xrun_callback(self->client, xrun_callback, self);
+
return 0;
}
-static void free_pkt_fifo(AVFifoBuffer *fifo)
+static void free_pkt_ringbuffer(jack_ringbuffer_t *rb)
{
AVPacket pkt;
- while (av_fifo_size(fifo)) {
- av_fifo_generic_read(fifo, &pkt, sizeof(pkt), NULL);
+ while (jack_ringbuffer_read_space(rb)) {
+ jack_ringbuffer_read(rb, (char *) &pkt, sizeof(pkt));
av_free_packet(&pkt);
}
- av_fifo_free(fifo);
+ jack_ringbuffer_free(rb);
}
static void stop_jack(JackData *self)
@@ -225,9 +245,12 @@ static void stop_jack(JackData *self)
jack_deactivate(self->client);
jack_client_close(self->client);
}
- sem_destroy(&self->packet_count);
- free_pkt_fifo(self->new_pkts);
- free_pkt_fifo(self->filled_pkts);
+ pthread_cond_destroy(&self->packet_ready);
+ pthread_mutex_destroy(&self->read_lock);
+ if (self->new_pkts)
+ free_pkt_ringbuffer(self->new_pkts);
+ if (self->filled_pkts)
+ free_pkt_ringbuffer(self->filled_pkts);
av_freep(&self->ports);
ff_timefilter_destroy(self->timefilter);
}
@@ -282,20 +305,27 @@ static int audio_read_packet(AVFormatContext *context,
AVPacket *pkt)
}
}
- /* Wait for a packet coming back from process_callback(), if one isn't
available yet */
- timeout.tv_sec = av_gettime() / 1000000 + 2;
- if (sem_timedwait(&self->packet_count, &timeout)) {
- if (errno == ETIMEDOUT) {
- av_log(context, AV_LOG_ERROR,
- "Input error: timed out when waiting for JACK process
callback output\n");
- } else {
- av_log(context, AV_LOG_ERROR, "Error while waiting for audio
packet: %s\n",
- strerror(errno));
+ if (jack_ringbuffer_read_space(self->filled_pkts) < sizeof(*pkt)) {
+
+ pthread_mutex_lock(&self->read_lock);
+
+ /* Wait for a packet coming back from process_callback(), if one isn't
available yet */
+ timeout.tv_sec = av_gettime() / 1000000 + 2;
+ if (pthread_cond_timedwait(&self->packet_ready, &self->read_lock,
&timeout)) {
+ if (errno == ETIMEDOUT) {
+ av_log(context, AV_LOG_ERROR,
+ "Input error: timed out when waiting for JACK process
callback output\n");
+ } else {
+ av_log(context, AV_LOG_ERROR, "Error while waiting for audio
packet: %s\n",
+ strerror(errno));
+ }
+ if (!self->client)
+ av_log(context, AV_LOG_ERROR, "Input error: JACK server is
gone\n");
+
+ return AVERROR(EIO);
}
- if (!self->client)
- av_log(context, AV_LOG_ERROR, "Input error: JACK server is
gone\n");
- return AVERROR(EIO);
+ pthread_mutex_unlock(&self->read_lock);
}
if (self->pkt_xrun) {
@@ -309,7 +339,7 @@ static int audio_read_packet(AVFormatContext *context,
AVPacket *pkt)
}
/* Retrieve the packet filled with audio data by process_callback() */
- av_fifo_generic_read(self->filled_pkts, pkt, sizeof(*pkt), NULL);
+ jack_ringbuffer_read(self->filled_pkts, (char *) pkt, sizeof(*pkt));
if ((test = supply_new_packets(self, context)))
return test;
--
1.8.1.2
_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel