Repository: thrift Updated Branches: refs/heads/master 5038466e5 -> 36d1b0dea
THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop() Client: C++ Patch: additional changes by jk...@apache.org to improve the test and stop clean in all cases This closes #1080 This closes #1196 Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/36d1b0de Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/36d1b0de Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/36d1b0de Branch: refs/heads/master Commit: 36d1b0dea566c0dea06e321421e32a6cad0abb32 Parents: 5038466 Author: BuÄra Gedik <bge...@gmail.com> Authored: Sun Sep 4 17:18:15 2016 +0900 Committer: James E. King, III <jk...@apache.org> Committed: Mon Feb 27 23:44:35 2017 -0500 ---------------------------------------------------------------------- .../src/thrift/server/TNonblockingServer.cpp | 54 ++++++++++---------- lib/cpp/test/TNonblockingServerTest.cpp | 37 +++++++++++++- 2 files changed, 61 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/36d1b0de/lib/cpp/src/thrift/server/TNonblockingServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index 649910f..d4418bd 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -510,7 +510,7 @@ void TNonblockingServer::TConnection::workSocket() { // If there is no data to send, then let us move on if (writeBufferPos_ == writeBufferSize_) { - GlobalOutput("WARNING: Send state with no data to send\n"); + GlobalOutput("WARNING: Send state with no data to send"); transition(); return; } @@ -765,11 +765,9 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) { } // Delete a previously existing event - if (eventFlags_ != 0) { - if (event_del(&event_) == -1) { - GlobalOutput("TConnection::setFlags event_del"); - return; - } + if (eventFlags_ && event_del(&event_) == -1) { + GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR); + return; } // Update in memory structure @@ -812,7 +810,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) { // Add the event if (event_add(&event_, 0) == -1) { - GlobalOutput("TConnection::setFlags(): could not event_add"); + GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR); } } @@ -820,9 +818,9 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) { * Closes a connection */ void TNonblockingServer::TConnection::close() { - // Delete the registered libevent - if (event_del(&event_) == -1) { + if (eventFlags_ && event_del(&event_) == -1) { GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR); + return; } if (serverEventHandler_) { @@ -1066,7 +1064,7 @@ void TNonblockingServer::createAndListenOnSocket() { if (res->ai_family == AF_INET6) { int zero = 0; if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) { - GlobalOutput("TServerSocket::listen() IPV6_V6ONLY"); + GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY", THRIFT_GET_SOCKET_ERROR); } } #endif // #ifdef IPV6_V6ONLY @@ -1486,6 +1484,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* if (nBytes == kSize) { if (connection == NULL) { // this is the command to stop our thread, exit the handler! + ioThread->breakLoop(false); return; } connection->transition(); @@ -1496,6 +1495,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* return; } else if (nBytes == 0) { GlobalOutput.printf("notifyHandler: Notify socket closed!"); + ioThread->breakLoop(false); // exit the loop break; } else { // nBytes < 0 @@ -1520,19 +1520,15 @@ void TNonblockingIOThread::breakLoop(bool error) { ::abort(); } - // sets a flag so that the loop exits on the next event - event_base_loopbreak(eventBase_); - - // event_base_loopbreak() only causes the loop to exit the next time - // it wakes up. We need to force it to wake up, in case there are - // no real events it needs to process. - // // If we're running in the same thread, we can't use the notify(0) // mechanism to stop the thread, but happily if we're running in the // same thread, this means the thread can't be blocking in the event // loop either. if (!Thread::is_current(threadId_)) { notify(NULL); + } else { + // cause the loop to stop ASAP - even if it has things to do in it + event_base_loopbreak(eventBase_); } } @@ -1566,24 +1562,26 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { } void TNonblockingIOThread::run() { - if (eventBase_ == NULL) + if (eventBase_ == NULL) { registerEvents(); - - GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_); - + } if (useHighPriority_) { setCurrentThreadHighPriority(true); } - // Run libevent engine, never returns, invokes calls to eventHandler - event_base_loop(eventBase_, 0); + if (eventBase_ != NULL) + { + GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_); + // Run libevent engine, never returns, invokes calls to eventHandler + event_base_loop(eventBase_, 0); - if (useHighPriority_) { - setCurrentThreadHighPriority(false); - } + if (useHighPriority_) { + setCurrentThreadHighPriority(false); + } - // cleans up our registered events - cleanupEvents(); + // cleans up our registered events + cleanupEvents(); + } GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_); } http://git-wip-us.apache.org/repos/asf/thrift/blob/36d1b0de/lib/cpp/test/TNonblockingServerTest.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp index 48ea913..e933d6b 100644 --- a/lib/cpp/test/TNonblockingServerTest.cpp +++ b/lib/cpp/test/TNonblockingServerTest.cpp @@ -21,6 +21,7 @@ #include <boost/test/unit_test.hpp> #include <boost/smart_ptr.hpp> +#include "thrift/concurrency/Monitor.h" #include "thrift/concurrency/Thread.h" #include "thrift/server/TNonblockingServer.h" @@ -29,6 +30,10 @@ #include <event.h> using namespace apache::thrift; +using apache::thrift::concurrency::Guard; +using apache::thrift::concurrency::Monitor; +using apache::thrift::concurrency::Mutex; +using apache::thrift::server::TServerEventHandler; struct Handler : public test::ParentServiceIf { void addString(const std::string& s) { strings_.push_back(s); } @@ -46,11 +51,31 @@ struct Handler : public test::ParentServiceIf { class Fixture { private: + struct ListenEventHandler : public TServerEventHandler { + public: + ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {} + + void preServe() /* override */ { + Guard g(listenMonitor_.mutex()); + ready_ = true; + listenMonitor_.notify(); + } + + Monitor listenMonitor_; + bool ready_; + }; + struct Runner : public apache::thrift::concurrency::Runnable { int port; boost::shared_ptr<event_base> userEventBase; boost::shared_ptr<TProcessor> processor; boost::shared_ptr<server::TNonblockingServer> server; + boost::shared_ptr<ListenEventHandler> listenHandler; + Mutex mutex_; + + Runner() { + listenHandler.reset(new ListenEventHandler(&mutex_)); + } virtual void run() { // When binding to explicit port, allow retrying to workaround bind failures on ports in use @@ -58,10 +83,18 @@ private: startServer(retryCount); } + void readyBarrier() { + // block until server is listening and ready to accept connections + Guard g(mutex_); + while (!listenHandler->ready_) { + listenHandler->listenMonitor_.wait(); + } + } private: void startServer(int retry_count) { try { server.reset(new server::TNonblockingServer(processor, port)); + server->setServerEventHandler(listenHandler); if (userEventBase) { server->registerEvents(userEventBase.get()); } @@ -112,8 +145,8 @@ protected: false)); thread = threadFactory->newThread(runner); thread->start(); - // wait 100 ms for the server to begin listening - THRIFT_SLEEP_USEC(100000); + runner->readyBarrier(); + server = runner->server; return runner->port; }