Diff
Modified: trunk/Source/WebCore/ChangeLog (256727 => 256728)
--- trunk/Source/WebCore/ChangeLog 2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/ChangeLog 2020-02-17 12:07:08 UTC (rev 256728)
@@ -1,3 +1,70 @@
+2020-02-17 Takashi Komori <[email protected]>
+
+ [Curl] Use shared single thread for WebSocket connections
+ https://bugs.webkit.org/show_bug.cgi?id=187984
+
+ Reviewed by Fujii Hironori.
+
+ This patch suppresses invoking worker threads for websocket connections.
+ CurlStreamScheduler starts up to one worker thread.
+
+ No new tests. Covered by existing WebSocket tests.
+
+ * platform/Curl.cmake:
+ * platform/network/curl/CurlContext.cpp:
+ (WebCore::CurlContext::streamScheduler):
+ (WebCore::CurlHandle::getActiveSocket):
+ (WebCore::CurlHandle::send):
+ (WebCore::CurlHandle::receive):
+ (WebCore::CurlSocketHandle::CurlSocketHandle): Deleted.
+ (WebCore::CurlSocketHandle::connect): Deleted.
+ (WebCore::CurlSocketHandle::send): Deleted.
+ (WebCore::CurlSocketHandle::receive): Deleted.
+ (WebCore::CurlSocketHandle::wait): Deleted.
+ * platform/network/curl/CurlContext.h:
+ * platform/network/curl/CurlStream.cpp: Added.
+ (WebCore::CurlStream::CurlStream):
+ (WebCore::CurlStream::~CurlStream):
+ (WebCore::CurlStream::destroyHandle):
+ (WebCore::CurlStream::send):
+ (WebCore::CurlStream::appendMonitoringFd):
+ (WebCore::CurlStream::tryToTransfer):
+ (WebCore::CurlStream::tryToReceive):
+ (WebCore::CurlStream::tryToSend):
+ (WebCore::CurlStream::notifyFailure):
+ * platform/network/curl/CurlStream.h: Added.
+ (WebCore::CurlStream::create):
+ * platform/network/curl/CurlStreamScheduler.cpp: Added.
+ (WebCore::CurlStreamScheduler::CurlStreamScheduler):
+ (WebCore::CurlStreamScheduler::~CurlStreamScheduler):
+ (WebCore::CurlStreamScheduler::createStream):
+ (WebCore::CurlStreamScheduler::destroyStream):
+ (WebCore::CurlStreamScheduler::send):
+ (WebCore::CurlStreamScheduler::callOnWorkerThread):
+ (WebCore::CurlStreamScheduler::callClientOnMainThread):
+ (WebCore::CurlStreamScheduler::startThreadIfNeeded):
+ (WebCore::CurlStreamScheduler::stopThreadIfNoMoreJobRunning):
+ (WebCore::CurlStreamScheduler::executeTasks):
+ (WebCore::CurlStreamScheduler::workerThread):
+ * platform/network/curl/CurlStreamScheduler.h: Added.
+ * platform/network/curl/SocketStreamHandleImpl.h:
+ (WebCore::SocketStreamHandleImpl::isStreamInvalidated):
+ * platform/network/curl/SocketStreamHandleImplCurl.cpp:
+ (WebCore::SocketStreamHandleImpl::SocketStreamHandleImpl):
+ (WebCore::SocketStreamHandleImpl::~SocketStreamHandleImpl):
+ (WebCore::SocketStreamHandleImpl::platformSendInternal):
+ (WebCore::SocketStreamHandleImpl::platformClose):
+ (WebCore::SocketStreamHandleImpl::didOpen):
+ (WebCore::SocketStreamHandleImpl::didSendData):
+ (WebCore::SocketStreamHandleImpl::didReceiveData):
+ (WebCore::SocketStreamHandleImpl::didFail):
+ (WebCore::SocketStreamHandleImpl::destructStream):
+ (WebCore::SocketStreamHandleImpl::threadEntryPoint): Deleted.
+ (WebCore::SocketStreamHandleImpl::handleError): Deleted.
+ (WebCore::SocketStreamHandleImpl::stopThread): Deleted.
+ (WebCore::SocketStreamHandleImpl::callOnWorkerThread): Deleted.
+ (WebCore::SocketStreamHandleImpl::executeTasks): Deleted.
+
2020-02-17 Carlos Garcia Campos <[email protected]>
[WPE] Use custom theme style to render buttons
Modified: trunk/Source/WebCore/platform/Curl.cmake (256727 => 256728)
--- trunk/Source/WebCore/platform/Curl.cmake 2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/Curl.cmake 2020-02-17 12:07:08 UTC (rev 256728)
@@ -22,6 +22,8 @@
platform/network/curl/CurlResourceHandleDelegate.cpp
platform/network/curl/CurlSSLHandle.cpp
platform/network/curl/CurlSSLVerifier.cpp
+ platform/network/curl/CurlStream.cpp
+ platform/network/curl/CurlStreamScheduler.cpp
platform/network/curl/DNSResolveQueueCurl.cpp
platform/network/curl/NetworkStorageSessionCurl.cpp
platform/network/curl/OpenSSLHelper.cpp
@@ -58,6 +60,8 @@
platform/network/curl/CurlResponse.h
platform/network/curl/CurlSSLHandle.h
platform/network/curl/CurlSSLVerifier.h
+ platform/network/curl/CurlStream.h
+ platform/network/curl/CurlStreamScheduler.h
platform/network/curl/DNSResolveQueueCurl.h
platform/network/curl/DownloadBundle.h
platform/network/curl/OpenSSLHelper.h
Modified: trunk/Source/WebCore/platform/network/curl/CurlContext.cpp (256727 => 256728)
--- trunk/Source/WebCore/platform/network/curl/CurlContext.cpp 2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/CurlContext.cpp 2020-02-17 12:07:08 UTC (rev 256728)
@@ -32,6 +32,7 @@
#include "CurlRequestScheduler.h"
#include "CurlSSLHandle.h"
#include "CurlSSLVerifier.h"
+#include "CurlStreamScheduler.h"
#include "HTTPHeaderMap.h"
#include <NetworkLoadMetrics.h>
#include <mutex>
@@ -146,6 +147,12 @@
curl_easy_cleanup(curl);
}
+CurlStreamScheduler& CurlContext::streamScheduler()
+{
+ static NeverDestroyed<CurlStreamScheduler> sharedInstance;
+ return sharedInstance;
+}
+
bool CurlContext::isHttp2Enabled() const
{
curl_version_info_data* data = ""
@@ -888,116 +895,27 @@
#endif
-// CurlSocketHandle
-
-CurlSocketHandle::CurlSocketHandle(const URL& url, Function<void(CURLcode)>&& errorHandler)
- : m_errorHandler(WTFMove(errorHandler))
+Expected<curl_socket_t, CURLcode> CurlHandle::getActiveSocket()
{
- // Libcurl is not responsible for the protocol handling. It just handles connection.
- // Only scheme, host and port is required.
- URL urlForConnection;
- urlForConnection.setProtocol(url.protocolIs("wss") ? "https" : "http");
- urlForConnection.setHostAndPort(url.hostAndPort());
- setUrl(urlForConnection);
+ curl_socket_t socket;
- enableConnectionOnly();
-}
+ CURLcode errorCode = curl_easy_getinfo(m_handle, CURLINFO_ACTIVESOCKET, &socket);
+ if (errorCode != CURLE_OK)
+ return makeUnexpected(errorCode);
-bool CurlSocketHandle::connect()
-{
- CURLcode errorCode = perform();
- if (errorCode != CURLE_OK) {
- m_errorHandler(errorCode);
- return false;
- }
-
- return true;
+ return socket;
}
-size_t CurlSocketHandle::send(const uint8_t* buffer, size_t size)
+CURLcode CurlHandle::send(const uint8_t* buffer, size_t bufferSize, size_t& bytesSent)
{
- size_t totalBytesSent = 0;
-
- while (totalBytesSent < size) {
- size_t bytesSent = 0;
- CURLcode errorCode = curl_easy_send(handle(), buffer + totalBytesSent, size - totalBytesSent, &bytesSent);
- if (errorCode != CURLE_OK) {
- if (errorCode != CURLE_AGAIN)
- m_errorHandler(errorCode);
- break;
- }
-
- totalBytesSent += bytesSent;
- }
-
- return totalBytesSent;
+ return curl_easy_send(m_handle, buffer, bufferSize, &bytesSent);
}
-Optional<size_t> CurlSocketHandle::receive(uint8_t* buffer, size_t bufferSize)
+CURLcode CurlHandle::receive(uint8_t* buffer, size_t bufferSize, size_t& bytesRead)
{
- size_t bytesRead = 0;
-
- CURLcode errorCode = curl_easy_recv(handle(), buffer, bufferSize, &bytesRead);
- if (errorCode != CURLE_OK) {
- if (errorCode != CURLE_AGAIN)
- m_errorHandler(errorCode);
-
- return WTF::nullopt;
- }
-
- return bytesRead;
+ return curl_easy_recv(m_handle, buffer, bufferSize, &bytesRead);
}
-Optional<CurlSocketHandle::WaitResult> CurlSocketHandle::wait(const Seconds& timeout, bool alsoWaitForWrite)
-{
- curl_socket_t socket;
- CURLcode errorCode = curl_easy_getinfo(handle(), CURLINFO_ACTIVESOCKET, &socket);
- if (errorCode != CURLE_OK) {
- m_errorHandler(errorCode);
- return WTF::nullopt;
- }
-
- int64_t usec = timeout.microsecondsAs<int64_t>();
-
- struct timeval selectTimeout;
- if (usec <= 0) {
- selectTimeout.tv_sec = 0;
- selectTimeout.tv_usec = 0;
- } else {
- selectTimeout.tv_sec = usec / 1000000;
- selectTimeout.tv_usec = usec % 1000000;
- }
-
- int rc = 0;
- int maxfd = static_cast<int>(socket) + 1;
- fd_set fdread;
- fd_set fdwrite;
- fd_set fderr;
-
- // Retry 'select' if it was interrupted by a process signal.
- do {
- FD_ZERO(&fdread);
- FD_SET(socket, &fdread);
-
- FD_ZERO(&fdwrite);
- if (alsoWaitForWrite)
- FD_SET(socket, &fdwrite);
-
- FD_ZERO(&fderr);
- FD_SET(socket, &fderr);
-
- rc = ::select(maxfd, &fdread, &fdwrite, &fderr, &selectTimeout);
- } while (rc == -1 && errno == EINTR);
-
- if (rc <= 0)
- return WTF::nullopt;
-
- WaitResult result;
- result.readable = FD_ISSET(socket, &fdread) || FD_ISSET(socket, &fderr);
- result.writable = FD_ISSET(socket, &fdwrite);
- return result;
}
-}
-
#endif
Modified: trunk/Source/WebCore/platform/network/curl/CurlContext.h (256727 => 256728)
--- trunk/Source/WebCore/platform/network/curl/CurlContext.h 2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/CurlContext.h 2020-02-17 12:07:08 UTC (rev 256728)
@@ -89,6 +89,7 @@
// CurlContext --------------------------------------------
class CurlRequestScheduler;
+class CurlStreamScheduler;
class CurlContext : public CurlGlobal {
WTF_MAKE_NONCOPYABLE(CurlContext);
@@ -101,6 +102,7 @@
const CurlShareHandle& shareHandle() { return m_shareHandle; }
CurlRequestScheduler& scheduler() { return *m_scheduler; }
+ CurlStreamScheduler& streamScheduler();
// Proxy
const CurlProxySettings& proxySettings() const { return m_proxySettings; }
@@ -293,6 +295,11 @@
static long long maxCurlOffT();
+ // socket
+ Expected<curl_socket_t, CURLcode> getActiveSocket();
+ CURLcode send(const uint8_t*, size_t, size_t&);
+ CURLcode receive(uint8_t*, size_t, size_t&);
+
#ifndef NDEBUG
void enableVerboseIfUsed();
void enableStdErrIfUsed();
@@ -313,24 +320,4 @@
std::unique_ptr<CurlSSLVerifier> m_sslVerifier;
};
-class CurlSocketHandle : public CurlHandle {
- WTF_MAKE_NONCOPYABLE(CurlSocketHandle);
-
-public:
- struct WaitResult {
- bool readable { false };
- bool writable { false };
- };
-
- CurlSocketHandle(const URL&, Function<void(CURLcode)>&& errorHandler);
-
- bool connect();
- size_t send(const uint8_t*, size_t);
- Optional<size_t> receive(uint8_t*, size_t);
- Optional<WaitResult> wait(const Seconds& timeout, bool alsoWaitForWrite);
-
-private:
- Function<void(CURLcode)> m_errorHandler;
-};
-
} // namespace WebCore
Added: trunk/Source/WebCore/platform/network/curl/CurlStream.cpp (0 => 256728)
--- trunk/Source/WebCore/platform/network/curl/CurlStream.cpp (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStream.cpp 2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,193 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "CurlStream.h"
+
+#include "CurlStreamScheduler.h"
+#include "SocketStreamError.h"
+
+#if USE(CURL)
+
+namespace WebCore {
+
+CurlStream::CurlStream(CurlStreamScheduler& scheduler, CurlStreamID streamID, URL&& url)
+ : m_scheduler(scheduler)
+ , m_streamID(streamID)
+{
+ ASSERT(!isMainThread());
+
+ m_curlHandle = WTF::makeUnique<CurlHandle>();
+
+ // Libcurl is not responsible for the protocol handling. It just handles connection.
+ // Only scheme, host and port is required.
+ URL urlForConnection;
+ urlForConnection.setProtocol(url.protocolIs("wss") ? "https" : "http");
+ urlForConnection.setHostAndPort(url.hostAndPort());
+ m_curlHandle->setUrl(urlForConnection);
+
+ m_curlHandle->enableConnectionOnly();
+
+ auto errorCode = m_curlHandle->perform();
+ if (errorCode != CURLE_OK) {
+ notifyFailure(errorCode);
+ return;
+ }
+
+ m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID](Client& client) {
+ client.didOpen(streamID);
+ });
+}
+
+CurlStream::~CurlStream()
+{
+ ASSERT(!isMainThread());
+ destroyHandle();
+}
+
+void CurlStream::destroyHandle()
+{
+ if (!m_curlHandle)
+ return;
+
+ m_curlHandle = nullptr;
+}
+
+void CurlStream::send(UniqueArray<uint8_t>&& buffer, size_t length)
+{
+ ASSERT(!isMainThread());
+
+ if (!m_curlHandle)
+ return;
+
+ m_sendBuffers.append(std::make_pair(WTFMove(buffer), length));
+}
+
+void CurlStream::appendMonitoringFd(fd_set& readfds, fd_set& writefds, fd_set& exceptfds, int& maxfd)
+{
+ ASSERT(!isMainThread());
+
+ if (!m_curlHandle)
+ return;
+
+ auto socket = m_curlHandle->getActiveSocket();
+ if (!socket.has_value()) {
+ notifyFailure(socket.error());
+ return;
+ }
+
+ FD_SET(*socket, &readfds);
+ FD_SET(*socket, &exceptfds);
+
+ if (m_sendBuffers.size())
+ FD_SET(*socket, &writefds);
+
+ if (maxfd < *socket)
+ maxfd = *socket;
+}
+
+void CurlStream::tryToTransfer(const fd_set& readfds, const fd_set& writefds, const fd_set& exceptfds)
+{
+ ASSERT(!isMainThread());
+
+ if (!m_curlHandle)
+ return;
+
+ auto socket = m_curlHandle->getActiveSocket();
+ if (!socket.has_value()) {
+ notifyFailure(socket.error());
+ return;
+ }
+
+ if (FD_ISSET(*socket, &readfds) || FD_ISSET(*socket, &exceptfds))
+ tryToReceive();
+
+ if (FD_ISSET(*socket, &writefds))
+ tryToSend();
+}
+
+void CurlStream::tryToReceive()
+{
+ if (!m_curlHandle)
+ return;
+
+ auto receiveBuffer = makeUniqueArray<uint8_t>(kReceiveBufferSize);
+ size_t bytesReceived = 0;
+
+ auto errorCode = m_curlHandle->receive(receiveBuffer.get(), kReceiveBufferSize, bytesReceived);
+ if (errorCode != CURLE_OK) {
+ if (errorCode != CURLE_AGAIN)
+ notifyFailure(errorCode);
+ return;
+ }
+
+ // 0 bytes indicates a closed connection.
+ if (!bytesReceived)
+ destroyHandle();
+
+ m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID, buffer = WTFMove(receiveBuffer), length = bytesReceived](Client& client) mutable {
+ client.didReceiveData(streamID, reinterpret_cast<const char*>(buffer.get()), length);
+ });
+}
+
+void CurlStream::tryToSend()
+{
+ if (!m_curlHandle || !m_sendBuffers.size())
+ return;
+
+ auto& [buffer, length] = m_sendBuffers.first();
+ size_t bytesSent = 0;
+
+ auto errorCode = m_curlHandle->send(buffer.get() + m_sendBufferOffset, length - m_sendBufferOffset, bytesSent);
+ if (errorCode != CURLE_OK) {
+ if (errorCode != CURLE_AGAIN)
+ notifyFailure(errorCode);
+ return;
+ }
+
+ m_sendBufferOffset += bytesSent;
+
+ if (m_sendBufferOffset >= length) {
+ m_sendBuffers.remove(0);
+ m_sendBufferOffset = 0;
+ }
+
+ m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID, length = bytesSent](Client& client) {
+ client.didSendData(streamID, length);
+ });
+}
+
+void CurlStream::notifyFailure(CURLcode errorCode)
+{
+ destroyHandle();
+
+ m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID, errorCode](Client& client) mutable {
+ client.didFail(streamID, errorCode);
+ });
+}
+
+}
+
+#endif
Added: trunk/Source/WebCore/platform/network/curl/CurlStream.h (0 => 256728)
--- trunk/Source/WebCore/platform/network/curl/CurlStream.h (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStream.h 2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "CurlContext.h"
+#include <wtf/URL.h>
+#include <wtf/UniqueArray.h>
+#include <wtf/Vector.h>
+
+namespace WebCore {
+
+class CurlStreamScheduler;
+class SocketStreamError;
+
+using CurlStreamID = uint16_t;
+const CurlStreamID invalidCurlStreamID = 0;
+
+class CurlStream {
+ WTF_MAKE_FAST_ALLOCATED;
+ WTF_MAKE_NONCOPYABLE(CurlStream);
+public:
+ class Client {
+ public:
+ virtual void didOpen(CurlStreamID) = 0;
+ virtual void didSendData(CurlStreamID, size_t) = 0;
+ virtual void didReceiveData(CurlStreamID, const char*, size_t) = 0;
+ virtual void didFail(CurlStreamID, CURLcode) = 0;
+ };
+
+ static std::unique_ptr<CurlStream> create(CurlStreamScheduler& scheduler, CurlStreamID streamID, URL&& url)
+ {
+ return WTF::makeUnique<CurlStream>(scheduler, streamID, WTFMove(url));
+ }
+
+ CurlStream(CurlStreamScheduler&, CurlStreamID, URL&&);
+ virtual ~CurlStream();
+
+ void send(UniqueArray<uint8_t>&&, size_t);
+
+ void appendMonitoringFd(fd_set&, fd_set&, fd_set&, int&);
+ void tryToTransfer(const fd_set&, const fd_set&, const fd_set&);
+
+private:
+ void destroyHandle();
+
+ void tryToReceive();
+ void tryToSend();
+
+ void notifyFailure(CURLcode);
+
+ static const size_t kReceiveBufferSize = 16 * 1024;
+
+ CurlStreamScheduler& m_scheduler;
+ CurlStreamID m_streamID;
+
+ std::unique_ptr<CurlHandle> m_curlHandle;
+
+ WTF::Vector<std::pair<UniqueArray<uint8_t>, size_t>> m_sendBuffers;
+ size_t m_sendBufferOffset { 0 };
+};
+
+} // namespace WebCore
Added: trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.cpp (0 => 256728)
--- trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.cpp (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.cpp 2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,190 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "CurlStreamScheduler.h"
+
+#if USE(CURL)
+
+namespace WebCore {
+
+CurlStreamScheduler::CurlStreamScheduler()
+{
+ ASSERT(isMainThread());
+}
+
+CurlStreamScheduler::~CurlStreamScheduler()
+{
+ ASSERT(isMainThread());
+}
+
+CurlStreamID CurlStreamScheduler::createStream(const URL& url, CurlStream::Client& client)
+{
+ ASSERT(isMainThread());
+
+ do {
+ m_currentStreamID = (m_currentStreamID + 1 != invalidCurlStreamID) ? m_currentStreamID + 1 : 1;
+ } while (m_clientList.contains(m_currentStreamID));
+
+ auto streamID = m_currentStreamID;
+ m_clientList.add(streamID, &client);
+
+ callOnWorkerThread([this, streamID, url = "" mutable {
+ m_streamList.add(streamID, CurlStream::create(*this, streamID, WTFMove(url)));
+ });
+
+ return streamID;
+}
+
+void CurlStreamScheduler::destroyStream(CurlStreamID streamID)
+{
+ ASSERT(isMainThread());
+
+ if (m_clientList.contains(streamID))
+ m_clientList.remove(streamID);
+
+ callOnWorkerThread([this, streamID]() {
+ if (m_streamList.contains(streamID))
+ m_streamList.remove(streamID);
+ });
+}
+
+void CurlStreamScheduler::send(CurlStreamID streamID, UniqueArray<uint8_t>&& data, size_t length)
+{
+ ASSERT(isMainThread());
+
+ callOnWorkerThread([this, streamID, data = "" length]() mutable {
+ if (auto stream = m_streamList.get(streamID))
+ stream->send(WTFMove(data), length);
+ });
+}
+
+void CurlStreamScheduler::callOnWorkerThread(WTF::Function<void()>&& task)
+{
+ ASSERT(isMainThread());
+
+ {
+ auto locker = holdLock(m_mutex);
+ m_taskQueue.append(WTFMove(task));
+ }
+
+ startThreadIfNeeded();
+}
+
+void CurlStreamScheduler::callClientOnMainThread(CurlStreamID streamID, WTF::Function<void(CurlStream::Client&)>&& task)
+{
+ ASSERT(!isMainThread());
+
+ callOnMainThread([this, streamID, task = WTFMove(task)]() {
+ if (auto client = m_clientList.get(streamID))
+ task(*client);
+ });
+}
+
+void CurlStreamScheduler::startThreadIfNeeded()
+{
+ {
+ auto locker = holdLock(m_mutex);
+ if (m_runThread)
+ return;
+ }
+
+ if (m_thread)
+ m_thread->waitForCompletion();
+
+ m_runThread = true;
+
+ m_thread = Thread::create("curlStreamThread", [this] {
+ workerThread();
+ });
+}
+
+void CurlStreamScheduler::stopThreadIfNoMoreJobRunning()
+{
+ ASSERT(!isMainThread());
+
+ if (m_streamList.size())
+ return;
+
+ auto locker = holdLock(m_mutex);
+ if (m_taskQueue.size())
+ return;
+
+ m_runThread = false;
+}
+
+void CurlStreamScheduler::executeTasks()
+{
+ ASSERT(!isMainThread());
+
+ Vector<WTF::Function<void()>> taskQueue;
+
+ {
+ auto locker = holdLock(m_mutex);
+ taskQueue = WTFMove(m_taskQueue);
+ }
+
+ for (auto& task : taskQueue)
+ task();
+}
+
+void CurlStreamScheduler::workerThread()
+{
+ ASSERT(!isMainThread());
+ static const int selectTimeoutMS = 20;
+ struct timeval timeout { 0, selectTimeoutMS * 1000};
+
+ while (m_runThread) {
+ executeTasks();
+
+ int rc = 0;
+ fd_set readfds;
+ fd_set writefds;
+ fd_set exceptfds;
+
+ do {
+ int maxfd = -1;
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ FD_ZERO(&exceptfds);
+
+ for (auto& stream : m_streamList.values())
+ stream->appendMonitoringFd(readfds, writefds, exceptfds, maxfd);
+
+ if (maxfd >= 0)
+ rc = ::select(maxfd + 1, &readfds, &writefds, &exceptfds, &timeout);
+ } while (rc == -1 && errno == EINTR);
+
+ for (auto& stream : m_streamList.values())
+ stream->tryToTransfer(readfds, writefds, exceptfds);
+
+ stopThreadIfNoMoreJobRunning();
+ }
+}
+
+}
+
+#endif
Added: trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.h (0 => 256728)
--- trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.h (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.h 2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "CurlStream.h"
+#include <wtf/Function.h>
+#include <wtf/HashMap.h>
+
+namespace WebCore {
+
+class CurlStreamScheduler {
+ WTF_MAKE_FAST_ALLOCATED;
+ WTF_MAKE_NONCOPYABLE(CurlStreamScheduler);
+public:
+ CurlStreamScheduler();
+ virtual ~CurlStreamScheduler();
+
+ CurlStreamID createStream(const URL&, CurlStream::Client&);
+ void destroyStream(CurlStreamID);
+ void send(CurlStreamID, UniqueArray<uint8_t>&&, size_t);
+
+ void callOnWorkerThread(WTF::Function<void()>&&);
+ void callClientOnMainThread(CurlStreamID, WTF::Function<void(CurlStream::Client&)>&&);
+
+private:
+ void startThreadIfNeeded();
+ void stopThreadIfNoMoreJobRunning();
+
+ void executeTasks();
+
+ void workerThread();
+
+ Lock m_mutex;
+ RefPtr<Thread> m_thread;
+ bool m_runThread { false };
+
+ CurlStreamID m_currentStreamID = 1;
+
+ Vector<Function<void()>> m_taskQueue;
+ HashMap<CurlStreamID, CurlStream::Client*> m_clientList;
+ HashMap<CurlStreamID, std::unique_ptr<CurlStream>> m_streamList;
+};
+
+} // namespace WebCore
Modified: trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h (256727 => 256728)
--- trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h 2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h 2020-02-17 12:07:08 UTC (rev 256728)
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2009-2018 Apple Inc. All rights reserved.
* Copyright (C) 2009 Google Inc. All rights reserved.
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
@@ -31,16 +32,10 @@
#pragma once
-#include "CurlContext.h"
+#include "CurlStream.h"
#include "SocketStreamHandle.h"
#include <pal/SessionID.h>
-#include <wtf/Function.h>
-#include <wtf/Lock.h>
-#include <wtf/MessageQueue.h>
-#include <wtf/RefCounted.h>
#include <wtf/StreamBuffer.h>
-#include <wtf/Threading.h>
-#include <wtf/UniqueArray.h>
namespace WebCore {
@@ -47,7 +42,7 @@
class SocketStreamHandleClient;
class StorageSessionProvider;
-class SocketStreamHandleImpl : public SocketStreamHandle {
+class SocketStreamHandleImpl : public SocketStreamHandle, public CurlStream::Client {
public:
static Ref<SocketStreamHandleImpl> create(const URL& url, SocketStreamHandleClient& client, PAL::SessionID, const String&, SourceApplicationAuditToken&&, const StorageSessionProvider* provider) { return adoptRef(*new SocketStreamHandleImpl(url, client, provider)); }
@@ -64,28 +59,22 @@
Optional<size_t> platformSendInternal(const uint8_t*, size_t);
bool sendPendingData();
- void threadEntryPoint(const URL&);
- void handleError(CURLcode);
- void stopThread();
+ void didOpen(CurlStreamID) final;
+ void didSendData(CurlStreamID, size_t) final;
+ void didReceiveData(CurlStreamID, const char*, size_t) final;
+ void didFail(CurlStreamID, CURLcode) final;
- void callOnWorkerThread(Function<void()>&&);
- void executeTasks();
+ bool isStreamInvalidated() { return m_streamID == invalidCurlStreamID; }
+ void destructStream();
- static const size_t kReadBufferSize = 4 * 1024;
-
RefPtr<const StorageSessionProvider> m_storageSessionProvider;
- RefPtr<Thread> m_workerThread;
- std::atomic<bool> m_running { true };
- MessageQueue<Function<void()>> m_taskQueue;
-
- bool m_hasPendingWriteData { false };
- size_t m_writeBufferSize { 0 };
- size_t m_writeBufferOffset { 0 };
- UniqueArray<uint8_t> m_writeBuffer;
-
StreamBuffer<uint8_t, 1024 * 1024> m_buffer;
static const unsigned maxBufferSize = 100 * 1024 * 1024;
+
+ CurlStreamScheduler& m_scheduler;
+ CurlStreamID m_streamID { invalidCurlStreamID };
+ unsigned m_totalSendDataSize { 0 };
};
} // namespace WebCore
Modified: trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp (256727 => 256728)
--- trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp 2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp 2020-02-17 12:07:08 UTC (rev 256728)
@@ -1,7 +1,7 @@
/*
* Copyright (C) 2009 Brent Fulgham. All rights reserved.
* Copyright (C) 2009 Google Inc. All rights reserved.
- * Copyright (C) 2018 Sony Interactive Entertainment Inc.
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
@@ -35,14 +35,11 @@
#if USE(CURL)
+#include "CurlStreamScheduler.h"
#include "DeprecatedGlobalSettings.h"
-#include "Logging.h"
#include "SocketStreamError.h"
#include "SocketStreamHandleClient.h"
#include "StorageSessionProvider.h"
-#include <wtf/MainThread.h>
-#include <wtf/URL.h>
-#include <wtf/text/CString.h>
namespace WebCore {
@@ -49,169 +46,92 @@
SocketStreamHandleImpl::SocketStreamHandleImpl(const URL& url, SocketStreamHandleClient& client, const StorageSessionProvider* provider)
: SocketStreamHandle(url, client)
, m_storageSessionProvider(provider)
+ , m_scheduler(CurlContext::singleton().streamScheduler())
{
- LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);
- ASSERT(isMainThread());
-
// FIXME: Using DeprecatedGlobalSettings from here is a layering violation.
if (m_url.protocolIs("wss") && DeprecatedGlobalSettings::allowsAnySSLCertificate())
CurlContext::singleton().sslHandle().setIgnoreSSLErrors(true);
- m_workerThread = Thread::create("WebSocket thread", [this, protectedThis = makeRef(*this), url = "" {
- threadEntryPoint(url);
- });
+ m_streamID = m_scheduler.createStream(m_url, *this);
}
SocketStreamHandleImpl::~SocketStreamHandleImpl()
{
- LOG(Network, "SocketStreamHandle %p delete", this);
- stopThread();
+ destructStream();
}
Optional<size_t> SocketStreamHandleImpl::platformSendInternal(const uint8_t* data, size_t length)
{
- LOG(Network, "SocketStreamHandle %p platformSend", this);
- ASSERT(isMainThread());
+ if (isStreamInvalidated())
+ return WTF::nullopt;
- if (m_hasPendingWriteData)
+ if (m_totalSendDataSize + length > maxBufferSize)
return 0;
+ m_totalSendDataSize += length;
- m_hasPendingWriteData = true;
+ auto buffer = makeUniqueArray<uint8_t>(length);
+ memcpy(buffer.get(), data, length);
- auto writeBuffer = makeUniqueArray<uint8_t>(length);
- memcpy(writeBuffer.get(), data, length);
-
- callOnWorkerThread([this, writeBuffer = WTFMove(writeBuffer), writeBufferSize = length]() mutable {
- ASSERT(!isMainThread());
- m_writeBuffer = WTFMove(writeBuffer);
- m_writeBufferSize = writeBufferSize;
- m_writeBufferOffset = 0;
- });
-
+ m_scheduler.send(m_streamID, WTFMove(buffer), length);
return length;
}
void SocketStreamHandleImpl::platformClose()
{
- LOG(Network, "SocketStreamHandle %p platformClose", this);
- ASSERT(isMainThread());
+ destructStream();
if (m_state == Closed)
return;
m_state = Closed;
- stopThread();
m_client.didCloseSocketStream(*this);
}
-void SocketStreamHandleImpl::threadEntryPoint(const URL& url)
+void SocketStreamHandleImpl::didOpen(CurlStreamID)
{
- ASSERT(!isMainThread());
-
- CurlSocketHandle socket { url, [this](CURLcode errorCode) {
- handleError(errorCode);
- }};
-
- // Connect to host
- if (!socket.connect())
+ if (m_state != Connecting)
return;
+ m_state = Open;
- callOnMainThread([this, protectedThis = makeRef(*this)] {
- if (m_state == Connecting) {
- m_state = Open;
- m_client.didOpenSocketStream(*this);
- }
- });
+ m_client.didOpenSocketStream(*this);
+}
- while (m_running) {
- executeTasks();
+void SocketStreamHandleImpl::didSendData(CurlStreamID, size_t length)
+{
+ ASSERT(m_totalSendDataSize - length >= 0);
- auto result = socket.wait(20_ms, m_writeBuffer.get());
- if (!result)
- continue;
-
- // These logic only run when there's data waiting.
- if (result->writable && m_running) {
- auto bytesSent = socket.send(m_writeBuffer.get() + m_writeBufferOffset, m_writeBufferSize - m_writeBufferOffset);
- m_writeBufferOffset += bytesSent;
-
- if (m_writeBufferSize <= m_writeBufferOffset) {
- m_writeBuffer = nullptr;
- m_writeBufferSize = 0;
- m_writeBufferOffset = 0;
-
- callOnMainThread([this, protectedThis = makeRef(*this)] {
- m_hasPendingWriteData = false;
- sendPendingData();
- });
- }
- }
-
- if (result->readable && m_running) {
- auto readBuffer = makeUniqueArray<uint8_t>(kReadBufferSize);
- auto bytesRead = socket.receive(readBuffer.get(), kReadBufferSize);
- // `nullopt` result means nothing to handle at this moment.
- if (!bytesRead)
- continue;
-
- // 0 bytes indicates a closed connection.
- if (!*bytesRead) {
- m_running = false;
- callOnMainThread([this, protectedThis = makeRef(*this)] {
- close();
- });
- break;
- }
-
- callOnMainThread([this, protectedThis = makeRef(*this), buffer = WTFMove(readBuffer), size = *bytesRead ] {
- if (m_state == Open)
- m_client.didReceiveSocketStreamData(*this, reinterpret_cast<const char*>(buffer.get()), size);
- });
- }
- }
-
- m_writeBuffer = nullptr;
+ m_totalSendDataSize -= length;
+ sendPendingData();
}
-void SocketStreamHandleImpl::handleError(CURLcode errorCode)
+void SocketStreamHandleImpl::didReceiveData(CurlStreamID, const char* data, size_t length)
{
- m_running = false;
- callOnMainThread([this, protectedThis = makeRef(*this), errorCode, localizedDescription = CurlHandle::errorDescription(errorCode).isolatedCopy()] {
- if (m_state == Closed)
- return;
+ if (m_state != Open)
+ return;
- if (errorCode == CURLE_RECV_ERROR)
- m_client.didFailToReceiveSocketStreamData(*this);
- else
- m_client.didFailSocketStream(*this, SocketStreamError(static_cast<int>(errorCode), { }, localizedDescription));
- });
+ m_client.didReceiveSocketStreamData(*this, data, length);
}
-void SocketStreamHandleImpl::stopThread()
+void SocketStreamHandleImpl::didFail(CurlStreamID, CURLcode errorCode)
{
- ASSERT(isMainThread());
+ destructStream();
- m_running = false;
+ if (m_state == Closed)
+ return;
- if (m_workerThread) {
- m_workerThread->waitForCompletion();
- m_workerThread = nullptr;
- }
+ if (errorCode == CURLE_RECV_ERROR)
+ m_client.didFailToReceiveSocketStreamData(*this);
+ else
+ m_client.didFailSocketStream(*this, SocketStreamError(errorCode, m_url, CurlHandle::errorDescription(errorCode)));
}
-void SocketStreamHandleImpl::callOnWorkerThread(Function<void()>&& task)
+void SocketStreamHandleImpl::destructStream()
{
- ASSERT(isMainThread());
- m_taskQueue.append(makeUnique<Function<void()>>(WTFMove(task)));
-}
+ if (isStreamInvalidated())
+ return;
-void SocketStreamHandleImpl::executeTasks()
-{
- ASSERT(!isMainThread());
-
- auto tasks = m_taskQueue.takeAllMessages();
- for (auto& task : tasks)
- (*task)();
+ m_scheduler.destroyStream(m_streamID);
+ m_streamID = invalidCurlStreamID;
}
} // namespace WebCore