Author: gnodet
Date: Wed Aug 23 01:50:43 2006
New Revision: 433981
URL: http://svn.apache.org/viewvc?rev=433981&view=rev
Log:
Improve transaction support in servicemix-common
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=433981&r1=433980&r2=433981&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
Wed Aug 23 01:50:43 2006
@@ -338,14 +338,15 @@
} catch (Exception e) {
logger.error("Error processing exchange " + exchange, e);
try {
- // If we are transacted and this is a runtime exception
- // try to mark transaction as rollback
- if (tx != null && e instanceof RuntimeException) {
+ // If we are transacted, check if this exception should
+ // rollback the transaction
+ if (transactionManager != null &&
+ transactionManager.getStatus() == Status.STATUS_ACTIVE &&
+ exceptionShouldRollbackTx(e)) {
transactionManager.setRollbackOnly();
- } else {
- exchange.setError(e);
- channel.send(exchange);
}
+ exchange.setError(e);
+ channel.send(exchange);
} catch (Exception inner) {
logger.error("Error setting exchange status to ERROR", inner);
}
@@ -370,6 +371,10 @@
logger.error("Error checking transaction status.", t);
}
}
+ }
+
+ protected boolean exceptionShouldRollbackTx(Exception e) {
+ return false;
}
public void processExchange(MessageExchange exchange) throws Exception {
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java?rev=433981&r1=433980&r2=433981&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseLifeCycle.java
Wed Aug 23 01:50:43 2006
@@ -48,12 +48,11 @@
// try to mark transaction as rollback
if (transactionManager != null &&
transactionManager.getStatus() == Status.STATUS_ACTIVE &&
- e instanceof RuntimeException) {
+ exceptionShouldRollbackTx(e)) {
transactionManager.setRollbackOnly();
- } else {
- exchange.setError(e);
- channel.send(exchange);
}
+ exchange.setError(e);
+ channel.send(exchange);
} catch (Exception inner) {
logger.error("Error setting exchange status to ERROR", inner);
}
Modified:
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java?rev=433981&r1=433980&r2=433981&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/test/java/org/apache/servicemix/common/TransactionsTest.java
Wed Aug 23 01:50:43 2006
@@ -51,8 +51,13 @@
private TransactionManager txManager;
private Component component;
private ServiceMixClient client;
+ private Exception exceptionToThrow;
+ private boolean exceptionShouldRollback;
protected void setUp() throws Exception {
+ exceptionToThrow = null;
+ exceptionShouldRollback = false;
+
broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616");
@@ -102,12 +107,74 @@
InOnly me = client.createInOnlyExchange();
me.setService(new QName("service"));
me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
- client.sendSync(me, 1000);
+ boolean ok = client.sendSync(me, 1000);
+ assertTrue(ok);
assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
assertEquals(ExchangeStatus.DONE, me.getStatus());
txManager.commit();
}
+ public void testTxExceptionAsync() throws Exception {
+ exceptionToThrow = new Exception("Business exception");
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.send(me);
+ assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ txManager.commit();
+ me = (InOnly) client.receive(1000);
+ assertNotNull(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ }
+
+ public void testTxExceptionSync() throws Exception {
+ exceptionToThrow = new Exception("Business exception");
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ boolean ok = client.sendSync(me, 1000);
+ assertTrue(ok);
+ assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ txManager.commit();
+ }
+
+ public void testTxExceptionRollbackAsync() throws Exception {
+ exceptionToThrow = new Exception("Business exception");
+ exceptionShouldRollback = true;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ client.send(me);
+ assertEquals(Status.STATUS_ACTIVE, txManager.getStatus());
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ txManager.commit();
+ // if we always mark the transaction as rollback,
+ // the exchange will be redelivered by the JCA flow,
+ // until it is discarded, so we will never receive
+ // it back
+ me = (InOnly) client.receive(1000);
+ assertNull(me);
+ }
+
+ public void testTxExceptionRollbackSync() throws Exception {
+ exceptionToThrow = new RuntimeException("Runtime exception");
+ exceptionShouldRollback = true;
+ txManager.begin();
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName("service"));
+ me.getInMessage().setContent(new StringSource("<hello>world</hello>"));
+ boolean ok = client.sendSync(me, 1000);
+ assertTrue(ok);
+ assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus());
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ txManager.commit();
+ }
+
private class TestComponent extends BaseComponent {
public TestComponent() {
super();
@@ -140,6 +207,9 @@
super.doStop();
su.stop();
}
+ protected boolean exceptionShouldRollbackTx(Exception e) {
+ return exceptionShouldRollback;
+ }
}
protected class TestEndpoint extends Endpoint implements
ExchangeProcessor {
@@ -160,6 +230,9 @@
return Role.PROVIDER;
}
public void process(MessageExchange exchange) throws Exception {
+ if (exceptionToThrow != null) {
+ throw exceptionToThrow;
+ }
exchange.setStatus(ExchangeStatus.DONE);
getComponentContext().getDeliveryChannel().send(exchange);
}