Author: gsim
Date: Wed Oct 31 11:47:28 2007
New Revision: 590786
URL: http://svn.apache.org/viewvc?rev=590786&view=rev
Log:
Simple fix to prevent concurrent disconnection and sending of frames causing
seg faults.
A more complete solution may follow.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=590786&r1=590785&r2=590786&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Oct 31
11:47:28 2007
@@ -81,12 +81,14 @@
}
void SessionHandler::handleOut(AMQFrame& f) {
- if (!session.get())
- throw InternalErrorException(
- QPID_MSG("attempt to send frame on detached channel."));
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.solicitAck();
+ ConditionalScopedLock<Semaphore> s(suspension);
+ if (s.lockAcquired() && session.get() && session->isAttached()) {
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
+ } else {
+ QPID_LOG(warning, "Dropping frame as session is no longer attached to
a channel: " << f);
+ }
}
void SessionHandler::assertAttached(const char* method) const {
@@ -150,7 +152,8 @@
}
void SessionHandler::localSuspend() {
- if (session.get()) {
+ ScopedLock<Semaphore> s(suspension);
+ if (session.get() && session->isAttached()) {
session->detach();
connection.broker.getSessionManager().suspend(session);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=590786&r1=590785&r2=590786&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Oct 31
11:47:28 2007
@@ -27,6 +27,8 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Semaphore.h"
#include <boost/noncopyable.hpp>
@@ -94,6 +96,7 @@
bool ignoring;
bool resuming;
std::auto_ptr<SessionState> session;
+ sys::Semaphore suspension;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h?rev=590786&r1=590785&r2=590786&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h Wed Oct 31 11:47:28 2007
@@ -66,6 +66,18 @@
L& mutex;
};
+template <class L>
+class ConditionalScopedLock
+{
+ public:
+ ConditionalScopedLock(L& l) : mutex(l) { acquired = l.trylock(); }
+ ~ConditionalScopedLock() { if (acquired) mutex.unlock(); }
+ bool lockAcquired() { return acquired; }
+ private:
+ L& mutex;
+ bool acquired;
+};
+
}}
#ifdef USE_APR_PLATFORM
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h?rev=590786&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h Wed Oct 31 11:47:28
2007
@@ -0,0 +1,67 @@
+#ifndef _sys_Semaphore_h
+#define _sys_Semaphore_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+class Semaphore
+{
+public:
+ Semaphore(uint c = 1) : count(c) {}
+
+ void lock() { acquire(); }
+ void unlock() { release(); }
+ bool trylock() { return tryAcquire(); }
+
+ bool tryAcquire()
+ {
+ Monitor::ScopedLock l(monitor);
+ if (count) {
+ count--;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void acquire()
+ {
+ Monitor::ScopedLock l(monitor);
+ while (count == 0) monitor.wait();
+ count--;
+ }
+
+ void release()
+ {
+ Monitor::ScopedLock l(monitor);
+ if (!count++) monitor.notifyAll();
+ }
+
+private:
+ Monitor monitor;
+ uint count;
+};
+
+}}
+
+#endif /*!_sys_Semaphore_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h
------------------------------------------------------------------------------
svn:keywords = Rev Date