Re: [FFmpeg-devel] [RFC][PATCH] avformat/fifo: add timeshift option to delay output

2020-05-08 Thread Tao Zhang
I have tested with below commands, It works fine. Thanks Marton.
ffmpeg -i input_rtmp_addr -map 0:v -map 0:a -c copy  -f fifo
-timeshift 20 -queue_size 600 -fifo_format flv output_rtmp_addr
ffmpeg -stream_loop -1 -re -i input_file -map 0:v -map 0:a -c copy  -f
fifo -timeshift 20 -queue_size 600 -fifo_format flv
output_rtmp_addr

Marton Balint  于2020年5月8日周五 上午4:28写道:
>
> Signed-off-by: Marton Balint 
> ---
>  libavformat/fifo.c | 59 
> +-
>  1 file changed, 58 insertions(+), 1 deletion(-)
>
> diff --git a/libavformat/fifo.c b/libavformat/fifo.c
> index d11dc6626c..17748e94ce 100644
> --- a/libavformat/fifo.c
> +++ b/libavformat/fifo.c
> @@ -19,6 +19,8 @@
>   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
>   */
>
> +#include 
> +
>  #include "libavutil/avassert.h"
>  #include "libavutil/opt.h"
>  #include "libavutil/time.h"
> @@ -77,6 +79,9 @@ typedef struct FifoContext {
>  /* Value > 0 signals queue overflow */
>  volatile uint8_t overflow_flag;
>
> +atomic_int_least64_t queue_duration;
> +int64_t last_sent_dts;
> +int64_t timeshift;
>  } FifoContext;
>
>  typedef struct FifoThreadContext {
> @@ -98,9 +103,12 @@ typedef struct FifoThreadContext {
>   * so finalization by calling write_trailer and ff_io_close must be done
>   * before exiting / reinitialization of underlying muxer */
>  uint8_t header_written;
> +
> +int64_t last_received_dts;
>  } FifoThreadContext;
>
>  typedef enum FifoMessageType {
> +FIFO_NOOP,
>  FIFO_WRITE_HEADER,
>  FIFO_WRITE_PACKET,
>  FIFO_FLUSH_OUTPUT
> @@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext 
> *ctx)
>  return av_write_frame(avf2, NULL);
>  }
>
> +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t 
> *last_dts)
> +{
> +AVStream *st = avf->streams[pkt->stream_index];
> +int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
> +int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
> +*last_dts = dts;
> +return duration;
> +}
> +
>  static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
>  {
>  AVFormatContext *avf = ctx->avf;
> @@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext 
> *ctx, AVPacket *pkt)
>  AVRational src_tb, dst_tb;
>  int ret, s_idx;
>
> +if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
> +atomic_fetch_sub_explicit(>queue_duration, next_duration(avf, 
> pkt, >last_received_dts), memory_order_relaxed);
> +
>  if (ctx->drop_until_keyframe) {
>  if (pkt->flags & AV_PKT_FLAG_KEY) {
>  ctx->drop_until_keyframe = 0;
> @@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext 
> *ctx, FifoMessage *msg
>  {
>  int ret = AVERROR(EINVAL);
>
> +if (msg->type == FIFO_NOOP)
> +return 0;
> +
>  if (!ctx->header_written) {
>  ret = fifo_thread_write_header(ctx);
>  if (ret < 0)
> @@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data)
>  AVFormatContext *avf = data;
>  FifoContext *fifo = avf->priv_data;
>  AVThreadMessageQueue *queue = fifo->queue;
> -FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
> +FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
>  int ret;
>
>  FifoThreadContext fifo_thread_ctx;
>  memset(_thread_ctx, 0, sizeof(FifoThreadContext));
>  fifo_thread_ctx.avf = avf;
> +fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
>
>  while (1) {
>  uint8_t just_flushed = 0;
> @@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data)
>  if (just_flushed)
>  av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
>
> +if (fifo->timeshift)
> +while (atomic_load_explicit(>queue_duration, 
> memory_order_relaxed) < fifo->timeshift)
> +av_usleep(1);
> +
>  ret = av_thread_message_queue_recv(queue, , 0);
>  if (ret < 0) {
>  av_thread_message_queue_set_err_send(queue, ret);
> @@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf)
> " only when drop_pkts_on_overflow is also turned on\n");
>  return AVERROR(EINVAL);
>  }
> +atomic_init(>queue_duration, 0);
> +fifo->last_sent_dts = AV_NOPTS_VALUE;
>
>  oformat = av_guess_format(fifo->format, avf->url, NULL);
>  if (!oformat) {
> @@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, 
> AVPacket *pkt)
>  goto fail;
>  }
>
> +if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
> +atomic_fetch_add_explicit(>queue_duration, next_duration(avf, 
> pkt, >last_sent_dts), memory_order_relaxed);
> +
>  return ret;
>  fail:
>  if (pkt)
> @@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf)
>  int ret;
>
>  av_thread_message_queue_set_err_recv(fifo->queue, 

[FFmpeg-devel] [RFC][PATCH] avformat/fifo: add timeshift option to delay output

2020-05-07 Thread Marton Balint
Signed-off-by: Marton Balint 
---
 libavformat/fifo.c | 59 +-
 1 file changed, 58 insertions(+), 1 deletion(-)

diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index d11dc6626c..17748e94ce 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -19,6 +19,8 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#include 
+
 #include "libavutil/avassert.h"
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
@@ -77,6 +79,9 @@ typedef struct FifoContext {
 /* Value > 0 signals queue overflow */
 volatile uint8_t overflow_flag;
 
+atomic_int_least64_t queue_duration;
+int64_t last_sent_dts;
+int64_t timeshift;
 } FifoContext;
 
 typedef struct FifoThreadContext {
@@ -98,9 +103,12 @@ typedef struct FifoThreadContext {
  * so finalization by calling write_trailer and ff_io_close must be done
  * before exiting / reinitialization of underlying muxer */
 uint8_t header_written;
+
+int64_t last_received_dts;
 } FifoThreadContext;
 
 typedef enum FifoMessageType {
+FIFO_NOOP,
 FIFO_WRITE_HEADER,
 FIFO_WRITE_PACKET,
 FIFO_FLUSH_OUTPUT
@@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext *ctx)
 return av_write_frame(avf2, NULL);
 }
 
+static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t 
*last_dts)
+{
+AVStream *st = avf->streams[pkt->stream_index];
+int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
+int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
+*last_dts = dts;
+return duration;
+}
+
 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
 {
 AVFormatContext *avf = ctx->avf;
@@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext *ctx, 
AVPacket *pkt)
 AVRational src_tb, dst_tb;
 int ret, s_idx;
 
+if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
+atomic_fetch_sub_explicit(>queue_duration, next_duration(avf, 
pkt, >last_received_dts), memory_order_relaxed);
+
 if (ctx->drop_until_keyframe) {
 if (pkt->flags & AV_PKT_FLAG_KEY) {
 ctx->drop_until_keyframe = 0;
@@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext 
*ctx, FifoMessage *msg
 {
 int ret = AVERROR(EINVAL);
 
+if (msg->type == FIFO_NOOP)
+return 0;
+
 if (!ctx->header_written) {
 ret = fifo_thread_write_header(ctx);
 if (ret < 0)
@@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data)
 AVFormatContext *avf = data;
 FifoContext *fifo = avf->priv_data;
 AVThreadMessageQueue *queue = fifo->queue;
-FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
+FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
 int ret;
 
 FifoThreadContext fifo_thread_ctx;
 memset(_thread_ctx, 0, sizeof(FifoThreadContext));
 fifo_thread_ctx.avf = avf;
+fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
 
 while (1) {
 uint8_t just_flushed = 0;
@@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data)
 if (just_flushed)
 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
 
+if (fifo->timeshift)
+while (atomic_load_explicit(>queue_duration, 
memory_order_relaxed) < fifo->timeshift)
+av_usleep(1);
+
 ret = av_thread_message_queue_recv(queue, , 0);
 if (ret < 0) {
 av_thread_message_queue_set_err_send(queue, ret);
@@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf)
" only when drop_pkts_on_overflow is also turned on\n");
 return AVERROR(EINVAL);
 }
+atomic_init(>queue_duration, 0);
+fifo->last_sent_dts = AV_NOPTS_VALUE;
 
 oformat = av_guess_format(fifo->format, avf->url, NULL);
 if (!oformat) {
@@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket 
*pkt)
 goto fail;
 }
 
+if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
+atomic_fetch_add_explicit(>queue_duration, next_duration(avf, 
pkt, >last_sent_dts), memory_order_relaxed);
+
 return ret;
 fail:
 if (pkt)
@@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf)
 int ret;
 
 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+if (fifo->timeshift) {
+int64_t now = av_gettime_relative();
+int64_t elapsed = 0;
+FifoMessage msg = {FIFO_NOOP};
+do {
+int64_t delay = av_gettime_relative() - now;
+if (delay < 0) { // Discontinuity?
+delay = 1;
+now = av_gettime_relative();
+} else {
+now += delay;
+}
+atomic_fetch_add_explicit(>queue_duration, delay, 
memory_order_relaxed);
+elapsed += delay;
+if (elapsed > fifo->timeshift)
+break;
+