Author: rajith
Date: Tue Oct 16 16:43:55 2007
New Revision: 585289
URL: http://svn.apache.org/viewvc?rev=585289&view=rev
Log:
There was an issue with the receiveNoWait method.
I modified it to use the getMessageFromQueue(long l) method by passing a -1.
In this case it will use the same logic as the receive(long timeout) method
expect that it will not block on the queue when it does a poll
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=585289&r1=585288&r2=585289&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Tue Oct 16 16:43:55 2007
@@ -408,7 +408,7 @@
return null;
}
- Object o = _synchronousQueue.poll();
+ Object o = getMessageFromQueue(-1);
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
@@ -417,6 +417,12 @@
}
return m;
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+
+ return null;
}
finally
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=585289&r1=585288&r2=585289&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Oct 16 16:43:55 2007
@@ -253,6 +253,11 @@
{
throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when
evaluating message selector", e);
}
+
+
System.out.println("---------------------------------------------------------");
+ System.out.println("messageOk : " + messageOk + " pre-acquire mode : "
+ _preAcquire);
+
System.out.println("---------------------------------------------------------");
+
if (_logger.isDebugEnabled())
{
_logger.debug("messageOk " + messageOk);
@@ -396,9 +401,20 @@
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
- if (l > 0)
+ if (l == 0)
{
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ o = _synchronousQueue.take();
+ }
+ else
+ {
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ o = _synchronousQueue.poll();
+ }
if (o == null)
{
_logger.debug("Message Didn't arrive in time, checking if one
is inflight");
@@ -415,10 +431,6 @@
System.out.println("null");
}
}
- }
- else
- {
- o = _synchronousQueue.take();
}
return o;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=585289&r1=585288&r2=585289&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
Tue Oct 16 16:43:55 2007
@@ -94,6 +94,10 @@
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
}
+ if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
else
{
o = _synchronousQueue.take();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=585289&r1=585288&r2=585289&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Tue Oct 16 16:43:55 2007
@@ -67,6 +67,14 @@
message.prepareForSending();
org.apache.qpidity.api.Message qpidityMessage = new
ByteBufferMessage();
// set the payload
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Message Props: " + message.toString());
+ }
+
+ //System.out.println("Message Props" + message.toString());
+
try
{
if (message.getData() != null)