mgorny updated this revision to Diff 455295.
mgorny added a comment.
Update to the version with error support.
CHANGES SINCE LAST ACTION
https://reviews.llvm.org/D132578/new/
https://reviews.llvm.org/D132578
Files:
lldb/include/lldb/Core/Communication.h
lldb/source/Core/Communication.cpp
lldb/unittests/Core/CommunicationTest.cpp
Index: lldb/unittests/Core/CommunicationTest.cpp
===================================================================
--- lldb/unittests/Core/CommunicationTest.cpp
+++ lldb/unittests/Core/CommunicationTest.cpp
@@ -118,6 +118,103 @@
CommunicationReadTest(/*use_thread=*/true);
}
+static void CommunicationWriteTest(bool use_read_thread) {
+ Pipe pipe;
+ ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(),
+ llvm::Succeeded());
+
+ Communication comm("test");
+ comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(
+ pipe.ReleaseWriteFileDescriptor(), /*owns_fd=*/true));
+ comm.SetCloseOnEOF(true);
+
+ if (use_read_thread)
+ ASSERT_TRUE(comm.StartReadThread());
+
+ // In our test case, a short Write() should be atomic.
+ lldb::ConnectionStatus status = lldb::eConnectionStatusSuccess;
+ Status error;
+ std::string test_str{"test"};
+ EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 4U);
+ EXPECT_EQ(status, lldb::eConnectionStatusSuccess);
+ EXPECT_THAT_ERROR(error.ToError(), llvm::Succeeded());
+
+ char buf[5];
+ size_t bytes_read;
+ ASSERT_THAT_ERROR(
+ pipe.ReadWithTimeout(buf, 4U, std::chrono::seconds(0), bytes_read)
+ .ToError(),
+ llvm::Succeeded());
+ buf[4] = 0;
+ EXPECT_EQ(test_str, buf);
+
+ // Test WriteAll() too.
+ test_str[3] = '2';
+ error.Clear();
+ EXPECT_EQ(comm.WriteAll(test_str.data(), test_str.size(), status, &error),
+ 4U);
+ EXPECT_EQ(status, lldb::eConnectionStatusSuccess);
+ EXPECT_THAT_ERROR(error.ToError(), llvm::Succeeded());
+
+ ASSERT_THAT_ERROR(
+ pipe.ReadWithTimeout(buf, 4U, std::chrono::seconds(0), bytes_read)
+ .ToError(),
+ llvm::Succeeded());
+ buf[4] = 0;
+ EXPECT_EQ(test_str, buf);
+
+ EXPECT_TRUE(comm.StopReadThread());
+
+ // Test using Communication without a connection.
+ comm.SetConnection(nullptr);
+ pipe.CloseReadFileDescriptor();
+ if (use_read_thread)
+ ASSERT_TRUE(comm.StartReadThread());
+ error.Clear();
+ EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 0U);
+ EXPECT_EQ(status, lldb::eConnectionStatusNoConnection);
+ EXPECT_THAT_ERROR(error.ToError(), llvm::Failed());
+ EXPECT_TRUE(comm.StopReadThread());
+
+ // Test using Communication that is disconnected.
+ ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(),
+ llvm::Succeeded());
+ comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(
+ pipe.ReleaseWriteFileDescriptor(), /*owns_fd=*/true));
+ comm.SetCloseOnEOF(true);
+ ASSERT_EQ(comm.Disconnect(), lldb::eConnectionStatusSuccess);
+ if (use_read_thread)
+ ASSERT_TRUE(comm.StartReadThread());
+ error.Clear();
+ EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 0U);
+ EXPECT_EQ(status, lldb::eConnectionStatusNoConnection);
+ EXPECT_THAT_ERROR(error.ToError(), llvm::Failed());
+ EXPECT_TRUE(comm.StopReadThread());
+
+ // Test using the wrong end of a pipe.
+ pipe.CloseReadFileDescriptor();
+ ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(),
+ llvm::Succeeded());
+ comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(
+ pipe.ReleaseReadFileDescriptor(), /*owns_fd=*/true));
+ comm.SetCloseOnEOF(true);
+ if (use_read_thread)
+ ASSERT_TRUE(comm.StartReadThread());
+ error.Clear();
+ EXPECT_EQ(comm.Write(test_str.data(), test_str.size(), status, &error), 0U);
+ EXPECT_EQ(status, lldb::eConnectionStatusError);
+ EXPECT_THAT_ERROR(error.ToError(), llvm::Failed());
+ EXPECT_TRUE(comm.StopReadThread());
+}
+
+TEST(CommunicationTest, Write) {
+ CommunicationWriteTest(/*use_thread=*/false);
+}
+
+TEST(CommunicationTest, WriteThread) {
+ CommunicationWriteTest(/*use_thread=*/true);
+}
+
TEST(CommunicationTest, StopReadThread) {
std::condition_variable finished;
std::mutex finished_mutex;
Index: lldb/source/Core/Communication.cpp
===================================================================
--- lldb/source/Core/Communication.cpp
+++ lldb/source/Core/Communication.cpp
@@ -186,21 +186,55 @@
size_t Communication::Write(const void *src, size_t src_len,
ConnectionStatus &status, Status *error_ptr) {
- lldb::ConnectionSP connection_sp(m_connection_sp);
-
std::lock_guard<std::mutex> guard(m_write_mutex);
LLDB_LOG(GetLog(LLDBLog::Communication),
"{0} Communication::Write (src = {1}, src_len = {2}"
") connection = {3}",
- this, src, (uint64_t)src_len, connection_sp.get());
+ this, src, (uint64_t)src_len, m_connection_sp.get());
- if (connection_sp)
- return connection_sp->Write(src, src_len, status, error_ptr);
+ if (m_read_thread_enabled) {
+ assert(m_write_bytes.empty());
- if (error_ptr)
- error_ptr->SetErrorString("Invalid connection.");
- status = eConnectionStatusNoConnection;
- return 0;
+ if (!m_connection_sp) {
+ if (error_ptr)
+ error_ptr->SetErrorString("Invalid connection.");
+ status = eConnectionStatusNoConnection;
+ return 0;
+ }
+
+ ListenerSP listener_sp(Listener::MakeListener("Communication::Write"));
+ listener_sp->StartListeningForEvents(
+ this, eBroadcastBitWriteDone | eBroadcastBitReadThreadDidExit);
+
+ m_write_bytes.assign(static_cast<const char *>(src), src_len);
+ m_io_loop->AddPendingCallback(
+ [this](MainLoopBase &loop) { WriteThread(); });
+ m_io_loop->TriggerPendingCallbacks();
+
+ EventSP event_sp;
+ while (m_read_thread_enabled) {
+ if (listener_sp->GetEvent(event_sp, std::chrono::seconds(5))) {
+ const uint32_t event_type = event_sp->GetType();
+ if (event_type & eBroadcastBitWriteDone) {
+ size_t ret = src_len - m_write_bytes.size();
+ status = m_pass_status;
+ if (error_ptr)
+ *error_ptr = std::move(m_pass_error);
+ m_write_bytes.clear();
+ return ret;
+ }
+
+ if (event_type & eBroadcastBitReadThreadDidExit)
+ break;
+ }
+ }
+
+ // If read thread exited before performing the write, fall back
+ // to writing directly.
+ m_write_bytes.clear();
+ }
+
+ return WriteToConnection(src, src_len, status, error_ptr);
}
size_t Communication::WriteAll(const void *src, size_t src_len,
@@ -331,6 +365,19 @@
return 0;
}
+size_t Communication::WriteToConnection(const void *src, size_t src_len,
+ ConnectionStatus &status,
+ Status *error_ptr) {
+ lldb::ConnectionSP connection_sp(m_connection_sp);
+ if (connection_sp)
+ return connection_sp->Write(src, src_len, status, error_ptr);
+
+ if (error_ptr)
+ error_ptr->SetErrorString("Invalid connection.");
+ status = eConnectionStatusNoConnection;
+ return 0;
+}
+
bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; }
lldb::thread_result_t Communication::ReadThread() {
@@ -430,6 +477,24 @@
return {};
}
+void Communication::WriteThread() {
+ // There should be only one pending request queued.
+ assert(!m_write_bytes.empty());
+
+ ConnectionStatus status = eConnectionStatusSuccess;
+ Status error;
+ do {
+ size_t bytes_written = WriteToConnection(
+ m_write_bytes.data(), m_write_bytes.size(), status, &error);
+ if (bytes_written > 0)
+ m_write_bytes.erase(0, bytes_written);
+ } while (!m_write_bytes.empty() && status == eConnectionStatusSuccess);
+
+ m_pass_status = status;
+ m_pass_error = std::move(error);
+ BroadcastEvent(eBroadcastBitWriteDone);
+}
+
void Communication::SetReadThreadBytesReceivedCallback(
ReadThreadBytesReceived callback, void *callback_baton) {
m_callback = callback;
Index: lldb/include/lldb/Core/Communication.h
===================================================================
--- lldb/include/lldb/Core/Communication.h
+++ lldb/include/lldb/Core/Communication.h
@@ -99,6 +99,9 @@
eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
///to indicate all pending
///input has been processed.
+ eBroadcastBitWriteDone = (1u << 6), ///< Sent by the read thread
+ ///to indicate that a write request
+ ///has been processed.
kLoUserBroadcastBit =
(1u << 16), ///< Subclasses can used bits 31:16 for any needed events.
kHiUserBroadcastBit = (1u << 31),
@@ -325,6 +328,7 @@
std::atomic<bool> m_read_thread_did_exit;
std::string
m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
+ std::string m_write_bytes; ///< A buffer used to pass write bytes.
std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded
///access to the cached bytes.
lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
@@ -340,6 +344,8 @@
size_t ReadFromConnection(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
lldb::ConnectionStatus &status, Status *error_ptr);
+ size_t WriteToConnection(const void *src, size_t src_len,
+ lldb::ConnectionStatus &status, Status *error_ptr);
/// Append new bytes that get read from the read thread into the internal
/// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
@@ -380,6 +386,8 @@
/// The number of bytes extracted from the data cache.
size_t GetCachedBytes(void *dst, size_t dst_len);
+ void WriteThread();
+
private:
Communication(const Communication &) = delete;
const Communication &operator=(const Communication &) = delete;
_______________________________________________
lldb-commits mailing list
[email protected]
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits