mgorny updated this revision to Diff 455951.
mgorny added a comment.
Rebase, move `StopReadThread()` test here.
CHANGES SINCE LAST ACTION
https://reviews.llvm.org/D132283/new/
https://reviews.llvm.org/D132283
Files:
lldb/include/lldb/Core/Communication.h
lldb/source/Core/Communication.cpp
lldb/unittests/Core/CommunicationTest.cpp
Index: lldb/unittests/Core/CommunicationTest.cpp
===================================================================
--- lldb/unittests/Core/CommunicationTest.cpp
+++ lldb/unittests/Core/CommunicationTest.cpp
@@ -116,6 +116,30 @@
CommunicationReadTest(/*use_thread=*/true);
}
+TEST_F(CommunicationTest, StopReadThread) {
+ std::condition_variable finished;
+ std::mutex finished_mutex;
+
+ std::thread test_thread{[&]() {
+ std::unique_ptr<TCPSocket> a, b;
+ ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));
+
+ Communication comm("test");
+ comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(b.release()));
+ comm.SetCloseOnEOF(true);
+
+ ASSERT_TRUE(comm.StartReadThread());
+ ASSERT_TRUE(comm.StopReadThread());
+ finished.notify_all();
+ }};
+
+ // StopReadThread() can hang, so force an external timeout.
+ std::unique_lock<std::mutex> lock{finished_mutex};
+ ASSERT_EQ(finished.wait_for(lock, std::chrono::seconds(3)),
+ std::cv_status::no_timeout);
+ test_thread.join();
+}
+
TEST_F(CommunicationTest, SynchronizeWhileClosing) {
std::unique_ptr<TCPSocket> a, b;
ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));
Index: lldb/source/Core/Communication.cpp
===================================================================
--- lldb/source/Core/Communication.cpp
+++ lldb/source/Core/Communication.cpp
@@ -228,6 +228,8 @@
m_read_thread_enabled = true;
m_read_thread_did_exit = false;
+ // Allocate the I/O loop in main thread to avoid races.
+ m_io_loop.reset(new MainLoop());
auto maybe_thread = ThreadLauncher::LaunchThread(
thread_name, [this] { return ReadThread(); });
if (maybe_thread) {
@@ -258,9 +260,13 @@
BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
- // error = m_read_thread.Cancel();
+ m_io_loop->AddPendingCallback(
+ [](MainLoopBase &loop) { loop.RequestTermination(); });
+ m_io_loop->TriggerPendingCallbacks();
Status error = m_read_thread.Join(nullptr);
+ if (error.Success())
+ m_io_loop.reset(nullptr);
return error.Success();
}
@@ -332,56 +338,76 @@
LLDB_LOG(log, "Communication({0}) thread starting...", this);
- uint8_t buf[1024];
-
- Status error;
- ConnectionStatus status = eConnectionStatusSuccess;
- bool done = false;
bool disconnect = false;
- while (!done && m_read_thread_enabled) {
- size_t bytes_read = ReadFromConnection(
- buf, sizeof(buf), std::chrono::seconds(5), status, &error);
- if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
- AppendBytesToCache(buf, bytes_read, true, status);
-
- switch (status) {
- case eConnectionStatusSuccess:
- break;
-
- case eConnectionStatusEndOfFile:
- done = true;
- disconnect = GetCloseOnEOF();
- break;
- case eConnectionStatusError: // Check GetError() for details
- if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
- // EIO on a pipe is usually caused by remote shutdown
- disconnect = GetCloseOnEOF();
- done = true;
- }
- if (error.Fail())
- LLDB_LOG(log, "error: {0}, status = {1}", error,
- Communication::ConnectionStatusAsString(status));
- break;
- case eConnectionStatusInterrupted: // Synchronization signal from
- // SynchronizeWithReadThread()
- // The connection returns eConnectionStatusInterrupted only when there is
- // no input pending to be read, so we can signal that.
- BroadcastEvent(eBroadcastBitNoMorePendingInput);
- break;
- case eConnectionStatusNoConnection: // No connection
- case eConnectionStatusLostConnection: // Lost connection while connected to
- // a valid connection
- done = true;
- [[fallthrough]];
- case eConnectionStatusTimedOut: // Request timed out
- if (error.Fail())
- LLDB_LOG(log, "error: {0}, status = {1}", error,
- Communication::ConnectionStatusAsString(status));
- break;
+ ConnectionStatus status = eConnectionStatusSuccess;
+ Status error;
+ if (IsConnected()) {
+ Status loop_error;
+ auto handle = m_io_loop->RegisterReadObject(
+ m_connection_sp->GetReadObject(),
+ [this, &disconnect, &status, &error](MainLoopBase &loop) {
+ Log *log = GetLog(LLDBLog::Communication);
+
+ if (!m_read_thread_enabled) {
+ loop.RequestTermination();
+ return;
+ }
+
+ uint8_t buf[1024];
+ size_t bytes_read = ReadFromConnection(
+ buf, sizeof(buf), std::chrono::seconds(5), status, &error);
+ if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
+ AppendBytesToCache(buf, bytes_read, true, status);
+
+ switch (status) {
+ case eConnectionStatusSuccess:
+ case eConnectionStatusInterrupted:
+ break;
+
+ case eConnectionStatusEndOfFile:
+ loop.RequestTermination();
+ disconnect = GetCloseOnEOF();
+ break;
+ case eConnectionStatusError: // Check GetError() for details
+ if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
+ // EIO on a pipe is usually caused by remote shutdown
+ loop.RequestTermination();
+ disconnect = GetCloseOnEOF();
+ }
+ if (error.Fail())
+ LLDB_LOG(log, "error: {0}, status = {1}", error,
+ Communication::ConnectionStatusAsString(status));
+ break;
+ case eConnectionStatusNoConnection: // No connection
+ case eConnectionStatusLostConnection: // Lost connection while
+ // connected to a valid
+ // connection
+ loop.RequestTermination();
+ [[fallthrough]];
+ case eConnectionStatusTimedOut: // Request timed out
+ if (error.Fail())
+ LLDB_LOG(log, "error: {0}, status = {1}", error,
+ Communication::ConnectionStatusAsString(status));
+ break;
+ }
+ },
+ loop_error);
+ if (loop_error.Success())
+ loop_error = m_io_loop->Run();
+ if (!loop_error.Success()) {
+ error = std::move(loop_error);
+ status = lldb::eConnectionStatusError;
}
+ } else {
+ if (m_connection_sp)
+ status = lldb::eConnectionStatusLostConnection;
+ else
+ status = lldb::eConnectionStatusNoConnection;
+ error.SetErrorString(ConnectionStatusAsString(status));
}
m_pass_status = status;
m_pass_error = std::move(error);
+
LLDB_LOG(log, "Communication({0}) thread exiting...", this);
// Handle threads wishing to synchronize with us.
@@ -424,7 +450,10 @@
return;
// Notify the read thread.
- m_connection_sp->InterruptRead();
+ m_io_loop->AddPendingCallback([this](MainLoopBase &loop) {
+ BroadcastEvent(eBroadcastBitNoMorePendingInput);
+ });
+ m_io_loop->TriggerPendingCallbacks();
// Wait for the synchronization event.
EventSP event_sp;
Index: lldb/include/lldb/Core/Communication.h
===================================================================
--- lldb/include/lldb/Core/Communication.h
+++ lldb/include/lldb/Core/Communication.h
@@ -10,6 +10,7 @@
#define LLDB_CORE_COMMUNICATION_H
#include "lldb/Host/HostThread.h"
+#include "lldb/Host/MainLoop.h"
#include "lldb/Utility/Broadcaster.h"
#include "lldb/Utility/Timeout.h"
#include "lldb/lldb-defines.h"
@@ -318,6 +319,8 @@
///by this communications class.
HostThread m_read_thread; ///< The read thread handle in case we need to
///cancel the thread.
+ std::unique_ptr<MainLoop> m_io_loop; ///< The loop instance used by
+ ///the read thread.
std::atomic<bool> m_read_thread_enabled;
std::atomic<bool> m_read_thread_did_exit;
std::string
_______________________________________________
lldb-commits mailing list
[email protected]
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits