Author: arnaudsimon
Date: Wed Feb 6 07:14:42 2008
New Revision: 619012
URL: http://svn.apache.org/viewvc?rev=619012&view=rev
Log:
Changed for using Window mode see QPID-778
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=619012&r1=619011&r2=619012&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Feb 6 07:14:42 2008
@@ -375,7 +375,7 @@
consumer.isNoLocal() ?
Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ?
Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
if(consumer.isStrated())
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=619012&r1=619011&r2=619012&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Wed Feb 6 07:14:42 2008
@@ -46,10 +46,6 @@
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[],
ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * Number of received message so far
- */
- private final AtomicLong _messagesReceived = new AtomicLong(0);
/**
* This class logger
@@ -118,7 +114,6 @@
*/
public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
- _messagesReceived.incrementAndGet();
boolean messageOk = false;
try
{
@@ -143,20 +138,7 @@
}
}
- /**
- * Require more credit for this consumer
- */
- private void requireMoreCreditIfNecessary()
- {
- if (_isStarted && _messagesReceived.get() >=
AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
-
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
- }
+
/**
* This method is invoked by the transport layer when a message is
delivered for this
@@ -239,14 +221,6 @@
{
// notify the session
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
- if (isMessageListenerSet())
- {
- requireMoreCreditIfNecessary();
- }
- else if (_synchronousQueue.isEmpty())
- {
- requireMoreCreditIfNecessary();
- }
//if (!Boolean.getBoolean("noAck"))
//{
super.postDeliver(msg);
@@ -458,7 +432,6 @@
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
- _messagesReceived.set(0);
}
}
}