Author: ritchiem
Date: Thu Oct 11 10:02:28 2007
New Revision: 583882
URL: http://svn.apache.org/viewvc?rev=583882&view=rev
Log:
QPID-637 Patch submitted by Aidan Skinner to address receive not waiting for
full timeout.
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=583882&r1=583881&r2=583882&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Oct 11 10:02:28 2007
@@ -357,27 +357,48 @@
Object o = null;
if (l > 0)
{
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ long endtime = System.currentTimeMillis() + l;
+ while (System.currentTimeMillis() < endtime && o == null)
+ {
+ try
+ {
+ o = _synchronousQueue.poll(endtime -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
+ }
}
else
{
- o = _synchronousQueue.take();
+ while (o == null)
+ {
+ try
+ {
+ o = _synchronousQueue.take();
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
+ }
}
-
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
preApplicationProcessing(m);
postDeliver(m);
}
-
return m;
- }
- catch (InterruptedException e)
- {
- _logger.warn("Interrupted: " + e);
-
- return null;
}
finally
{