Author: arnaudsimon
Date: Mon Oct 15 10:21:44 2007
New Revision: 584826
URL: http://svn.apache.org/viewvc?rev=584826&view=rev
Log:
increased number of runs
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=584826&r1=584825&r2=584826&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Oct 15 10:21:44 2007
@@ -266,7 +266,7 @@
}
}
- private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws
JMSException
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws
JMSException
{
if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=584826&r1=584825&r2=584826&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Oct 15 10:21:44 2007
@@ -40,6 +40,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This is a 0.10 message consumer.
@@ -48,6 +49,10 @@
implements org.apache.qpidity.nclient.util.MessageListener
{
/**
+ * A counter for keeping the number of available messages for this consumer
+ */
+ private final AtomicLong _messageCounter = new AtomicLong(0);
+ /**
* This class logger
*/
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -78,7 +83,7 @@
super(channelId, connection, destination, messageSelector, noLocal,
messageFactory, session, protocolHandler,
rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && messageSelector != "")
+ if (messageSelector != null && ! messageSelector.equals("") )
{
try
{
@@ -161,6 +166,8 @@
newMessage.setReplyToURL(replyToUrl);
}
newMessage.setContentHeader(headers);
+ // increase the counter of messages
+ _messageCounter.incrementAndGet();
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -348,6 +355,11 @@
return result;
}
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg)
throws JMSException
+ {
+ _messageCounter.decrementAndGet();
+ super.preApplicationProcessing(jmsMsg);
+ }
public void setMessageListener(final MessageListener messageListener)
throws JMSException
{
@@ -393,7 +405,10 @@
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
- o = _synchronousQueue.poll();
+ if( _messageCounter.get() > 0 )
+ {
+ o = _synchronousQueue.take();
+ }
}
}
else