Author: ritchiem
Date: Thu Apr 19 08:08:04 2007
New Revision: 530442

URL: http://svn.apache.org/viewvc?view=rev&rev=530442
Log:
QPID-455 Pre-fetched messages can cause problems with client tools. Set 
IMMEDIATE_PREFETCH="true" for previous behaviour.
Inverted check now setting System proprety IMMEDIATE_PREFETCH="true" will cause 
existing messages to be immediately pre-fetched to the newly registered 
consumer.

Solved out standing broker issues see QPID-458 and QPID-459.

Modified:
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=530442&r1=530441&r2=530442
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Apr 19 08:08:04 2007
@@ -199,11 +199,18 @@
 
     private final Object _suspensionLock = new Object();
 
-    /** Responsible for decoding a message fragment and passing it to the 
appropriate message consumer. */
+    /** Boolean to control immediate prefetch . Records the first call to the 
dispatcher to prevent further flow(true) */
+    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+
+    /** System property to enable immediate message prefetching */
+    private static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+    /** Immediate message prefetch default */
+    private static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
 
     private static final Logger _dispatcherLogger = 
Logger.getLogger(Dispatcher.class);
-    private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
+    /** Responsible for decoding a message fragment and passing it to the 
appropriate message consumer. */
     private class Dispatcher extends Thread
     {
 
@@ -1932,20 +1939,19 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
-        if 
(Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED",
 "false")))
+        // If IMMEDIATE_PREFETCH is not set then we need to start fetching     
     
+        if 
(!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT)))
         {
-//            if (!connectionStopped)
+            // We do this now if this is the first call on a started connection
+            if (isSuspended() && _startedAtLeastOnce.get() && 
_firstDispatcher.getAndSet(false))
             {
-                if (isSuspended() && _firstDispatcher.getAndSet(false))
+                try
                 {
-                    try
-                    {
-                        suspendChannel(false);
-                    }
-                    catch (AMQException e)
-                    {
-                        _logger.info("Suspending channel threw an exception:" 
+ e);
-                    }
+                    suspendChannel(false);
+                }
+                catch (AMQException e)
+                {
+                    _logger.info("Suspending channel threw an exception:" + e);
                 }
             }
         }
@@ -1998,11 +2004,12 @@
 
         bindQueue(amqd, queueName, protocolHandler, 
consumer.getRawSelectorFieldTable());
 
-        // The dispatcher will be null if we have just created this session
-        // so suspend the channel before we register our consumer so that we 
don't
-        // start prefetching until a receive/mListener is set.
-        if 
(Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED",
 "false")))
+        // If IMMEDIATE_PREFETCH is not required then suspsend the channel to 
delay prefetch
+        if 
(!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, 
IMMEDIATE_PREFETCH_DEFAULT)))
         {
+            // The dispatcher will be null if we have just created this session
+            // so suspend the channel before we register our consumer so that 
we don't
+            // start prefetching until a receive/mListener is set.
             if (_dispatcher == null)
             {
                 if (!isSuspended())
@@ -2010,6 +2017,7 @@
                     try
                     {
                         suspendChannel(true);
+                        _logger.info("Prefetching delayed existing messages 
will not flow until requested via receive*() or setML().");
                     }
                     catch (AMQException e)
                     {
@@ -2017,6 +2025,10 @@
                     }
                 }
             }
+        }
+        else
+        {
+            _logger.info("Immediately prefetching existing messages to new 
consumer.");
         }
 
         try


Reply via email to