Author: astitcher Date: Mon Mar 2 18:42:02 2009 New Revision: 749406 URL: http://svn.apache.org/viewvc?rev=749406&view=rev Log: - Reworked DispatchHandler state machine to eliminate race conditions particularly when deleting a DispatchHandle - Reworked Poller interrupt mechanism eliminating locking problems and to support DispatchHandler changes - Beefed up the DispatchHandler test program so that it's a fair torture test of the DispatchHandler code
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp qpid/trunk/qpid/cpp/src/tests/Makefile.am Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Mon Mar 2 18:42:02 2009 @@ -220,6 +220,7 @@ bool ret = !closed; if (!closed) { closed = true; + aio->queueForDeletion(); poller->shutdown(); } if (!joined && receiver.id() != Thread::current().id()) { @@ -384,14 +385,13 @@ assert(protect); try { Dispatcher d(poller); - + for (int i = 0; i < 32; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - + aio->start(poller); d.run(); - aio->queueForDeletion(); socket.close(); } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Mon Mar 2 18:42:02 2009 @@ -221,6 +221,7 @@ bool ret = !closed; if (!closed) { closed = true; + aio->queueForDeletion(); poller->shutdown(); } if (!joined && receiver.id() != Thread::current().id()) { @@ -386,7 +387,6 @@ aio->start(poller); d.run(); - aio->queueForDeletion(); socket.close(); } catch (const std::exception& e) { QPID_LOG(error, e.what()); Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Mon Mar 2 18:42:02 2009 @@ -21,6 +21,8 @@ #include "DispatchHandle.h" +#include <algorithm> + #include <boost/cast.hpp> #include <assert.h> @@ -29,7 +31,6 @@ namespace sys { DispatchHandle::~DispatchHandle() { - stopWatch(); } void DispatchHandle::startWatch(Poller::shared_ptr poller0) { @@ -37,13 +38,21 @@ bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE); + assert(state == IDLE || state == DELAYED_IDLE); // If no callbacks set then do nothing (that is what we were asked to do!) // TODO: Maybe this should be an assert instead if (!r && !w) { - state = INACTIVE; - return; + switch (state) { + case IDLE: + state = INACTIVE; + return; + case DELAYED_IDLE: + state = DELAYED_INACTIVE; + return; + default: + assert(state == IDLE || state == DELAYED_IDLE); + } } Poller::Direction d = r ? @@ -53,9 +62,20 @@ poller = poller0; poller->addFd(*this, d); - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; + switch (state) { + case IDLE: + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; + return; + case DELAYED_IDLE: + state = r ? + (w ? DELAYED_RW : DELAYED_R) : + DELAYED_W; + return; + default: + assert(state == IDLE || state == DELAYED_IDLE); + } } void DispatchHandle::rewatch() { @@ -93,6 +113,8 @@ case ACTIVE_RW: // Don't need to do anything already waiting for readable/writable break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -130,6 +152,8 @@ poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -167,6 +191,8 @@ case ACTIVE_RW: // Nothing to do: already waiting for writable break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -203,6 +229,8 @@ case ACTIVE_W: case INACTIVE: break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -239,6 +267,8 @@ case ACTIVE_R: case INACTIVE: break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -261,6 +291,8 @@ poller->modFd(*this, Poller::NONE); state = INACTIVE; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -280,47 +312,72 @@ default: state = IDLE; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } assert(poller); poller->delFd(*this); poller.reset(); } +// If we are already in the IDLE state we can't do the callback as we might +// race to delete and callback at the same time +// TODO: might be able to fix this by adding a new state, but would make +// the state machine even more complex void DispatchHandle::call(Callback iCb) { assert(iCb); ScopedLock<Mutex> lock(stateLock); - interruptedCallbacks.push(iCb); - - (void) poller->interrupt(*this); + switch (state) { + case IDLE: + case ACTIVE_DELETE: + assert(false); + return; + default: + interruptedCallbacks.push(iCb); + assert(poller); + (void) poller->interrupt(*this); + } } // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete void DispatchHandle::doDelete() { - // Ensure that we're no longer watching anything - stopWatch(); - - // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); + // Ensure that we're no longer watching anything switch (state) { + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + assert(poller); + poller->delFd(*this); + poller.reset(); + // Fallthrough case DELAYED_IDLE: - case DELAYED_DELETE: state = DELAYED_DELETE; + // Fallthrough + case DELAYED_DELETE: + case ACTIVE_DELETE: return; case IDLE: break; default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states - assert(false); + state = ACTIVE_DELETE; + assert(poller); + (void) poller->interrupt(*this); + poller->delFd(*this); + return; } } - // If we're not then do it right away + // If we're IDLE we can do this right away delete this; } void DispatchHandle::processEvent(Poller::EventType type) { + CallbackQueue callbacks; + // Note that we are now doing the callbacks { ScopedLock<Mutex> lock(stateLock); @@ -336,6 +393,16 @@ case ACTIVE_RW: state = DELAYED_RW; break; + case ACTIVE_DELETE: + // Need to make sure we clean up any pending callbacks in this case + std::swap(callbacks, interruptedCallbacks); + goto saybyebye; + // Can get here in idle if we are stopped in a different thread + // just after we return with this handle in Poller::wait + case IDLE: + // Can get here in INACTIVE if a non connection thread unwatches + // whilst we were stuck in the above lock + case INACTIVE: // Can only get here in a DELAYED_* state in the rare case // that we're already here for reading and we get activated for // writing and we can write (it might be possible the other way @@ -348,9 +415,9 @@ case DELAYED_IDLE: case DELAYED_DELETE: return; - default: - assert(false); } + + std::swap(callbacks, interruptedCallbacks); } // Do callbacks - whilst we are doing the callbacks we are prevented from processing @@ -378,8 +445,8 @@ break; case Poller::INTERRUPTED: { - ScopedLock<Mutex> lock(stateLock); - assert(interruptedCallbacks.size() > 0); + // We could only be interrupted if we also had a callback to do + assert(callbacks.size() > 0); // We'll actually do the interrupt below } break; @@ -387,16 +454,18 @@ assert(false); } - { - ScopedLock<Mutex> lock(stateLock); - // If we've got a pending interrupt do it now - while (interruptedCallbacks.size() > 0) { - Callback cb = interruptedCallbacks.front(); + // If we have any callbacks do them now - + // (because we use a copy from before the previous callbacks we won't + // do anything yet that was just added) + while (callbacks.size() > 0) { + Callback cb = callbacks.front(); assert(cb); cb(*this); - interruptedCallbacks.pop(); + callbacks.pop(); } + { + ScopedLock<Mutex> lock(stateLock); // If any of the callbacks re-enabled reading/writing then actually // do it now switch (state) { @@ -425,7 +494,9 @@ case DELAYED_DELETE: break; } - } + } + +saybyebye: delete this; } Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Mon Mar 2 18:42:02 2009 @@ -65,6 +65,7 @@ Mutex stateLock; enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, + ACTIVE_DELETE, DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE } state; Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Mon Mar 2 18:42:02 2009 @@ -86,8 +86,9 @@ // with the handle and the INTERRUPTED event type // if it returns false then the handle is not being monitored by the poller // - This can either be because it has just received an event which has been - // reported and has not been reenabled since. Or because it was removed - // from the monitoring set + // reported and has not been reenabled since. + // - Because it was removed from the monitoring set + // - Or because it is already being interrupted bool interrupt(PollerHandle& handle); // Poller run loop Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Mar 2 18:42:02 2009 @@ -54,17 +54,20 @@ INACTIVE, HUNGUP, MONITORED_HUNGUP, + INTERRUPTED, DELETED }; int fd; ::__uint32_t events; + PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f) : + PollerHandlePrivate(int f, PollerHandle* p) : fd(f), events(0), + pollerHandle(p), stat(ABSENT) { } @@ -101,6 +104,14 @@ stat = HUNGUP; } + bool isInterrupted() const { + return stat == INTERRUPTED; + } + + void setInterrupted() { + stat = INTERRUPTED; + } + bool isDeleted() const { return stat == DELETED; } @@ -111,7 +122,7 @@ }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl))) + impl(new PollerHandlePrivate(toFd(h.impl), this)) {} PollerHandle::~PollerHandle() { @@ -120,6 +131,10 @@ if (impl->isDeleted()) { return; } + if (impl->isInterrupted()) { + impl->setDeleted(); + return; + } if (impl->isActive()) { impl->setDeleted(); } @@ -243,23 +258,21 @@ ::close(epollFd); } - void interrupt(bool all=false) { + void interrupt() { ::epoll_event epe; - if (all) { - // Not EPOLLONESHOT, so we eventually get all threads - epe.events = ::EPOLLIN; - epe.data.u64 = 0; // Keep valgrind happy - } else { - // Use EPOLLONESHOT so we only wake a single thread - epe.events = ::EPOLLIN | ::EPOLLONESHOT; - epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); - } + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); } void interruptAll() { - interrupt(true); + ::epoll_event epe; + // Not EPOLLONESHOT, so we eventually get all threads + epe.events = ::EPOLLIN; + epe.data.u64 = 0; // Keep valgrind happy + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); } }; @@ -281,7 +294,7 @@ epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); } epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &handle; + epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); @@ -312,7 +325,7 @@ ::epoll_event epe; epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &handle; + epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -329,7 +342,7 @@ ::epoll_event epe; epe.events = eh.events; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &handle; + epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); @@ -355,15 +368,14 @@ { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - if (eh.isInactive()) { + if (!eh.isActive()) { return false; } ::epoll_event epe; epe.events = 0; epe.data.u64 = 0; // Keep valgrind happy - epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - eh.setInactive(); + eh.setInterrupted(); } PollerPrivate::InterruptHandle& ih = impl->interruptHandle; @@ -422,37 +434,54 @@ #else int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask); #endif - // Check for shutdown - if (impl->isShutdown) { - PollerHandleDeletionManager.markAllUnusedInThisThread(); - return Event(0, SHUTDOWN); - } if (rc ==-1 && errno != EINTR) { QPID_POSIX_CHECK(rc); } else if (rc > 0) { assert(rc == 1); - PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr); + void* dataPtr = epe.data.ptr; - PollerHandlePrivate& eh = *handle->impl; + // Check if this is an interrupt + PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle; + if (dataPtr == &interruptHandle) { + PollerHandle* wrappedHandle = 0; + { + ScopedLock<Mutex> l(interruptHandle.impl->lock); + if (interruptHandle.impl->isActive()) { + wrappedHandle = interruptHandle.getHandle(); + // If there is an interrupt queued behind this one we need to arm it + // We do it this way so that another thread can pick it up + if (interruptHandle.queuedHandles()) { + impl->interrupt(); + interruptHandle.impl->setActive(); + } else { + interruptHandle.impl->setInactive(); + } + } + } + if (wrappedHandle) { + ScopedLock<Mutex> l(wrappedHandle->impl->lock); + if (!wrappedHandle->impl->isDeleted()) { + wrappedHandle->impl->setInactive(); + return Event(wrappedHandle, INTERRUPTED); + } + PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl); + } + continue; + } + + // Check for shutdown + if (impl->isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, SHUTDOWN); + } + + PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr); ScopedLock<Mutex> l(eh.lock); // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { - - // Check if this is an interrupt - if (handle == &impl->interruptHandle) { - PollerHandle* wrappedHandle = impl->interruptHandle.getHandle(); - // If there is an interrupt queued behind this one we need to arm it - // We do it this way so that another thread can pick it up - if (impl->interruptHandle.queuedHandles()) { - impl->interrupt(); - eh.setActive(); - } else { - eh.setInactive(); - } - return Event(wrappedHandle, INTERRUPTED); - } + PollerHandle* handle = eh.pollerHandle; // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again Modified: qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp Mon Mar 2 18:42:02 2009 @@ -78,9 +78,6 @@ h.rewatch(); } -DispatchHandle* rh = 0; -DispatchHandle* wh = 0; - void rInterrupt(DispatchHandle&) { cerr << "R"; } @@ -92,9 +89,28 @@ DispatchHandle::Callback rcb = rInterrupt; DispatchHandle::Callback wcb = wInterrupt; +DispatchHandleRef *volatile rh = 0; +DispatchHandleRef *volatile wh = 0; + +volatile bool stopWait = false; +volatile bool phase1finished = false; + +timer_t timer; + +void stop_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { + stopWait = true; +} + void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { - rh->call(rcb); - wh->call(wcb); + static int count = 0; + if (count++ < 10) { + rh->call(rcb); + wh->call(wcb); + } else { + phase1finished = true; + int rc = ::timer_delete(timer); + assert(rc == 0); + } } int main(int /*argc*/, char** /*argv*/) @@ -132,8 +148,8 @@ PosixIOHandle f0(sv[0]); PosixIOHandle f1(sv[1]); - rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0); - wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); + rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); rh->startWatch(poller); wh->startWatch(poller); @@ -154,11 +170,10 @@ rc = ::sigaction(SIGRTMIN, &sa,0); assert(rc == 0); - ::sigevent se; + ::sigevent se={}; se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = SIGRTMIN; se.sigev_value.sival_ptr = 0; - timer_t timer; rc = ::timer_create(CLOCK_REALTIME, &se, &timer); assert(rc == 0); @@ -169,11 +184,52 @@ rc = ::timer_settime(timer, 0, &ts, 0); assert(rc == 0); - // wait 2 minutes then shutdown - ::sleep(60); + // wait + while (!phase1finished) { + ::sleep(1); + } + + // Now test deleting/creating DispatchHandles in tight loop, so that we are likely to still be using the + // attached PollerHandles after deleting the DispatchHandle + DispatchHandleRef* t = wh; + wh = 0; + delete t; + t = rh; + rh = 0; + delete t; + + sa.sa_sigaction = stop_handler; + rc = ::sigaction(SIGRTMIN, &sa,0); + assert(rc == 0); + + itimerspec nts = { + /*.it_value = */ {30, 0}, // s, ns + /*.it_interval = */ {30, 0}}; // s, ns + + rc = ::timer_create(CLOCK_REALTIME, &se, &timer); + assert(rc == 0); + rc = ::timer_settime(timer, 0, &nts, 0); + assert(rc == 0); + + DispatchHandleRef* rh1; + DispatchHandleRef* wh1; + + struct timespec w = {0, 1000000}; + while (!stopWait) { + rh1 = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh1 = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); + rh1->startWatch(poller); + wh1->startWatch(poller); + + ::nanosleep(&w, 0); + + delete wh1; + delete rh1; + } rc = ::timer_delete(timer); assert(rc == 0); + poller->shutdown(); dt.join(); dt1.join(); Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=749406&r1=749405&r2=749406&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Mar 2 18:42:02 2009 @@ -201,6 +201,7 @@ check_PROGRAMS+=DispatcherTest DispatcherTest_SOURCES=DispatcherTest.cpp DispatcherTest_LDADD=$(lib_common) +DispatcherTest_CXXFLAGS=$(AM_CXXFLAGS) -Wno-missing-field-initializers TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org