Modified: trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp (254502 => 254503)
--- trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp 2020-01-14 10:01:10 UTC (rev 254502)
+++ trunk/Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp 2020-01-14 10:16:19 UTC (rev 254503)
@@ -73,9 +73,9 @@
void loadFinished(PlatformMediaResource&) override;
static constexpr int s_growBlocksizeLimit { 1 };
- static constexpr int s_growBlocksizeCount { 1 };
+ static constexpr int s_growBlocksizeCount { 2 };
static constexpr int s_growBlocksizeFactor { 2 };
- static constexpr float s_reduceBlocksizeLimit { 0.20 };
+ static constexpr float s_reduceBlocksizeLimit { 0.5 };
static constexpr int s_reduceBlocksizeCount { 2 };
static constexpr float s_reduceBlocksizeFactor { 0.5 };
int m_reduceBlocksizeCount { 0 };
@@ -141,11 +141,12 @@
unsigned minimumBlocksize;
Lock adapterLock;
Condition adapterCondition;
- uint64_t queueSize { 0 };
bool isDownloadSuspended { false };
GRefPtr<GstAdapter> adapter;
GRefPtr<GstEvent> httpHeadersEvent;
GUniquePtr<GstStructure> httpHeaders;
+ WallTime downloadStartTime { WallTime::nan() };
+ uint64_t totalDownloadedBytes { 0 };
};
enum {
@@ -180,6 +181,8 @@
static gboolean webKitWebSrcUnLock(GstBaseSrc*);
static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
static void webKitWebSrcSetContext(GstElement*, GstContext*);
+static void restartLoaderIfNeeded(WebKitWebSrc*);
+static void stopLoaderIfNeeded(WebKitWebSrc*);
#define webkit_web_src_parent_class parent_class
WEBKIT_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_PUSH_SRC,
@@ -345,6 +348,72 @@
GST_ELEMENT_CLASS(parent_class)->set_context(element, context);
}
+static void restartLoaderIfNeeded(WebKitWebSrc* src)
+{
+ WebKitWebSrcPrivate* priv = src->priv;
+
+ if (!priv->isDownloadSuspended) {
+ GST_TRACE_OBJECT(src, "download already active");
+ return;
+ }
+
+ GST_TRACE_OBJECT(src, "is download suspended %s, does have EOS %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT
+ " (min %u)", boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->haveSize)
+ , boolForPrinting(priv->isSeekable), priv->size, SMALL_MEDIA_RESOURCE_MAX_SIZE);
+ if (priv->doesHaveEOS || !priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+ GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
+ return;
+ }
+ GST_TRACE_OBJECT(src, "read position %" G_GUINT64_FORMAT ", state %s", priv->readPosition, gst_element_state_get_name(GST_STATE(src)));
+ if (!priv->readPosition || priv->readPosition == priv->size || GST_STATE(src) < GST_STATE_PAUSED) {
+ GST_TRACE_OBJECT(src, "can't restart download");
+ return;
+ }
+
+ size_t queueSize = gst_adapter_available(priv->adapter.get());
+ GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (min %1.0f)", queueSize
+ , priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD);
+
+ if (queueSize >= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD) {
+ GST_TRACE_OBJECT(src, "queue size above low watermark, not restarting download");
+ return;
+ }
+
+ GST_DEBUG_OBJECT(src, "restarting download");
+ priv->isDownloadSuspended = false;
+ webKitWebSrcMakeRequest(GST_BASE_SRC_CAST(src), false);
+}
+
+
+static void stopLoaderIfNeeded(WebKitWebSrc* src)
+{
+ WebKitWebSrcPrivate* priv = src->priv;
+
+ if (priv->isDownloadSuspended) {
+ GST_TRACE_OBJECT(src, "download already suspended");
+ return;
+ }
+
+ GST_TRACE_OBJECT(src, "is download suspended %s, does have size %s, is seekable %s, size %" G_GUINT64_FORMAT " (min %u)"
+ , boolForPrinting(priv->isDownloadSuspended), boolForPrinting(priv->haveSize), boolForPrinting(priv->isSeekable), priv->size
+ , SMALL_MEDIA_RESOURCE_MAX_SIZE);
+ if (!priv->haveSize || !priv->isSeekable || priv->size <= SMALL_MEDIA_RESOURCE_MAX_SIZE) {
+ GST_TRACE_OBJECT(src, "download cannot be stopped/restarted");
+ return;
+ }
+
+ size_t queueSize = gst_adapter_available(priv->adapter.get());
+ GST_TRACE_OBJECT(src, "queue size %" G_GUINT64_FORMAT " (max %1.0f)", queueSize, priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
+ if (queueSize <= priv->size * HIGH_QUEUE_FACTOR_THRESHOLD) {
+ GST_TRACE_OBJECT(src, "queue size under high watermark, not stopping download");
+ return;
+ }
+
+ GST_DEBUG_OBJECT(src, "stopping download");
+ priv->isDownloadSuspended = true;
+ priv->resource->stop();
+}
+
static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
{
GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(pushSrc);
@@ -357,9 +426,7 @@
{
LockHolder adapterLocker(priv->adapterLock);
GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
- // Discard all the buffers coming before the requested seek position.
- gst_adapter_flush(priv->adapter.get(), priv->queueSize);
- priv->queueSize = 0;
+ gst_adapter_clear(priv->adapter.get());
}
uint64_t requestedPosition = priv->requestedPosition;
webKitWebSrcStop(baseSrc);
@@ -379,15 +446,12 @@
});
}
- GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, queueSize: %" G_GUINT64_FORMAT ", isDownloadSuspended: %s",
- boolForPrinting(priv->isFlushing), boolForPrinting(priv->doesHaveEOS), priv->queueSize,
- boolForPrinting(priv->isDownloadSuspended));
+ // We don't use the GstAdapter methods marked as fast anymore because sometimes it was slower. The reason why this was slower is that we can be
+ // waiting for more "fast" (that could be retrieved with the _fast API) buffers to be available even in cases where the queue is not empty. These
+ // other buffers can be retrieved with the "non _fast" API.
+ GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, isDownloadSuspended: %s", boolForPrinting(priv->isFlushing)
+ , boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->isDownloadSuspended));
- if (priv->isFlushing) {
- GST_DEBUG_OBJECT(src, "Flushing");
- return GST_FLOW_FLUSHING;
- }
-
if (priv->doesHaveEOS) {
GST_DEBUG_OBJECT(src, "EOS");
return GST_FLOW_EOS;
@@ -394,28 +458,43 @@
}
unsigned size = gst_base_src_get_blocksize(baseSrc);
- bool isAdapterDrained = false;
+ size_t queueSize;
{
LockHolder adapterLocker(priv->adapterLock);
unsigned retries = 0;
- size_t available = gst_adapter_available_fast(priv->adapter.get());
- while (available < size && !isAdapterDrained) {
- priv->adapterCondition.waitFor(priv->adapterLock, 100_ms, [&] {
- return gst_adapter_available_fast(priv->adapter.get()) >= size;
+ queueSize = gst_adapter_available(priv->adapter.get());
+ GST_TRACE_OBJECT(src, "available bytes %" G_GSIZE_FORMAT ", block size %u", queueSize, size);
+ while (!queueSize && !priv->isFlushing) {
+ GST_TRACE_OBJECT(src, "let's try to restart the download if possible and wait a bit if no data");
+ priv->adapterCondition.waitFor(priv->adapterLock, 1_s, [&] {
+ restartLoaderIfNeeded(src);
+ return priv->isFlushing || (!priv->isDownloadSuspended && gst_adapter_available(priv->adapter.get()));
});
- retries++;
- available = gst_adapter_available_fast(priv->adapter.get());
- if (available && available < size) {
- GST_TRACE_OBJECT(src, "did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, available);
- size = available;
- } else if (retries > 3)
- isAdapterDrained = true;
+ queueSize = gst_adapter_available(priv->adapter.get());
+ GST_TRACE_OBJECT(src, "available %" G_GSIZE_FORMAT, queueSize);
+ if (queueSize || priv->isFlushing) {
+ // We have data or we're flushing. We can break the loop here.
+ break;
+ }
+
+ // We should keep waiting but we could be in EOS. Let's check the two possibilities:
+ // 1. We are at the end of the file with a known size.
+ // 2. The download is not suspended and no more data are arriving. We cannot wait forever, 10x1s seems safe and sensible.
+ if (priv->haveSize && priv->readPosition >= priv->size) {
+ GST_DEBUG_OBJECT(src, "Waiting for data beyond the end, signalling EOS");
+ return GST_FLOW_EOS;
+ }
+ GST_TRACE_OBJECT(src, "is download suspended? %s, num retries %u", boolForPrinting(priv->isDownloadSuspended), retries + 1);
+ if (!priv->isDownloadSuspended && ++retries >= 10) {
+ GST_DEBUG_OBJECT(src, "Adapter still empty after 10s of waiting, assuming EOS");
+ return GST_FLOW_EOS;
+ }
}
}
- if (isAdapterDrained) {
- GST_DEBUG_OBJECT(src, "Adapter still empty after 400 milli-seconds of waiting, assuming EOS");
- return GST_FLOW_EOS;
+ if (priv->isFlushing) {
+ GST_DEBUG_OBJECT(src, "Flushing");
+ return GST_FLOW_FLUSHING;
}
if (priv->haveSize && !priv->isDurationSet) {
@@ -429,14 +508,17 @@
gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), priv->httpHeadersEvent.leakRef());
{
- GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
LockHolder adapterLocker(priv->adapterLock);
+ queueSize = gst_adapter_available(priv->adapter.get());
+ if (queueSize < size) {
+ GST_TRACE_OBJECT(src, "Did not get the %u blocksize bytes, let's push the %" G_GSIZE_FORMAT " bytes we got", size, queueSize);
+ size = queueSize;
+ } else
+ GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
if (size) {
- *buffer = gst_adapter_take_buffer_fast(priv->adapter.get(), size);
+ *buffer = gst_adapter_take_buffer(priv->adapter.get(), size);
RELEASE_ASSERT(*buffer);
- priv->queueSize -= size;
-
GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
@@ -449,18 +531,11 @@
} else if (priv->wasSeeking)
priv->wasSeeking = false;
- if (!priv->doesHaveEOS && priv->haveSize && priv->isSeekable
- && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && priv->readPosition
- && (priv->readPosition != priv->size)
- && (priv->queueSize < (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD * LOW_QUEUE_FACTOR_THRESHOLD))
- && GST_STATE(src) >= GST_STATE_PAUSED && priv->isDownloadSuspended) {
- GST_DEBUG_OBJECT(src, "[Buffering] Adapter running out of data, restarting download");
- priv->isDownloadSuspended = false;
- webKitWebSrcMakeRequest(baseSrc, false);
- }
-
- } else
- GST_ERROR_OBJECT(src, "Empty adapter?");
+ restartLoaderIfNeeded(src);
+ } else {
+ GST_ERROR_OBJECT(src, "Empty adapter!");
+ ASSERT_NOT_REACHED();
+ }
}
return GST_FLOW_OK;
@@ -548,6 +623,7 @@
priv->isDurationSet = false;
priv->doesHaveEOS = false;
priv->isFlushing = false;
+ priv->downloadStartTime = WallTime::nan();
priv->didPassAccessControlCheck = false;
@@ -671,7 +747,6 @@
{
LockHolder adapterLocker(priv->adapterLock);
gst_adapter_clear(priv->adapter.get());
- priv->queueSize = 0;
}
webkitWebSrcReset(src);
@@ -729,6 +804,7 @@
priv->isSeeking = true;
priv->requestedPosition = segment->start;
priv->stopPosition = segment->stop;
+ priv->adapterCondition.notifyOne();
return TRUE;
}
@@ -767,6 +843,7 @@
GST_DEBUG_OBJECT(src, "Unlock");
src->priv->isFlushing = true;
src->priv->responseCondition.notifyOne();
+ src->priv->adapterCondition.notifyOne();
return TRUE;
}
@@ -796,8 +873,10 @@
GST_DEBUG_OBJECT(src, "PAUSED->READY cancelling network requests");
src->priv->isFlushing = true;
src->priv->responseCondition.notifyOne();
+ src->priv->adapterCondition.notifyOne();
break;
- } default:
+ }
+ default:
break;
}
@@ -918,7 +997,7 @@
unsigned blocksize = gst_base_src_get_blocksize(baseSrc);
GST_LOG_OBJECT(src, "Checking to update blocksize. Read: %u, current blocksize: %u", bytesRead, blocksize);
- if (bytesRead >= blocksize * s_growBlocksizeLimit) {
+ if (bytesRead > blocksize * s_growBlocksizeLimit) {
m_reduceBlocksizeCount = 0;
m_increaseBlocksizeCount++;
@@ -1070,6 +1149,18 @@
GST_LOG_OBJECT(src, "Have %d bytes of data", length);
LockHolder locker(priv->responseLock);
+ // Rough bandwidth calculation. We ignore here the first data package because we would have to reset the counters when we issue the request and
+ // that first package delivery would include the time of sending out the request and getting the data back. Since we can't distinguish the
+ // sending time from the receiving time, it is better to ignore it.
+ if (!isnan(priv->downloadStartTime)) {
+ priv->totalDownloadedBytes += length;
+ double timeSinceStart = (WallTime::now() - priv->downloadStartTime).seconds();
+ GST_TRACE_OBJECT(src, "downloaded %u bytes in %f seconds =~ %1.0f bytes/second", priv->totalDownloadedBytes, timeSinceStart
+ , timeSinceStart ? priv->totalDownloadedBytes / timeSinceStart : 0);
+ } else {
+ priv->downloadStartTime = WallTime::now();
+ priv->totalDownloadedBytes = 0;
+ }
uint64_t newPosition = priv->readPosition + length;
if (LIKELY (priv->requestedPosition == priv->readPosition))
@@ -1104,18 +1195,8 @@
{
LockHolder adapterLocker(priv->adapterLock);
GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
- priv->queueSize += length;
gst_adapter_push(priv->adapter.get(), buffer);
- GST_TRACE_OBJECT(src, "[Buffering] isDownloadSuspended: %s", boolForPrinting(priv->isDownloadSuspended));
- if (priv->haveSize && (priv->size > SMALL_MEDIA_RESOURCE_MAX_SIZE) && (priv->queueSize > (priv->size * HIGH_QUEUE_FACTOR_THRESHOLD))
- && !priv->isDownloadSuspended && priv->isSeekable) {
- GST_TRACE_OBJECT(src, "[Buffering] queueSize: %" G_GUINT64_FORMAT ", threshold: %f", priv->queueSize,
- priv->size * HIGH_QUEUE_FACTOR_THRESHOLD);
- GST_DEBUG_OBJECT(src, "[Buffering] Stopping resource loader");
- priv->isDownloadSuspended = true;
- priv->resource->stop();
- return;
- }
+ stopLoaderIfNeeded(src);
priv->adapterCondition.notifyOne();
}
}