Author: aconway
Date: Tue May 27 11:38:06 2008
New Revision: 660643

URL: http://svn.apache.org/viewvc?rev=660643&view=rev
Log:
Tighten up sync-correctness in SubscriptionManager & Dispatcher.
Add a flush to SessionBase_0_10::sync() so it syncs in both directions.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=660643&r1=660642&r2=660643&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Tue May 27 
11:38:06 2008
@@ -89,7 +89,7 @@
                 }
             }
         }
-        sync(session).sync(); // Make sure all our acks are received before 
returning.
+        session.sync(); // Make sure all our acks are received before 
returning.
     }
     catch (const ClosedException&) {} //ignore it and return
     catch (const std::exception& e) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp?rev=660643&r1=660642&r2=660643&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp Tue May 
27 11:38:06 2008
@@ -42,6 +42,7 @@
 
 void SessionBase_0_10::sync()
 {
+    impl->sendFlush();          // Let the peer know our state.
     ExecutionSyncBody b;
     b.setSync(true);
     impl->send(b).wait(*impl);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h?rev=660643&r1=660642&r2=660643&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h Tue May 27 
11:38:06 2008
@@ -75,9 +75,9 @@
      */
     void close();
     
-    /** Synchronize the session: sync() waits until all commands
-     * issued on this session have been completed. It is equivalent to
-     * calling Session::executionSync()
+    /**
+     * Synchronize the session: sync() waits until all commands issued
+     * on this session so far have been completed by the broker.
      *
      * Note sync() is always synchronous, even on an AsyncSession object
      * because that's almost always what you want. You can call

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=660643&r1=660642&r2=660643&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Tue 
May 27 11:38:06 2008
@@ -42,7 +42,7 @@
 void SubscriptionManager::subscribeInternal(
     const std::string& q, const std::string& dest)
 {
-    async(session).messageSubscribe( // setFlowControl will sync.
+    session.messageSubscribe( 
         arg::queue=q, arg::destination=dest,
         arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
     setFlowControl(dest, messages, bytes, window); 
@@ -68,9 +68,10 @@
 void SubscriptionManager::setFlowControl(
     const std::string& dest, uint32_t messages,  uint32_t bytes, bool window)
 {
-    async(session).messageSetFlowMode(dest, window); 
-    async(session).messageFlow(dest, 0, messages); 
-    session.messageFlow(dest, 1, bytes); // Only need one sync
+    session.messageSetFlowMode(dest, window); 
+    session.messageFlow(dest, 0, messages); 
+    session.messageFlow(dest, 1, bytes);
+    session.sync();
 }
 
 void SubscriptionManager::setFlowControl(
@@ -91,8 +92,8 @@
 
 void SubscriptionManager::cancel(const std::string dest)
 {
+    sync(session).messageCancel(dest);
     dispatcher.cancel(dest);
-    session.messageCancel(dest);
 }
 
 void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=660643&r1=660642&r2=660643&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Tue May 
27 11:38:06 2008
@@ -48,7 +48,7 @@
     void subscribeInternal(const std::string& q, const std::string& dest);
     
     qpid::client::Dispatcher dispatcher;
-    qpid::client::Session session;
+    qpid::client::AsyncSession session;
     uint32_t messages;
     uint32_t bytes;
     bool window;


Reply via email to