Author: gnodet
Date: Wed Aug 23 01:50:43 2006
New Revision: 433981

URL: http://svn.apache.org/viewvc?rev=433981&view=rev
Log:
Improve transaction support in servicemix-common

Modified:
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
    
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=433981&r1=433980&r2=433981&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
 Wed Aug 23 01:50:43 2006
@@ -338,14 +338,15 @@
         } catch (Exception e) {
             logger.error("Error processing exchange " + exchange, e);
             try {
-                // If we are transacted and this is a runtime exception
-                // try to mark transaction as rollback
-                if (tx != null && e instanceof RuntimeException) {
+                // If we are transacted, check if this exception should
+                // rollback the transaction
+                if (transactionManager != null && 
+                    transactionManager.getStatus() == Status.STATUS_ACTIVE && 
+                    exceptionShouldRollbackTx(e)) {
                     transactionManager.setRollbackOnly();
-                } else  {
-                    exchange.setError(e);
-                    channel.send(exchange);
                 }
+                exchange.setError(e);
+                channel.send(exchange);
             } catch (Exception inner) {
                 logger.error("Error setting exchange status to ERROR", inner);
             }
@@ -370,6 +371,10 @@
                 logger.error("Error checking transaction status.", t);
             }
         }
+    }
+    
+    protected boolean exceptionShouldRollbackTx(Exception e) {
+        return false;
     }
     
     public void processExchange(MessageExchange exchange) throws Exception {

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java?rev=433981&r1=433980&r2=433981&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
 Wed Aug 23 01:50:43 2006
@@ -48,12 +48,11 @@
                 // try to mark transaction as rollback
                 if (transactionManager != null && 
                     transactionManager.getStatus() == Status.STATUS_ACTIVE && 
-                    e instanceof RuntimeException) {
+                    exceptionShouldRollbackTx(e)) {
                     transactionManager.setRollbackOnly();
-                } else  {
-                    exchange.setError(e);
-                    channel.send(exchange);
                 }
+                exchange.setError(e);
+                channel.send(exchange);
             } catch (Exception inner) {
                 logger.error("Error setting exchange status to ERROR", inner);
             }

Modified: 
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java?rev=433981&r1=433980&r2=433981&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
 Wed Aug 23 01:50:43 2006
@@ -51,8 +51,13 @@
     private TransactionManager txManager;
     private Component component;
     private ServiceMixClient client;
+    private Exception exceptionToThrow;
+    private boolean exceptionShouldRollback;
     
     protected void setUp() throws Exception {
+        exceptionToThrow = null;
+        exceptionShouldRollback = false;
+        
         broker = new BrokerService();
         broker.setPersistent(false);
         broker.addConnector("tcp://localhost:61616");
@@ -102,12 +107,74 @@
         InOnly me = client.createInOnlyExchange();
         me.setService(new QName("service"));
         me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
-        client.sendSync(me, 1000);
+        boolean ok = client.sendSync(me, 1000);
+        assertTrue(ok);
         assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
         assertEquals(ExchangeStatus.DONE, me.getStatus());
         txManager.commit();
     }
     
+    public void testTxExceptionAsync() throws Exception {
+        exceptionToThrow = new Exception("Business exception");
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.send(me);
+        assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        txManager.commit();
+        me = (InOnly) client.receive(1000);
+        assertNotNull(me);
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+    }
+    
+    public void testTxExceptionSync() throws Exception {
+        exceptionToThrow = new Exception("Business exception");
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        boolean ok = client.sendSync(me, 1000);
+        assertTrue(ok);
+        assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+        txManager.commit();
+    }
+    
+    public void testTxExceptionRollbackAsync() throws Exception {
+        exceptionToThrow = new Exception("Business exception");
+        exceptionShouldRollback = true;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        client.send(me);
+        assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        txManager.commit();
+        // if we always mark the transaction as rollback,
+        // the exchange will be redelivered by the JCA flow,
+        // until it is discarded, so we will never receive
+        // it back
+        me = (InOnly) client.receive(1000);
+        assertNull(me);
+    }
+    
+    public void testTxExceptionRollbackSync() throws Exception {
+        exceptionToThrow = new RuntimeException("Runtime exception");
+        exceptionShouldRollback = true;
+        txManager.begin();
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName("service"));
+        me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+        boolean ok = client.sendSync(me, 1000);
+        assertTrue(ok);
+        assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+        txManager.commit();
+    }
+    
     private class TestComponent extends BaseComponent {
         public TestComponent() {
             super();
@@ -140,6 +207,9 @@
                 super.doStop();
                 su.stop();
             }
+            protected boolean exceptionShouldRollbackTx(Exception e) {
+                return exceptionShouldRollback;
+            }
         }
         
         protected class TestEndpoint extends Endpoint implements 
ExchangeProcessor {
@@ -160,6 +230,9 @@
                 return Role.PROVIDER;
             }
             public void process(MessageExchange exchange) throws Exception {
+                if (exceptionToThrow != null) {
+                    throw exceptionToThrow;
+                }
                 exchange.setStatus(ExchangeStatus.DONE);
                 getComponentContext().getDeliveryChannel().send(exchange);
             }


Reply via email to