Author: arnaudsimon
Date: Fri Oct 19 04:28:00 2007
New Revision: 586382
URL: http://svn.apache.org/viewvc?rev=586382&view=rev
Log:
changed to handle async pre-fetch
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=586382&r1=586381&r2=586382&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Fri Oct 19 04:28:00 2007
@@ -207,7 +207,7 @@
return _acknowledgeMode;
}
- private boolean isMessageListenerSet()
+ protected boolean isMessageListenerSet()
{
return _messageListener.get() != null;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=586382&r1=586381&r2=586382&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Fri Oct 19 04:28:00 2007
@@ -53,6 +53,12 @@
* A counter for keeping the number of available messages for this consumer
*/
private final AtomicLong _messageCounter = new AtomicLong(0);
+
+ /**
+ * Number of received message so far
+ */
+ private final AtomicLong _messagesReceived = new AtomicLong(0);
+
/**
* This class logger
*/
@@ -135,6 +141,18 @@
public void onMessage(Message message)
{
+ if( isMessageListenerSet())
+ {
+ _messagesReceived.incrementAndGet();
+ if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+ {
+ // require more credit
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+
AMQSession_0_10.MAX_PREFETCH);
+ _messagesReceived.set(0);
+ }
+ }
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -417,6 +435,7 @@
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ _messagesReceived.set(0);;
}
}
}