On Fri, Apr 01, 2011 at 17:30:47 (CEST), Luca Barbato wrote:

> It is currently broken due bogus packet duplication within the queue.
> ---
>  ffmpeg.c |  183 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 files changed, 176 insertions(+), 7 deletions(-)
>
> diff --git a/ffmpeg.c b/ffmpeg.c
> index 5e50db3..b7a84eb 100644
> --- a/ffmpeg.c
> +++ b/ffmpeg.c
> @@ -31,6 +31,7 @@
>  #include <signal.h>
>  #include <limits.h>
>  #include <unistd.h>
> +#include <pthread.h>
>  #include "libavformat/avformat.h"
>  #include "libavdevice/avdevice.h"
>  #include "libswscale/swscale.h"
> @@ -256,6 +257,20 @@ static AVBitStreamFilterContext 
> *subtitle_bitstream_filters=NULL;
>  
>  #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass"
>  
> +typedef struct AVPacketQueue {
> +    AVPacketList *first_pkt, *last_pkt;
> +    int nb_packets;
> +    int size;
> +    int abort_request;
> +    pthread_mutex_t mutex;
> +    pthread_cond_t cond;
> +} AVPacketQueue;

err, does really each element of the queue need it's own mutex and
condition variable? AFAIU your code, there is no race on single elements
of the queue, but you are syncorinzing get() and puts() on elements of
the queue.

> +typedef struct AVOutputQueue {
> +    AVPacketQueue queue;
> +    pthread_t th;
> +} AVOutputQueue;
> +
>  struct AVInputStream;
>  
>  typedef struct AVOutputStream {
> @@ -335,6 +350,104 @@ typedef struct AVInputFile {
>      int nb_streams;       /* nb streams we are aware of */
>  } AVInputFile;
>  
> +static AVPacket flush_pkt;
> +
> +static void avpacket_queue_init(AVPacketQueue *q)
> +{
> +    memset(q, 0, sizeof(AVPacketQueue));
> +    pthread_mutex_init(&q->mutex, NULL);
> +    pthread_cond_init(&q->cond, NULL);
> +}
> +
> +static void avpacket_queue_flush(AVPacketQueue *q)
> +{
> +    AVPacketList *pkt, *pkt1;
> +
> +    pthread_mutex_lock(&q->mutex);
> +    for(pkt = q->first_pkt; pkt != NULL; pkt = pkt1) {
> +        pkt1 = pkt->next;
> +        av_free_packet(&pkt->pkt);
> +        av_freep(&pkt);
> +    }
> +    q->last_pkt = NULL;
> +    q->first_pkt = NULL;
> +    q->nb_packets = 0;
> +    q->size = 0;
> +    pthread_mutex_unlock(&q->mutex);
> +}
> +
> +static void avpacket_queue_end(AVPacketQueue *q)
> +{
> +    avpacket_queue_flush(q);
> +    pthread_mutex_destroy(&q->mutex);
> +    pthread_cond_destroy(&q->cond);
> +}
> +
> +static int avpacket_queue_put(AVPacketQueue *q, AVPacket *pkt)
> +{
> +    AVPacketList *pkt1;
> +
> +    /* duplicate the packet */
> +    if (pkt!=&flush_pkt && av_dup_packet(pkt) < 0)
> +        return -1;
> +
> +    pkt1 = (AVPacketList *)av_malloc(sizeof(AVPacketList));
> +    if (!pkt1)
> +        return -1;
> +    pkt1->pkt = *pkt;
> +    pkt1->next = NULL;
> +
> +    pthread_mutex_lock(&q->mutex);
> +
> +    if (!q->last_pkt)
> +
> +        q->first_pkt = pkt1;

Unnecessary blank line.

> +    else
> +        q->last_pkt->next = pkt1;
> +    q->last_pkt = pkt1;
> +    q->nb_packets++;
> +    q->size += pkt1->pkt.size + sizeof(*pkt1);
> +
> +    pthread_cond_signal(&q->cond);
> +
> +    pthread_mutex_unlock(&q->mutex);
> +    return 0;
> +}

here as well

> +static int avpacket_queue_get(AVPacketQueue *q, AVPacket *pkt, int block)
> +{
> +    AVPacketList *pkt1;
> +    int ret;
> +
> +    pthread_mutex_lock(&q->mutex);
> +
> +    for(;;) {
> +        pkt1 = q->first_pkt;
> +        if (pkt1) {
> +            q->first_pkt = pkt1->next;
> +            if (!q->first_pkt)
> +                q->last_pkt = NULL;
> +            q->nb_packets--;
> +            q->size -= pkt1->pkt.size + sizeof(*pkt1);
> +            *pkt = pkt1->pkt;
> +            av_free(pkt1);
> +            ret = 1;
> +            break;
> +        } else if (!block) {
> +            ret = 0;
> +            break;
> +        } else {
> +            pthread_cond_wait(&q->cond, &q->mutex);
> +            if (q->first_pkt->pkt.data == flush_pkt.data) {
> +                ret = 0;
> +                break;
> +            }
> +        }
> +    }
> +    pthread_mutex_unlock(&q->mutex);
> +    return ret;
> +}
> +

I wonder if this AVPacketQueue wouldn't be better placed somewhere in
libavformat/

TBH, I'm actually a bit surprised that we don't have a syncronized list
implementation already.

>  #if CONFIG_AVFILTER
>  
>  static int configure_filters(AVInputStream *ist, AVOutputStream *ost)
> @@ -447,6 +560,15 @@ static int decode_interrupt_cb(void)
>      return q_pressed || (q_pressed = read_key() == 'q');
>  }
>  
> +// kill the thread and free the queue
> +static void stop_output_queue(AVOutputQueue *out)
> +{
> +    //pthread_cancel(out->th);

??

> +    avpacket_queue_put(&out->queue, &flush_pkt);
> +    pthread_join(out->th, NULL);
> +    avpacket_queue_end(&out->queue);
> +}
> +
>  static int ffmpeg_exit(int ret)
>  {
>      int i;
> @@ -454,6 +576,9 @@ static int ffmpeg_exit(int ret)
>      /* close files */
>      for(i=0;i<nb_output_files;i++) {
>          AVFormatContext *s = output_files[i];
> +        AVOutputQueue *out = s->opaque;
> +        if (out) stop_output_queue(out);
> +
missing newline

>          if (!(s->oformat->flags & AVFMT_NOFILE) && s->pb)
>              avio_close(s->pb);
>          avformat_free_context(s);
> @@ -666,10 +791,10 @@ get_sync_ipts(const AVOutputStream *ost)
>  }
>  
>  static void write_frame(AVFormatContext *s, AVPacket *pkt, AVCodecContext 
> *avctx, AVBitStreamFilterContext *bsfc){
> -    int ret;
> +    AVOutputQueue *out = s->opaque;
> +    AVPacket new_pkt= *pkt;
>  
>      while(bsfc){
> -        AVPacket new_pkt= *pkt;
>          int a= av_bitstream_filter_filter(bsfc, avctx, NULL,
>                                            &new_pkt.data, &new_pkt.size,
>                                            pkt->data, pkt->size,
> @@ -690,11 +815,7 @@ static void write_frame(AVFormatContext *s, AVPacket 
> *pkt, AVCodecContext *avctx
>          bsfc= bsfc->next;
>      }

I didn't test your patch, but you are changing the semantics of the
loop. Shouldn't a 'new_pkt = *pkt' remain at the head of the loop?

>  
> -    ret= av_interleaved_write_frame(s, pkt);
> -    if(ret < 0){
> -        print_error("av_interleaved_write_frame()", ret);
> -        ffmpeg_exit(1);
> -    }
> +    avpacket_queue_put(&out->queue, &new_pkt);
>  }
>  
>  #define MAX_AUDIO_PACKET_SIZE (128 * 1024)
> @@ -1857,6 +1978,45 @@ static void parse_forced_key_frames(char *kf, 
> AVOutputStream *ost,
>      }
>  }
>  
> +static void *remux(void *ctx)
> +{
> +    AVFormatContext *s = ctx;
> +    AVOutputQueue *out = s->opaque;
> +    AVPacket pkt;
> +    int ret;
> +
> +    while (avpacket_queue_get(&out->queue, &pkt, 1)) {
> +        ret = av_interleaved_write_frame(s, &pkt);
> +        if (ret < 0) {
> +            print_error("av_interleaved_write_frame()", ret);
> +            ffmpeg_exit(1);
> +        }
> +    }
> +    return NULL;
> +}
> +
> +static int init_output_queue(AVFormatContext *s)
> +{
> +    AVOutputQueue *out = s->opaque;
> +    AVPacketQueue *queue = &out->queue;
> +    pthread_t *th = &out->th;
> +    int ret;
> +
> +    avpacket_queue_init(queue);
> +    ret = pthread_create(th, NULL, remux, s);
> +    if (ret) return AVERROR(ENOMEM);
> +    return 0;
> +}
missing newline after 'if (ret)'

> +
> +// kill the thread and consume all the packets
> +static void wait_output_queue(AVFormatContext *s)
> +{
> +    AVOutputQueue *out = s->opaque;
> +    avpacket_queue_put(&out->queue, &flush_pkt);
> +    pthread_join(out->th, NULL);
> +}
> +
> +
>  /*
>   * The following code is the main loop of the file converter
>   */
> @@ -2401,6 +2561,14 @@ static int transcode(AVFormatContext **output_files,
>          if (strcmp(output_files[i]->oformat->name, "rtp")) {
>              want_sdp = 0;
>          }
> +        /* initialize the output queues and thread */
> +        os->opaque = av_malloc(sizeof(AVOutputQueue));
> +        if (!os->opaque) {
> +            snprintf(error, sizeof(error), "Cannot allocate output queue");
> +            ret = AVERROR(ENOMEM);
> +        }
> +        init_output_queue(os);
> +
>      }
>  
>   dump_format:
> @@ -2609,6 +2777,7 @@ static int transcode(AVFormatContext **output_files,
>      /* write the trailer if needed and close file */
>      for(i=0;i<nb_output_files;i++) {
>          os = output_files[i];
> +        wait_output_queue(os);
>          av_write_trailer(os);
>      }

-- 
Gruesse/greetings,
Reinhard Tartler, KeyID 945348A4
_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel

Reply via email to