Author: arnaudsimon
Date: Mon Oct 8 07:14:56 2007
New Revision: 582832
URL: http://svn.apache.org/viewvc?rev=582832&view=rev
Log:
fixed issue with transactions
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Oct 8 07:14:56 2007
@@ -1370,12 +1370,8 @@
}
}
- public void sendRollback() throws AMQException, FailoverException
- {
-
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion()),
TxRollbackOkBody.class);
+ public abstract void sendRollback() throws AMQException,
FailoverException ;
- }
public void run()
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Oct 8 07:14:56 2007
@@ -452,6 +452,14 @@
}
+ public void sendRollback() throws AMQException, FailoverException
+ {
+ getQpidSession().txRollback();
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
+
//------ Private methods
/**
* Access to the underlying Qpid Session
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Mon Oct 8 07:14:56 2007
@@ -31,32 +31,7 @@
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -343,5 +318,11 @@
return new BasicMessageProducer_0_8(_connection, (AMQDestination)
destination, _transacted, _channelId,
this, getProtocolHandler(), producerId,
immediate, mandatory, waitUntilSent);
+ }
+
+ public void sendRollback() throws AMQException, FailoverException
+ {
+
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion()),
TxRollbackOkBody.class);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Oct 8 07:14:56 2007
@@ -869,35 +869,7 @@
_logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
- // rollback received but not committed messages
- while (!_receivedDeliveryTags.isEmpty())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
- .size() + ") in _receivedDTs (RQ)" + "for consumer
with tag:" + _consumerTag);
- }
-
- Long tag = _receivedDeliveryTags.poll();
-
- if (tag != null)
- {
- if (_logger.isTraceEnabled())
- {
- _logger.trace("Rejecting tag from _receivedDTs:" + tag);
- }
-
- _session.rejectMessage(tag, true);
- }
- }
-
- if (!_receivedDeliveryTags.isEmpty())
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Queue _receivedDTs (RQ) was not empty after
rejection");
- }
- }
+ rollbackReceivedMessages();
// rollback pending messages
if (_synchronousQueue.size() > 0)
@@ -941,6 +913,39 @@
}
clearReceiveQueue();
+ }
+ }
+
+ protected void rollbackReceivedMessages()
+ {
+ // rollback received but not committed messages
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
+ .size() + ") in _receivedDTs (RQ)" + "for consumer
with tag:" + _consumerTag);
+ }
+
+ Long tag = _receivedDeliveryTags.poll();
+
+ if (tag != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+ }
+
+ _session.rejectMessage(tag, true);
+ }
+ }
+
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Queue _receivedDTs (RQ) was not empty after
rejection");
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Oct 8 07:14:56 2007
@@ -297,6 +297,11 @@
}
}
+ protected void rollbackReceivedMessages()
+ {
+ // do nothing as the rollback operation will do the job.
+ }
+
/**
* Acquire a message
*
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Mon Oct 8 07:14:56 2007
@@ -27,8 +27,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
import org.apache.qpid.testutil.QpidTestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -247,13 +245,13 @@
{
try
{
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest",
"consumer1", "test");
+ AMQConnection con = (AMQConnection) getConnection("guest",
"guest");
Session consumerSession = con.createSession(true,
Session.SESSION_TRANSACTED);
AMQQueue queue3 = new
AMQQueue(consumerSession.getDefaultQueueExchangeName(), new
AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
- AMQConnection con2 = new AMQConnection("vm://:1", "guest",
"guest", "producer1", "test");
+ AMQConnection con2 = (AMQConnection) getConnection("guest",
"guest");
Session producerSession = con2.createSession(true,
Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(queue3);