Author: arnaudsimon
Date: Mon Oct  8 07:14:56 2007
New Revision: 582832

URL: http://svn.apache.org/viewvc?rev=582832&view=rev
Log:
fixed issue with transactions

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Oct  8 07:14:56 2007
@@ -1370,12 +1370,8 @@
         }
     }
 
-    public void sendRollback() throws AMQException, FailoverException
-    {
-        
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
-                getProtocolMajorVersion(), getProtocolMinorVersion()), 
TxRollbackOkBody.class);
+    public abstract void  sendRollback() throws AMQException, 
FailoverException ;
 
-    }
 
     public void run()
     {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Mon Oct  8 07:14:56 2007
@@ -452,6 +452,14 @@
     }
 
 
+    public void sendRollback() throws AMQException, FailoverException
+    {
+         getQpidSession().txRollback();
+       // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
     //------ Private methods
     /**
      * Access to the underlying Qpid Session

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Mon Oct  8 07:14:56 2007
@@ -31,32 +31,7 @@
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -343,5 +318,11 @@
 
        return new BasicMessageProducer_0_8(_connection, (AMQDestination) 
destination, _transacted, _channelId,
                                  this, getProtocolHandler(), producerId, 
immediate, mandatory, waitUntilSent);
+    }
+
+    public void sendRollback() throws AMQException, FailoverException
+    {
+        
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
+            getProtocolMajorVersion(), getProtocolMinorVersion()), 
TxRollbackOkBody.class);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Mon Oct  8 07:14:56 2007
@@ -869,35 +869,7 @@
             _logger.debug("Rejecting received messages in _receivedDTs (RQ)");
         }
 
-        // rollback received but not committed messages
-        while (!_receivedDeliveryTags.isEmpty())
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
-                        .size() + ") in _receivedDTs (RQ)" + "for consumer 
with tag:" + _consumerTag);
-            }
-
-            Long tag = _receivedDeliveryTags.poll();
-
-            if (tag != null)
-            {
-                if (_logger.isTraceEnabled())
-                {
-                    _logger.trace("Rejecting tag from _receivedDTs:" + tag);
-                }
-
-                _session.rejectMessage(tag, true);
-            }
-        }
-
-        if (!_receivedDeliveryTags.isEmpty())
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Queue _receivedDTs (RQ) was not empty after 
rejection");
-            }
-        }
+        rollbackReceivedMessages();
 
         // rollback pending messages
         if (_synchronousQueue.size() > 0)
@@ -941,6 +913,39 @@
             }
 
             clearReceiveQueue();
+        }
+    }
+
+    protected void rollbackReceivedMessages()
+    {
+        // rollback received but not committed messages
+        while (!_receivedDeliveryTags.isEmpty())
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
+                        .size() + ") in _receivedDTs (RQ)" + "for consumer 
with tag:" + _consumerTag);
+            }
+
+            Long tag = _receivedDeliveryTags.poll();
+
+            if (tag != null)
+            {
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+                }
+
+                _session.rejectMessage(tag, true);
+            }
+        }
+
+        if (!_receivedDeliveryTags.isEmpty())
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Queue _receivedDTs (RQ) was not empty after 
rejection");
+            }
         }
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Mon Oct  8 07:14:56 2007
@@ -297,6 +297,11 @@
         }
     }
 
+      protected void rollbackReceivedMessages()
+      {
+          // do nothing as the rollback operation will do the job.
+      }
+    
     /**
      * Acquire a message
      *

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=582832&r1=582831&r2=582832&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
 Mon Oct  8 07:14:56 2007
@@ -27,8 +27,6 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.testutil.QpidTestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -247,13 +245,13 @@
     {
         try
         {
-            AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", 
"consumer1", "test");
+            AMQConnection con = (AMQConnection) getConnection("guest", 
"guest");
 
             Session consumerSession = con.createSession(true, 
Session.SESSION_TRANSACTED);
             AMQQueue queue3 = new 
AMQQueue(consumerSession.getDefaultQueueExchangeName(), new 
AMQShortString("Q3"), false);
             MessageConsumer consumer = consumerSession.createConsumer(queue3);
 
-            AMQConnection con2 = new AMQConnection("vm://:1", "guest", 
"guest", "producer1", "test");
+            AMQConnection con2 = (AMQConnection) getConnection("guest", 
"guest");
             Session producerSession = con2.createSession(true, 
Session.SESSION_TRANSACTED);
             MessageProducer producer = producerSession.createProducer(queue3);
 


Reply via email to