Author: arnaudsimon
Date: Fri Oct 19 04:28:00 2007
New Revision: 586382

URL: http://svn.apache.org/viewvc?rev=586382&view=rev
Log:
changed to handle async pre-fetch

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=586382&r1=586381&r2=586382&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Fri Oct 19 04:28:00 2007
@@ -207,7 +207,7 @@
         return _acknowledgeMode;
     }
 
-    private boolean isMessageListenerSet()
+    protected boolean isMessageListenerSet()
     {
         return _messageListener.get() != null;
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=586382&r1=586381&r2=586382&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Fri Oct 19 04:28:00 2007
@@ -53,6 +53,12 @@
      * A counter for keeping the number of available messages for this consumer
      */
     private final AtomicLong _messageCounter = new AtomicLong(0);
+
+    /**
+     * Number of received message so far
+     */
+      private final AtomicLong _messagesReceived = new AtomicLong(0);
+
     /**
      * This class logger
      */
@@ -135,6 +141,18 @@
 
     public void onMessage(Message message)
     {
+        if( isMessageListenerSet())
+        {
+            _messagesReceived.incrementAndGet();
+            if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+            {
+                // require more credit
+                 
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                                                          
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                                          
AMQSession_0_10.MAX_PREFETCH);
+                _messagesReceived.set(0);
+            }
+        }
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
         String consumerTag = getConsumerTag().toString();
@@ -417,6 +435,7 @@
                                                           
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                           0xFFFFFFFF);
                 _0_10session.getQpidSession().sync();
+                _messagesReceived.set(0);;
             }
         }
     }


Reply via email to