Modified: trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp (236546 => 236547)
--- trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp 2018-09-27 15:57:08 UTC (rev 236546)
+++ trunk/Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp 2018-09-27 16:03:48 UTC (rev 236547)
@@ -46,6 +46,26 @@
namespace WebCore {
+GType AppendPipeline::s_endOfAppendMetaType = 0;
+const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
+std::once_flag AppendPipeline::s_staticInitializationFlag;
+
+struct EndOfAppendMeta {
+ GstMeta base;
+ static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
+ static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
+ static void free(GstMeta*, GstBuffer*) { }
+};
+
+void AppendPipeline::staticInitialization()
+{
+ ASSERT(WTF::isMainThread());
+
+ const char* tags[] = { nullptr };
+ s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
+ s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
+}
+
static const char* dumpAppendState(AppendPipeline::AppendState appendState)
{
switch (appendState) {
@@ -68,11 +88,9 @@
}
}
-static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
-static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
#if !LOG_DISABLED
static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
#endif
@@ -110,14 +128,12 @@
, m_playerPrivate(&playerPrivate)
, m_id(0)
, m_wasBusAlreadyNotifiedOfAvailableSamples(false)
- , m_appsrcAtLeastABufferLeft(false)
- , m_appsrcNeedDataReceived(false)
- , m_appsrcDataLeavingProbeId(0)
, m_appendState(AppendState::NotStarted)
, m_abortPending(false)
, m_streamType(Unknown)
{
ASSERT(WTF::isMainThread());
+ std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
GST_TRACE("Creating AppendPipeline (%p)", this);
@@ -137,6 +153,11 @@
// below will already take the initial reference and we need an additional one for us.
m_appsrc = gst_element_factory_make("appsrc", nullptr);
+ GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
+ gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
+ return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
+ }, this, nullptr);
+
const String& type = m_sourceBufferPrivate->type().containerType();
if (type.endsWith("mp4"))
m_demux = gst_element_factory_make("qtdemux", nullptr);
@@ -154,8 +175,6 @@
GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
- setAppsrcDataLeavingProbe();
-
#if !LOG_DISABLED
GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
@@ -173,7 +192,6 @@
#endif
// These signals won't be connected outside of the lifetime of "this".
- g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(appendPipelineDemuxerNoMorePads), this);
@@ -213,7 +231,6 @@
}
if (m_appsrc) {
- removeAppsrcDataLeavingProbe();
g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
m_appsrc = nullptr;
}
@@ -247,6 +264,27 @@
m_demuxerSrcPadCaps = nullptr;
};
+GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
+{
+ ASSERT(!WTF::isMainThread());
+ m_streamingThread = &WTF::Thread::current();
+
+ GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
+ ASSERT(GST_IS_BUFFER(buffer));
+
+ EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
+ if (!endOfAppendMeta) {
+ // Normal buffer, nothing to do.
+ return GST_PAD_PROBE_OK;
+ }
+
+ GST_TRACE_OBJECT(this, "posting end-of-append request to bus");
+ GstStructure* structure = gst_structure_new_empty("end-of-append");
+ GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
+ gst_bus_post(m_bus.get(), message);
+ return GST_PAD_PROBE_DROP;
+}
+
void AppendPipeline::clearPlayerPrivate()
{
ASSERT(WTF::isMainThread());
@@ -285,16 +323,6 @@
const GstStructure* structure = gst_message_get_structure(message);
- if (gst_structure_has_name(structure, "appsrc-need-data")) {
- handleAppsrcNeedDataReceived();
- return;
- }
-
- if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
- handleAppsrcAtLeastABufferLeft();
- return;
- }
-
if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
GRefPtr<GstPad> demuxerSrcPad;
gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
@@ -324,6 +352,11 @@
return;
}
+ if (gst_structure_has_name(structure, "end-of-append")) {
+ handleEndOfAppend();
+ return;
+ }
+
ASSERT_NOT_REACHED();
}
@@ -353,31 +386,6 @@
}
}
-void AppendPipeline::handleAppsrcNeedDataReceived()
-{
- if (!m_appsrcAtLeastABufferLeft) {
- GST_TRACE("discarding until at least a buffer leaves appsrc");
- return;
- }
-
- ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
- ASSERT(!m_appsrcNeedDataReceived);
-
- GST_TRACE("received need-data from appsrc");
-
- m_appsrcNeedDataReceived = true;
- checkEndOfAppend();
-}
-
-void AppendPipeline::handleAppsrcAtLeastABufferLeft()
-{
- m_appsrcAtLeastABufferLeft = true;
- GST_TRACE("received buffer-left from appsrc");
-#if LOG_DISABLED
- removeAppsrcDataLeavingProbe();
-#endif
-}
-
gint AppendPipeline::id()
{
ASSERT(WTF::isMainThread());
@@ -438,7 +446,7 @@
case AppendState::NotStarted:
ok = true;
if (m_pendingBuffer) {
- GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
+ GST_TRACE("pushing pending buffer %" GST_PTR_FORMAT, m_pendingBuffer.get());
gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
nextAppendState = AppendState::Ongoing;
}
@@ -603,24 +611,19 @@
}
}
-void AppendPipeline::checkEndOfAppend()
+void AppendPipeline::handleEndOfAppend()
{
ASSERT(WTF::isMainThread());
+ GST_TRACE_OBJECT(this, "received end-of-append");
- if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
- return;
-
- GST_TRACE("end of append data mark was received");
-
+ // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called.
switch (m_appendState) {
case AppendState::Ongoing:
GST_TRACE("DataStarve");
- m_appsrcNeedDataReceived = false;
setAppendState(AppendState::DataStarve);
break;
case AppendState::Sampling:
GST_TRACE("LastSample");
- m_appsrcNeedDataReceived = false;
setAppendState(AppendState::LastSample);
break;
default:
@@ -633,14 +636,6 @@
{
ASSERT(WTF::isMainThread());
- // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
- if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
- GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
- // FIXME: Return ERROR and find a more robust way to detect that all the
- // data has been processed, so we don't need to resort to these hacks.
- return;
- }
-
if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
GST_WARNING("Received sample without buffer from appsink.");
return;
@@ -748,9 +743,6 @@
}
GST_TRACE_OBJECT(this, "batchedSampleCount = %d", batchedSampleCount);
-
- if (batchedSampleCount > 0)
- checkEndOfAppend();
}
void AppendPipeline::resetPipeline()
@@ -757,8 +749,6 @@
{
ASSERT(WTF::isMainThread());
GST_DEBUG("resetting pipeline");
- m_appsrcAtLeastABufferLeft = false;
- setAppsrcDataLeavingProbe();
gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
@@ -774,29 +764,6 @@
}
-void AppendPipeline::setAppsrcDataLeavingProbe()
-{
- if (m_appsrcDataLeavingProbeId)
- return;
-
- GST_TRACE("setting appsrc data leaving probe");
-
- GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
- m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
-}
-
-void AppendPipeline::removeAppsrcDataLeavingProbe()
-{
- if (!m_appsrcDataLeavingProbeId)
- return;
-
- GST_TRACE("removing appsrc data leaving probe");
-
- GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
- gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
- m_appsrcDataLeavingProbeId = 0;
-}
-
void AppendPipeline::abort()
{
ASSERT(WTF::isMainThread());
@@ -816,39 +783,51 @@
GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
{
- GstFlowReturn result;
-
if (m_abortPending) {
m_pendingBuffer = adoptGRef(buffer);
- result = GST_FLOW_OK;
- } else {
- setAppendState(AppendPipeline::AppendState::Ongoing);
- GST_TRACE("pushing new buffer %p", buffer);
- result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
+ return GST_FLOW_OK;
}
- return result;
-}
+ setAppendState(AppendPipeline::AppendState::Ongoing);
-void AppendPipeline::reportAppsrcAtLeastABufferLeft()
-{
- GST_TRACE("buffer left appsrc, reposting to bus");
- GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
- GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
- gst_bus_post(m_bus.get(), message);
-}
+ GST_TRACE_OBJECT(this, "pushing data buffer %" GST_PTR_FORMAT, buffer);
+ GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer);
+ // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
+ // be true at this point.
+ g_return_val_if_fail(pushDataBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
-void AppendPipeline::reportAppsrcNeedDataReceived()
-{
- GST_TRACE("received need-data signal at appsrc, reposting to bus");
- GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
- GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
- gst_bus_post(m_bus.get(), message);
+ // Push an additional empty buffer that marks the end of the append.
+ // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
+ // completion of the append.
+ //
+ // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
+ // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
+ // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
+ // buffer has completed.
+
+ GstBuffer* endOfAppendBuffer = gst_buffer_new();
+ gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
+
+ GST_TRACE_OBJECT(this, "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
+ GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
+ g_return_val_if_fail(pushEndOfAppendBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
+
+ return GST_FLOW_OK;
}
GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
{
ASSERT(!WTF::isMainThread());
+ if (&WTF::Thread::current() != m_streamingThread) {
+ // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
+ // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
+ // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
+ // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
+ // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
+ // been demuxed completely.;
+ g_critical("Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
+ RELEASE_ASSERT_NOT_REACHED();
+ }
if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
@@ -1033,9 +1012,7 @@
didReceiveInitializationSegment();
return;
default:
- // No useful data, but notify anyway to complete the append operation.
- GST_DEBUG("Received all pending samples (no data)");
- m_sourceBufferPrivate->didReceiveAllPendingSamples();
+ // No useful data.
break;
}
@@ -1078,20 +1055,6 @@
GST_TRACE("appsink-caps-changed message posted to bus");
}
-static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
-{
- ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
-
- GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
- gsize bufferSize = gst_buffer_get_size(buffer);
-
- GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
-
- appendPipeline->reportAppsrcAtLeastABufferLeft();
-
- return GST_PAD_PROBE_OK;
-}
-
#if !LOG_DISABLED
static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
{
@@ -1131,11 +1094,6 @@
return GST_PAD_PROBE_DROP;
}
-static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
-{
- appendPipeline->reportAppsrcNeedDataReceived();
-}
-
static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
{
appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);