Title: [254503] trunk/Source/WebCore
Revision
254503
Author
calva...@igalia.com
Date
2020-01-14 02:16:19 -0800 (Tue, 14 Jan 2020)

Log Message

[GStreamer] Rework WebKitWebSrc to improve robustness
https://bugs.webkit.org/show_bug.cgi?id=206003

Reviewed by Philippe Normand.

Reworked how the web source deals with data. It's more eager now
in pushing data downstream.  We don't use the GstAdapter methods
marked as fast anymore because sometimes it was slower. The reason
why this was slower is that we can be waiting for more "fast"
(that could be retrieved with the _fast API) buffers to be
available even in cases where the queue is not empty. These other
buffers can be retrieved with the "non _fast" API.

The streaming thread locks now when it has no data to push
downstream and restarts the download if needed.

In this patch we also fixed the possible race condition of
receiving a flush during the streaming thread wait.

No new tests, just a rework.

* platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
(WebCore::MediaPlayerPrivateGStreamer::updateStates): Added FALLTHROUGH.
* platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
(restartLoaderIfNeeded):
(stopLoaderIfNeeded): Refactored.
(webKitWebSrcCreate): Avoid adapter methods marked as fast,
otherwise we might be waiting for data we already have. Streaming
thread is now going to lock waiting for data and is more eager in
pushing data downstream.
(webKitWebSrcStop): No more queueSize.
(webKitWebSrcDoSeek):
(webKitWebSrcUnLock):
(webKitWebSrcChangeState): Notify streaming thread.
(CachedResourceStreamingClient::checkUpdateBlocksize): Blocksize
adjustment improved. With former values blocksize grew too fast
and couldn't be reduced so easily. I think now it adjusts more
quickly to the real network values.
(CachedResourceStreamingClient::dataReceived): Added rudimentary
bandwith calculation and use stopLoaderIfNeeded.

Modified Paths

Diff

Modified: trunk/Source/WebCore/ChangeLog (254502 => 254503)


--- trunk/Source/WebCore/ChangeLog	2020-01-14 10:01:10 UTC (rev 254502)
+++ trunk/Source/WebCore/ChangeLog	2020-01-14 10:16:19 UTC (rev 254503)
@@ -1,3 +1,46 @@
+2020-01-14  Xabier Rodriguez Calvar  <calva...@igalia.com>
+
+        [GStreamer] Rework WebKitWebSrc to improve robustness
+        https://bugs.webkit.org/show_bug.cgi?id=206003
+
+        Reviewed by Philippe Normand.
+
+        Reworked how the web source deals with data. It's more eager now
+        in pushing data downstream.  We don't use the GstAdapter methods
+        marked as fast anymore because sometimes it was slower. The reason
+        why this was slower is that we can be waiting for more "fast"
+        (that could be retrieved with the _fast API) buffers to be
+        available even in cases where the queue is not empty. These other
+        buffers can be retrieved with the "non _fast" API.
+
+        The streaming thread locks now when it has no data to push
+        downstream and restarts the download if needed.
+
+        In this patch we also fixed the possible race condition of
+        receiving a flush during the streaming thread wait.
+
+        No new tests, just a rework.
+
+        * platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
+        (WebCore::MediaPlayerPrivateGStreamer::updateStates): Added FALLTHROUGH.
+        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
+        (restartLoaderIfNeeded):
+        (stopLoaderIfNeeded): Refactored.
+        (webKitWebSrcCreate): Avoid adapter methods marked as fast,
+        otherwise we might be waiting for data we already have. Streaming
+        thread is now going to lock waiting for data and is more eager in
+        pushing data downstream.
+        (webKitWebSrcStop): No more queueSize.
+        (webKitWebSrcDoSeek):
+        (webKitWebSrcUnLock):
+        (webKitWebSrcChangeState): Notify streaming thread.
+        (CachedResourceStreamingClient::checkUpdateBlocksize): Blocksize
+        adjustment improved. With former values blocksize grew too fast
+        and couldn't be reduced so easily. I think now it adjusts more
+        quickly to the real network values.
+        (CachedResourceStreamingClient::dataReceived): Added rudimentary
+        bandwith calculation and use stopLoaderIfNeeded.
+
 2020-01-14  Tomoki Imai  <tomoki.i...@sony.com>
 
         Always Use CAIRO_OPERATOR_SOURCE to copyRectFromOneSurfaceToAnother

Modified: trunk/Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp (254502 => 254503)


--- trunk/Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp	2020-01-14 10:01:10 UTC (rev 254502)
+++ trunk/Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp	2020-01-14 10:16:19 UTC (rev 254503)
@@ -2574,6 +2574,7 @@
             m_networkState = MediaPlayer::NetworkState::Empty;
             break;
         case GST_STATE_PAUSED:
+            FALLTHROUGH;
         case GST_STATE_PLAYING:
             if (m_isBuffering) {
                 if (m_bufferingPercentage == 100) {
@@ -2614,7 +2615,7 @@
         } else if (m_currentState == GST_STATE_PLAYING) {
             m_isPaused = false;
 
-            if ((m_isBuffering && m_isLiveStream) || !m_playbackRate) {
+            if ((m_isBuffering && !m_isLiveStream) || !m_playbackRate) {
                 GST_DEBUG_OBJECT(pipeline(), "[Buffering] Pausing stream for buffering.");
                 changePipelineState(GST_STATE_PAUSED);
             }

Modified: trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp (254502 => 254503)


--- trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp	2020-01-14 10:01:10 UTC (rev 254502)
+++ trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp	2020-01-14 10:16:19 UTC (rev 254503)
@@ -73,9 +73,9 @@
     void loadFinished(PlatformMediaResource&) override;
 
     static constexpr int s_growBlocksizeLimit { 1 };
-    static constexpr int s_growBlocksizeCount { 1 };
+    static constexpr int s_growBlocksizeCount { 2 };
     static constexpr int s_growBlocksizeFactor { 2 };
-    static constexpr float s_reduceBlocksizeLimit { 0.20 };
+    static constexpr float s_reduceBlocksizeLimit { 0.5 };
     static constexpr int s_reduceBlocksizeCount { 2 };
     static constexpr float s_reduceBlocksizeFactor { 0.5 };
     int m_reduceBlocksizeCount { 0 };
@@ -141,11 +141,12 @@
     unsigned minimumBlocksize;
     Lock adapterLock;
     Condition adapterCondition;
-    uint64_t queueSize { 0 };
     bool isDownloadSuspended { false };
     GRefPtr<GstAdapter> adapter;
     GRefPtr<GstEvent> httpHeadersEvent;
     GUniquePtr<GstStructure> httpHeaders;
+    WallTime downloadStartTime { WallTime::nan() };
+    uint64_t totalDownloadedBytes { 0 };
 };
 
 enum {
@@ -180,6 +181,8 @@
 static gboolean webKitWebSrcUnLock(GstBaseSrc*);
 static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
 static void webKitWebSrcSetContext(GstElement*, GstContext*);
+static void restartLoaderIfNeeded(WebKitWebSrc*);
+static void stopLoaderIfNeeded(WebKitWebSrc*);
 
 #define webkit_web_src_parent_class parent_class
 WEBKIT_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_PUSH_SRC,
@@ -345,6 +348,72 @@
     GST_ELEMENT_CLASS(parent_class)->set_context(element, context);
 }
 
+static void restartLoaderIfNeeded(WebKitWebSrc* src)
+{
+    WebKitWebSrcPrivate* priv = src->priv;
+
+    if (!priv->isDownloadSuspended) {
+        GST_TRACE_OBJECT(src, "download already active");
+        return;
+    }
+
+    GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT
+        " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize)
+        , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
+    if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+        GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
+        return;
+    }
+    GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src)));
+    if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) {
+        GST_TRACE_OBJECT(src, "can't restart download");
+        return;
+    }
+
+    size_t queueSize = gst_adapter_available(priv->adapter.get());
+    GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (min %1.0f)", queueSize
+        , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
+
+    if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
+        GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download");
+        return;
+    }
+
+    GST_DEBUG_OBJECT(src, "restarting download");
+    priv->isDownloadSuspended = false;
+    webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false);
+}
+
+
+static void stopLoaderIfNeeded(WebKitWebSrc* src)
+{
+    WebKitWebSrcPrivate* priv = src->priv;
+
+    if (priv->isDownloadSuspended) {
+        GST_TRACE_OBJECT(src, "download already suspended");
+        return;
+    }
+
+    GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)"
+        , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size
+        , SMALL_MEDIA_RESOURCE_MAX_SIZE);
+    if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+        GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
+        return;
+    }
+
+    size_t queueSize = gst_adapter_available(priv->adapter.get());
+    GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
+    if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
+        GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download");
+        return;
+    }
+
+    GST_DEBUG_OBJECT(src, "stopping download");
+    priv->isDownloadSuspended = true;
+    priv->resource->stop();
+}
+
 static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
 {
     GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(pushSrc);
@@ -357,9 +426,7 @@
         {
             LockHolder adapterLocker(priv->adapterLock);
             GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
-            // Discard all the buffers coming before the requested seek position.
-            gst_adapter_flush(priv->adapter.get(), priv->queueSize);
-            priv->queueSize = 0;
+            gst_adapter_clear(priv->adapter.get());
         }
         uint64_t requestedPosition = priv->requestedPosition;
         webKitWebSrcStop(baseSrc);
@@ -379,15 +446,12 @@
         });
     }
 
-    GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, queueSize: %" G_GUINT64_FORMAT ", isDownloadSuspended: %s",
-        boolForPrinting(priv->isFlushing), boolForPrinting(priv->doesHaveEOS), priv->queueSize,
-        boolForPrinting(priv->isDownloadSuspended));
+    // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
+    // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
+    // other buffers can be retrieved with the "non _fast" API.
+    GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing)
+        , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended));
 
-    if (priv->isFlushing) {
-        GST_DEBUG_OBJECT(src, "Flushing");
-        return GST_FLOW_FLUSHING;
-    }
-
     if (priv->doesHaveEOS) {
         GST_DEBUG_OBJECT(src, "EOS");
         return GST_FLOW_EOS;
@@ -394,28 +458,43 @@
     }
 
     unsigned size = gst_base_src_get_blocksize(baseSrc);
-    bool isAdapterDrained = false;
+    size_t queueSize;
     {
         LockHolder adapterLocker(priv->adapterLock);
         unsigned retries = 0;
-        size_t available = gst_adapter_available_fast(priv->adapter.get());
-        while (available < size && !isAdapterDrained) {
-            priv->adapterCondition.waitFor(priv->adapterLock, 100_ms, [&] {
-                return gst_adapter_available_fast(priv->adapter.get()) >= size;
+        queueSize = gst_adapter_available(priv->adapter.get());
+        GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
+        while (!queueSize && !priv->isFlushing) {
+            GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data");
+            priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] {
+                restartLoaderIfNeeded(src);
+                return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get()));
             });
-            retries++;
-            available = gst_adapter_available_fast(priv->adapter.get());
-            if (available && available < size) {
-                GST_TRACE_OBJECT(src, "did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, available);
-                size = available;
-            } else if (retries > 3)
-                isAdapterDrained = true;
+            queueSize = gst_adapter_available(priv->adapter.get());
+            GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
+            if (queueSize || priv->isFlushing) {
+                // We have data or we're flushing. We can break the loop here.
+                break;
+            }
+
+            // We should keep waiting but we could be in EOS. Let's check the two possibilities:
+            // 1. We are at the end of the file with a known size.
+            // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible.
+            if (priv->haveSize && priv->readPosition >= priv->size) {
+                GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS");
+                return GST_FLOW_EOS;
+            }
+            GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1);
+            if (!priv->isDownloadSuspended && ++retries >= 10) {
+                GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS");
+                return GST_FLOW_EOS;
+            }
         }
     }
 
-    if (isAdapterDrained) {
-        GST_DEBUG_OBJECT(src, "Adapter still empty after 400 milli-seconds of waiting, assuming EOS");
-        return GST_FLOW_EOS;
+    if (priv->isFlushing) {
+        GST_DEBUG_OBJECT(src, "Flushing");
+        return GST_FLOW_FLUSHING;
     }
 
     if (priv->haveSize && !priv->isDurationSet) {
@@ -429,14 +508,17 @@
         gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), priv->httpHeadersEvent.leakRef());
 
     {
-        GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
         LockHolder adapterLocker(priv->adapterLock);
+        queueSize = gst_adapter_available(priv->adapter.get());
+        if (queueSize < size) {
+            GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize);
+            size = queueSize;
+        } else
+            GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
         if (size) {
-            *buffer = gst_adapter_take_buffer_fast(priv->adapter.get(), size);
+            *buffer = gst_adapter_take_buffer(priv->adapter.get(), size);
             RELEASE_ASSERT(*buffer);
 
-            priv->queueSize -= size;
-
             GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
             GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
             GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
@@ -449,18 +531,11 @@
             } else if (priv->wasSeeking)
                 priv->wasSeeking = false;
 
-            if (!priv->doesHaveEOS && priv->haveSize && priv->isSeekable
-                && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && priv->readPosition
-                && (priv->readPosition != priv->size)
-                && (priv->queueSize < (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD))
-                && GST_STATE(src) >= GST_STATE_PAUSED && priv->isDownloadSuspended) {
-                GST_DEBUG_OBJECT(src, "[Buffering] Adapter running out of data, restarting download");
-                priv->isDownloadSuspended = false;
-                webKitWebSrcMakeRequest(baseSrc, false);
-            }
-
-        } else
-            GST_ERROR_OBJECT(src, "Empty adapter?");
+            restartLoaderIfNeeded(src);
+        } else {
+            GST_ERROR_OBJECT(src, "Empty adapter!");
+            ASSERT_NOT_REACHED();
+        }
     }
 
     return GST_FLOW_OK;
@@ -548,6 +623,7 @@
     priv->isDurationSet = false;
     priv->doesHaveEOS = false;
     priv->isFlushing = false;
+    priv->downloadStartTime = WallTime::nan();
 
     priv->didPassAccessControlCheck = false;
 
@@ -671,7 +747,6 @@
     {
         LockHolder adapterLocker(priv->adapterLock);
         gst_adapter_clear(priv->adapter.get());
-        priv->queueSize = 0;
     }
 
     webkitWebSrcReset(src);
@@ -729,6 +804,7 @@
     priv->isSeeking = true;
     priv->requestedPosition = segment->start;
     priv->stopPosition = segment->stop;
+    priv->adapterCondition.notifyOne();
     return TRUE;
 }
 
@@ -767,6 +843,7 @@
     GST_DEBUG_OBJECT(src, "Unlock");
     src->priv->isFlushing = true;
     src->priv->responseCondition.notifyOne();
+    src->priv->adapterCondition.notifyOne();
     return TRUE;
 }
 
@@ -796,8 +873,10 @@
         GST_DEBUG_OBJECT(src, "PAUSED->READY cancelling network requests");
         src->priv->isFlushing = true;
         src->priv->responseCondition.notifyOne();
+        src->priv->adapterCondition.notifyOne();
         break;
-    } default:
+    }
+    default:
         break;
     }
 
@@ -918,7 +997,7 @@
     unsigned blocksize = gst_base_src_get_blocksize(baseSrc);
     GST_LOG_OBJECT(src, "Checking to update blocksize. Read: %u, current blocksize: %u", bytesRead, blocksize);
 
-    if (bytesRead >= blocksize * s_growBlocksizeLimit) {
+    if (bytesRead > blocksize * s_growBlocksizeLimit) {
         m_reduceBlocksizeCount = 0;
         m_increaseBlocksizeCount++;
 
@@ -1070,6 +1149,18 @@
 
     GST_LOG_OBJECT(src, "Have %d bytes of data", length);
     LockHolder locker(priv->responseLock);
+    // Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and
+    // that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the
+    // sending time from the receiving time, it is better to ignore it.
+    if (!isnan(priv->downloadStartTime)) {
+        priv->totalDownloadedBytes += length;
+        double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds();
+        GST_TRACE_OBJECT(src, "downloaded %u bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart
+            , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0);
+    } else {
+        priv->downloadStartTime = WallTime::now();
+        priv->totalDownloadedBytes = 0;
+    }
 
     uint64_t newPosition = priv->readPosition + length;
     if (LIKELY (priv->requestedPosition == priv->readPosition))
@@ -1104,18 +1195,8 @@
     {
         LockHolder adapterLocker(priv->adapterLock);
         GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
-        priv->queueSize += length;
         gst_adapter_push(priv->adapter.get(), buffer);
-        GST_TRACE_OBJECT(src, "[Buffering] isDownloadSuspended: %s", boolForPrinting(priv->isDownloadSuspended));
-        if (priv->haveSize && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && (priv->queueSize > (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD))
-            && !priv->isDownloadSuspended && priv->isSeekable) {
-            GST_TRACE_OBJECT(src, "[Buffering] queueSize: %" G_GUINT64_FORMAT ", threshold: %f", priv->queueSize,
-                priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
-            GST_DEBUG_OBJECT(src, "[Buffering] Stopping resource loader");
-            priv->isDownloadSuspended = true;
-            priv->resource->stop();
-            return;
-        }
+        stopLoaderIfNeeded(src);
         priv->adapterCondition.notifyOne();
     }
 }
_______________________________________________
webkit-changes mailing list
webkit-changes@lists.webkit.org
https://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to