Repository: activemq Updated Branches: refs/heads/master 20522394c -> 85181d630
[AMQ-6548] ensure any pending xa transaction is marked rollback only on delivery failure exception from on message, before delegating to potential clientInternalExceptionListener. Variant of patch applied with additional test - thanks to Andrey Dyachikhin for the patch inspiration Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85181d63 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85181d63 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85181d63 Branch: refs/heads/master Commit: 85181d630c94c7ca7a47cbb40fc55e3f9e27b574 Parents: 2052239 Author: gtully <gary.tu...@gmail.com> Authored: Tue Jan 17 16:51:27 2017 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Jan 17 16:51:27 2017 +0000 ---------------------------------------------------------------------- .../org/apache/activemq/ActiveMQSession.java | 5 ++ .../java/org/apache/activemq/ra/MDBTest.java | 79 ++++++++++++++++++++ 2 files changed, 84 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/85181d63/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 6603a2f..1e82af8 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1043,6 +1043,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } catch (Throwable e) { LOG.error("error dispatching message: ", e); + if (getTransactionContext().isInXATransaction()) { + LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext()); + getTransactionContext().setRollbackOnly(true); + } + // A problem while invoking the MessageListener does not // in general indicate a problem with the connection to the broker, i.e. // it will usually be sufficient to let the afterDelivery() method either http://git-wip-us.apache.org/repos/asf/activemq/blob/85181d63/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java index a078a91..93b631c 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java @@ -906,6 +906,85 @@ public class MDBTest { adapter.stop(); } + @Test(timeout = 90000) + public void testXaOnMessageExceptionRollback() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("vm://localhost?broker.persistent=false"); + adapter.start(new StubBootstrapContext()); + + final CountDownLatch messageDelivered = new CountDownLatch(1); + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() { + @Override + public void onMessage(Message message) { + super.onMessage(message); + messageDelivered.countDown(); + throw new RuntimeException("Failure"); + }; + + @Override + public void afterDelivery() throws ResourceException { + try { + xaresource.end(xid, XAResource.TMSUCCESS); + xaresource.commit(xid, true); + } catch (Throwable e) { + throw new ResourceException(e); + } + } + }; + + ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); + activationSpec.setDestinationType(Queue.class.getName()); + activationSpec.setDestination("TEST"); + activationSpec.setResourceAdapter(adapter); + activationSpec.validate(); + + MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { + @Override + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + @Override + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(messageEndpointFactory, activationSpec); + + // Give endpoint a chance to setup and register its listeners + try { + Thread.sleep(1000); + } catch (Exception e) { + } + + // Send the broker a message to that endpoint + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + producer.send(session.createTextMessage("Hello!")); + + // Wait for the message to be delivered twice. + assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS)); + + // Shut the Endpoint down. + adapter.endpointDeactivation(messageEndpointFactory, activationSpec); + adapter.stop(); + + // assert message still available + MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST")); + assertNotNull("got the message", messageConsumer.receive(5000)); + connection.close(); + + } + public Xid createXid() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(baos);