This is an automated email from the ASF dual-hosted git repository. jfisher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomee.git
The following commit(s) were added to refs/heads/master by this push: new f56b2b2 Fix several JMS/JMS2.0 bugs f56b2b2 is described below commit f56b2b25a65b13472c6e6a02757cc2dc978c9953 Author: Jonathan S. Fisher <jonathan.fis...@emoneyusa.com> AuthorDate: Mon Aug 26 20:09:10 2019 -0500 Fix several JMS/JMS2.0 bugs * Fix TOMEE-2229: JMSContext Injected by TomEE does not participate in JTA * Fix TOMEE-2650: TomEE Creating non-JTA Sessions * Fix TOMEE-2651: TomEE doesn't return JMS Connection to pool after a Transaction Timeout * Fix TOMEE-2652: TransactionSupport parameter not honored on JMS Connection Factory resources --- .../openejb/resource/AutoConnectionTracker.java | 26 ++-- .../openejb/resource/activemq/jms2/JMS2.java | 13 ++ .../resource/activemq/jms2/JMSContextImpl.java | 7 +- .../resource/activemq/jms2/JMSProducerImpl.java | 1 + .../activemq/jms2/TomEEConnectionFactory.java | 42 +++++-- .../activemq/jms2/TomEEManagedConnection.java | 10 +- .../jms2/TomEEManagedConnectionFactory.java | 45 ++++++- .../activemq/jms2/TomEEManagedConnectionProxy.java | 132 ++++++++++++++++++++- .../activemq/jms2/TomEERAConnectionFactory.java | 108 ++++++++++++++++- .../activemq/jms2/cdi/JMS2CDIExtension.java | 18 ++- 10 files changed, 360 insertions(+), 42 deletions(-) diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java index 49135c8..b5080ab 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java @@ -22,6 +22,7 @@ import org.apache.geronimo.connector.outbound.ConnectionReturnAction; import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor; import org.apache.geronimo.connector.outbound.ManagedConnectionInfo; import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker; +import org.apache.geronimo.transaction.manager.TransactionImpl; import org.apache.openejb.dyni.DynamicSubclass; import org.apache.openejb.loader.SystemInstance; import org.apache.openejb.util.LogCategory; @@ -32,7 +33,6 @@ import javax.resource.ResourceException; import javax.resource.spi.DissociatableManagedConnection; import javax.transaction.Synchronization; import javax.transaction.SystemException; -import javax.transaction.Transaction; import javax.transaction.TransactionManager; import javax.transaction.TransactionSynchronizationRegistry; import java.lang.ref.PhantomReference; @@ -58,6 +58,7 @@ public class AutoConnectionTracker implements ConnectionTracker { private final TransactionManager txMgr; private final Logger logger = Logger.getInstance(LogCategory.OPENEJB_CONNECTOR, "org.apache.openejb.resource"); private final ConcurrentMap<ManagedConnectionInfo, ProxyPhantomReference> references = new ConcurrentHashMap<>(); + @SuppressWarnings("rawtypes") private final ReferenceQueue referenceQueue = new ReferenceQueue(); private final ConcurrentMap<Class<?>, Class<?>> proxies = new ConcurrentHashMap<>(); private final ConcurrentMap<Class<?>, Class<?>[]> interfaces = new ConcurrentHashMap<>(); @@ -80,6 +81,7 @@ public class AutoConnectionTracker implements ConnectionTracker { * @param connectionInfo the connection to be obtained * @param key the unique id of the connection manager */ + @Override public void setEnvironment(final ConnectionInfo connectionInfo, final String key) { ProxyPhantomReference reference = (ProxyPhantomReference) referenceQueue.poll(); while (reference != null) { @@ -103,12 +105,14 @@ public class AutoConnectionTracker implements ConnectionTracker { * @param connectionInfo the connection that was obtained * @param reassociate should always be false */ + @SuppressWarnings("unchecked") + @Override public void handleObtained(final ConnectionTrackingInterceptor interceptor, final ConnectionInfo connectionInfo, final boolean reassociate) throws ResourceException { if (txMgr != null && registry != null) { - Transaction currentTx = null; + TransactionImpl currentTx = null; try { - currentTx = txMgr.getTransaction(); - } catch (SystemException e) { + currentTx = (TransactionImpl) txMgr.getTransaction(); + } catch (SystemException | ClassCastException e) { //ignore } @@ -162,16 +166,18 @@ public class AutoConnectionTracker implements ConnectionTracker { * @param connectionInfo the connection that was released * @param action ignored */ + @Override + @SuppressWarnings("unchecked") public void handleReleased(final ConnectionTrackingInterceptor interceptor, final ConnectionInfo connectionInfo, final ConnectionReturnAction action) { - Transaction currentTx = null; + TransactionImpl currentTx = null; try { - currentTx = txMgr.getTransaction(); - } catch (SystemException e) { + currentTx = (TransactionImpl) txMgr.getTransaction(); + } catch (SystemException | ClassCastException e) { //ignore } if (currentTx != null) { - Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>> txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>) registry.getResource(KEY); + Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>> txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>) currentTx.getResource(KEY); if (txConnections == null) { txConnections = new HashMap<>(); registry.putResource(KEY, txConnections); @@ -185,6 +191,7 @@ public class AutoConnectionTracker implements ConnectionTracker { } } + @SuppressWarnings("rawtypes") final PhantomReference phantomReference = references.remove(connectionInfo.getManagedConnectionInfo()); if (phantomReference != null) { phantomReference.clear(); @@ -284,6 +291,7 @@ public class AutoConnectionTracker implements ConnectionTracker { this.handle = handle; } + @Override public Object invoke(final Object object, final Method method, final Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { if (method.getName().equals("finalize")) { @@ -322,7 +330,7 @@ public class AutoConnectionTracker implements ConnectionTracker { public ProxyPhantomReference(final ConnectionTrackingInterceptor interceptor, final ManagedConnectionInfo managedConnectionInfo, final ConnectionInvocationHandler handler, - final ReferenceQueue referenceQueue) { + @SuppressWarnings("rawtypes") final ReferenceQueue referenceQueue) { super(handler, referenceQueue); this.interceptor = interceptor; this.managedConnectionInfo = managedConnectionInfo; diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java index 1f7e78a..29c0ebf 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java @@ -43,6 +43,9 @@ import javax.jms.TransactionInProgressException; import javax.jms.TransactionInProgressRuntimeException; import javax.jms.TransactionRolledBackException; import javax.jms.TransactionRolledBackRuntimeException; +import javax.transaction.SystemException; + +import org.apache.openejb.OpenEJB; public final class JMS2 { private JMS2() { @@ -83,6 +86,7 @@ public final class JMS2 { return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e); } + @SuppressWarnings("unchecked") public static <T extends Message> T wrap(final T message10) { if (message10 == null) { return null; @@ -112,4 +116,13 @@ public final class JMS2 { } return (T) new DelegateMessage(message10); } + + + public static boolean inTx() { + try { + return OpenEJB.getTransactionManager().getTransaction() != null; + } catch (SystemException | NullPointerException e) { + return false; + } + } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java index db975be..872b1c9 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java @@ -42,6 +42,7 @@ import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.XAConnection; + import java.io.Serializable; import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException; @@ -80,7 +81,6 @@ public class JMSContextImpl implements JMSContext { if (connection == null) { try { connection = username != null ? factory.createConnection(username, password) : factory.createConnection(); - xa = XAConnection.class.isInstance(connection); } catch (final JMSException e) { throw toRuntimeException(e); } @@ -96,10 +96,11 @@ public class JMSContextImpl implements JMSContext { } if (session == null) { try { + Connection connection = connection(); if (xa) { - session = XAConnection.class.cast(connection()).createXASession(); + session = XAConnection.class.cast(connection).createXASession(); } else { - session = connection().createSession(sessionMode); + session = connection.createSession(sessionMode); } } catch (final JMSException e) { throw toRuntimeException(e); diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java index 4d9a504..89841ca 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java @@ -40,6 +40,7 @@ import java.util.Set; import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException; import static org.apache.openejb.resource.activemq.jms2.JMS2.wrap; +@SuppressWarnings("deprecation") class JMSProducerImpl implements JMSProducer { private final JMSContextImpl context; private final MessageProducer producer; diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java index 7bb6a52..9af3d0d 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java @@ -17,35 +17,63 @@ package org.apache.openejb.resource.activemq.jms2; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQSslConnectionFactory; +import org.apache.activemq.ActiveMQXASslConnectionFactory; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.transport.Transport; import javax.jms.JMSContext; -public class TomEEConnectionFactory extends ActiveMQSslConnectionFactory { +public class TomEEConnectionFactory extends ActiveMQXASslConnectionFactory { @Override protected ActiveMQConnection createActiveMQConnection(final Transport transport, final JMSStatsImpl stats) throws Exception { - return new TomEEConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats); + return new TomEEXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats); } @Override public JMSContext createContext() { - return new JMSContextImpl(this, -1, null, null, false); + boolean inTx = JMS2.inTx(); + int mode; + if (inTx) { + mode = -1; + } else { + mode = JMSContext.AUTO_ACKNOWLEDGE; + } + return new JMSContextImpl(this, mode, null, null, inTx); } @Override public JMSContext createContext(final int sessionMode) { - return new JMSContextImpl(this, sessionMode, null, null, false); + boolean inTx = JMS2.inTx(); + int mode; + if (inTx) { + mode = -1; + } else { + mode = sessionMode; + } + return new JMSContextImpl(this, mode, null, null, inTx); } @Override public JMSContext createContext(final String userName, final String password) { - return new JMSContextImpl(this, -1, userName, password, false); + boolean inTx = JMS2.inTx(); + int mode; + if (inTx) { + mode = -1; + } else { + mode = JMSContext.AUTO_ACKNOWLEDGE; + } + return new JMSContextImpl(this, mode, userName, password, inTx); } @Override public JMSContext createContext(final String userName, final String password, final int sessionMode) { - return new JMSContextImpl(this, sessionMode, userName, password, false); + boolean inTx = JMS2.inTx(); + int mode; + if (inTx) { + mode = -1; + } else { + mode = sessionMode; + } + return new JMSContextImpl(this, mode, userName, password, inTx); } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java index 4d7bb29..fe8e2b7 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java @@ -23,12 +23,14 @@ import org.apache.activemq.ra.ManagedConnectionProxy; import javax.resource.ResourceException; import javax.resource.spi.ConnectionRequestInfo; +import javax.resource.spi.TransactionSupport.TransactionSupportLevel; import javax.security.auth.Subject; import java.lang.reflect.Field; import java.util.Collection; public class TomEEManagedConnection extends ActiveMQManagedConnection { private static final Field PROXY_CONNECTIONS_FIELD; + private TransactionSupportLevel transactionSupportLevel; static { try { @@ -41,14 +43,16 @@ public class TomEEManagedConnection extends ActiveMQManagedConnection { private final Collection<ManagedConnectionProxy> proxyConnections; + @SuppressWarnings("unchecked") public TomEEManagedConnection(final Subject subject, final ActiveMQConnection physicalConnection, - final ActiveMQConnectionRequestInfo info) throws ResourceException { + final ActiveMQConnectionRequestInfo info, TransactionSupportLevel transactionSupportLevel) throws ResourceException { super(subject, physicalConnection, info); try { proxyConnections = Collection.class.cast(PROXY_CONNECTIONS_FIELD.get(this)); } catch (final IllegalAccessException e) { throw new IllegalStateException("Incompatible AMQ", e); } + this.transactionSupportLevel = transactionSupportLevel; } @Override @@ -57,4 +61,8 @@ public class TomEEManagedConnection extends ActiveMQManagedConnection { proxyConnections.add(proxy); return proxy; } + + public TransactionSupportLevel getTransactionSupportLevel() { + return transactionSupportLevel; + } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java index 44ea157..22c44d0 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java @@ -22,17 +22,25 @@ import org.apache.activemq.ra.ActiveMQManagedConnectionFactory; import org.apache.activemq.ra.MessageActivationSpec; import org.apache.activemq.ra.SimpleConnectionManager; +import java.util.Locale; + import javax.jms.JMSException; import javax.resource.ResourceException; import javax.resource.spi.ConnectionManager; import javax.resource.spi.ConnectionRequestInfo; import javax.resource.spi.ManagedConnection; +import javax.resource.spi.TransactionSupport.TransactionSupportLevel; import javax.security.auth.Subject; public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFactory { + private static final long serialVersionUID = 1L; + private TransactionSupportLevel transactionSupportLevel; + @Override public Object createConnectionFactory(final ConnectionManager manager) throws ResourceException { - return new TomEERAConnectionFactory(this, manager, getInfo()); + TomEERAConnectionFactory factory = new TomEERAConnectionFactory(this, manager, getInfo()); + factory.setTransactionSupport(transactionSupportLevel); + return factory; } @Override @@ -56,7 +64,7 @@ public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFact amqInfo = getInfo(); } try { - return new TomEEManagedConnection(subject, makeConnection(amqInfo), amqInfo); + return new TomEEManagedConnection(subject, makeConnection(amqInfo), amqInfo, transactionSupportLevel); } catch (final JMSException e) { throw new ResourceException("Could not create connection.", e); } @@ -67,4 +75,37 @@ public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFact return !(object == null || !getClass().isInstance(object)) && ((ActiveMQManagedConnectionFactory) object).getInfo().equals(getInfo()); } + + public String getTransactionSupport() { + switch (transactionSupportLevel) { + case XATransaction: + return "xa"; + case LocalTransaction: + return "local"; + case NoTransaction: + return "none"; + default: + return null; + } + } + + public void setTransactionSupport(String transactionSupport) { + if (transactionSupport == null) { + throw new IllegalArgumentException("transactionSupport cannot be not null"); + } else { + switch (transactionSupport.toLowerCase(Locale.ENGLISH)) { + case "xa": + transactionSupportLevel = TransactionSupportLevel.XATransaction; + break; + case "local": + transactionSupportLevel = TransactionSupportLevel.LocalTransaction; + break; + case "none": + transactionSupportLevel = TransactionSupportLevel.NoTransaction; + break; + default: + throw new IllegalArgumentException("transactionSupport must be xa, local, or none:" + transactionSupport); + } + } + } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java index f07b7a2..aefe4f9 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java @@ -18,21 +18,28 @@ package org.apache.openejb.resource.activemq.jms2; import org.apache.activemq.ra.ActiveMQManagedConnection; import org.apache.activemq.ra.ManagedConnectionProxy; +import org.apache.openejb.OpenEJB; import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ExceptionListener; +import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; +import javax.jms.XAConnection; +import javax.jms.XASession; import javax.resource.spi.ConnectionRequestInfo; +import javax.resource.spi.TransactionSupport.TransactionSupportLevel; +import javax.transaction.RollbackException; +import javax.transaction.SystemException; public class TomEEManagedConnectionProxy extends ManagedConnectionProxy // cause org.apache.openejb.resource.AutoConnectionTracker.proxyConnection() just uses getInterfaces() - implements Connection, QueueConnection, TopicConnection, ExceptionListener { + implements Connection, QueueConnection, TopicConnection, ExceptionListener, XAConnection { private volatile ActiveMQManagedConnection connection; @@ -48,13 +55,119 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy } @Override - public Session createSession(final int sessionMode) throws JMSException { - return connection.getPhysicalConnection().createSession(sessionMode); + public Session createSession(final int acknowledgeMode) throws JMSException { + // For the next three methods, we ignore the requested session mode per the + // spec: + // https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#createSession-int- + // + // But we also allow the user to override this behavior. If they set + // transactionSupport on the connection factory + // we will not return to them a xa session, even though the underlying physical + // connection may support XA. + + int mode; + boolean xa; + TransactionSupportLevel transactionSupportLevel; + if (connection instanceof TomEEManagedConnection) { + transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel(); + } else { + transactionSupportLevel = TransactionSupportLevel.XATransaction; + } + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = acknowledgeMode; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + if (xa) { + return createXASession(); + } else { + return connection.getPhysicalConnection().createSession(mode); + } + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + int mode; + boolean xa; + TransactionSupportLevel transactionSupportLevel; + if (connection instanceof TomEEManagedConnection) { + transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel(); + } else if (!transacted) { + transactionSupportLevel = TransactionSupportLevel.NoTransaction; + } else { + transactionSupportLevel = TransactionSupportLevel.XATransaction; + } + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = acknowledgeMode; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + if (xa) { + return createXASession(); + } else { + return connection.getPhysicalConnection().createSession(mode); + } } @Override public Session createSession() throws JMSException { - return connection.getPhysicalConnection().createSession(); + int mode; + boolean xa; + TransactionSupportLevel transactionSupportLevel; + if (connection instanceof TomEEManagedConnection) { + transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel(); + } else { + transactionSupportLevel = TransactionSupportLevel.XATransaction; + } + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = JMSContext.AUTO_ACKNOWLEDGE; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + if (xa) { + return createXASession(); + } else { + return connection.getPhysicalConnection().createSession(mode); + } } @Override @@ -69,4 +182,15 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy final ServerSessionPool sessionPool, final int maxMessages) throws JMSException { return connection.getPhysicalConnection().createSharedConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages); } + + @Override + public XASession createXASession() throws JMSException { + XASession session = ((XAConnection) connection.getPhysicalConnection()).createXASession(); + try { + OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource()); + } catch (IllegalStateException | SystemException | RollbackException e) { + throw new RuntimeException(e); + } + return session; + } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java index 75f3582..1a76f9f 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java @@ -16,15 +16,18 @@ */ package org.apache.openejb.resource.activemq.jms2; +import javax.resource.spi.TransactionSupport.TransactionSupportLevel; import org.apache.activemq.ra.ActiveMQConnectionFactory; import org.apache.activemq.ra.ActiveMQConnectionRequestInfo; import org.apache.activemq.ra.ActiveMQManagedConnectionFactory; import javax.jms.JMSContext; -import javax.jms.Session; import javax.resource.spi.ConnectionManager; public class TomEERAConnectionFactory extends ActiveMQConnectionFactory { + private static final long serialVersionUID = 1L; + private TransactionSupportLevel transactionSupportLevel = TransactionSupportLevel.XATransaction; + public TomEERAConnectionFactory(final ActiveMQManagedConnectionFactory factory, final ConnectionManager manager, final ActiveMQConnectionRequestInfo connectionRequestInfo) { super(factory, manager, connectionRequestInfo); @@ -32,21 +35,116 @@ public class TomEERAConnectionFactory extends ActiveMQConnectionFactory { @Override public JMSContext createContext() { - return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, null, null, false); + // See notes here. We _do_ allow the user to override session mode at the + // connectionFactory level, otherwise we follow the spec. + // https://docs.oracle.com/javaee/7/api/javax/jms/ConnectionFactory.html#createContext-int- + int mode; + boolean xa; + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = JMSContext.AUTO_ACKNOWLEDGE; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + return new JMSContextImpl(this, mode, null, null, xa); } @Override public JMSContext createContext(final int sessionMode) { - return new JMSContextImpl(this, sessionMode, null, null, false); + int mode; + boolean xa; + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = sessionMode; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + return new JMSContextImpl(this, mode, null, null, xa); } @Override public JMSContext createContext(final String userName, final String password) { - return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, userName, password, false); + int mode; + boolean xa; + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = JMSContext.AUTO_ACKNOWLEDGE; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + return new JMSContextImpl(this, mode, userName, password, xa); } @Override public JMSContext createContext(final String userName, final String password, final int sessionMode) { - return new JMSContextImpl(this, sessionMode, userName, password, false); + int mode; + boolean xa; + switch (transactionSupportLevel) { + case XATransaction: + if (JMS2.inTx()) { + mode = -1; + xa = true; + break; + } + case NoTransaction: + mode = sessionMode; + xa = false; + break; + case LocalTransaction: + mode = JMSContext.SESSION_TRANSACTED; + xa = false; + break; + default: + throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel); + } + return new JMSContextImpl(this, mode, userName, password, xa); + } + + public TransactionSupportLevel getTransactionSupport() { + return transactionSupportLevel; + } + + public void setTransactionSupport(TransactionSupportLevel transactionSupportLevel) { + if (transactionSupportLevel == null) { + throw new IllegalArgumentException("transactionSupportLevel cannot be null"); + } else { + this.transactionSupportLevel = transactionSupportLevel; + } } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java index 1fe4f2a..042b39a 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java @@ -16,10 +16,10 @@ */ package org.apache.openejb.resource.activemq.jms2.cdi; -import org.apache.openejb.OpenEJB; import org.apache.openejb.assembler.classic.OpenEjbConfiguration; import org.apache.openejb.assembler.classic.ResourceInfo; import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.resource.activemq.jms2.JMS2; import org.apache.openejb.spi.ContainerSystem; import javax.annotation.PreDestroy; @@ -56,7 +56,6 @@ import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.NamingException; -import javax.transaction.SystemException; import javax.transaction.TransactionScoped; import java.io.Serializable; import java.util.Map; @@ -140,6 +139,7 @@ public class JMS2CDIExtension implements Extension { } public abstract static class AutoContextDestruction implements Serializable { + private static final long serialVersionUID = 1L; private transient Map<Key, JMSContext> contexts = new ConcurrentHashMap<>(); public void push(final Key key, final JMSContext c) { @@ -170,13 +170,16 @@ public class JMS2CDIExtension implements Extension { @RequestScoped public static class RequestAutoContextDestruction extends AutoContextDestruction { + private static final long serialVersionUID = 1L; } @TransactionScoped public static class TransactionAutoContextDestruction extends AutoContextDestruction { + private static final long serialVersionUID = 1L; } public static class Key implements Serializable { + private static final long serialVersionUID = 1L; private volatile ConnectionFactory connectionFactoryInstance; private final String connectionFactory; private final String username; @@ -250,6 +253,7 @@ public class JMS2CDIExtension implements Extension { } public static class InternalJMSContext implements JMSContext, Serializable { + private static final long serialVersionUID = 1L; private final Key key; private final RequestAutoContextDestruction requestStorage; private final TransactionAutoContextDestruction transactionStorage; @@ -261,7 +265,7 @@ public class JMS2CDIExtension implements Extension { } private synchronized JMSContext context() { - if (inTx()) { + if (JMS2.inTx()) { return findOrCreateContext(transactionStorage); } return findOrCreateContext(requestStorage); @@ -276,14 +280,6 @@ public class JMS2CDIExtension implements Extension { return jmsContext; } - private boolean inTx() { - try { - return OpenEJB.getTransactionManager().getTransaction() != null; - } catch (SystemException e) { - return false; - } - } - // plain delegation now @Override