Modified: releases/WebKitGTK/webkit-2.22/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp (236681 => 236682)
--- releases/WebKitGTK/webkit-2.22/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp 2018-10-01 19:17:10 UTC (rev 236681)
+++ releases/WebKitGTK/webkit-2.22/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp 2018-10-01 19:17:19 UTC (rev 236682)
@@ -108,6 +108,7 @@
, m_sourceBufferPrivate(sourceBufferPrivate.get())
, m_playerPrivate(&playerPrivate)
, m_id(0)
+ , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
, m_appsrcAtLeastABufferLeft(false)
, m_appsrcNeedDataReceived(false)
, m_appsrcDataLeavingProbeId(0)
@@ -188,11 +189,7 @@
{
ASSERT(WTF::isMainThread());
- {
- LockHolder locker(m_newSampleLock);
- setAppendState(AppendState::Invalid);
- m_newSampleCondition.notifyOne();
- }
+ setAppendState(AppendState::Invalid);
{
LockHolder locker(m_padAddRemoveLock);
@@ -253,16 +250,10 @@
ASSERT(WTF::isMainThread());
GST_DEBUG("cleaning private player");
- {
- LockHolder locker(m_newSampleLock);
- // Make sure that AppendPipeline won't process more data from now on and
- // instruct handleNewSample to abort itself from now on as well.
- setAppendState(AppendState::Invalid);
+ // Make sure that AppendPipeline won't process more data from now on and
+ // instruct handleNewSample to abort itself from now on as well.
+ setAppendState(AppendState::Invalid);
- // Awake any pending handleNewSample operation in the streaming thread.
- m_newSampleCondition.notifyOne();
- }
-
{
LockHolder locker(m_padAddRemoveLock);
m_playerPrivate = nullptr;
@@ -316,10 +307,8 @@
}
if (gst_structure_has_name(structure, "appsink-new-sample")) {
- GRefPtr<GstSample> newSample;
- gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
-
- appsinkNewSample(newSample.get());
+ m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
+ consumeAppsinkAvailableSamples();
return;
}
@@ -626,58 +615,48 @@
}
}
-void AppendPipeline::appsinkNewSample(GstSample* sample)
+void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
{
ASSERT(WTF::isMainThread());
- {
- LockHolder locker(m_newSampleLock);
+ // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
+ if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
+ GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
+ // FIXME: Return ERROR and find a more robust way to detect that all the
+ // data has been processed, so we don't need to resort to these hacks.
+ return;
+ }
- // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
- if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
- GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
- // FIXME: Return ERROR and find a more robust way to detect that all the
- // data has been processed, so we don't need to resort to these hacks.
- // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
- m_flowReturn = GST_FLOW_OK;
- m_newSampleCondition.notifyOne();
- return;
- }
+ if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
+ GST_WARNING("Received sample without buffer from appsink.");
+ return;
+ }
- RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(sample, m_presentationSize, trackId());
+ RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
- GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
- mediaSample->trackID().string().utf8().data(),
- mediaSample->presentationTime().toString().utf8().data(),
- mediaSample->decodeTime().toString().utf8().data(),
- mediaSample->duration().toString().utf8().data(),
- mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
+ GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
+ mediaSample->trackID().string().utf8().data(),
+ mediaSample->presentationTime().toString().utf8().data(),
+ mediaSample->decodeTime().toString().utf8().data(),
+ mediaSample->duration().toString().utf8().data(),
+ mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
- // If we're beyond the duration, ignore this sample and the remaining ones.
- MediaTime duration = m_mediaSourceClient->duration();
- if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
- GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
- setAppendState(AppendState::LastSample);
- m_flowReturn = GST_FLOW_OK;
- m_newSampleCondition.notifyOne();
- return;
- }
+ // If we're beyond the duration, ignore this sample and the remaining ones.
+ MediaTime duration = m_mediaSourceClient->duration();
+ if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
+ GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
+ setAppendState(AppendState::LastSample);
+ return;
+ }
- // Add a gap sample if a gap is detected before the first sample.
- if (mediaSample->decodeTime() == MediaTime::zeroTime()
- && mediaSample->presentationTime() > MediaTime::zeroTime()
- && mediaSample->presentationTime() <= MediaTime(1, 10)) {
- GST_DEBUG("Adding gap offset");
- mediaSample->applyPtsOffset(MediaTime::zeroTime());
- }
-
- m_sourceBufferPrivate->didReceiveSample(*mediaSample);
- setAppendState(AppendState::Sampling);
- m_flowReturn = GST_FLOW_OK;
- m_newSampleCondition.notifyOne();
+ // Add a gap sample if a gap is detected before the first sample.
+ if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
+ GST_DEBUG("Adding gap offset");
+ mediaSample->applyPtsOffset(MediaTime::zeroTime());
}
- checkEndOfAppend();
+ m_sourceBufferPrivate->didReceiveSample(*mediaSample);
+ setAppendState(AppendState::Sampling);
}
void AppendPipeline::appsinkEOS()
@@ -743,6 +722,23 @@
return m_track->id();
}
+void AppendPipeline::consumeAppsinkAvailableSamples()
+{
+ ASSERT(WTF::isMainThread());
+
+ GRefPtr<GstSample> sample;
+ int batchedSampleCount = 0;
+ while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
+ appsinkNewSample(WTFMove(sample));
+ batchedSampleCount++;
+ }
+
+ GST_TRACE_OBJECT(this, "batchedSampleCount = %d", batchedSampleCount);
+
+ if (batchedSampleCount > 0)
+ checkEndOfAppend();
+}
+
void AppendPipeline::resetPipeline()
{
ASSERT(WTF::isMainThread());
@@ -750,12 +746,8 @@
m_appsrcAtLeastABufferLeft = false;
setAppsrcDataLeavingProbe();
- {
- LockHolder locker(m_newSampleLock);
- m_newSampleCondition.notifyOne();
- gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
- gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
- }
+ gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
+ gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
#if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
{
@@ -844,27 +836,19 @@
{
ASSERT(!WTF::isMainThread());
- // Even if we're disabled, it's important to pull the sample out anyway to
- // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
- GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
- LockHolder locker(m_newSampleLock);
-
if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
return GST_FLOW_ERROR;
}
- GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
- GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
- gst_bus_post(m_bus.get(), message);
- GST_TRACE("appsink-new-sample message posted to bus");
+ if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
+ GstStructure* structure = gst_structure_new_empty("appsink-new-sample");
+ GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
+ gst_bus_post(m_bus.get(), message);
+ GST_TRACE("appsink-new-sample message posted to bus");
+ }
- m_newSampleCondition.wait(m_newSampleLock);
- // We've been awaken because the sample was processed or because of
- // an exceptional condition (entered in Invalid state, destructor, etc.).
- // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
-
- return m_flowReturn;
+ return GST_FLOW_OK;
}
static GRefPtr<GstElement>
Modified: releases/WebKitGTK/webkit-2.22/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h (236681 => 236682)
--- releases/WebKitGTK/webkit-2.22/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h 2018-10-01 19:17:10 UTC (rev 236681)
+++ releases/WebKitGTK/webkit-2.22/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h 2018-10-01 19:17:19 UTC (rev 236682)
@@ -27,6 +27,7 @@
#include "MediaSourceClientGStreamerMSE.h"
#include "SourceBufferPrivateGStreamer.h"
+#include <atomic>
#include <gst/gst.h>
#include <wtf/Condition.h>
@@ -61,7 +62,7 @@
// Takes ownership of caps.
void parseDemuxerSrcPadCaps(GstCaps*);
void appsinkCapsChanged();
- void appsinkNewSample(GstSample*);
+ void appsinkNewSample(GRefPtr<GstSample>&&);
void appsinkEOS();
void didReceiveInitializationSegment();
AtomicString trackId();
@@ -94,6 +95,8 @@
void removeAppsrcDataLeavingProbe();
void setAppsrcDataLeavingProbe();
+ void consumeAppsinkAvailableSamples();
+
Ref<MediaSourceClientGStreamerMSE> m_mediaSourceClient;
Ref<SourceBufferPrivateGStreamer> m_sourceBufferPrivate;
MediaPlayerPrivateGStreamerMSE* m_playerPrivate;
@@ -103,8 +106,6 @@
MediaTime m_initialDuration;
- GstFlowReturn m_flowReturn;
-
GRefPtr<GstElement> m_pipeline;
GRefPtr<GstBus> m_bus;
GRefPtr<GstElement> m_appsrc;
@@ -113,8 +114,13 @@
// The demuxer has one src stream only, so only one appsink is needed and linked to it.
GRefPtr<GstElement> m_appsink;
- Lock m_newSampleLock;
- Condition m_newSampleCondition;
+ // Used to avoid unnecessary notifications per sample.
+ // It is read and written from the streaming thread and written from the main thread.
+ // The main thread must set it to false before actually pulling samples.
+ // This strategy ensures that at any time, there are at most two notifications in the bus
+ // queue, instead of it growing unbounded.
+ std::atomic_flag m_wasBusAlreadyNotifiedOfAvailableSamples;
+
Lock m_padAddRemoveLock;
Condition m_padAddRemoveCondition;