jack_ringbuffers are used instead of AVFifoBuffers as they are safe for
lock-free use (in single-writer, single-reader scenarios).

Bug-Id: 312
---
 libavdevice/jack_audio.c | 118 +++++++++++++++++++++++++++++------------------
 1 file changed, 74 insertions(+), 44 deletions(-)

diff --git a/libavdevice/jack_audio.c b/libavdevice/jack_audio.c
index c261514..53f75a1 100644
--- a/libavdevice/jack_audio.c
+++ b/libavdevice/jack_audio.c
@@ -21,11 +21,11 @@
  */
 
 #include "config.h"
-#include <semaphore.h>
+#include <pthread.h>
 #include <jack/jack.h>
+#include <jack/ringbuffer.h>
 
 #include "libavutil/log.h"
-#include "libavutil/fifo.h"
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavcodec/avcodec.h"
@@ -34,22 +34,23 @@
 #include "timefilter.h"
 
 /**
- * Size of the internal FIFO buffers as a number of audio packets
+ * Size of the internal ringbuffers as a number of audio packets
  */
-#define FIFO_PACKETS_NUM 16
+#define RING_PACKETS_NUM 16
 
 typedef struct {
     AVClass        *class;
     jack_client_t * client;
     int             activated;
-    sem_t           packet_count;
+    pthread_mutex_t read_lock;
+    pthread_cond_t  packet_ready;
     jack_nframes_t  sample_rate;
     jack_nframes_t  buffer_size;
     jack_port_t **  ports;
     int             nports;
     TimeFilter *    timefilter;
-    AVFifoBuffer *  new_pkts;
-    AVFifoBuffer *  filled_pkts;
+    jack_ringbuffer_t *new_pkts;
+    jack_ringbuffer_t *filled_pkts;
     int             pkt_xrun;
     int             jack_xrun;
 } JackData;
@@ -79,13 +80,14 @@ static int process_callback(jack_nframes_t nframes, void 
*arg)
                                       self->buffer_size);
 
     /* Check if an empty packet is available, and if there's enough space to 
send it back once filled */
-    if ((av_fifo_size(self->new_pkts) < sizeof(pkt)) || 
(av_fifo_space(self->filled_pkts) < sizeof(pkt))) {
+    if ((jack_ringbuffer_read_space(self->new_pkts) < sizeof(pkt)) ||
+        (jack_ringbuffer_write_space(self->filled_pkts) < sizeof(pkt))) {
         self->pkt_xrun = 1;
         return 0;
     }
 
     /* Retrieve empty (but allocated) packet */
-    av_fifo_generic_read(self->new_pkts, &pkt, sizeof(pkt), NULL);
+    jack_ringbuffer_read(self->new_pkts, (char *) &pkt, sizeof(pkt));
 
     pkt_data  = (float *) pkt.data;
     latency   = 0;
@@ -107,9 +109,13 @@ static int process_callback(jack_nframes_t nframes, void 
*arg)
     /* Timestamp the packet with the cycle start time minus the average 
latency */
     pkt.pts = (cycle_time - (double) latency / (self->nports * 
self->sample_rate)) * 1000000.0;
 
-    /* Send the now filled packet back, and increase packet counter */
-    av_fifo_generic_write(self->filled_pkts, &pkt, sizeof(pkt), NULL);
-    sem_post(&self->packet_count);
+    /* Send the now filled packet back, and signal that it's ready */
+    jack_ringbuffer_write(self->filled_pkts, (char *) &pkt, sizeof(pkt));
+
+    if (pthread_mutex_trylock(&self->read_lock) == 0) {
+        pthread_cond_signal(&self->packet_ready);
+        pthread_mutex_unlock(&self->read_lock);
+    }
 
     return 0;
 }
@@ -136,12 +142,12 @@ static int supply_new_packets(JackData *self, 
AVFormatContext *context)
     /* Supply the process callback with new empty packets, by filling the new
      * packets FIFO buffer with as many packets as possible. process_callback()
      * can't do this by itself, because it can't allocate memory in realtime. 
*/
-    while (av_fifo_space(self->new_pkts) >= sizeof(pkt)) {
+    while (jack_ringbuffer_write_space(self->new_pkts) >= sizeof(pkt)) {
         if ((test = av_new_packet(&pkt, pkt_size)) < 0) {
             av_log(context, AV_LOG_ERROR, "Could not create packet of size 
%d\n", pkt_size);
             return test;
         }
-        av_fifo_generic_write(self->new_pkts, &pkt, sizeof(pkt), NULL);
+        jack_ringbuffer_write(self->new_pkts, (char *) &pkt, sizeof(pkt));
     }
     return 0;
 }
@@ -160,7 +166,8 @@ static int start_jack(AVFormatContext *context)
         return AVERROR(EIO);
     }
 
-    sem_init(&self->packet_count, 0, 0);
+    pthread_mutex_init(&self->read_lock, NULL);
+    pthread_cond_init(&self->packet_ready, NULL);
 
     self->sample_rate = jack_get_sample_rate(self->client);
     self->ports       = av_malloc(self->nports * sizeof(*self->ports));
@@ -181,11 +188,6 @@ static int start_jack(AVFormatContext *context)
         }
     }
 
-    /* Register JACK callbacks */
-    jack_set_process_callback(self->client, process_callback, self);
-    jack_on_shutdown(self->client, shutdown_callback, self);
-    jack_set_xrun_callback(self->client, xrun_callback, self);
-
     /* Create time filter */
     period            = (double) self->buffer_size / self->sample_rate;
     o                 = 2 * M_PI * 1.5 * period; /// bandwidth: 1.5Hz
@@ -195,27 +197,45 @@ static int start_jack(AVFormatContext *context)
         return AVERROR(ENOMEM);
     }
 
-    /* Create FIFO buffers */
-    self->filled_pkts = av_fifo_alloc(FIFO_PACKETS_NUM * sizeof(AVPacket));
-    /* New packets FIFO with one extra packet for safety against underruns */
-    self->new_pkts    = av_fifo_alloc((FIFO_PACKETS_NUM + 1) * 
sizeof(AVPacket));
+    /* Create ringbuffers */
+    self->filled_pkts = jack_ringbuffer_create(RING_PACKETS_NUM * 
sizeof(AVPacket));
+    if (!self->filled_pkts) {
+        jack_client_close(self->client);
+        return AVERROR(ENOMEM);
+    }
+
+    /* New packets ringbuffer with one extra packet for safety against 
underruns */
+    self->new_pkts    = jack_ringbuffer_create((RING_PACKETS_NUM + 1) * 
sizeof(AVPacket));
+    if (!self->new_pkts) {
+        jack_client_close(self->client);
+        return AVERROR(ENOMEM);
+    }
+
+    if (jack_ringbuffer_mlock(self->filled_pkts) || 
jack_ringbuffer_mlock(self->new_pkts))
+        av_log(context, AV_LOG_WARNING, "Unable to lock JACK ringbuffer in 
memory\n");
+
     if ((test = supply_new_packets(self, context))) {
         jack_client_close(self->client);
         return test;
     }
 
+    /* Register JACK callbacks */
+    jack_set_process_callback(self->client, process_callback, self);
+    jack_on_shutdown(self->client, shutdown_callback, self);
+    jack_set_xrun_callback(self->client, xrun_callback, self);
+
     return 0;
 
 }
 
-static void free_pkt_fifo(AVFifoBuffer *fifo)
+static void free_pkt_ringbuffer(jack_ringbuffer_t *rb)
 {
     AVPacket pkt;
-    while (av_fifo_size(fifo)) {
-        av_fifo_generic_read(fifo, &pkt, sizeof(pkt), NULL);
+    while (jack_ringbuffer_read_space(rb)) {
+        jack_ringbuffer_read(rb, (char *) &pkt, sizeof(pkt));
         av_free_packet(&pkt);
     }
-    av_fifo_free(fifo);
+    jack_ringbuffer_free(rb);
 }
 
 static void stop_jack(JackData *self)
@@ -225,9 +245,12 @@ static void stop_jack(JackData *self)
             jack_deactivate(self->client);
         jack_client_close(self->client);
     }
-    sem_destroy(&self->packet_count);
-    free_pkt_fifo(self->new_pkts);
-    free_pkt_fifo(self->filled_pkts);
+    pthread_cond_destroy(&self->packet_ready);
+    pthread_mutex_destroy(&self->read_lock);
+    if (self->new_pkts)
+        free_pkt_ringbuffer(self->new_pkts);
+    if (self->filled_pkts)
+        free_pkt_ringbuffer(self->filled_pkts);
     av_freep(&self->ports);
     ff_timefilter_destroy(self->timefilter);
 }
@@ -282,20 +305,27 @@ static int audio_read_packet(AVFormatContext *context, 
AVPacket *pkt)
         }
     }
 
-    /* Wait for a packet coming back from process_callback(), if one isn't 
available yet */
-    timeout.tv_sec = av_gettime() / 1000000 + 2;
-    if (sem_timedwait(&self->packet_count, &timeout)) {
-        if (errno == ETIMEDOUT) {
-            av_log(context, AV_LOG_ERROR,
-                   "Input error: timed out when waiting for JACK process 
callback output\n");
-        } else {
-            av_log(context, AV_LOG_ERROR, "Error while waiting for audio 
packet: %s\n",
-                   strerror(errno));
+    if (jack_ringbuffer_read_space(self->filled_pkts) < sizeof(*pkt)) {
+
+        pthread_mutex_lock(&self->read_lock);
+
+        /* Wait for a packet coming back from process_callback(), if one isn't 
available yet */
+        timeout.tv_sec = av_gettime() / 1000000 + 2;
+        if (pthread_cond_timedwait(&self->packet_ready, &self->read_lock, 
&timeout)) {
+            if (errno == ETIMEDOUT) {
+                av_log(context, AV_LOG_ERROR,
+                        "Input error: timed out when waiting for JACK process 
callback output\n");
+            } else {
+                av_log(context, AV_LOG_ERROR, "Error while waiting for audio 
packet: %s\n",
+                        strerror(errno));
+            }
+            if (!self->client)
+                av_log(context, AV_LOG_ERROR, "Input error: JACK server is 
gone\n");
+
+            return AVERROR(EIO);
         }
-        if (!self->client)
-            av_log(context, AV_LOG_ERROR, "Input error: JACK server is 
gone\n");
 
-        return AVERROR(EIO);
+        pthread_mutex_unlock(&self->read_lock);
     }
 
     if (self->pkt_xrun) {
@@ -309,7 +339,7 @@ static int audio_read_packet(AVFormatContext *context, 
AVPacket *pkt)
     }
 
     /* Retrieve the packet filled with audio data by process_callback() */
-    av_fifo_generic_read(self->filled_pkts, pkt, sizeof(*pkt), NULL);
+    jack_ringbuffer_read(self->filled_pkts, (char *) pkt, sizeof(*pkt));
 
     if ((test = supply_new_packets(self, context)))
         return test;
-- 
1.8.1.2

_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel

Reply via email to