Author: arnaudsimon
Date: Wed Oct 17 07:09:46 2007
New Revision: 585514
URL: http://svn.apache.org/viewvc?rev=585514&view=rev
Log:
Cahnged flow control for message selector
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=585514&r1=585513&r2=585514&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Wed Oct 17 07:09:46 2007
@@ -298,12 +298,24 @@
// the current message received is not good, so we need to get a
message.
if (getMessageListener() == null)
{
+ int oldval = _messageCounter.intValue();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
+ if( _messageCounter.intValue() <= oldval )
+ {
+ // we haven't received a message so tell the receiver to
return null
+ _synchronousQueue.add(new NullTocken());
+ }
+ else
+ {
+ _messageCounter.decrementAndGet();
+ }
}
+ // we now need to check if we have received a message
+
}
catch(Exception e)
{
@@ -378,11 +390,6 @@
return result;
}
- void preDeliver(AbstractJMSMessage msg)
- {
- _messageCounter.decrementAndGet();
- super.preDeliver(msg);
- }
public void setMessageListener(final MessageListener messageListener)
throws JMSException
{
@@ -443,12 +450,23 @@
{
o = _synchronousQueue.take();
}
- else
- {
- System.out.println("null");
- }
}
}
+ if( o instanceof NullTocken )
+ {
+ o = null;
+ }
return o;
+ }
+
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws
JMSException
+ {
+ _messageCounter.decrementAndGet();
+ super.preApplicationProcessing(jmsMsg);
+ }
+
+ private class NullTocken
+ {
+
}
}