Author: aconway
Date: Mon Oct 29 14:14:44 2007
New Revision: 589857

URL: http://svn.apache.org/viewvc?rev=589857&view=rev
Log:

client/BlockingQueue.h, sys/ConcurrentQueue.h: merged to sys/BlockingQueue.h
 - updated all users      

qpid/Exception.h: Removed unimplemented clone() function.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 29 14:14:44 2007
@@ -294,7 +294,6 @@
   qpid/broker/TxOp.h \
   qpid/broker/TxPublish.h \
   qpid/client/AckMode.h \
-  qpid/client/BlockingQueue.h \
   qpid/client/ChainableFrameHandler.h  \
   qpid/client/Channel.h \
   qpid/client/Exchange.h \
@@ -378,7 +377,7 @@
   qpid/sys/Acceptor.h \
   qpid/sys/AsynchIO.h \
   qpid/sys/AtomicCount.h \
-  qpid/sys/ConcurrentQueue.h \
+  qpid/sys/BlockingQueue.h \
   qpid/sys/Condition.h \
   qpid/sys/ConnectionInputHandler.h \
   qpid/sys/ConnectionInputHandlerFactory.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Mon Oct 29 14:14:44 
2007
@@ -46,8 +46,4 @@
 
 const char* Exception::what() const throw() { return str().c_str(); }
 
-std::auto_ptr<Exception> Exception::clone() const throw() {
-    return std::auto_ptr<Exception>(new Exception(*this));
-}
-
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Mon Oct 29 14:14:44 2007
@@ -44,7 +44,6 @@
     virtual ~Exception() throw();
     
     virtual const char *what() const throw();
-    virtual std::auto_ptr<Exception> clone() const throw();
     virtual std::string str() const throw();
   private:
     std::string msg;
@@ -61,14 +60,6 @@
     ConnectionException(framing::ReplyCode code_, const std::string& message)
         : Exception(message), code(code_) {}
 };
-
-/** Clone an exception.
- * For qpid::Exception this calls the clone member function.
- * For standard exceptions, uses the copy constructor.
- * For unknown exception types creates a std::exception
- * with the same what() string.
- */
-std::auto_ptr<std::exception> clone(const std::exception&);
 
 } // namespace qpid
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Mon Oct 29 
14:14:44 2007
@@ -187,13 +187,14 @@
     status.sync();
     session.messageCancel(tag);
 
-    if (incoming.empty()) {
-        return false;
-    } else {
-        msg.populate(*(incoming.pop()));
+    FrameSet::shared_ptr p;
+    if (incoming.tryPop(p)) {
+        msg.populate(*p);
         if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
         return true;
     }
+    else
+        return false;
 }
 
 void Channel::publish(Message& msg, const Exchange& exchange,
@@ -263,7 +264,7 @@
                 QPID_LOG(warning, "Dropping unsupported message type: " << 
content->getMethod());                        
             }
         }
-    } catch (const QueueClosed&) {}
+    } catch (const sys::QueueClosed&) {}
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.h Mon Oct 29 14:14:44 
2007
@@ -80,7 +80,7 @@
     ConsumerMap consumers;
     Session_0_10 session;
     framing::ChannelId channelId;
-    BlockingQueue<framing::FrameSet::shared_ptr> gets;
+    sys::BlockingQueue<framing::FrameSet::shared_ptr> gets;
     framing::Uuid uniqueId;
     uint32_t nameCounter;
     bool active;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Mon Oct 29 14:14:44 
2007
@@ -24,7 +24,7 @@
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/FrameSet.h"
 #include "qpid/sys/Mutex.h"
-#include "BlockingQueue.h"
+#include "qpid/sys/BlockingQueue.h"
 
 #ifndef _Demux_
 #define _Demux_
@@ -44,7 +44,7 @@
 {
 public:
     typedef boost::function<bool(const framing::FrameSet&)> Condition;
-    typedef BlockingQueue<framing::FrameSet::shared_ptr> Queue;
+    typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
 
     void handle(framing::FrameSet::shared_ptr);
     void close();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Oct 29 
14:14:44 2007
@@ -24,7 +24,7 @@
 #include "qpid/framing/FrameSet.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
-#include "BlockingQueue.h"
+#include "qpid/sys/BlockingQueue.h"
 #include "Message.h"
 
 using qpid::framing::FrameSet;
@@ -62,7 +62,7 @@
 
 void Dispatcher::run()
 {    
-    BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ? 
+    sys::BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ? 
         session.execution().getDemux().getDefault() : 
         session.execution().getDemux().get(queue); 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h Mon Oct 29 
14:14:44 2007
@@ -22,28 +22,29 @@
 #ifndef _MessageQueue_
 #define _MessageQueue_
 #include <iostream>
-#include "BlockingQueue.h"
+#include "qpid/sys/BlockingQueue.h"
 #include "MessageListener.h"
 
 namespace qpid {
 namespace client {
 
-    /**
-     * A MessageListener implementation that simply queues up
-     * messages.
-     *
-     * \ingroup clientapi
-     */
-    class MessageQueue : public MessageListener, public BlockingQueue<Message>
+/**
+ * A MessageListener implementation that simply queues up
+ * messages.
+ *
+ * \ingroup clientapi
+ */
+class MessageQueue : public MessageListener,
+                     public sys::BlockingQueue<Message>
+{
+    std::queue<Message> messages;
+  public:
+    void received(Message& msg)
     {
-        std::queue<Message> messages;
-    public:
-        void received(Message& msg)
-        {
-            std::cout << "Adding message to queue: " << msg.getData() << 
std::endl;
-            push(msg);
-        }
-    };
+        std::cout << "Adding message to queue: " << msg.getData() << std::endl;
+        push(msg);
+    }
+};
 
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Mon Oct 29 
14:14:44 2007
@@ -120,7 +120,7 @@
     Lock l(state);
     invariant();
     detach(COMMAND_INVALID, "Session deleted");
-    state.waitAll();
+    state.waitWaiters();
 }
 
 void SessionCore::detach(int c, const std::string& t) {

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=589857&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Mon Oct 29 
14:14:44 2007
@@ -0,0 +1,127 @@
+#ifndef QPID_SYS_BLOCKINGQUEUE_H
+#define QPID_SYS_BLOCKINGQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Waitable.h"
+
+#include <queue>
+
+namespace qpid {
+namespace sys {
+
+struct QueueClosed {};
+
+template <class T>
+class BlockingQueue
+{
+    sys::Waitable lock;
+    std::queue<T> queue;
+    bool closed;
+
+public:
+    BlockingQueue() : closed(false) {}
+    ~BlockingQueue() { close(); }
+
+    /** Block until there is a value to pop */
+    T pop()
+    {
+        Waitable::ScopedLock l(lock);
+        if (!queueWait()) throw QueueClosed();
+        return popInternal();
+    }
+
+    /** Non-blocking pop. If there is a value set outValue and return
+     * true, else return false;
+     */
+    bool tryPop(T& outValue) {
+        Waitable::ScopedLock l(lock);
+        if (queue.empty()) return false;
+        outValue = popInternal();
+        return true;
+    }
+
+    /** Non-blocking pop. If there is a value return it, else return
+     * valueIfEmpty.
+     */
+    T tryPop(const T& valueIfEmpty=T()) {
+        T result=valueIfEmpty;
+        tryPop(result);
+        return result;
+    }
+
+    /** Push a value onto the queue */
+    void push(const T& t)
+    {
+        Waitable::ScopedLock l(lock);
+        queue.push(t);
+        queueNotify(0);
+    }
+
+    /**
+     * Close the queue. Throws QueueClosed in threads waiting in pop().
+     * Blocks till all waiting threads have been notified.
+     */ 
+    void close()
+    {
+        Waitable::ScopedLock l(lock);
+        if (!closed) {
+            closed = true;
+            lock.notifyAll();
+            lock.waitWaiters(); // Ensure no threads are still waiting.
+        }
+    }
+
+    /** Open a closed queue. */
+    void open() {
+        Waitable::ScopedLock l(lock);
+        closed=false;
+    }
+
+  private:
+
+    void queueNotify(size_t ignore) {
+        if (!queue.empty() && lock.hasWaiters()>ignore)
+            lock.notify();      // Notify another waiter.
+    }
+
+    bool queueWait() {
+        Waitable::ScopedWait w(lock);
+        while (!closed && queue.empty())
+            lock.wait();
+        return !queue.empty();
+    }
+
+    T popInternal() {
+        T t=queue.front();
+        queue.pop();
+        queueNotify(1);
+        return t;
+    }
+    
+};
+
+}}
+
+
+
+#endif  /*!QPID_SYS_BLOCKINGQUEUE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h Mon Oct 29 14:14:44 
2007
@@ -29,9 +29,9 @@
 namespace sys {
 
 /**
- * A monitor that keeps track of waiting threads.
- * Threads that use a WaitLock are counted as waiters, threads that
- * use a normal ScopedLock are not considered waiters.
+ * A monitor that keeps track of waiting threads.  Threads declare a
+ * ScopedWait around wait() inside a ScopedLock to be considered
+ * waiters.
  */
 class Waitable : public Monitor {
   public:
@@ -43,24 +43,22 @@
     struct ScopedWait {
         Waitable& w;
         ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; }
-        ~ScopedWait() { --w.waiters; w.notifyAll(); }
+        ~ScopedWait() { if (--w.waiters==0) w.notifyAll(); }
     };
 
-    /** Block till all waiters have finished waiting.
-     * The calling thread does not count as a waiter.
+    /** Block till there are no more ScopedWaits.
      [EMAIL PROTECTED] Must be called inside a ScopedLock but NOT a ScopedWait.
      */
-    bool waitAll(Duration timeout=TIME_INFINITE) {
-        AbsTime deadline(now(), timeout);
-        while (waiters > 0) {
-            if (!wait(deadline)) {
-                assert(timeout != TIME_INFINITE);
-                return false;
-            }
-        }
-        return true;
+    void waitWaiters() {
+        while (waiters != 0) 
+            wait();
     }
 
+    /** Returns the number of outstanding ScopedWaits.
+     * Must be called with the lock held.
+     */
+    size_t hasWaiters() { return waiters; }
+    
   private:
   friend struct ScopedWait;
     size_t waiters;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Mon Oct 29 14:14:44 2007
@@ -36,8 +36,7 @@
     TestCluster cluster("clusterOne", "amqp:one:1");
     AMQFrame send(1, SessionOpenBody(VER));
     cluster.handle(send);
-    AMQFrame received;
-    BOOST_REQUIRE(cluster.received.waitPop(received));
+    AMQFrame received = cluster.received.pop();
     BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *received.getBody());
     BOOST_CHECK_EQUAL(1u, cluster.size());
     Cluster::MemberList members = cluster.getMembers();
@@ -62,11 +61,10 @@
         // Exchange frames with child.
         AMQFrame send(1, SessionOpenBody(VER));
         cluster.handle(send);
-        AMQFrame received;
-        BOOST_REQUIRE(cluster.received.waitPop(received));
+        AMQFrame received = cluster.received.pop();
         BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *received.getBody());
         
-        BOOST_REQUIRE(cluster.received.waitPop(received));
+        received=cluster.received.pop();
         BOOST_CHECK_TYPEID_EQUAL(SessionAttachedBody, *received.getBody());
 
         if (!nofork) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Mon Oct 29 14:14:44 2007
@@ -20,7 +20,7 @@
  */
 
 #include "qpid/cluster/Cluster.h"
-#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/BlockingQueue.h"
 #include "qpid/framing/AMQFrame.h"
 
 #include <boost/bind.hpp>
@@ -45,12 +45,10 @@
 void null_deleter(void*) {}
 
 template <class T>
-class TestHandler : public Handler<T&>, public ConcurrentQueue<T>
+class TestHandler : public Handler<T&>, public BlockingQueue<T>
 {
   public:
     void handle(T& frame) { push(frame); }
-    bool waitPop(T& x) { return waitPop(x, TIME_SEC); }
-    using ConcurrentQueue<T>::waitPop;
 };
 
 typedef TestHandler<AMQFrame> TestFrameHandler;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp Mon Oct 29 
14:14:44 2007
@@ -35,8 +35,7 @@
 /** Child part of Cluster::clusterTwo test */
 void clusterTwo() {
     TestCluster cluster("clusterTwo", "amqp:child:2");
-    AMQFrame frame;
-    BOOST_REQUIRE(cluster.received.waitPop(frame)); // Frame from parent.
+    AMQFrame frame = cluster.received.pop(frame); // Frame from parent.
     BOOST_CHECK_TYPEID_EQUAL(SessionOpenBody, *frame.getBody());
     BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp Mon Oct 29 
14:14:44 2007
@@ -20,10 +20,10 @@
  */
 
 /[EMAIL PROTECTED]
- * Compare alternative implementations for ConcurrentQueue.
+ * Compare alternative implementations for BlockingQueue.
  */
 
-#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/BlockingQueue.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
@@ -83,7 +83,7 @@
     typename std::vector<T>::iterator popIter;
 };
 
-template <class T> struct LockedDequeQueue : public ConcurrentQueue<T> {
+template <class T> struct LockedDequeQueue : public BlockingQueue<T> {
     /** size_t ignored, can't pre-allocate space in a dequeue */
     LockedDequeQueue(size_t=0) {};
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=589857&r1=589856&r2=589857&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Mon Oct 29 
14:14:44 2007
@@ -26,7 +26,7 @@
 #include "qpid/client/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Thread.h"
-#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/sys/BlockingQueue.h"
 #include "qpid/shared_ptr.h"
 
 #include <vector>
@@ -65,26 +65,29 @@
         }
 
         ~NetworkQueue() { 
-            queue.shutdown();
+            queue.close();
             thread.join();
         }
 
         void push(AMQFrame& f) { queue.push(f); }
 
         void run() {
-            AMQFrame f;
-            while (queue.waitPop(f)) {
-                Lock l(lock);
-                if (inputHandler) { 
-                    QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
-                    inputHandler->handle(f);
-                }
-                else {
-                    QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
+            try {
+                while(true) {
+                    AMQFrame f = queue.pop();
+                    if (inputHandler) { 
+                        QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
+                        inputHandler->handle(f);
+                    }
+                    else 
+                        QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
                 }
             }
+            catch (const sys::QueueClosed&) {
+                return;
+            }
         }
-
+        
         void setInputHandler(FrameHandler* h) {
             Lock l(lock);
             inputHandler = h;
@@ -92,7 +95,7 @@
         
       private:
         sys::Mutex lock;
-        sys::ConcurrentQueue<AMQFrame> queue;
+        sys::BlockingQueue<AMQFrame> queue;
         sys::Thread thread;
         FrameHandler* inputHandler;
         const char* const receiver;


Reply via email to