Author: arnaudsimon
Date: Wed Oct 17 07:09:46 2007
New Revision: 585514

URL: http://svn.apache.org/viewvc?rev=585514&view=rev
Log:
Cahnged flow control for message selector

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=585514&r1=585513&r2=585514&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Wed Oct 17 07:09:46 2007
@@ -298,12 +298,24 @@
             // the current message received is not good, so we need to get a 
message.
             if (getMessageListener() == null)
             {
+                  int oldval = _messageCounter.intValue();
                 
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                     
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
                 
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
                 _0_10session.getQpidSession().sync();
                 
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
 0xFFFFFFFF);
+                if( _messageCounter.intValue() <= oldval )
+                {
+                    // we haven't received a message so tell the receiver to 
return null                     
+                    _synchronousQueue.add(new NullTocken());
+                }
+                else
+                {
+                    _messageCounter.decrementAndGet();
+                }
             }
+            // we now need to check if we have received a message
+
         }
         catch(Exception e)
         {
@@ -378,11 +390,6 @@
         return result;
     }
 
-    void preDeliver(AbstractJMSMessage msg)
-    {
-        _messageCounter.decrementAndGet();
-         super.preDeliver(msg);
-    }
 
     public void setMessageListener(final MessageListener messageListener) 
throws JMSException
     {
@@ -443,12 +450,23 @@
                 {
                       o = _synchronousQueue.take();
                 }
-                else
-                {
-                    System.out.println("null");
-                }
             }
         }
+        if( o instanceof NullTocken )
+        {
+            o = null;
+        }
         return o;
+    }
+
+     protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws 
JMSException
+    {
+        _messageCounter.decrementAndGet();
+        super.preApplicationProcessing(jmsMsg);
+    }
+
+    private class NullTocken
+    {
+
     }
 }


Reply via email to