Author: ritchiem
Date: Wed Apr 18 08:11:22 2007
New Revision: 530049

URL: http://svn.apache.org/viewvc?view=rev&rev=530049
Log:
QPID-455 Prefetched messages can cause problems with client tools.
Removed the changes as this was causing problems. Guarded with a check for now 
but solution is till not correct.

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=530049&r1=530048&r2=530049
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Apr 18 08:11:22 2007
@@ -1932,6 +1932,24 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
+        if 
(Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED",
 "false")))
+        {
+//            if (!connectionStopped)
+            {
+                if (isSuspended() && _firstDispatcher.getAndSet(false))
+                {
+                    try
+                    {
+                        suspendChannel(false);
+                    }
+                    catch (AMQException e)
+                    {
+                        _logger.info("Suspending channel threw an exception:" 
+ e);
+                    }
+                }
+            }
+        }
+
         startDistpatcherIfNecessary(false);
     }
 
@@ -1948,24 +1966,6 @@
         {
             _dispatcher.setConnectionStopped(initiallyStopped);
         }
-
-        if (!AMQSession.this._closed.get()
-            && AMQSession.this._startedAtLeastOnce.get()
-            && _firstDispatcher.getAndSet(false))
-        {
-            if (isSuspended())
-            {
-                try
-                {
-                    suspendChannel(false);
-                }
-                catch (AMQException e)
-                {
-                    _logger.info("Suspending channel threw an exception:" + e);
-                }
-            }
-        }
-
     }
 
     void stop() throws AMQException
@@ -1998,17 +1998,23 @@
 
         bindQueue(amqd, queueName, protocolHandler, 
consumer.getRawSelectorFieldTable());
 
-        if (_dispatcher == null)
+        // The dispatcher will be null if we have just created this session
+        // so suspend the channel before we register our consumer so that we 
don't
+        // start prefetching until a receive/mListener is set.
+        if 
(Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED",
 "false")))
         {
-            if (!isSuspended())
+            if (_dispatcher == null)
             {
-                try
-                {
-                    suspendChannel(true);
-                }
-                catch (AMQException e)
+                if (!isSuspended())
                 {
-                    _logger.info("Suspending channel threw an exception:" + e);
+                    try
+                    {
+                        suspendChannel(true);
+                    }
+                    catch (AMQException e)
+                    {
+                        _logger.info("Suspending channel threw an exception:" 
+ e);
+                    }
                 }
             }
         }


Reply via email to