Title: [256728] trunk/Source/WebCore
Revision
256728
Author
[email protected]
Date
2020-02-17 04:07:08 -0800 (Mon, 17 Feb 2020)

Log Message

[Curl] Use shared single thread for WebSocket connections
https://bugs.webkit.org/show_bug.cgi?id=187984

Patch by Takashi Komori <[email protected]> on 2020-02-17
Reviewed by Fujii Hironori.

This patch suppresses invoking worker threads for websocket connections.
CurlStreamScheduler starts up to one worker thread.

No new tests. Covered by existing WebSocket tests.

* platform/Curl.cmake:
* platform/network/curl/CurlContext.cpp:
(WebCore::CurlContext::streamScheduler):
(WebCore::CurlHandle::getActiveSocket):
(WebCore::CurlHandle::send):
(WebCore::CurlHandle::receive):
(WebCore::CurlSocketHandle::CurlSocketHandle): Deleted.
(WebCore::CurlSocketHandle::connect): Deleted.
(WebCore::CurlSocketHandle::send): Deleted.
(WebCore::CurlSocketHandle::receive): Deleted.
(WebCore::CurlSocketHandle::wait): Deleted.
* platform/network/curl/CurlContext.h:
* platform/network/curl/CurlStream.cpp: Added.
(WebCore::CurlStream::CurlStream):
(WebCore::CurlStream::~CurlStream):
(WebCore::CurlStream::destroyHandle):
(WebCore::CurlStream::send):
(WebCore::CurlStream::appendMonitoringFd):
(WebCore::CurlStream::tryToTransfer):
(WebCore::CurlStream::tryToReceive):
(WebCore::CurlStream::tryToSend):
(WebCore::CurlStream::notifyFailure):
* platform/network/curl/CurlStream.h: Added.
(WebCore::CurlStream::create):
* platform/network/curl/CurlStreamScheduler.cpp: Added.
(WebCore::CurlStreamScheduler::CurlStreamScheduler):
(WebCore::CurlStreamScheduler::~CurlStreamScheduler):
(WebCore::CurlStreamScheduler::createStream):
(WebCore::CurlStreamScheduler::destroyStream):
(WebCore::CurlStreamScheduler::send):
(WebCore::CurlStreamScheduler::callOnWorkerThread):
(WebCore::CurlStreamScheduler::callClientOnMainThread):
(WebCore::CurlStreamScheduler::startThreadIfNeeded):
(WebCore::CurlStreamScheduler::stopThreadIfNoMoreJobRunning):
(WebCore::CurlStreamScheduler::executeTasks):
(WebCore::CurlStreamScheduler::workerThread):
* platform/network/curl/CurlStreamScheduler.h: Added.
* platform/network/curl/SocketStreamHandleImpl.h:
(WebCore::SocketStreamHandleImpl::isStreamInvalidated):
* platform/network/curl/SocketStreamHandleImplCurl.cpp:
(WebCore::SocketStreamHandleImpl::SocketStreamHandleImpl):
(WebCore::SocketStreamHandleImpl::~SocketStreamHandleImpl):
(WebCore::SocketStreamHandleImpl::platformSendInternal):
(WebCore::SocketStreamHandleImpl::platformClose):
(WebCore::SocketStreamHandleImpl::didOpen):
(WebCore::SocketStreamHandleImpl::didSendData):
(WebCore::SocketStreamHandleImpl::didReceiveData):
(WebCore::SocketStreamHandleImpl::didFail):
(WebCore::SocketStreamHandleImpl::destructStream):
(WebCore::SocketStreamHandleImpl::threadEntryPoint): Deleted.
(WebCore::SocketStreamHandleImpl::handleError): Deleted.
(WebCore::SocketStreamHandleImpl::stopThread): Deleted.
(WebCore::SocketStreamHandleImpl::callOnWorkerThread): Deleted.
(WebCore::SocketStreamHandleImpl::executeTasks): Deleted.

Modified Paths

Added Paths

Diff

Modified: trunk/Source/WebCore/ChangeLog (256727 => 256728)


--- trunk/Source/WebCore/ChangeLog	2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/ChangeLog	2020-02-17 12:07:08 UTC (rev 256728)
@@ -1,3 +1,70 @@
+2020-02-17  Takashi Komori  <[email protected]>
+
+        [Curl] Use shared single thread for WebSocket connections
+        https://bugs.webkit.org/show_bug.cgi?id=187984
+
+        Reviewed by Fujii Hironori.
+
+        This patch suppresses invoking worker threads for websocket connections.
+        CurlStreamScheduler starts up to one worker thread.
+
+        No new tests. Covered by existing WebSocket tests.
+
+        * platform/Curl.cmake:
+        * platform/network/curl/CurlContext.cpp:
+        (WebCore::CurlContext::streamScheduler):
+        (WebCore::CurlHandle::getActiveSocket):
+        (WebCore::CurlHandle::send):
+        (WebCore::CurlHandle::receive):
+        (WebCore::CurlSocketHandle::CurlSocketHandle): Deleted.
+        (WebCore::CurlSocketHandle::connect): Deleted.
+        (WebCore::CurlSocketHandle::send): Deleted.
+        (WebCore::CurlSocketHandle::receive): Deleted.
+        (WebCore::CurlSocketHandle::wait): Deleted.
+        * platform/network/curl/CurlContext.h:
+        * platform/network/curl/CurlStream.cpp: Added.
+        (WebCore::CurlStream::CurlStream):
+        (WebCore::CurlStream::~CurlStream):
+        (WebCore::CurlStream::destroyHandle):
+        (WebCore::CurlStream::send):
+        (WebCore::CurlStream::appendMonitoringFd):
+        (WebCore::CurlStream::tryToTransfer):
+        (WebCore::CurlStream::tryToReceive):
+        (WebCore::CurlStream::tryToSend):
+        (WebCore::CurlStream::notifyFailure):
+        * platform/network/curl/CurlStream.h: Added.
+        (WebCore::CurlStream::create):
+        * platform/network/curl/CurlStreamScheduler.cpp: Added.
+        (WebCore::CurlStreamScheduler::CurlStreamScheduler):
+        (WebCore::CurlStreamScheduler::~CurlStreamScheduler):
+        (WebCore::CurlStreamScheduler::createStream):
+        (WebCore::CurlStreamScheduler::destroyStream):
+        (WebCore::CurlStreamScheduler::send):
+        (WebCore::CurlStreamScheduler::callOnWorkerThread):
+        (WebCore::CurlStreamScheduler::callClientOnMainThread):
+        (WebCore::CurlStreamScheduler::startThreadIfNeeded):
+        (WebCore::CurlStreamScheduler::stopThreadIfNoMoreJobRunning):
+        (WebCore::CurlStreamScheduler::executeTasks):
+        (WebCore::CurlStreamScheduler::workerThread):
+        * platform/network/curl/CurlStreamScheduler.h: Added.
+        * platform/network/curl/SocketStreamHandleImpl.h:
+        (WebCore::SocketStreamHandleImpl::isStreamInvalidated):
+        * platform/network/curl/SocketStreamHandleImplCurl.cpp:
+        (WebCore::SocketStreamHandleImpl::SocketStreamHandleImpl):
+        (WebCore::SocketStreamHandleImpl::~SocketStreamHandleImpl):
+        (WebCore::SocketStreamHandleImpl::platformSendInternal):
+        (WebCore::SocketStreamHandleImpl::platformClose):
+        (WebCore::SocketStreamHandleImpl::didOpen):
+        (WebCore::SocketStreamHandleImpl::didSendData):
+        (WebCore::SocketStreamHandleImpl::didReceiveData):
+        (WebCore::SocketStreamHandleImpl::didFail):
+        (WebCore::SocketStreamHandleImpl::destructStream):
+        (WebCore::SocketStreamHandleImpl::threadEntryPoint): Deleted.
+        (WebCore::SocketStreamHandleImpl::handleError): Deleted.
+        (WebCore::SocketStreamHandleImpl::stopThread): Deleted.
+        (WebCore::SocketStreamHandleImpl::callOnWorkerThread): Deleted.
+        (WebCore::SocketStreamHandleImpl::executeTasks): Deleted.
+
 2020-02-17  Carlos Garcia Campos  <[email protected]>
 
         [WPE] Use custom theme style to render buttons

Modified: trunk/Source/WebCore/platform/Curl.cmake (256727 => 256728)


--- trunk/Source/WebCore/platform/Curl.cmake	2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/Curl.cmake	2020-02-17 12:07:08 UTC (rev 256728)
@@ -22,6 +22,8 @@
     platform/network/curl/CurlResourceHandleDelegate.cpp
     platform/network/curl/CurlSSLHandle.cpp
     platform/network/curl/CurlSSLVerifier.cpp
+    platform/network/curl/CurlStream.cpp
+    platform/network/curl/CurlStreamScheduler.cpp
     platform/network/curl/DNSResolveQueueCurl.cpp
     platform/network/curl/NetworkStorageSessionCurl.cpp
     platform/network/curl/OpenSSLHelper.cpp
@@ -58,6 +60,8 @@
     platform/network/curl/CurlResponse.h
     platform/network/curl/CurlSSLHandle.h
     platform/network/curl/CurlSSLVerifier.h
+    platform/network/curl/CurlStream.h
+    platform/network/curl/CurlStreamScheduler.h
     platform/network/curl/DNSResolveQueueCurl.h
     platform/network/curl/DownloadBundle.h
     platform/network/curl/OpenSSLHelper.h

Modified: trunk/Source/WebCore/platform/network/curl/CurlContext.cpp (256727 => 256728)


--- trunk/Source/WebCore/platform/network/curl/CurlContext.cpp	2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/CurlContext.cpp	2020-02-17 12:07:08 UTC (rev 256728)
@@ -32,6 +32,7 @@
 #include "CurlRequestScheduler.h"
 #include "CurlSSLHandle.h"
 #include "CurlSSLVerifier.h"
+#include "CurlStreamScheduler.h"
 #include "HTTPHeaderMap.h"
 #include <NetworkLoadMetrics.h>
 #include <mutex>
@@ -146,6 +147,12 @@
     curl_easy_cleanup(curl);
 }
 
+CurlStreamScheduler& CurlContext::streamScheduler()
+{
+    static NeverDestroyed<CurlStreamScheduler> sharedInstance;
+    return sharedInstance;
+}
+
 bool CurlContext::isHttp2Enabled() const
 {
     curl_version_info_data* data = ""
@@ -888,116 +895,27 @@
 
 #endif
 
-// CurlSocketHandle
-
-CurlSocketHandle::CurlSocketHandle(const URL& url, Function<void(CURLcode)>&& errorHandler)
-    : m_errorHandler(WTFMove(errorHandler))
+Expected<curl_socket_t, CURLcode> CurlHandle::getActiveSocket()
 {
-    // Libcurl is not responsible for the protocol handling. It just handles connection.
-    // Only scheme, host and port is required.
-    URL urlForConnection;
-    urlForConnection.setProtocol(url.protocolIs("wss") ? "https" : "http");
-    urlForConnection.setHostAndPort(url.hostAndPort());
-    setUrl(urlForConnection);
+    curl_socket_t socket;
 
-    enableConnectionOnly();
-}
+    CURLcode errorCode = curl_easy_getinfo(m_handle, CURLINFO_ACTIVESOCKET, &socket);
+    if (errorCode != CURLE_OK)
+        return makeUnexpected(errorCode);
 
-bool CurlSocketHandle::connect()
-{
-    CURLcode errorCode = perform();
-    if (errorCode != CURLE_OK) {
-        m_errorHandler(errorCode);
-        return false;
-    }
-
-    return true;
+    return socket;
 }
 
-size_t CurlSocketHandle::send(const uint8_t* buffer, size_t size)
+CURLcode CurlHandle::send(const uint8_t* buffer, size_t bufferSize, size_t& bytesSent)
 {
-    size_t totalBytesSent = 0;
-
-    while (totalBytesSent < size) {
-        size_t bytesSent = 0;
-        CURLcode errorCode = curl_easy_send(handle(), buffer + totalBytesSent, size - totalBytesSent, &bytesSent);
-        if (errorCode != CURLE_OK) {
-            if (errorCode != CURLE_AGAIN)
-                m_errorHandler(errorCode);
-            break;
-        }
-
-        totalBytesSent += bytesSent;
-    }
-
-    return totalBytesSent;
+    return curl_easy_send(m_handle, buffer, bufferSize, &bytesSent);
 }
 
-Optional<size_t> CurlSocketHandle::receive(uint8_t* buffer, size_t bufferSize)
+CURLcode CurlHandle::receive(uint8_t* buffer, size_t bufferSize, size_t& bytesRead)
 {
-    size_t bytesRead = 0;
-
-    CURLcode errorCode = curl_easy_recv(handle(), buffer, bufferSize, &bytesRead);
-    if (errorCode != CURLE_OK) {
-        if (errorCode != CURLE_AGAIN)
-            m_errorHandler(errorCode);
-
-        return WTF::nullopt;
-    }
-
-    return bytesRead;
+    return curl_easy_recv(m_handle, buffer, bufferSize, &bytesRead);
 }
 
-Optional<CurlSocketHandle::WaitResult> CurlSocketHandle::wait(const Seconds& timeout, bool alsoWaitForWrite)
-{
-    curl_socket_t socket;
-    CURLcode errorCode = curl_easy_getinfo(handle(), CURLINFO_ACTIVESOCKET, &socket);
-    if (errorCode != CURLE_OK) {
-        m_errorHandler(errorCode);
-        return WTF::nullopt;
-    }
-
-    int64_t usec = timeout.microsecondsAs<int64_t>();
-
-    struct timeval selectTimeout;
-    if (usec <= 0) {
-        selectTimeout.tv_sec = 0;
-        selectTimeout.tv_usec = 0;
-    } else {
-        selectTimeout.tv_sec = usec / 1000000;
-        selectTimeout.tv_usec = usec % 1000000;
-    }
-
-    int rc = 0;
-    int maxfd = static_cast<int>(socket) + 1;
-    fd_set fdread;
-    fd_set fdwrite;
-    fd_set fderr;
-
-    // Retry 'select' if it was interrupted by a process signal.
-    do {
-        FD_ZERO(&fdread);
-        FD_SET(socket, &fdread);
-
-        FD_ZERO(&fdwrite);
-        if (alsoWaitForWrite)
-            FD_SET(socket, &fdwrite);
-
-        FD_ZERO(&fderr);
-        FD_SET(socket, &fderr);
-
-        rc = ::select(maxfd, &fdread, &fdwrite, &fderr, &selectTimeout);
-    } while (rc == -1 && errno == EINTR);
-
-    if (rc <= 0)
-        return WTF::nullopt;
-
-    WaitResult result;
-    result.readable = FD_ISSET(socket, &fdread) || FD_ISSET(socket, &fderr);
-    result.writable = FD_ISSET(socket, &fdwrite);
-    return result;
 }
 
-}
-
 #endif

Modified: trunk/Source/WebCore/platform/network/curl/CurlContext.h (256727 => 256728)


--- trunk/Source/WebCore/platform/network/curl/CurlContext.h	2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/CurlContext.h	2020-02-17 12:07:08 UTC (rev 256728)
@@ -89,6 +89,7 @@
 // CurlContext --------------------------------------------
 
 class CurlRequestScheduler;
+class CurlStreamScheduler;
 
 class CurlContext : public CurlGlobal {
     WTF_MAKE_NONCOPYABLE(CurlContext);
@@ -101,6 +102,7 @@
     const CurlShareHandle& shareHandle() { return m_shareHandle; }
 
     CurlRequestScheduler& scheduler() { return *m_scheduler; }
+    CurlStreamScheduler& streamScheduler();
 
     // Proxy
     const CurlProxySettings& proxySettings() const { return m_proxySettings; }
@@ -293,6 +295,11 @@
 
     static long long maxCurlOffT();
 
+    // socket
+    Expected<curl_socket_t, CURLcode> getActiveSocket();
+    CURLcode send(const uint8_t*, size_t, size_t&);
+    CURLcode receive(uint8_t*, size_t, size_t&);
+
 #ifndef NDEBUG
     void enableVerboseIfUsed();
     void enableStdErrIfUsed();
@@ -313,24 +320,4 @@
     std::unique_ptr<CurlSSLVerifier> m_sslVerifier;
 };
 
-class CurlSocketHandle : public CurlHandle {
-    WTF_MAKE_NONCOPYABLE(CurlSocketHandle);
-
-public:
-    struct WaitResult {
-        bool readable { false };
-        bool writable { false };
-    };
-
-    CurlSocketHandle(const URL&, Function<void(CURLcode)>&& errorHandler);
-
-    bool connect();
-    size_t send(const uint8_t*, size_t);
-    Optional<size_t> receive(uint8_t*, size_t);
-    Optional<WaitResult> wait(const Seconds& timeout, bool alsoWaitForWrite);
-
-private:
-    Function<void(CURLcode)> m_errorHandler;
-};
-
 } // namespace WebCore

Added: trunk/Source/WebCore/platform/network/curl/CurlStream.cpp (0 => 256728)


--- trunk/Source/WebCore/platform/network/curl/CurlStream.cpp	                        (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStream.cpp	2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,193 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "CurlStream.h"
+
+#include "CurlStreamScheduler.h"
+#include "SocketStreamError.h"
+
+#if USE(CURL)
+
+namespace WebCore {
+
+CurlStream::CurlStream(CurlStreamScheduler& scheduler, CurlStreamID streamID, URL&& url)
+    : m_scheduler(scheduler)
+    , m_streamID(streamID)
+{
+    ASSERT(!isMainThread());
+
+    m_curlHandle = WTF::makeUnique<CurlHandle>();
+
+    // Libcurl is not responsible for the protocol handling. It just handles connection.
+    // Only scheme, host and port is required.
+    URL urlForConnection;
+    urlForConnection.setProtocol(url.protocolIs("wss") ? "https" : "http");
+    urlForConnection.setHostAndPort(url.hostAndPort());
+    m_curlHandle->setUrl(urlForConnection);
+
+    m_curlHandle->enableConnectionOnly();
+
+    auto errorCode = m_curlHandle->perform();
+    if (errorCode != CURLE_OK) {
+        notifyFailure(errorCode);
+        return;
+    }
+
+    m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID](Client& client) {
+        client.didOpen(streamID);
+    });
+}
+
+CurlStream::~CurlStream()
+{
+    ASSERT(!isMainThread());
+    destroyHandle();
+}
+
+void CurlStream::destroyHandle()
+{
+    if (!m_curlHandle)
+        return;
+
+    m_curlHandle = nullptr;
+}
+
+void CurlStream::send(UniqueArray<uint8_t>&& buffer, size_t length)
+{
+    ASSERT(!isMainThread());
+
+    if (!m_curlHandle)
+        return;
+
+    m_sendBuffers.append(std::make_pair(WTFMove(buffer), length));
+}
+
+void CurlStream::appendMonitoringFd(fd_set& readfds, fd_set& writefds, fd_set& exceptfds, int& maxfd)
+{
+    ASSERT(!isMainThread());
+
+    if (!m_curlHandle)
+        return;
+
+    auto socket = m_curlHandle->getActiveSocket();
+    if (!socket.has_value()) {
+        notifyFailure(socket.error());
+        return;
+    }
+
+    FD_SET(*socket, &readfds);
+    FD_SET(*socket, &exceptfds);
+
+    if (m_sendBuffers.size())
+        FD_SET(*socket, &writefds);
+
+    if (maxfd < *socket)
+        maxfd = *socket;
+}
+
+void CurlStream::tryToTransfer(const fd_set& readfds, const fd_set& writefds, const fd_set& exceptfds)
+{
+    ASSERT(!isMainThread());
+
+    if (!m_curlHandle)
+        return;
+
+    auto socket = m_curlHandle->getActiveSocket();
+    if (!socket.has_value()) {
+        notifyFailure(socket.error());
+        return;
+    }
+
+    if (FD_ISSET(*socket, &readfds) || FD_ISSET(*socket, &exceptfds))
+        tryToReceive();
+
+    if (FD_ISSET(*socket, &writefds))
+        tryToSend();
+}
+
+void CurlStream::tryToReceive()
+{
+    if (!m_curlHandle)
+        return;
+
+    auto receiveBuffer = makeUniqueArray<uint8_t>(kReceiveBufferSize);
+    size_t bytesReceived = 0;
+
+    auto errorCode = m_curlHandle->receive(receiveBuffer.get(), kReceiveBufferSize, bytesReceived);
+    if (errorCode != CURLE_OK) {
+        if (errorCode != CURLE_AGAIN)
+            notifyFailure(errorCode);
+        return;
+    }
+
+    // 0 bytes indicates a closed connection.
+    if (!bytesReceived)
+        destroyHandle();
+
+    m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID, buffer = WTFMove(receiveBuffer), length = bytesReceived](Client& client) mutable {
+        client.didReceiveData(streamID, reinterpret_cast<const char*>(buffer.get()), length);
+    });
+}
+
+void CurlStream::tryToSend()
+{
+    if (!m_curlHandle || !m_sendBuffers.size())
+        return;
+
+    auto& [buffer, length] = m_sendBuffers.first();
+    size_t bytesSent = 0;
+
+    auto errorCode = m_curlHandle->send(buffer.get() + m_sendBufferOffset, length - m_sendBufferOffset, bytesSent);
+    if (errorCode != CURLE_OK) {
+        if (errorCode != CURLE_AGAIN)
+            notifyFailure(errorCode);
+        return;
+    }
+
+    m_sendBufferOffset += bytesSent;
+
+    if (m_sendBufferOffset >= length) {
+        m_sendBuffers.remove(0);
+        m_sendBufferOffset = 0;
+    }
+
+    m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID, length = bytesSent](Client& client) {
+        client.didSendData(streamID, length);
+    });
+}
+
+void CurlStream::notifyFailure(CURLcode errorCode)
+{
+    destroyHandle();
+
+    m_scheduler.callClientOnMainThread(m_streamID, [streamID = m_streamID, errorCode](Client& client) mutable {
+        client.didFail(streamID, errorCode);
+    });
+}
+
+}
+
+#endif

Added: trunk/Source/WebCore/platform/network/curl/CurlStream.h (0 => 256728)


--- trunk/Source/WebCore/platform/network/curl/CurlStream.h	                        (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStream.h	2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "CurlContext.h"
+#include <wtf/URL.h>
+#include <wtf/UniqueArray.h>
+#include <wtf/Vector.h>
+
+namespace WebCore {
+
+class CurlStreamScheduler;
+class SocketStreamError;
+
+using CurlStreamID = uint16_t;
+const CurlStreamID invalidCurlStreamID = 0;
+
+class CurlStream {
+    WTF_MAKE_FAST_ALLOCATED;
+    WTF_MAKE_NONCOPYABLE(CurlStream);
+public:
+    class Client {
+    public:
+        virtual void didOpen(CurlStreamID) = 0;
+        virtual void didSendData(CurlStreamID, size_t) = 0;
+        virtual void didReceiveData(CurlStreamID, const char*, size_t) = 0;
+        virtual void didFail(CurlStreamID, CURLcode) = 0;
+    };
+
+    static std::unique_ptr<CurlStream> create(CurlStreamScheduler& scheduler, CurlStreamID streamID, URL&& url)
+    {
+        return WTF::makeUnique<CurlStream>(scheduler, streamID, WTFMove(url));
+    }
+
+    CurlStream(CurlStreamScheduler&, CurlStreamID, URL&&);
+    virtual ~CurlStream();
+
+    void send(UniqueArray<uint8_t>&&, size_t);
+
+    void appendMonitoringFd(fd_set&, fd_set&, fd_set&, int&);
+    void tryToTransfer(const fd_set&, const fd_set&, const fd_set&);
+
+private:
+    void destroyHandle();
+
+    void tryToReceive();
+    void tryToSend();
+
+    void notifyFailure(CURLcode);
+
+    static const size_t kReceiveBufferSize = 16 * 1024;
+
+    CurlStreamScheduler& m_scheduler;
+    CurlStreamID m_streamID;
+
+    std::unique_ptr<CurlHandle> m_curlHandle;
+
+    WTF::Vector<std::pair<UniqueArray<uint8_t>, size_t>> m_sendBuffers;
+    size_t m_sendBufferOffset { 0 };
+};
+
+} // namespace WebCore

Added: trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.cpp (0 => 256728)


--- trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.cpp	                        (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.cpp	2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,190 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "CurlStreamScheduler.h"
+
+#if USE(CURL)
+
+namespace WebCore {
+
+CurlStreamScheduler::CurlStreamScheduler()
+{
+    ASSERT(isMainThread());
+}
+
+CurlStreamScheduler::~CurlStreamScheduler()
+{
+    ASSERT(isMainThread());
+}
+
+CurlStreamID CurlStreamScheduler::createStream(const URL& url, CurlStream::Client& client)
+{
+    ASSERT(isMainThread());
+
+    do {
+        m_currentStreamID = (m_currentStreamID + 1 != invalidCurlStreamID) ? m_currentStreamID + 1 : 1;
+    } while (m_clientList.contains(m_currentStreamID));
+
+    auto streamID = m_currentStreamID;
+    m_clientList.add(streamID, &client);
+
+    callOnWorkerThread([this, streamID, url = "" mutable {
+        m_streamList.add(streamID, CurlStream::create(*this, streamID, WTFMove(url)));
+    });
+
+    return streamID;
+}
+
+void CurlStreamScheduler::destroyStream(CurlStreamID streamID)
+{
+    ASSERT(isMainThread());
+
+    if (m_clientList.contains(streamID))
+        m_clientList.remove(streamID);
+
+    callOnWorkerThread([this, streamID]() {
+        if (m_streamList.contains(streamID))
+            m_streamList.remove(streamID);
+    });
+}
+
+void CurlStreamScheduler::send(CurlStreamID streamID, UniqueArray<uint8_t>&& data, size_t length)
+{
+    ASSERT(isMainThread());
+
+    callOnWorkerThread([this, streamID, data = "" length]() mutable {
+        if (auto stream = m_streamList.get(streamID))
+            stream->send(WTFMove(data), length);
+    });
+}
+
+void CurlStreamScheduler::callOnWorkerThread(WTF::Function<void()>&& task)
+{
+    ASSERT(isMainThread());
+
+    {
+        auto locker = holdLock(m_mutex);
+        m_taskQueue.append(WTFMove(task));
+    }
+
+    startThreadIfNeeded();
+}
+
+void CurlStreamScheduler::callClientOnMainThread(CurlStreamID streamID, WTF::Function<void(CurlStream::Client&)>&& task)
+{
+    ASSERT(!isMainThread());
+
+    callOnMainThread([this, streamID, task = WTFMove(task)]() {
+        if (auto client = m_clientList.get(streamID))
+            task(*client);
+    });
+}
+
+void CurlStreamScheduler::startThreadIfNeeded()
+{
+    {
+        auto locker = holdLock(m_mutex);
+        if (m_runThread)
+            return;
+    }
+
+    if (m_thread)
+        m_thread->waitForCompletion();
+
+    m_runThread = true;
+
+    m_thread = Thread::create("curlStreamThread", [this] {
+        workerThread();
+    });
+}
+
+void CurlStreamScheduler::stopThreadIfNoMoreJobRunning()
+{
+    ASSERT(!isMainThread());
+
+    if (m_streamList.size())
+        return;
+
+    auto locker = holdLock(m_mutex);
+    if (m_taskQueue.size())
+        return;
+
+    m_runThread = false;
+}
+
+void CurlStreamScheduler::executeTasks()
+{
+    ASSERT(!isMainThread());
+
+    Vector<WTF::Function<void()>> taskQueue;
+
+    {
+        auto locker = holdLock(m_mutex);
+        taskQueue = WTFMove(m_taskQueue);
+    }
+
+    for (auto& task : taskQueue)
+        task();
+}
+
+void CurlStreamScheduler::workerThread()
+{
+    ASSERT(!isMainThread());
+    static const int selectTimeoutMS = 20;
+    struct timeval timeout { 0, selectTimeoutMS * 1000};
+
+    while (m_runThread) {
+        executeTasks();
+
+        int rc = 0;
+        fd_set readfds;
+        fd_set writefds;
+        fd_set exceptfds;
+
+        do {
+            int maxfd = -1;
+
+            FD_ZERO(&readfds);
+            FD_ZERO(&writefds);
+            FD_ZERO(&exceptfds);
+
+            for (auto& stream : m_streamList.values())
+                stream->appendMonitoringFd(readfds, writefds, exceptfds, maxfd);
+
+            if (maxfd >= 0)
+                rc = ::select(maxfd + 1, &readfds, &writefds, &exceptfds, &timeout);
+        } while (rc == -1 && errno == EINTR);
+
+        for (auto& stream : m_streamList.values())
+            stream->tryToTransfer(readfds, writefds, exceptfds);
+
+        stopThreadIfNoMoreJobRunning();
+    }
+}
+
+}
+
+#endif

Added: trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.h (0 => 256728)


--- trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.h	                        (rev 0)
+++ trunk/Source/WebCore/platform/network/curl/CurlStreamScheduler.h	2020-02-17 12:07:08 UTC (rev 256728)
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+#include "CurlStream.h"
+#include <wtf/Function.h>
+#include <wtf/HashMap.h>
+
+namespace WebCore {
+
+class CurlStreamScheduler {
+    WTF_MAKE_FAST_ALLOCATED;
+    WTF_MAKE_NONCOPYABLE(CurlStreamScheduler);
+public:
+    CurlStreamScheduler();
+    virtual ~CurlStreamScheduler();
+
+    CurlStreamID createStream(const URL&, CurlStream::Client&);
+    void destroyStream(CurlStreamID);
+    void send(CurlStreamID, UniqueArray<uint8_t>&&, size_t);
+
+    void callOnWorkerThread(WTF::Function<void()>&&);
+    void callClientOnMainThread(CurlStreamID, WTF::Function<void(CurlStream::Client&)>&&);
+
+private:
+    void startThreadIfNeeded();
+    void stopThreadIfNoMoreJobRunning();
+
+    void executeTasks();
+
+    void workerThread();
+
+    Lock m_mutex;
+    RefPtr<Thread> m_thread;
+    bool m_runThread { false };
+
+    CurlStreamID m_currentStreamID = 1;
+
+    Vector<Function<void()>> m_taskQueue;
+    HashMap<CurlStreamID, CurlStream::Client*> m_clientList;
+    HashMap<CurlStreamID, std::unique_ptr<CurlStream>> m_streamList;
+};
+
+} // namespace WebCore

Modified: trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h (256727 => 256728)


--- trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h	2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImpl.h	2020-02-17 12:07:08 UTC (rev 256728)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2009-2018 Apple Inc. All rights reserved.
  * Copyright (C) 2009 Google Inc.  All rights reserved.
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are
@@ -31,16 +32,10 @@
 
 #pragma once
 
-#include "CurlContext.h"
+#include "CurlStream.h"
 #include "SocketStreamHandle.h"
 #include <pal/SessionID.h>
-#include <wtf/Function.h>
-#include <wtf/Lock.h>
-#include <wtf/MessageQueue.h>
-#include <wtf/RefCounted.h>
 #include <wtf/StreamBuffer.h>
-#include <wtf/Threading.h>
-#include <wtf/UniqueArray.h>
 
 namespace WebCore {
 
@@ -47,7 +42,7 @@
 class SocketStreamHandleClient;
 class StorageSessionProvider;
 
-class SocketStreamHandleImpl : public SocketStreamHandle {
+class SocketStreamHandleImpl : public SocketStreamHandle, public CurlStream::Client {
 public:
     static Ref<SocketStreamHandleImpl> create(const URL& url, SocketStreamHandleClient& client, PAL::SessionID, const String&, SourceApplicationAuditToken&&, const StorageSessionProvider* provider) { return adoptRef(*new SocketStreamHandleImpl(url, client, provider)); }
 
@@ -64,28 +59,22 @@
     Optional<size_t> platformSendInternal(const uint8_t*, size_t);
     bool sendPendingData();
 
-    void threadEntryPoint(const URL&);
-    void handleError(CURLcode);
-    void stopThread();
+    void didOpen(CurlStreamID) final;
+    void didSendData(CurlStreamID, size_t) final;
+    void didReceiveData(CurlStreamID, const char*, size_t) final;
+    void didFail(CurlStreamID, CURLcode) final;
 
-    void callOnWorkerThread(Function<void()>&&);
-    void executeTasks();
+    bool isStreamInvalidated() { return m_streamID == invalidCurlStreamID; }
+    void destructStream();
 
-    static const size_t kReadBufferSize = 4 * 1024;
-
     RefPtr<const StorageSessionProvider> m_storageSessionProvider;
-    RefPtr<Thread> m_workerThread;
-    std::atomic<bool> m_running { true };
 
-    MessageQueue<Function<void()>> m_taskQueue;
-
-    bool m_hasPendingWriteData { false };
-    size_t m_writeBufferSize { 0 };
-    size_t m_writeBufferOffset { 0 };
-    UniqueArray<uint8_t> m_writeBuffer;
-
     StreamBuffer<uint8_t, 1024 * 1024> m_buffer;
     static const unsigned maxBufferSize = 100 * 1024 * 1024;
+
+    CurlStreamScheduler& m_scheduler;
+    CurlStreamID m_streamID { invalidCurlStreamID };
+    unsigned m_totalSendDataSize { 0 };
 };
 
 } // namespace WebCore

Modified: trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp (256727 => 256728)


--- trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp	2020-02-17 10:54:46 UTC (rev 256727)
+++ trunk/Source/WebCore/platform/network/curl/SocketStreamHandleImplCurl.cpp	2020-02-17 12:07:08 UTC (rev 256728)
@@ -1,7 +1,7 @@
 /*
  * Copyright (C) 2009 Brent Fulgham.  All rights reserved.
  * Copyright (C) 2009 Google Inc.  All rights reserved.
- * Copyright (C) 2018 Sony Interactive Entertainment Inc.
+ * Copyright (C) 2020 Sony Interactive Entertainment Inc.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are
@@ -35,14 +35,11 @@
 
 #if USE(CURL)
 
+#include "CurlStreamScheduler.h"
 #include "DeprecatedGlobalSettings.h"
-#include "Logging.h"
 #include "SocketStreamError.h"
 #include "SocketStreamHandleClient.h"
 #include "StorageSessionProvider.h"
-#include <wtf/MainThread.h>
-#include <wtf/URL.h>
-#include <wtf/text/CString.h>
 
 namespace WebCore {
 
@@ -49,169 +46,92 @@
 SocketStreamHandleImpl::SocketStreamHandleImpl(const URL& url, SocketStreamHandleClient& client, const StorageSessionProvider* provider)
     : SocketStreamHandle(url, client)
     , m_storageSessionProvider(provider)
+    , m_scheduler(CurlContext::singleton().streamScheduler())
 {
-    LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);
-    ASSERT(isMainThread());
-
     // FIXME: Using DeprecatedGlobalSettings from here is a layering violation.
     if (m_url.protocolIs("wss") && DeprecatedGlobalSettings::allowsAnySSLCertificate())
         CurlContext::singleton().sslHandle().setIgnoreSSLErrors(true);
 
-    m_workerThread = Thread::create("WebSocket thread", [this, protectedThis = makeRef(*this), url = "" {
-        threadEntryPoint(url);
-    });
+    m_streamID = m_scheduler.createStream(m_url, *this);
 }
 
 SocketStreamHandleImpl::~SocketStreamHandleImpl()
 {
-    LOG(Network, "SocketStreamHandle %p delete", this);
-    stopThread();
+    destructStream();
 }
 
 Optional<size_t> SocketStreamHandleImpl::platformSendInternal(const uint8_t* data, size_t length)
 {
-    LOG(Network, "SocketStreamHandle %p platformSend", this);
-    ASSERT(isMainThread());
+    if (isStreamInvalidated())
+        return WTF::nullopt;
 
-    if (m_hasPendingWriteData)
+    if (m_totalSendDataSize + length > maxBufferSize)
         return 0;
+    m_totalSendDataSize += length;
 
-    m_hasPendingWriteData = true;
+    auto buffer = makeUniqueArray<uint8_t>(length);
+    memcpy(buffer.get(), data, length);
 
-    auto writeBuffer = makeUniqueArray<uint8_t>(length);
-    memcpy(writeBuffer.get(), data, length);
-
-    callOnWorkerThread([this, writeBuffer = WTFMove(writeBuffer), writeBufferSize = length]() mutable {
-        ASSERT(!isMainThread());
-        m_writeBuffer = WTFMove(writeBuffer);
-        m_writeBufferSize = writeBufferSize;
-        m_writeBufferOffset = 0;
-    });
-
+    m_scheduler.send(m_streamID, WTFMove(buffer), length);
     return length;
 }
 
 void SocketStreamHandleImpl::platformClose()
 {
-    LOG(Network, "SocketStreamHandle %p platformClose", this);
-    ASSERT(isMainThread());
+    destructStream();
 
     if (m_state == Closed)
         return;
     m_state = Closed;
 
-    stopThread();
     m_client.didCloseSocketStream(*this);
 }
 
-void SocketStreamHandleImpl::threadEntryPoint(const URL& url)
+void SocketStreamHandleImpl::didOpen(CurlStreamID)
 {
-    ASSERT(!isMainThread());
-
-    CurlSocketHandle socket { url, [this](CURLcode errorCode) {
-        handleError(errorCode);
-    }};
-
-    // Connect to host
-    if (!socket.connect())
+    if (m_state != Connecting)
         return;
+    m_state = Open;
 
-    callOnMainThread([this, protectedThis = makeRef(*this)] {
-        if (m_state == Connecting) {
-            m_state = Open;
-            m_client.didOpenSocketStream(*this);
-        }
-    });
+    m_client.didOpenSocketStream(*this);
+}
 
-    while (m_running) {
-        executeTasks();
+void SocketStreamHandleImpl::didSendData(CurlStreamID, size_t length)
+{
+    ASSERT(m_totalSendDataSize - length >= 0);
 
-        auto result = socket.wait(20_ms, m_writeBuffer.get());
-        if (!result)
-            continue;
-
-        // These logic only run when there's data waiting.
-        if (result->writable && m_running) {
-            auto bytesSent = socket.send(m_writeBuffer.get() + m_writeBufferOffset, m_writeBufferSize - m_writeBufferOffset);
-            m_writeBufferOffset += bytesSent;
-
-            if (m_writeBufferSize <= m_writeBufferOffset) {
-                m_writeBuffer = nullptr;
-                m_writeBufferSize = 0;
-                m_writeBufferOffset = 0;
-
-                callOnMainThread([this, protectedThis = makeRef(*this)] {
-                    m_hasPendingWriteData = false;
-                    sendPendingData();
-                });
-            }
-        }
-
-        if (result->readable && m_running) {
-            auto readBuffer = makeUniqueArray<uint8_t>(kReadBufferSize);
-            auto bytesRead = socket.receive(readBuffer.get(), kReadBufferSize);
-            // `nullopt` result means nothing to handle at this moment.
-            if (!bytesRead)
-                continue;
-
-            // 0 bytes indicates a closed connection.
-            if (!*bytesRead) {
-                m_running = false;
-                callOnMainThread([this, protectedThis = makeRef(*this)] {
-                    close();
-                });
-                break;
-            }
-
-            callOnMainThread([this, protectedThis = makeRef(*this), buffer = WTFMove(readBuffer), size = *bytesRead ] {
-                if (m_state == Open)
-                    m_client.didReceiveSocketStreamData(*this, reinterpret_cast<const char*>(buffer.get()), size);
-            });
-        }
-    }
-
-    m_writeBuffer = nullptr;
+    m_totalSendDataSize -= length;
+    sendPendingData();
 }
 
-void SocketStreamHandleImpl::handleError(CURLcode errorCode)
+void SocketStreamHandleImpl::didReceiveData(CurlStreamID, const char* data, size_t length)
 {
-    m_running = false;
-    callOnMainThread([this, protectedThis = makeRef(*this), errorCode, localizedDescription = CurlHandle::errorDescription(errorCode).isolatedCopy()] {
-        if (m_state == Closed)
-            return;
+    if (m_state != Open)
+        return;
 
-        if (errorCode == CURLE_RECV_ERROR)
-            m_client.didFailToReceiveSocketStreamData(*this);
-        else
-            m_client.didFailSocketStream(*this, SocketStreamError(static_cast<int>(errorCode), { }, localizedDescription));
-    });
+    m_client.didReceiveSocketStreamData(*this, data, length);
 }
 
-void SocketStreamHandleImpl::stopThread()
+void SocketStreamHandleImpl::didFail(CurlStreamID, CURLcode errorCode)
 {
-    ASSERT(isMainThread());
+    destructStream();
 
-    m_running = false;
+    if (m_state == Closed)
+        return;
 
-    if (m_workerThread) {
-        m_workerThread->waitForCompletion();
-        m_workerThread = nullptr;
-    }
+    if (errorCode == CURLE_RECV_ERROR)
+        m_client.didFailToReceiveSocketStreamData(*this);
+    else
+        m_client.didFailSocketStream(*this, SocketStreamError(errorCode, m_url, CurlHandle::errorDescription(errorCode)));
 }
 
-void SocketStreamHandleImpl::callOnWorkerThread(Function<void()>&& task)
+void SocketStreamHandleImpl::destructStream()
 {
-    ASSERT(isMainThread());
-    m_taskQueue.append(makeUnique<Function<void()>>(WTFMove(task)));
-}
+    if (isStreamInvalidated())
+        return;
 
-void SocketStreamHandleImpl::executeTasks()
-{
-    ASSERT(!isMainThread());
-
-    auto tasks = m_taskQueue.takeAllMessages();
-    for (auto& task : tasks)
-        (*task)();
+    m_scheduler.destroyStream(m_streamID);
+    m_streamID = invalidCurlStreamID;
 }
 
 } // namespace WebCore
_______________________________________________
webkit-changes mailing list
[email protected]
https://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to