It is currently broken due bogus packet duplication within the queue.
---
 ffmpeg.c |  183 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 176 insertions(+), 7 deletions(-)

diff --git a/ffmpeg.c b/ffmpeg.c
index 5e50db3..b7a84eb 100644
--- a/ffmpeg.c
+++ b/ffmpeg.c
@@ -31,6 +31,7 @@
 #include <signal.h>
 #include <limits.h>
 #include <unistd.h>
+#include <pthread.h>
 #include "libavformat/avformat.h"
 #include "libavdevice/avdevice.h"
 #include "libswscale/swscale.h"
@@ -256,6 +257,20 @@ static AVBitStreamFilterContext 
*subtitle_bitstream_filters=NULL;
 
 #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass"
 
+typedef struct AVPacketQueue {
+    AVPacketList *first_pkt, *last_pkt;
+    int nb_packets;
+    int size;
+    int abort_request;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+} AVPacketQueue;
+
+typedef struct AVOutputQueue {
+    AVPacketQueue queue;
+    pthread_t th;
+} AVOutputQueue;
+
 struct AVInputStream;
 
 typedef struct AVOutputStream {
@@ -335,6 +350,104 @@ typedef struct AVInputFile {
     int nb_streams;       /* nb streams we are aware of */
 } AVInputFile;
 
+static AVPacket flush_pkt;
+
+static void avpacket_queue_init(AVPacketQueue *q)
+{
+    memset(q, 0, sizeof(AVPacketQueue));
+    pthread_mutex_init(&q->mutex, NULL);
+    pthread_cond_init(&q->cond, NULL);
+}
+
+static void avpacket_queue_flush(AVPacketQueue *q)
+{
+    AVPacketList *pkt, *pkt1;
+
+    pthread_mutex_lock(&q->mutex);
+    for(pkt = q->first_pkt; pkt != NULL; pkt = pkt1) {
+        pkt1 = pkt->next;
+        av_free_packet(&pkt->pkt);
+        av_freep(&pkt);
+    }
+    q->last_pkt = NULL;
+    q->first_pkt = NULL;
+    q->nb_packets = 0;
+    q->size = 0;
+    pthread_mutex_unlock(&q->mutex);
+}
+
+static void avpacket_queue_end(AVPacketQueue *q)
+{
+    avpacket_queue_flush(q);
+    pthread_mutex_destroy(&q->mutex);
+    pthread_cond_destroy(&q->cond);
+}
+
+static int avpacket_queue_put(AVPacketQueue *q, AVPacket *pkt)
+{
+    AVPacketList *pkt1;
+
+    /* duplicate the packet */
+    if (pkt!=&flush_pkt && av_dup_packet(pkt) < 0)
+        return -1;
+
+    pkt1 = (AVPacketList *)av_malloc(sizeof(AVPacketList));
+    if (!pkt1)
+        return -1;
+    pkt1->pkt = *pkt;
+    pkt1->next = NULL;
+
+    pthread_mutex_lock(&q->mutex);
+
+    if (!q->last_pkt)
+
+        q->first_pkt = pkt1;
+    else
+        q->last_pkt->next = pkt1;
+    q->last_pkt = pkt1;
+    q->nb_packets++;
+    q->size += pkt1->pkt.size + sizeof(*pkt1);
+
+    pthread_cond_signal(&q->cond);
+
+    pthread_mutex_unlock(&q->mutex);
+    return 0;
+}
+
+static int avpacket_queue_get(AVPacketQueue *q, AVPacket *pkt, int block)
+{
+    AVPacketList *pkt1;
+    int ret;
+
+    pthread_mutex_lock(&q->mutex);
+
+    for(;;) {
+        pkt1 = q->first_pkt;
+        if (pkt1) {
+            q->first_pkt = pkt1->next;
+            if (!q->first_pkt)
+                q->last_pkt = NULL;
+            q->nb_packets--;
+            q->size -= pkt1->pkt.size + sizeof(*pkt1);
+            *pkt = pkt1->pkt;
+            av_free(pkt1);
+            ret = 1;
+            break;
+        } else if (!block) {
+            ret = 0;
+            break;
+        } else {
+            pthread_cond_wait(&q->cond, &q->mutex);
+            if (q->first_pkt->pkt.data == flush_pkt.data) {
+                ret = 0;
+                break;
+            }
+        }
+    }
+    pthread_mutex_unlock(&q->mutex);
+    return ret;
+}
+
 #if CONFIG_AVFILTER
 
 static int configure_filters(AVInputStream *ist, AVOutputStream *ost)
@@ -447,6 +560,15 @@ static int decode_interrupt_cb(void)
     return q_pressed || (q_pressed = read_key() == 'q');
 }
 
+// kill the thread and free the queue
+static void stop_output_queue(AVOutputQueue *out)
+{
+    //pthread_cancel(out->th);
+    avpacket_queue_put(&out->queue, &flush_pkt);
+    pthread_join(out->th, NULL);
+    avpacket_queue_end(&out->queue);
+}
+
 static int ffmpeg_exit(int ret)
 {
     int i;
@@ -454,6 +576,9 @@ static int ffmpeg_exit(int ret)
     /* close files */
     for(i=0;i<nb_output_files;i++) {
         AVFormatContext *s = output_files[i];
+        AVOutputQueue *out = s->opaque;
+        if (out) stop_output_queue(out);
+
         if (!(s->oformat->flags & AVFMT_NOFILE) && s->pb)
             avio_close(s->pb);
         avformat_free_context(s);
@@ -666,10 +791,10 @@ get_sync_ipts(const AVOutputStream *ost)
 }
 
 static void write_frame(AVFormatContext *s, AVPacket *pkt, AVCodecContext 
*avctx, AVBitStreamFilterContext *bsfc){
-    int ret;
+    AVOutputQueue *out = s->opaque;
+    AVPacket new_pkt= *pkt;
 
     while(bsfc){
-        AVPacket new_pkt= *pkt;
         int a= av_bitstream_filter_filter(bsfc, avctx, NULL,
                                           &new_pkt.data, &new_pkt.size,
                                           pkt->data, pkt->size,
@@ -690,11 +815,7 @@ static void write_frame(AVFormatContext *s, AVPacket *pkt, 
AVCodecContext *avctx
         bsfc= bsfc->next;
     }
 
-    ret= av_interleaved_write_frame(s, pkt);
-    if(ret < 0){
-        print_error("av_interleaved_write_frame()", ret);
-        ffmpeg_exit(1);
-    }
+    avpacket_queue_put(&out->queue, &new_pkt);
 }
 
 #define MAX_AUDIO_PACKET_SIZE (128 * 1024)
@@ -1857,6 +1978,45 @@ static void parse_forced_key_frames(char *kf, 
AVOutputStream *ost,
     }
 }
 
+static void *remux(void *ctx)
+{
+    AVFormatContext *s = ctx;
+    AVOutputQueue *out = s->opaque;
+    AVPacket pkt;
+    int ret;
+
+    while (avpacket_queue_get(&out->queue, &pkt, 1)) {
+        ret = av_interleaved_write_frame(s, &pkt);
+        if (ret < 0) {
+            print_error("av_interleaved_write_frame()", ret);
+            ffmpeg_exit(1);
+        }
+    }
+    return NULL;
+}
+
+static int init_output_queue(AVFormatContext *s)
+{
+    AVOutputQueue *out = s->opaque;
+    AVPacketQueue *queue = &out->queue;
+    pthread_t *th = &out->th;
+    int ret;
+
+    avpacket_queue_init(queue);
+    ret = pthread_create(th, NULL, remux, s);
+    if (ret) return AVERROR(ENOMEM);
+    return 0;
+}
+
+// kill the thread and consume all the packets
+static void wait_output_queue(AVFormatContext *s)
+{
+    AVOutputQueue *out = s->opaque;
+    avpacket_queue_put(&out->queue, &flush_pkt);
+    pthread_join(out->th, NULL);
+}
+
+
 /*
  * The following code is the main loop of the file converter
  */
@@ -2401,6 +2561,14 @@ static int transcode(AVFormatContext **output_files,
         if (strcmp(output_files[i]->oformat->name, "rtp")) {
             want_sdp = 0;
         }
+        /* initialize the output queues and thread */
+        os->opaque = av_malloc(sizeof(AVOutputQueue));
+        if (!os->opaque) {
+            snprintf(error, sizeof(error), "Cannot allocate output queue");
+            ret = AVERROR(ENOMEM);
+        }
+        init_output_queue(os);
+
     }
 
  dump_format:
@@ -2609,6 +2777,7 @@ static int transcode(AVFormatContext **output_files,
     /* write the trailer if needed and close file */
     for(i=0;i<nb_output_files;i++) {
         os = output_files[i];
+        wait_output_queue(os);
         av_write_trailer(os);
     }
 
-- 
1.7.4.1

_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel

Reply via email to