Author: Augusto Noronha
Date: 2023-08-15T15:43:36-07:00
New Revision: 5d16957207ce1bd1a2091f3677e176012009c59a

URL: 
https://github.com/llvm/llvm-project/commit/5d16957207ce1bd1a2091f3677e176012009c59a
DIFF: 
https://github.com/llvm/llvm-project/commit/5d16957207ce1bd1a2091f3677e176012009c59a.diff

LOG: [lldb] Properly protect the Communication class with reader/writer lock

This patch picks up where https://reviews.llvm.org/D157159 left of, but
allows for concurrent reads/writes, but protects setting up and tearing
down the underlying Connection object.

Differential Revision: https://reviews.llvm.org/D157760

Added: 
    

Modified: 
    lldb/include/lldb/Core/Communication.h
    lldb/include/lldb/Core/ThreadedCommunication.h
    lldb/source/Core/Communication.cpp
    lldb/source/Core/ThreadedCommunication.cpp
    lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp

Removed: 
    


################################################################################
diff  --git a/lldb/include/lldb/Core/Communication.h 
b/lldb/include/lldb/Core/Communication.h
index f5f636816cb4f9..7e4ce362acf9a6 100644
--- a/lldb/include/lldb/Core/Communication.h
+++ b/lldb/include/lldb/Core/Communication.h
@@ -16,6 +16,7 @@
 #include "lldb/lldb-types.h"
 
 #include <mutex>
+#include <shared_mutex>
 #include <string>
 
 namespace lldb_private {
@@ -46,8 +47,6 @@ class Communication {
   /// The destructor is virtual since this class gets subclassed.
   virtual ~Communication();
 
-  virtual void Clear();
-
   /// Connect using the current connection by passing \a url to its connect
   /// function. string.
   ///
@@ -84,7 +83,10 @@ class Communication {
 
   bool HasConnection() const;
 
-  lldb_private::Connection *GetConnection() { return m_connection_sp.get(); }
+  lldb_private::Connection *GetConnection() {
+    std::shared_lock guard(m_connection_mutex);
+    return m_connection_sp.get();
+  }
 
   /// Read bytes from the current connection.
   ///
@@ -169,13 +171,24 @@ class Communication {
                                       ///by this communications class.
   std::mutex
       m_write_mutex; ///< Don't let multiple threads write at the same time...
+  mutable std::shared_mutex m_connection_mutex;
   bool m_close_on_eof;
 
+  /// Same as read but with m_connection_mutex unlocked.
+  size_t ReadUnlocked(void *dst, size_t dst_len,
+                      const Timeout<std::micro> &timeout,
+                      lldb::ConnectionStatus &status, Status *error_ptr);
+
   size_t ReadFromConnection(void *dst, size_t dst_len,
                             const Timeout<std::micro> &timeout,
                             lldb::ConnectionStatus &status, Status *error_ptr);
 
 private:
+  /// Same as Disconnect but with with m_connection_mutex unlocked.
+  lldb::ConnectionStatus DisconnectUnlocked(Status *error_ptr = nullptr);
+  /// Same as Write but with both m_write_mutex and m_connection_mutex 
unlocked.
+  size_t WriteUnlocked(const void *src, size_t src_len,
+                       lldb::ConnectionStatus &status, Status *error_ptr);
   Communication(const Communication &) = delete;
   const Communication &operator=(const Communication &) = delete;
 };

diff  --git a/lldb/include/lldb/Core/ThreadedCommunication.h 
b/lldb/include/lldb/Core/ThreadedCommunication.h
index 7ebb77beb77f3d..170fd2dfcb555d 100644
--- a/lldb/include/lldb/Core/ThreadedCommunication.h
+++ b/lldb/include/lldb/Core/ThreadedCommunication.h
@@ -97,8 +97,6 @@ class ThreadedCommunication : public Communication, public 
Broadcaster {
   /// The destructor is virtual since this class gets subclassed.
   ~ThreadedCommunication() override;
 
-  void Clear() override;
-
   /// Disconnect the communications connection if one is currently connected.
   ///
   /// \return

diff  --git a/lldb/source/Core/Communication.cpp 
b/lldb/source/Core/Communication.cpp
index 5d890632ccc6a9..e2ba461a02329a 100644
--- a/lldb/source/Core/Communication.cpp
+++ b/lldb/source/Core/Communication.cpp
@@ -27,110 +27,110 @@ using namespace lldb;
 using namespace lldb_private;
 
 Communication::Communication()
-    : m_connection_sp(), m_write_mutex(), m_close_on_eof(true) {
-}
-
-Communication::~Communication() {
-  Clear();
-}
+    : m_connection_sp(), m_connection_mutex(), m_close_on_eof(true) {}
 
-void Communication::Clear() {
-  Disconnect(nullptr);
-}
+Communication::~Communication() { Disconnect(nullptr); }
 
 ConnectionStatus Communication::Connect(const char *url, Status *error_ptr) {
-  Clear();
+  std::unique_lock guard(m_connection_mutex);
 
   LLDB_LOG(GetLog(LLDBLog::Communication),
            "{0} Communication::Connect (url = {1})", this, url);
 
-  lldb::ConnectionSP connection_sp(m_connection_sp);
-  if (connection_sp)
-    return connection_sp->Connect(url, error_ptr);
+  DisconnectUnlocked();
+
+  if (m_connection_sp)
+    return m_connection_sp->Connect(url, error_ptr);
   if (error_ptr)
     error_ptr->SetErrorString("Invalid connection.");
   return eConnectionStatusNoConnection;
 }
 
 ConnectionStatus Communication::Disconnect(Status *error_ptr) {
+  std::unique_lock guard(m_connection_mutex);
+  return DisconnectUnlocked(error_ptr);
+}
+
+ConnectionStatus Communication::DisconnectUnlocked(Status *error_ptr) {
   LLDB_LOG(GetLog(LLDBLog::Communication), "{0} Communication::Disconnect ()",
            this);
 
-  lldb::ConnectionSP connection_sp(m_connection_sp);
-  if (connection_sp) {
-    ConnectionStatus status = connection_sp->Disconnect(error_ptr);
-    // We currently don't protect connection_sp with any mutex for multi-
-    // threaded environments. So lets not nuke our connection class without
-    // putting some multi-threaded protections in. We also probably don't want
-    // to pay for the overhead it might cause if every time we access the
-    // connection we have to take a lock.
-    //
-    // This unique pointer will cleanup after itself when this object goes
-    // away, so there is no need to currently have it destroy itself
-    // immediately upon disconnect.
-    // connection_sp.reset();
+  if (m_connection_sp) {
+    ConnectionStatus status = m_connection_sp->Disconnect(error_ptr);
     return status;
   }
   return eConnectionStatusNoConnection;
 }
 
 bool Communication::IsConnected() const {
-  lldb::ConnectionSP connection_sp(m_connection_sp);
-  return (connection_sp ? connection_sp->IsConnected() : false);
+  std::shared_lock guard(m_connection_mutex);
+  return (m_connection_sp ? m_connection_sp->IsConnected() : false);
 }
 
 bool Communication::HasConnection() const {
+  std::shared_lock guard(m_connection_mutex);
   return m_connection_sp.get() != nullptr;
 }
 
 size_t Communication::Read(void *dst, size_t dst_len,
                            const Timeout<std::micro> &timeout,
                            ConnectionStatus &status, Status *error_ptr) {
-  Log *log = GetLog(LLDBLog::Communication);
-  LLDB_LOG(
-      log,
-      "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
-      this, dst, dst_len, timeout, m_connection_sp.get());
-
-  return ReadFromConnection(dst, dst_len, timeout, status, error_ptr);
+  std::shared_lock guard(m_connection_mutex);
+  return ReadUnlocked(dst, dst_len, timeout, status, error_ptr);
 }
 
 size_t Communication::Write(const void *src, size_t src_len,
                             ConnectionStatus &status, Status *error_ptr) {
-  lldb::ConnectionSP connection_sp(m_connection_sp);
+  // We need to lock the write mutex so no concurrent writes happen, but also
+  // lock the connection mutex so it's not reset mid write. We need both 
mutexes
+  // because reads and writes from the connection can happen concurrently.
+  std::shared_lock guard(m_connection_mutex);
+  std::lock_guard<std::mutex> guard_write(m_write_mutex);
+  return WriteUnlocked(src, src_len, status, error_ptr);
+}
+
+size_t Communication::WriteUnlocked(const void *src, size_t src_len,
+                                    ConnectionStatus &status,
+                                    Status *error_ptr) {
+  if (!m_connection_sp) {
+    if (error_ptr)
+      error_ptr->SetErrorString("Invalid connection.");
+    status = eConnectionStatusNoConnection;
+    return 0;
+  }
 
-  std::lock_guard<std::mutex> guard(m_write_mutex);
   LLDB_LOG(GetLog(LLDBLog::Communication),
            "{0} Communication::Write (src = {1}, src_len = {2}"
            ") connection = {3}",
-           this, src, (uint64_t)src_len, connection_sp.get());
-
-  if (connection_sp)
-    return connection_sp->Write(src, src_len, status, error_ptr);
+           this, src, (uint64_t)src_len, m_connection_sp.get());
 
-  if (error_ptr)
-    error_ptr->SetErrorString("Invalid connection.");
-  status = eConnectionStatusNoConnection;
-  return 0;
+  return m_connection_sp->Write(src, src_len, status, error_ptr);
 }
 
 size_t Communication::WriteAll(const void *src, size_t src_len,
                                ConnectionStatus &status, Status *error_ptr) {
+  std::shared_lock guard(m_connection_mutex);
+  std::lock_guard<std::mutex> guard_write(m_write_mutex);
   size_t total_written = 0;
   do
-    total_written += Write(static_cast<const char *>(src) + total_written,
-                           src_len - total_written, status, error_ptr);
+    total_written +=
+        WriteUnlocked(static_cast<const char *>(src) + total_written,
+                      src_len - total_written, status, error_ptr);
   while (status == eConnectionStatusSuccess && total_written < src_len);
   return total_written;
 }
 
-size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
-                                         const Timeout<std::micro> &timeout,
-                                         ConnectionStatus &status,
-                                         Status *error_ptr) {
-  lldb::ConnectionSP connection_sp(m_connection_sp);
-  if (connection_sp)
-    return connection_sp->Read(dst, dst_len, timeout, status, error_ptr);
+size_t Communication::ReadUnlocked(void *dst, size_t dst_len,
+                                   const Timeout<std::micro> &timeout,
+                                   ConnectionStatus &status,
+                                   Status *error_ptr) {
+  Log *log = GetLog(LLDBLog::Communication);
+  LLDB_LOG(
+      log,
+      "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
+      this, dst, dst_len, timeout, m_connection_sp.get());
+  if (m_connection_sp)
+    return m_connection_sp->Read(dst, dst_len, timeout, status, error_ptr);
 
   if (error_ptr)
     error_ptr->SetErrorString("Invalid connection.");
@@ -139,7 +139,8 @@ size_t Communication::ReadFromConnection(void *dst, size_t 
dst_len,
 }
 
 void Communication::SetConnection(std::unique_ptr<Connection> connection) {
-  Disconnect(nullptr);
+  std::unique_lock guard(m_connection_mutex);
+  DisconnectUnlocked(nullptr);
   m_connection_sp = std::move(connection);
 }
 

diff  --git a/lldb/source/Core/ThreadedCommunication.cpp 
b/lldb/source/Core/ThreadedCommunication.cpp
index 7d8aae5d8ff689..8d3515652d8b85 100644
--- a/lldb/source/Core/ThreadedCommunication.cpp
+++ b/lldb/source/Core/ThreadedCommunication.cpp
@@ -61,11 +61,6 @@ ThreadedCommunication::~ThreadedCommunication() {
            this, GetBroadcasterName());
 }
 
-void ThreadedCommunication::Clear() {
-  SetReadThreadBytesReceivedCallback(nullptr, nullptr);
-  StopReadThread(nullptr);
-  Communication::Clear();
-}
 
 ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {
   assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
@@ -77,6 +72,7 @@ size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
                                    const Timeout<std::micro> &timeout,
                                    ConnectionStatus &status,
                                    Status *error_ptr) {
+  std::shared_lock guard(m_connection_mutex);
   Log *log = GetLog(LLDBLog::Communication);
   LLDB_LOG(
       log,
@@ -152,7 +148,7 @@ size_t ThreadedCommunication::Read(void *dst, size_t 
dst_len,
 
   // We aren't using a read thread, just read the data synchronously in this
   // thread.
-  return Communication::Read(dst, dst_len, timeout, status, error_ptr);
+  return Communication::ReadUnlocked(dst, dst_len, timeout, status, error_ptr);
 }
 
 bool ThreadedCommunication::StartReadThread(Status *error_ptr) {
@@ -273,46 +269,50 @@ lldb::thread_result_t ThreadedCommunication::ReadThread() 
{
   ConnectionStatus status = eConnectionStatusSuccess;
   bool done = false;
   bool disconnect = false;
-  while (!done && m_read_thread_enabled) {
-    size_t bytes_read = ReadFromConnection(
-        buf, sizeof(buf), std::chrono::seconds(5), status, &error);
-    if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
-      AppendBytesToCache(buf, bytes_read, true, status);
-
-    switch (status) {
-    case eConnectionStatusSuccess:
-      break;
-
-    case eConnectionStatusEndOfFile:
-      done = true;
-      disconnect = GetCloseOnEOF();
-      break;
-    case eConnectionStatusError: // Check GetError() for details
-      if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
-        // EIO on a pipe is usually caused by remote shutdown
+
+  {
+    std::shared_lock guard(m_connection_mutex);
+    while (!done && m_read_thread_enabled) {
+      size_t bytes_read = ReadUnlocked(buf, sizeof(buf),
+                                       std::chrono::seconds(5), status, 
&error);
+      if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
+        AppendBytesToCache(buf, bytes_read, true, status);
+
+      switch (status) {
+      case eConnectionStatusSuccess:
+        break;
+
+      case eConnectionStatusEndOfFile:
+        done = true;
         disconnect = GetCloseOnEOF();
+        break;
+      case eConnectionStatusError: // Check GetError() for details
+        if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
+          // EIO on a pipe is usually caused by remote shutdown
+          disconnect = GetCloseOnEOF();
+          done = true;
+        }
+        if (error.Fail())
+          LLDB_LOG(log, "error: {0}, status = {1}", error,
+                   ThreadedCommunication::ConnectionStatusAsString(status));
+        break;
+      case eConnectionStatusInterrupted: // Synchronization signal from
+                                         // SynchronizeWithReadThread()
+        // The connection returns eConnectionStatusInterrupted only when there
+        // is no input pending to be read, so we can signal that.
+        BroadcastEvent(eBroadcastBitNoMorePendingInput);
+        break;
+      case eConnectionStatusNoConnection:   // No connection
+      case eConnectionStatusLostConnection: // Lost connection while connected
+                                            // to a valid connection
         done = true;
+        [[fallthrough]];
+      case eConnectionStatusTimedOut: // Request timed out
+        if (error.Fail())
+          LLDB_LOG(log, "error: {0}, status = {1}", error,
+                   ThreadedCommunication::ConnectionStatusAsString(status));
+        break;
       }
-      if (error.Fail())
-        LLDB_LOG(log, "error: {0}, status = {1}", error,
-                 ThreadedCommunication::ConnectionStatusAsString(status));
-      break;
-    case eConnectionStatusInterrupted: // Synchronization signal from
-                                       // SynchronizeWithReadThread()
-      // The connection returns eConnectionStatusInterrupted only when there is
-      // no input pending to be read, so we can signal that.
-      BroadcastEvent(eBroadcastBitNoMorePendingInput);
-      break;
-    case eConnectionStatusNoConnection:   // No connection
-    case eConnectionStatusLostConnection: // Lost connection while connected to
-                                          // a valid connection
-      done = true;
-      [[fallthrough]];
-    case eConnectionStatusTimedOut: // Request timed out
-      if (error.Fail())
-        LLDB_LOG(log, "error: {0}, status = {1}", error,
-                 ThreadedCommunication::ConnectionStatusAsString(status));
-      break;
     }
   }
   m_pass_status = status;
@@ -361,8 +361,12 @@ void ThreadedCommunication::SynchronizeWithReadThread() {
   if (!m_read_thread_enabled || m_read_thread_did_exit)
     return;
 
-  // Notify the read thread.
-  m_connection_sp->InterruptRead();
+  {
+    // Notify the read thread.
+    std::shared_lock guard(m_connection_mutex);
+    if (m_connection_sp)
+    m_connection_sp->InterruptRead();
+  }
 
   // Wait for the synchronization event.
   EventSP event_sp;

diff  --git a/lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp 
b/lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
index b17701ea4a15cf..6862e43b6d5d35 100644
--- a/lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
+++ b/lldb/source/Plugins/Process/MacOSX-Kernel/ProcessKDP.cpp
@@ -557,7 +557,6 @@ Status ProcessKDP::DoDetach(bool keep_stopped) {
     }
   }
   StopAsyncThread();
-  m_comm.Clear();
 
   SetPrivateState(eStateDetached);
   ResumePrivateStateThread();


        
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to