https://github.com/JDevlieghere created https://github.com/llvm/llvm-project/pull/144610
Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily to reading partial JSON RPC packets. >From 696ad6138af43aabcb74d3d55121420ae4bca08a Mon Sep 17 00:00:00 2001 From: Jonas Devlieghere <jo...@devlieghere.com> Date: Tue, 17 Jun 2025 14:38:44 -0500 Subject: [PATCH] [lldb] Support non-blocking reads in JSONRPCTransport Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily to reading partial JSON RPC packets. --- lldb/include/lldb/Host/JSONTransport.h | 19 +++++++--- lldb/source/Host/common/JSONTransport.cpp | 42 +++++++++++++--------- lldb/unittests/Host/JSONTransportTest.cpp | 43 ++++++++++++++++++++--- 3 files changed, 79 insertions(+), 25 deletions(-) diff --git a/lldb/include/lldb/Host/JSONTransport.h b/lldb/include/lldb/Host/JSONTransport.h index 4087cdf2b42f7..36a67c929a1c6 100644 --- a/lldb/include/lldb/Host/JSONTransport.h +++ b/lldb/include/lldb/Host/JSONTransport.h @@ -85,7 +85,8 @@ class JSONTransport { /// Reads the next message from the input stream. template <typename T> - llvm::Expected<T> Read(const std::chrono::microseconds &timeout) { + llvm::Expected<T> + Read(std::optional<std::chrono::microseconds> timeout = std::nullopt) { llvm::Expected<std::string> message = ReadImpl(timeout); if (!message) return message.takeError(); @@ -97,10 +98,20 @@ class JSONTransport { virtual llvm::Error WriteImpl(const std::string &message) = 0; virtual llvm::Expected<std::string> - ReadImpl(const std::chrono::microseconds &timeout) = 0; + ReadImpl(std::optional<std::chrono::microseconds> timeout) = 0; + + llvm::Expected<std::string> + ReadFull(IOObject &descriptor, size_t length, + std::optional<std::chrono::microseconds> timeout) const; + + llvm::Expected<std::string> + ReadUntil(IOObject &descriptor, llvm::StringRef delimiter, + std::optional<std::chrono::microseconds> timeout); lldb::IOObjectSP m_input; lldb::IOObjectSP m_output; + + std::string m_buffer; }; /// A transport class for JSON with a HTTP header. @@ -113,7 +124,7 @@ class HTTPDelimitedJSONTransport : public JSONTransport { protected: virtual llvm::Error WriteImpl(const std::string &message) override; virtual llvm::Expected<std::string> - ReadImpl(const std::chrono::microseconds &timeout) override; + ReadImpl(std::optional<std::chrono::microseconds> timeout) override; // FIXME: Support any header. static constexpr llvm::StringLiteral kHeaderContentLength = @@ -131,7 +142,7 @@ class JSONRPCTransport : public JSONTransport { protected: virtual llvm::Error WriteImpl(const std::string &message) override; virtual llvm::Expected<std::string> - ReadImpl(const std::chrono::microseconds &timeout) override; + ReadImpl(std::optional<std::chrono::microseconds> timeout) override; static constexpr llvm::StringLiteral kMessageSeparator = "\n"; }; diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp index 1a0851d5c4365..0fae74fb87b68 100644 --- a/lldb/source/Host/common/JSONTransport.cpp +++ b/lldb/source/Host/common/JSONTransport.cpp @@ -27,9 +27,9 @@ using namespace lldb_private; /// ReadFull attempts to read the specified number of bytes. If EOF is /// encountered, an empty string is returned. -static Expected<std::string> -ReadFull(IOObject &descriptor, size_t length, - std::optional<std::chrono::microseconds> timeout = std::nullopt) { +Expected<std::string> JSONTransport::ReadFull( + IOObject &descriptor, size_t length, + std::optional<std::chrono::microseconds> timeout) const { if (!descriptor.IsValid()) return llvm::make_error<TransportInvalidError>(); @@ -67,19 +67,22 @@ ReadFull(IOObject &descriptor, size_t length, return data.substr(0, length); } -static Expected<std::string> -ReadUntil(IOObject &descriptor, StringRef delimiter, - std::optional<std::chrono::microseconds> timeout = std::nullopt) { - std::string buffer; - buffer.reserve(delimiter.size() + 1); - while (!llvm::StringRef(buffer).ends_with(delimiter)) { +Expected<std::string> +JSONTransport::ReadUntil(IOObject &descriptor, StringRef delimiter, + std::optional<std::chrono::microseconds> timeout) { + if (!timeout || *timeout != std::chrono::microseconds::zero()) { + m_buffer.clear(); + m_buffer.reserve(delimiter.size() + 1); + } + + while (!llvm::StringRef(m_buffer).ends_with(delimiter)) { Expected<std::string> next = - ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout); + ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout); if (auto Err = next.takeError()) return std::move(Err); - buffer += *next; + m_buffer += *next; } - return buffer.substr(0, buffer.size() - delimiter.size()); + return m_buffer.substr(0, m_buffer.size() - delimiter.size()); } JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output) @@ -89,11 +92,15 @@ void JSONTransport::Log(llvm::StringRef message) { LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message); } -Expected<std::string> -HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) { +Expected<std::string> HTTPDelimitedJSONTransport::ReadImpl( + std::optional<std::chrono::microseconds> timeout) { if (!m_input || !m_input->IsValid()) return llvm::make_error<TransportInvalidError>(); + if (timeout && *timeout == std::chrono::microseconds::zero()) + return llvm::createStringError( + "HTTPDelimitedJSONTransport does not support non-blocking reads"); + IOObject *input = m_input.get(); Expected<std::string> message_header = ReadFull(*input, kHeaderContentLength.size(), timeout); @@ -104,7 +111,8 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) { kHeaderContentLength, *message_header) .str()); - Expected<std::string> raw_length = ReadUntil(*input, kHeaderSeparator); + Expected<std::string> raw_length = + ReadUntil(*input, kHeaderSeparator, timeout); if (!raw_length) return handleErrors(raw_length.takeError(), [&](const TransportEOFError &E) -> llvm::Error { @@ -117,7 +125,7 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) { return createStringError( formatv("invalid content length {0}", *raw_length).str()); - Expected<std::string> raw_json = ReadFull(*input, length); + Expected<std::string> raw_json = ReadFull(*input, length, timeout); if (!raw_json) return handleErrors( raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error { @@ -143,7 +151,7 @@ Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) { } Expected<std::string> -JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) { +JSONRPCTransport::ReadImpl(std::optional<std::chrono::microseconds> timeout) { if (!m_input || !m_input->IsValid()) return make_error<TransportInvalidError>(); diff --git a/lldb/unittests/Host/JSONTransportTest.cpp b/lldb/unittests/Host/JSONTransportTest.cpp index 4621869887ac8..cc43d7d851cb1 100644 --- a/lldb/unittests/Host/JSONTransportTest.cpp +++ b/lldb/unittests/Host/JSONTransportTest.cpp @@ -16,7 +16,7 @@ using namespace lldb_private; namespace { template <typename T> class JSONTransportTest : public PipeTest { protected: - std::unique_ptr<JSONTransport> transport; + std::unique_ptr<T> transport; void SetUp() override { PipeTest::SetUp(); @@ -36,7 +36,13 @@ class HTTPDelimitedJSONTransportTest using JSONTransportTest::JSONTransportTest; }; -class JSONRPCTransportTest : public JSONTransportTest<JSONRPCTransport> { +class TestJSONRPCTransport : public JSONRPCTransport { +public: + using JSONRPCTransport::JSONRPCTransport; + using JSONRPCTransport::WriteImpl; // For partial writes. +}; + +class JSONRPCTransportTest : public JSONTransportTest<TestJSONRPCTransport> { public: using JSONTransportTest::JSONTransportTest; }; @@ -84,7 +90,6 @@ TEST_F(HTTPDelimitedJSONTransportTest, ReadWithEOF) { Failed<TransportEOFError>()); } - TEST_F(HTTPDelimitedJSONTransportTest, InvalidTransport) { transport = std::make_unique<HTTPDelimitedJSONTransport>(nullptr, nullptr); ASSERT_THAT_EXPECTED( @@ -142,13 +147,43 @@ TEST_F(JSONRPCTransportTest, Write) { } TEST_F(JSONRPCTransportTest, InvalidTransport) { - transport = std::make_unique<JSONRPCTransport>(nullptr, nullptr); + transport = std::make_unique<TestJSONRPCTransport>(nullptr, nullptr); ASSERT_THAT_EXPECTED( transport->Read<JSONTestType>(std::chrono::milliseconds(1)), Failed<TransportInvalidError>()); } #ifndef _WIN32 +TEST_F(HTTPDelimitedJSONTransportTest, NonBlockingRead) { + ASSERT_THAT_EXPECTED( + transport->Read<JSONTestType>(std::chrono::microseconds::zero()), + llvm::FailedWithMessage( + "HTTPDelimitedJSONTransport does not support non-blocking reads")); +} + +TEST_F(JSONRPCTransportTest, NonBlockingRead) { + llvm::StringRef head = R"({"str")"; + llvm::StringRef tail = R"(: "foo"})" + "\n"; + + ASSERT_THAT_EXPECTED(input.Write(head.data(), head.size()), Succeeded()); + ASSERT_THAT_EXPECTED( + transport->Read<JSONTestType>(std::chrono::microseconds::zero()), + Failed<TransportTimeoutError>()); + + ASSERT_THAT_EXPECTED(input.Write(tail.data(), tail.size()), Succeeded()); + while (true) { + llvm::Expected<JSONTestType> result = + transport->Read<JSONTestType>(std::chrono::microseconds::zero()); + if (result.errorIsA<TransportTimeoutError>()) { + llvm::consumeError(result.takeError()); + continue; + } + ASSERT_THAT_EXPECTED(result, HasValue(testing::FieldsAre(/*str=*/"foo"))); + break; + } +} + TEST_F(HTTPDelimitedJSONTransportTest, ReadWithTimeout) { ASSERT_THAT_EXPECTED( transport->Read<JSONTestType>(std::chrono::milliseconds(1)), _______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits