ffmpeg, improve wait for close logic
Both audio and video destructors have been succesfully using the logic,
if(isRunning())
{
m_exit = true;
join();
}
since it was introduced,
but the close routines are using,
m_exit = true;
if(isRunning() && waitForThreadToExit)
{
while(isRunning()) { OpenThreads::Thread::YieldCurrentThread(); }
}
which not only is it doing an unnecessary busy wait, but it doesn't
guaranteed that the other thread has terminated, just that it has
progressed far enough that OpenThreads has set the thread status as
not running. Like the destructor set the m_exit after checking
isRunning() to avoid the race condition of not getting to join()
because the thread was running, but isRunning() returns false.
Now that FFmpeg*close is fixed, call it from the destructor as well
to have that code in only one location.
---
src/osgPlugins/ffmpeg/FFmpegDecoderAudio.cpp | 18 +++++-------------
src/osgPlugins/ffmpeg/FFmpegDecoderVideo.cpp | 19 +++++--------------
2 files changed, 10 insertions(+), 27 deletions(-)
diff --git a/src/osgPlugins/ffmpeg/FFmpegDecoderAudio.cpp
b/src/osgPlugins/ffmpeg/FFmpegDecoderAudio.cpp
index 769bda1..b738a3b 100644
--- a/src/osgPlugins/ffmpeg/FFmpegDecoderAudio.cpp
+++ b/src/osgPlugins/ffmpeg/FFmpegDecoderAudio.cpp
@@ -54,15 +54,7 @@ FFmpegDecoderAudio::FFmpegDecoderAudio(PacketQueue &
packets, FFmpegClocks & clo
FFmpegDecoderAudio::~FFmpegDecoderAudio()
{
- if (isRunning())
- {
- m_exit = true;
-#if 0
- while(isRunning()) { OpenThreads::YieldCurrentThread(); }
-#else
- join();
-#endif
- }
+ this->close(true);
}
@@ -123,11 +115,11 @@ void FFmpegDecoderAudio::pause(bool pause)
void FFmpegDecoderAudio::close(bool waitForThreadToExit)
{
- m_exit = true;
-
- if (isRunning() && waitForThreadToExit)
+ if (isRunning())
{
- while(isRunning()) { OpenThreads::Thread::YieldCurrentThread(); }
+ m_exit = true;
+ if (waitForThreadToExit)
+ join();
}
}
diff --git a/src/osgPlugins/ffmpeg/FFmpegDecoderVideo.cpp
b/src/osgPlugins/ffmpeg/FFmpegDecoderVideo.cpp
index eed5825..e2bc11e 100644
--- a/src/osgPlugins/ffmpeg/FFmpegDecoderVideo.cpp
+++ b/src/osgPlugins/ffmpeg/FFmpegDecoderVideo.cpp
@@ -57,17 +57,8 @@ FFmpegDecoderVideo::~FFmpegDecoderVideo()
{
OSG_INFO<<"Destructing FFmpegDecoderVideo..."<<std::endl;
+ this->close(true);
- if (isRunning())
- {
- m_exit = true;
-#if 0
- while(isRunning()) { OpenThreads::YieldCurrentThread(); }
-#else
- join();
-#endif
- }
-
#ifdef USE_SWSCALE
if (m_swscale_ctx)
{
@@ -132,11 +123,11 @@ void FFmpegDecoderVideo::open(AVStream * const stream)
void FFmpegDecoderVideo::close(bool waitForThreadToExit)
{
- m_exit = true;
-
- if (isRunning() && waitForThreadToExit)
+ if (isRunning())
{
- while(isRunning()) { OpenThreads::Thread::YieldCurrentThread(); }
+ m_exit = true;
+ if (waitForThreadToExit)
+ join();
}
}
--
1.7.0
#include "FFmpegDecoderAudio.hpp"
#include <osg/Notify>
#include <stdexcept>
#include <string.h>
//DEBUG
//#include <iostream>
namespace osgFFmpeg {
static int decode_audio(AVCodecContext *avctx, int16_t *samples,
int *frame_size_ptr,
const uint8_t *buf, int buf_size)
{
#if LIBAVCODEC_VERSION_MAJOR >= 53 || (LIBAVCODEC_VERSION_MAJOR==52 && LIBAVCODEC_VERSION_MINOR>=32)
// following code segment copied from ffmpeg's avcodec_decode_audio2()
// implementation to avoid warnings about deprecated function usage.
AVPacket avpkt;
av_init_packet(&avpkt);
avpkt.data = const_cast<uint8_t *>(buf);
avpkt.size = buf_size;
return avcodec_decode_audio3(avctx, samples, frame_size_ptr, &avpkt);
#else
// fallback for older versions of ffmpeg that don't have avcodec_decode_audio3.
return avcodec_decode_audio2(avctx, samples, frame_size_ptr, buf, buf_size);
#endif
}
FFmpegDecoderAudio::FFmpegDecoderAudio(PacketQueue & packets, FFmpegClocks & clocks) :
m_packets(packets),
m_clocks(clocks),
m_stream(0),
m_context(0),
m_packet_data(0),
m_bytes_remaining(0),
m_audio_buffer((AVCODEC_MAX_AUDIO_FRAME_SIZE * 3) / 2),
m_audio_buf_size(0),
m_audio_buf_index(0),
m_end_of_stream(false),
m_paused(true),
m_exit(false)
{
}
FFmpegDecoderAudio::~FFmpegDecoderAudio()
{
this->close(true);
}
void FFmpegDecoderAudio::open(AVStream * const stream)
{
try
{
// Sound can be optional (i.e. no audio stream is present)
if (stream == 0)
return;
m_stream = stream;
m_context = stream->codec;
m_frequency = m_context->sample_rate;
m_nb_channels = m_context->channels;
m_sample_format = osg::AudioStream::SampleFormat(m_context->sample_fmt);
// Check stream sanity
if (m_context->codec_id == CODEC_ID_NONE)
throw std::runtime_error("invalid audio codec");;
// Find the decoder for the audio stream
AVCodec * const p_codec = avcodec_find_decoder(m_context->codec_id);
if (p_codec == 0)
throw std::runtime_error("avcodec_find_decoder() failed");
// Inform the codec that we can handle truncated bitstreams
//if (p_codec->capabilities & CODEC_CAP_TRUNCATED)
// m_context->flags |= CODEC_FLAG_TRUNCATED;
// Open codec
if (avcodec_open(m_context, p_codec) < 0)
throw std::runtime_error("avcodec_open() failed");
}
catch (...)
{
m_context = 0;
throw;
}
}
void FFmpegDecoderAudio::pause(bool pause)
{
if (pause != m_paused)
{
m_paused = pause;
if (m_audio_sink.valid())
{
if (m_paused) m_audio_sink->pause();
else m_audio_sink->play();
}
}
}
void FFmpegDecoderAudio::close(bool waitForThreadToExit)
{
if (isRunning())
{
m_exit = true;
if (waitForThreadToExit)
join();
}
}
void FFmpegDecoderAudio::setVolume(float volume)
{
if (m_audio_sink.valid())
{
m_audio_sink->setVolume(volume);
}
}
float FFmpegDecoderAudio::getVolume() const
{
if (m_audio_sink.valid())
{
return m_audio_sink->getVolume();
}
return 0.0f;
}
void FFmpegDecoderAudio::run()
{
try
{
decodeLoop();
}
catch (const std::exception & error)
{
OSG_WARN << "FFmpegDecoderAudio::run : " << error.what() << std::endl;
}
catch (...)
{
OSG_WARN << "FFmpegDecoderAudio::run : unhandled exception" << std::endl;
}
}
void FFmpegDecoderAudio::setAudioSink(osg::ref_ptr<osg::AudioSink> audio_sink)
{
// The FFmpegDecoderAudio object takes the responsability of destroying the audio_sink.
OSG_NOTICE<<"Assigning "<<audio_sink<<std::endl;
m_audio_sink = audio_sink;
}
void FFmpegDecoderAudio::fillBuffer(void * const buffer, size_t size)
{
uint8_t * dst_buffer = reinterpret_cast<uint8_t*>(buffer);
while (size != 0)
{
if (m_audio_buf_index == m_audio_buf_size)
{
m_audio_buf_index = 0;
// Pre-fetch audio buffer is empty, refill it.
const size_t bytes_decoded = decodeFrame(&m_audio_buffer[0], m_audio_buffer.size());
// If nothing could be decoded (e.g. error or no packet available), output a bit of silence
if (bytes_decoded == 0)
{
m_audio_buf_size = std::min(Buffer::size_type(1024), m_audio_buffer.size());
memset(&m_audio_buffer[0], 0, m_audio_buf_size);
}
else
{
m_audio_buf_size = bytes_decoded;
}
}
const size_t fill_size = std::min(m_audio_buf_size - m_audio_buf_index, size);
memcpy(dst_buffer, &m_audio_buffer[m_audio_buf_index], fill_size);
size -= fill_size;
dst_buffer += fill_size;
m_audio_buf_index += fill_size;
adjustBufferEndTps(fill_size);
}
}
void FFmpegDecoderAudio::decodeLoop()
{
const bool skip_audio = ! validContext() || ! m_audio_sink.valid();
if (! skip_audio && ! m_audio_sink->playing())
{
m_clocks.audioSetDelay(m_audio_sink->getDelay());
m_audio_sink->play();
}
else
{
m_clocks.audioDisable();
}
while (! m_exit)
{
if(m_paused)
{
m_clocks.pause(true);
m_pause_timer.setStartTick();
while(m_paused)
{
microSleep(10000);
}
m_clocks.setPauseTime(m_pause_timer.time_s());
m_clocks.pause(false);
}
// If skipping audio, make sure the audio stream is still consumed.
if (skip_audio)
{
bool is_empty;
FFmpegPacket packet = m_packets.timedPop(is_empty, 10);
if (packet.valid())
packet.clear();
}
// Else, just idle in this thread.
// Note: If m_audio_sink has an audio callback, this thread will still be awaken
// from time to time to refill the audio buffer.
else
{
OpenThreads::Thread::microSleep(10000);
}
}
}
void FFmpegDecoderAudio::adjustBufferEndTps(const size_t buffer_size)
{
int sample_size = nbChannels() * frequency();
switch (sampleFormat())
{
case osg::AudioStream::SAMPLE_FORMAT_U8:
sample_size *= 1;
break;
case osg::AudioStream::SAMPLE_FORMAT_S16:
sample_size *= 2;
break;
case osg::AudioStream::SAMPLE_FORMAT_S24:
sample_size *= 3;
break;
case osg::AudioStream::SAMPLE_FORMAT_S32:
sample_size *= 4;
break;
case osg::AudioStream::SAMPLE_FORMAT_F32:
sample_size *= 4;
break;
default:
throw std::runtime_error("unsupported audio sample format");
}
m_clocks.audioAdjustBufferEndPts(double(buffer_size) / double(sample_size));
}
size_t FFmpegDecoderAudio::decodeFrame(void * const buffer, const size_t size)
{
for (;;)
{
// Decode current packet
while (m_bytes_remaining > 0)
{
int data_size = size;
const int bytes_decoded = decode_audio(m_context, reinterpret_cast<int16_t*>(buffer), &data_size, m_packet_data, m_bytes_remaining);
if (bytes_decoded < 0)
{
// if error, skip frame
m_bytes_remaining = 0;
break;
}
m_bytes_remaining -= bytes_decoded;
m_packet_data += bytes_decoded;
// If we have some data, return it and come back for more later.
if (data_size > 0)
return data_size;
}
// Get next packet
if (m_packet.valid())
m_packet.clear();
if (m_exit)
return 0;
bool is_empty = true;
m_packet = m_packets.tryPop(is_empty);
if (is_empty)
return 0;
if (m_packet.type == FFmpegPacket::PACKET_DATA)
{
if (m_packet.packet.pts != int64_t(AV_NOPTS_VALUE))
{
const double pts = av_q2d(m_stream->time_base) * m_packet.packet.pts;
m_clocks.audioSetBufferEndPts(pts);
}
m_bytes_remaining = m_packet.packet.size;
m_packet_data = m_packet.packet.data;
}
else if (m_packet.type == FFmpegPacket::PACKET_END_OF_STREAM)
{
m_end_of_stream = true;
}
else if (m_packet.type == FFmpegPacket::PACKET_FLUSH)
{
avcodec_flush_buffers(m_context);
}
// just output silence when we reached the end of stream
if (m_end_of_stream)
{
memset(buffer, 0, size);
return size;
}
}
}
} // namespace osgFFmpeg
#include "FFmpegDecoderVideo.hpp"
#include <osg/Notify>
#include <osg/Timer>
#include <stdexcept>
#include <string.h>
namespace osgFFmpeg {
static int decode_video(AVCodecContext *avctx, AVFrame *picture,
int *got_picture_ptr,
const uint8_t *buf, int buf_size)
{
#if LIBAVCODEC_VERSION_MAJOR >= 53 || (LIBAVCODEC_VERSION_MAJOR==52 && LIBAVCODEC_VERSION_MINOR>=32)
// following code segment copied from ffmpeg avcodec_decode_video() implementation
// to avoid warnings about deprecated function usage.
AVPacket avpkt;
av_init_packet(&avpkt);
avpkt.data = const_cast<uint8_t *>(buf);
avpkt.size = buf_size;
// HACK for CorePNG to decode as normal PNG by default
avpkt.flags = AV_PKT_FLAG_KEY;
return avcodec_decode_video2(avctx, picture, got_picture_ptr, &avpkt);
#else
// fallback for older versions of ffmpeg that don't have avcodec_decode_video2.
return avcodec_decode_video(avctx, picture, got_picture_ptr, buf, buf_size);
#endif
}
FFmpegDecoderVideo::FFmpegDecoderVideo(PacketQueue & packets, FFmpegClocks & clocks) :
m_packets(packets),
m_clocks(clocks),
m_stream(0),
m_context(0),
m_codec(0),
m_packet_data(0),
m_bytes_remaining(0),
m_packet_pts(AV_NOPTS_VALUE),
m_writeBuffer(0),
m_user_data(0),
m_publish_func(0),
m_paused(true),
m_exit(false)
#ifdef USE_SWSCALE
,m_swscale_ctx(0)
#endif
{
}
FFmpegDecoderVideo::~FFmpegDecoderVideo()
{
OSG_INFO<<"Destructing FFmpegDecoderVideo..."<<std::endl;
this->close(true);
#ifdef USE_SWSCALE
if (m_swscale_ctx)
{
sws_freeContext(m_swscale_ctx);
m_swscale_ctx = 0;
}
#endif
OSG_INFO<<"Destructed FFmpegDecoderVideo"<<std::endl;
}
void FFmpegDecoderVideo::open(AVStream * const stream)
{
m_stream = stream;
m_context = stream->codec;
// Trust the video size given at this point
// (avcodec_open seems to sometimes return a 0x0 size)
m_width = m_context->width;
m_height = m_context->height;
findAspectRatio();
// Find out whether we support Alpha channel
m_alpha_channel = (m_context->pix_fmt == PIX_FMT_YUVA420P);
// Find out the framerate
m_frame_rate = av_q2d(stream->r_frame_rate);
// Find the decoder for the video stream
m_codec = avcodec_find_decoder(m_context->codec_id);
if (m_codec == 0)
throw std::runtime_error("avcodec_find_decoder() failed");
// Inform the codec that we can handle truncated bitstreams
//if (p_codec->capabilities & CODEC_CAP_TRUNCATED)
// m_context->flags |= CODEC_FLAG_TRUNCATED;
// Open codec
if (avcodec_open(m_context, m_codec) < 0)
throw std::runtime_error("avcodec_open() failed");
// Allocate video frame
m_frame.reset(avcodec_alloc_frame());
// Allocate converted RGB frame
m_frame_rgba.reset(avcodec_alloc_frame());
m_buffer_rgba[0].resize(avpicture_get_size(PIX_FMT_RGB32, width(), height()));
m_buffer_rgba[1].resize(m_buffer_rgba[0].size());
// Assign appropriate parts of the buffer to image planes in m_frame_rgba
avpicture_fill((AVPicture *) (m_frame_rgba).get(), &(m_buffer_rgba[0])[0], PIX_FMT_RGB32, width(), height());
// Override get_buffer()/release_buffer() from codec context in order to retrieve the PTS of each frame.
m_context->opaque = this;
m_context->get_buffer = getBuffer;
m_context->release_buffer = releaseBuffer;
}
void FFmpegDecoderVideo::close(bool waitForThreadToExit)
{
if (isRunning())
{
m_exit = true;
if (waitForThreadToExit)
join();
}
}
void FFmpegDecoderVideo::pause(bool pause)
{
if(pause)
m_paused = true;
else
m_paused = false;
}
void FFmpegDecoderVideo::run()
{
try
{
decodeLoop();
}
catch (const std::exception & error)
{
OSG_WARN << "FFmpegDecoderVideo::run : " << error.what() << std::endl;
}
catch (...)
{
OSG_WARN << "FFmpegDecoderVideo::run : unhandled exception" << std::endl;
}
}
void FFmpegDecoderVideo::decodeLoop()
{
FFmpegPacket packet;
double pts;
while (! m_exit)
{
// Work on the current packet until we have decoded all of it
while (m_bytes_remaining > 0)
{
// Save global PTS to be stored in m_frame via getBuffer()
m_packet_pts = packet.packet.pts;
// Decode video frame
int frame_finished = 0;
const int bytes_decoded = decode_video(m_context, m_frame.get(), &frame_finished, m_packet_data, m_bytes_remaining);
if (bytes_decoded < 0)
throw std::runtime_error("avcodec_decode_video failed()");
m_bytes_remaining -= bytes_decoded;
m_packet_data += bytes_decoded;
// Find out the frame pts
if (packet.packet.dts == int64_t(AV_NOPTS_VALUE) &&
m_frame->opaque != 0 &&
*reinterpret_cast<const int64_t*>(m_frame->opaque) != int64_t(AV_NOPTS_VALUE))
{
pts = *reinterpret_cast<const int64_t*>(m_frame->opaque);
}
else if (packet.packet.dts != int64_t(AV_NOPTS_VALUE))
{
pts = packet.packet.dts;
}
else
{
pts = 0;
}
pts *= av_q2d(m_stream->time_base);
// Publish the frame if we have decoded a complete frame
if (frame_finished)
{
const double synched_pts = m_clocks.videoSynchClock(m_frame.get(), av_q2d(m_stream->time_base), pts);
const double frame_delay = m_clocks.videoRefreshSchedule(synched_pts);
publishFrame(frame_delay, m_clocks.audioDisabled());
}
}
while(m_paused && !m_exit)
{
microSleep(10000);
}
// Get the next packet
pts = 0;
if (packet.valid())
packet.clear();
bool is_empty = true;
packet = m_packets.timedPop(is_empty, 10);
if (! is_empty)
{
if (packet.type == FFmpegPacket::PACKET_DATA)
{
m_bytes_remaining = packet.packet.size;
m_packet_data = packet.packet.data;
}
else if (packet.type == FFmpegPacket::PACKET_FLUSH)
{
avcodec_flush_buffers(m_context);
}
}
}
}
void FFmpegDecoderVideo::findAspectRatio()
{
float ratio = 0.0f;
if (m_context->sample_aspect_ratio.num != 0)
ratio = float(av_q2d(m_context->sample_aspect_ratio));
if (ratio <= 0.0f)
ratio = 1.0f;
m_pixel_aspect_ratio = ratio;
}
int FFmpegDecoderVideo::convert(AVPicture *dst, int dst_pix_fmt, AVPicture *src,
int src_pix_fmt, int src_width, int src_height)
{
osg::Timer_t startTick = osg::Timer::instance()->tick();
#ifdef USE_SWSCALE
if (m_swscale_ctx==0)
{
m_swscale_ctx = sws_getContext(src_width, src_height, (PixelFormat) src_pix_fmt,
src_width, src_height, (PixelFormat) dst_pix_fmt,
/*SWS_BILINEAR*/ SWS_BICUBIC, NULL, NULL, NULL);
}
OSG_INFO<<"Using sws_scale ";
int result = sws_scale(m_swscale_ctx,
(src->data), (src->linesize), 0, src_height,
(dst->data), (dst->linesize));
#else
OSG_INFO<<"Using img_convert ";
int result = img_convert(dst, dst_pix_fmt, src,
src_pix_fmt, src_width, src_height);
#endif
osg::Timer_t endTick = osg::Timer::instance()->tick();
OSG_INFO<<" time = "<<osg::Timer::instance()->delta_m(startTick,endTick)<<"ms"<<std::endl;
return result;
}
void FFmpegDecoderVideo::publishFrame(const double delay, bool audio_disabled)
{
// If no publishing function, just ignore the frame
if (m_publish_func == 0)
return;
#if 1
// new code from Jean-Sebasiten Guay - needs testing as we're unclear on the best solution
// If the display delay is too small, we better skip the frame.
if (!audio_disabled && delay < -0.010)
return;
#else
// original solution that hung on video stream over web.
// If the display delay is too small, we better skip the frame.
if (delay < -0.010)
return;
#endif
AVPicture * const src = (AVPicture *) m_frame.get();
AVPicture * const dst = (AVPicture *) m_frame_rgba.get();
// Assign appropriate parts of the buffer to image planes in m_frame_rgba
avpicture_fill((AVPicture *) (m_frame_rgba).get(), &(m_buffer_rgba[m_writeBuffer])[0], PIX_FMT_RGB32, width(), height());
// Convert YUVA420p (i.e. YUV420p plus alpha channel) using our own routine
if (m_context->pix_fmt == PIX_FMT_YUVA420P)
yuva420pToRgba(dst, src, width(), height());
else
convert(dst, PIX_FMT_RGB32, src, m_context->pix_fmt, width(), height());
// Wait 'delay' seconds before publishing the picture.
int i_delay = static_cast<int>(delay * 1000000 + 0.5);
while (i_delay > 1000)
{
// Avoid infinite/very long loops
if (m_exit)
return;
const int micro_delay = (std::min)(1000000, i_delay);
OpenThreads::Thread::microSleep(micro_delay);
i_delay -= micro_delay;
}
m_writeBuffer = 1-m_writeBuffer;
m_publish_func(* this, m_user_data);
}
void FFmpegDecoderVideo::yuva420pToRgba(AVPicture * const dst, AVPicture * const src, int width, int height)
{
convert(dst, PIX_FMT_RGB32, src, m_context->pix_fmt, width, height);
const size_t bpp = 4;
uint8_t * a_dst = dst->data[0] + 3;
for (int h = 0; h < height; ++h) {
const uint8_t * a_src = src->data[3] + h * src->linesize[3];
for (int w = 0; w < width; ++w) {
*a_dst = *a_src;
a_dst += bpp;
a_src += 1;
}
}
}
int FFmpegDecoderVideo::getBuffer(AVCodecContext * const context, AVFrame * const picture)
{
const FFmpegDecoderVideo * const this_ = reinterpret_cast<const FFmpegDecoderVideo*>(context->opaque);
const int result = avcodec_default_get_buffer(context, picture);
int64_t * p_pts = reinterpret_cast<int64_t*>( av_malloc(sizeof(int64_t)) );
*p_pts = this_->m_packet_pts;
picture->opaque = p_pts;
return result;
}
void FFmpegDecoderVideo::releaseBuffer(AVCodecContext * const context, AVFrame * const picture)
{
if (picture != 0)
av_freep(&picture->opaque);
avcodec_default_release_buffer(context, picture);
}
} // namespace osgFFmpeg
_______________________________________________
osg-submissions mailing list
[email protected]
http://lists.openscenegraph.org/listinfo.cgi/osg-submissions-openscenegraph.org