Re: [FFmpeg-devel] [RFC][PATCH] avformat/fifo: add timeshift option to delay output
I have tested with below commands, It works fine. Thanks Marton. ffmpeg -i input_rtmp_addr -map 0:v -map 0:a -c copy -f fifo -timeshift 20 -queue_size 600 -fifo_format flv output_rtmp_addr ffmpeg -stream_loop -1 -re -i input_file -map 0:v -map 0:a -c copy -f fifo -timeshift 20 -queue_size 600 -fifo_format flv output_rtmp_addr Marton Balint 于2020年5月8日周五 上午4:28写道: > > Signed-off-by: Marton Balint > --- > libavformat/fifo.c | 59 > +- > 1 file changed, 58 insertions(+), 1 deletion(-) > > diff --git a/libavformat/fifo.c b/libavformat/fifo.c > index d11dc6626c..17748e94ce 100644 > --- a/libavformat/fifo.c > +++ b/libavformat/fifo.c > @@ -19,6 +19,8 @@ > * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA > */ > > +#include > + > #include "libavutil/avassert.h" > #include "libavutil/opt.h" > #include "libavutil/time.h" > @@ -77,6 +79,9 @@ typedef struct FifoContext { > /* Value > 0 signals queue overflow */ > volatile uint8_t overflow_flag; > > +atomic_int_least64_t queue_duration; > +int64_t last_sent_dts; > +int64_t timeshift; > } FifoContext; > > typedef struct FifoThreadContext { > @@ -98,9 +103,12 @@ typedef struct FifoThreadContext { > * so finalization by calling write_trailer and ff_io_close must be done > * before exiting / reinitialization of underlying muxer */ > uint8_t header_written; > + > +int64_t last_received_dts; > } FifoThreadContext; > > typedef enum FifoMessageType { > +FIFO_NOOP, > FIFO_WRITE_HEADER, > FIFO_WRITE_PACKET, > FIFO_FLUSH_OUTPUT > @@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext > *ctx) > return av_write_frame(avf2, NULL); > } > > +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t > *last_dts) > +{ > +AVStream *st = avf->streams[pkt->stream_index]; > +int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); > +int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); > +*last_dts = dts; > +return duration; > +} > + > static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) > { > AVFormatContext *avf = ctx->avf; > @@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext > *ctx, AVPacket *pkt) > AVRational src_tb, dst_tb; > int ret, s_idx; > > +if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) > +atomic_fetch_sub_explicit(>queue_duration, next_duration(avf, > pkt, >last_received_dts), memory_order_relaxed); > + > if (ctx->drop_until_keyframe) { > if (pkt->flags & AV_PKT_FLAG_KEY) { > ctx->drop_until_keyframe = 0; > @@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext > *ctx, FifoMessage *msg > { > int ret = AVERROR(EINVAL); > > +if (msg->type == FIFO_NOOP) > +return 0; > + > if (!ctx->header_written) { > ret = fifo_thread_write_header(ctx); > if (ret < 0) > @@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data) > AVFormatContext *avf = data; > FifoContext *fifo = avf->priv_data; > AVThreadMessageQueue *queue = fifo->queue; > -FifoMessage msg = {FIFO_WRITE_HEADER, {0}}; > +FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; > int ret; > > FifoThreadContext fifo_thread_ctx; > memset(_thread_ctx, 0, sizeof(FifoThreadContext)); > fifo_thread_ctx.avf = avf; > +fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; > > while (1) { > uint8_t just_flushed = 0; > @@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data) > if (just_flushed) > av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); > > +if (fifo->timeshift) > +while (atomic_load_explicit(>queue_duration, > memory_order_relaxed) < fifo->timeshift) > +av_usleep(1); > + > ret = av_thread_message_queue_recv(queue, , 0); > if (ret < 0) { > av_thread_message_queue_set_err_send(queue, ret); > @@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf) > " only when drop_pkts_on_overflow is also turned on\n"); > return AVERROR(EINVAL); > } > +atomic_init(>queue_duration, 0); > +fifo->last_sent_dts = AV_NOPTS_VALUE; > > oformat = av_guess_format(fifo->format, avf->url, NULL); > if (!oformat) { > @@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, > AVPacket *pkt) > goto fail; > } > > +if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) > +atomic_fetch_add_explicit(>queue_duration, next_duration(avf, > pkt, >last_sent_dts), memory_order_relaxed); > + > return ret; > fail: > if (pkt) > @@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf) > int ret; > > av_thread_message_queue_set_err_recv(fifo->queue,
[FFmpeg-devel] [RFC][PATCH] avformat/fifo: add timeshift option to delay output
Signed-off-by: Marton Balint --- libavformat/fifo.c | 59 +- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/libavformat/fifo.c b/libavformat/fifo.c index d11dc6626c..17748e94ce 100644 --- a/libavformat/fifo.c +++ b/libavformat/fifo.c @@ -19,6 +19,8 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include + #include "libavutil/avassert.h" #include "libavutil/opt.h" #include "libavutil/time.h" @@ -77,6 +79,9 @@ typedef struct FifoContext { /* Value > 0 signals queue overflow */ volatile uint8_t overflow_flag; +atomic_int_least64_t queue_duration; +int64_t last_sent_dts; +int64_t timeshift; } FifoContext; typedef struct FifoThreadContext { @@ -98,9 +103,12 @@ typedef struct FifoThreadContext { * so finalization by calling write_trailer and ff_io_close must be done * before exiting / reinitialization of underlying muxer */ uint8_t header_written; + +int64_t last_received_dts; } FifoThreadContext; typedef enum FifoMessageType { +FIFO_NOOP, FIFO_WRITE_HEADER, FIFO_WRITE_PACKET, FIFO_FLUSH_OUTPUT @@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext *ctx) return av_write_frame(avf2, NULL); } +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts) +{ +AVStream *st = avf->streams[pkt->stream_index]; +int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); +int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); +*last_dts = dts; +return duration; +} + static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) { AVFormatContext *avf = ctx->avf; @@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) AVRational src_tb, dst_tb; int ret, s_idx; +if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) +atomic_fetch_sub_explicit(>queue_duration, next_duration(avf, pkt, >last_received_dts), memory_order_relaxed); + if (ctx->drop_until_keyframe) { if (pkt->flags & AV_PKT_FLAG_KEY) { ctx->drop_until_keyframe = 0; @@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg { int ret = AVERROR(EINVAL); +if (msg->type == FIFO_NOOP) +return 0; + if (!ctx->header_written) { ret = fifo_thread_write_header(ctx); if (ret < 0) @@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data) AVFormatContext *avf = data; FifoContext *fifo = avf->priv_data; AVThreadMessageQueue *queue = fifo->queue; -FifoMessage msg = {FIFO_WRITE_HEADER, {0}}; +FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; int ret; FifoThreadContext fifo_thread_ctx; memset(_thread_ctx, 0, sizeof(FifoThreadContext)); fifo_thread_ctx.avf = avf; +fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; while (1) { uint8_t just_flushed = 0; @@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data) if (just_flushed) av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); +if (fifo->timeshift) +while (atomic_load_explicit(>queue_duration, memory_order_relaxed) < fifo->timeshift) +av_usleep(1); + ret = av_thread_message_queue_recv(queue, , 0); if (ret < 0) { av_thread_message_queue_set_err_send(queue, ret); @@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf) " only when drop_pkts_on_overflow is also turned on\n"); return AVERROR(EINVAL); } +atomic_init(>queue_duration, 0); +fifo->last_sent_dts = AV_NOPTS_VALUE; oformat = av_guess_format(fifo->format, avf->url, NULL); if (!oformat) { @@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) goto fail; } +if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) +atomic_fetch_add_explicit(>queue_duration, next_duration(avf, pkt, >last_sent_dts), memory_order_relaxed); + return ret; fail: if (pkt) @@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf) int ret; av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); +if (fifo->timeshift) { +int64_t now = av_gettime_relative(); +int64_t elapsed = 0; +FifoMessage msg = {FIFO_NOOP}; +do { +int64_t delay = av_gettime_relative() - now; +if (delay < 0) { // Discontinuity? +delay = 1; +now = av_gettime_relative(); +} else { +now += delay; +} +atomic_fetch_add_explicit(>queue_duration, delay, memory_order_relaxed); +elapsed += delay; +if (elapsed > fifo->timeshift) +break; +