Author: rhs
Date: Mon Jun 16 14:04:01 2008
New Revision: 668311
URL: http://svn.apache.org/viewvc?rev=668311&view=rev
Log:
QPID-1139: use RFC1982 comparisons for rollback mark and update rollback mark
to track dispatched messages
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Mon Jun 16 14:04:01 2008
@@ -63,6 +63,7 @@
}
catch (Exception e)
{
+ _logger.error("exception creating session:", e);
throw new JMSAMQException("cannot create session", e);
}
return session;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Jun 16 14:04:01 2008
@@ -2745,6 +2745,8 @@
{
while (!_closed.get() && ((message = (UnprocessedMessage)
_queue.take()) != null))
{
+ long deliveryTag = message.getDeliveryTag();
+
synchronized (_lock)
{
@@ -2753,27 +2755,24 @@
_lock.wait();
}
- if (message.getDeliveryTag() <= _rollbackMark.get())
+ if (tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
else
{
- if (message.getDeliveryTag() <=
_rollbackMark.get())
- {
- rejectMessage(message, true);
- }
- else
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
- {
- dispatchMessage(message);
- }
+ dispatchMessage(message);
}
}
-
}
+ long current = _rollbackMark.get();
+ if (updateRollbackMark(current, deliveryTag))
+ {
+ _rollbackMark.compareAndSet(current, deliveryTag);
+ }
}
}
catch (InterruptedException e)
@@ -2851,6 +2850,10 @@
}
}
+ abstract boolean tagLE(long tag1, long tag2);
+
+ abstract boolean updateRollbackMark(long current, long deliveryTag);
+
/*public void requestAccess(AMQShortString realm, boolean exclusive,
boolean passive, boolean active, boolean write,
boolean read) throws AMQException
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Jun 16 14:04:01 2008
@@ -27,6 +27,7 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.util.Serial;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -785,4 +786,15 @@
throw new JMSAMQException("Fail-over interrupted commit. Status of
the commit is uncertain.", e);
}
}
+
+ final boolean tagLE(long tag1, long tag2)
+ {
+ return Serial.le((int) tag1, (int) tag2);
+ }
+
+ final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return Serial.lt((int) currentMark, (int) deliveryTag);
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=668311&r1=668310&r2=668311&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Mon Jun 16 14:04:01 2008
@@ -40,7 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession
{
/** Used for debugging. */
@@ -453,4 +453,14 @@
return okHandler._messageCount;
}
+ final boolean tagLE(long tag1, long tag2)
+ {
+ return tag1 <= tag2;
+ }
+
+ final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ {
+ return false;
+ }
+
}