http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java
new file mode 100644
index 0000000..d1cad59
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java
@@ -0,0 +1,1903 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionInProgressException;
+import javax.jms.XAQueueSession;
+import javax.jms.XATopicSession;
+import javax.resource.ResourceException;
+import javax.resource.spi.ConnectionEvent;
+import javax.resource.spi.ManagedConnection;
+import javax.transaction.xa.XAResource;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.jms.client.ActiveMQSession;
+
+
+/**
+ * A joint interface for JMS sessions
+ *
+ * @author <a href="mailto:adr...@jboss.com";>Adrian Brock</a>
+ * @author <a href="mailto:jesper.peder...@jboss.org";>Jesper Pedersen</a>
+ * @author <a href="mailto:mtay...@redhat.com";>Martyn Taylor</a>
+ */
+public final class ActiveMQRASession implements QueueSession, TopicSession, 
XAQueueSession, XATopicSession
+{
+   /**
+    * Trace enabled
+    */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * The managed connection
+    */
+   private ActiveMQRAManagedConnection mc;
+
+   /**
+    * The connection request info
+    */
+   private final ActiveMQRAConnectionRequestInfo cri;
+
+   /**
+    * The session factory
+    */
+   private ActiveMQRASessionFactory sf;
+
+   /**
+    * The message consumers
+    */
+   private final Set<MessageConsumer> consumers;
+
+   /**
+    * The message producers
+    */
+   private final Set<MessageProducer> producers;
+
+   /**
+    * Constructor
+    *
+    * @param mc  The managed connection
+    * @param cri The connection request info
+    */
+   public ActiveMQRASession(final ActiveMQRAManagedConnection mc, final 
ActiveMQRAConnectionRequestInfo cri)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + mc + ", " + cri + ")");
+      }
+
+      this.mc = mc;
+      this.cri = cri;
+      sf = null;
+      consumers = new HashSet<MessageConsumer>();
+      producers = new HashSet<MessageProducer>();
+   }
+
+   /**
+    * Set the session factory
+    *
+    * @param sf The session factory
+    */
+   public void setActiveMQSessionFactory(final ActiveMQRASessionFactory sf)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setActiveMQSessionFactory(" + sf + 
")");
+      }
+
+      this.sf = sf;
+   }
+
+   /**
+    * Lock
+    *
+    * @throws JMSException          Thrown if an error occurs
+    * @throws IllegalStateException The session is closed
+    */
+   protected void lock() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("lock()");
+      }
+
+      final ActiveMQRAManagedConnection mcLocal = this.mc;
+      if (mcLocal != null)
+      {
+         mcLocal.tryLock();
+      }
+      else
+      {
+         throw new IllegalStateException("Connection is not associated with a 
managed connection. " + this);
+      }
+   }
+
+   /**
+    * Unlock
+    */
+   protected void unlock()
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("unlock()");
+      }
+
+      final ActiveMQRAManagedConnection mcLocal = this.mc;
+      if (mcLocal != null)
+      {
+         mcLocal.unlock();
+      }
+
+      // We recreate the lock when returned to the pool
+      // so missing the unlock after disassociation is not important
+   }
+
+   /**
+    * Create a bytes message
+    *
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public BytesMessage createBytesMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createBytesMessage" + session);
+      }
+
+      return session.createBytesMessage();
+   }
+
+   /**
+    * Create a map message
+    *
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public MapMessage createMapMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createMapMessage(), " + session);
+      }
+
+      return session.createMapMessage();
+   }
+
+   /**
+    * Create a message
+    *
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public Message createMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createMessage" + session);
+      }
+
+      return session.createMessage();
+   }
+
+   /**
+    * Create an object message
+    *
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public ObjectMessage createObjectMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createObjectMessage" + session);
+      }
+
+      return session.createObjectMessage();
+   }
+
+   /**
+    * Create an object message
+    *
+    * @param object The object
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public ObjectMessage createObjectMessage(final Serializable object) throws 
JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createObjectMessage(" + object + ")" + 
session);
+      }
+
+      return session.createObjectMessage(object);
+   }
+
+   /**
+    * Create a stream message
+    *
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public StreamMessage createStreamMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createStreamMessage" + session);
+      }
+
+      return session.createStreamMessage();
+   }
+
+   /**
+    * Create a text message
+    *
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TextMessage createTextMessage() throws JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createTextMessage" + session);
+      }
+
+      return session.createTextMessage();
+   }
+
+   /**
+    * Create a text message
+    *
+    * @param string The text
+    * @return The message
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TextMessage createTextMessage(final String string) throws 
JMSException
+   {
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createTextMessage(" + string + ")" + 
session);
+      }
+
+      return session.createTextMessage(string);
+   }
+
+   /**
+    * Get transacted
+    *
+    * @return True if transacted; otherwise false
+    * @throws JMSException Thrown if an error occurs
+    */
+   public boolean getTransacted() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTransacted()");
+      }
+
+      getSessionInternal();
+      return cri.isTransacted();
+   }
+
+   /**
+    * Get the message listener -- throws IllegalStateException
+    *
+    * @return The message listener
+    * @throws JMSException Thrown if an error occurs
+    */
+   public MessageListener getMessageListener() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMessageListener()");
+      }
+
+      throw new IllegalStateException("Method not allowed");
+   }
+
+   /**
+    * Set the message listener -- Throws IllegalStateException
+    *
+    * @param listener The message listener
+    * @throws JMSException Thrown if an error occurs
+    */
+   public void setMessageListener(final MessageListener listener) throws 
JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setMessageListener(" + listener + ")");
+      }
+
+      throw new IllegalStateException("Method not allowed");
+   }
+
+   /**
+    * Always throws an Error.
+    *
+    * @throws Error Method not allowed.
+    */
+   public void run()
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("run()");
+      }
+
+      throw new Error("Method not allowed");
+   }
+
+   /**
+    * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
+    * managed connection.
+    *
+    * @throws JMSException Failed to close session.
+    */
+   public void close() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("close()");
+      }
+
+      sf.closeSession(this);
+      closeSession();
+   }
+
+   /**
+    * Commit
+    *
+    * @throws JMSException Failed to close session.
+    */
+   public void commit() throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.XA_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION ||
+         cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new TransactionInProgressException("XA connection");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (cri.isTransacted() == false)
+         {
+            throw new IllegalStateException("Session is not transacted");
+         }
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("Commit session " + this);
+         }
+
+         session.commit();
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Rollback
+    *
+    * @throws JMSException Failed to close session.
+    */
+   public void rollback() throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.XA_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION ||
+         cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new TransactionInProgressException("XA connection");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (cri.isTransacted() == false)
+         {
+            throw new IllegalStateException("Session is not transacted");
+         }
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("Rollback session " + this);
+         }
+
+         session.rollback();
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Recover
+    *
+    * @throws JMSException Failed to close session.
+    */
+   public void recover() throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (cri.isTransacted())
+         {
+            throw new IllegalStateException("Session is transacted");
+         }
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("Recover session " + this);
+         }
+
+         session.recover();
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic
+    *
+    * @param topicName The topic name
+    * @return The topic
+    * @throws JMSException Thrown if an error occurs
+    */
+   public Topic createTopic(final String topicName) throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create topic for 
javax.jms.QueueSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createTopic " + session + " 
topicName=" + topicName);
+      }
+
+      Topic result = session.createTopic(topicName);
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createdTopic " + session + " topic=" + 
result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a topic subscriber
+    *
+    * @param topic The topic
+    * @return The subscriber
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createSubscriber(final Topic topic) throws 
JMSException
+   {
+      lock();
+      try
+      {
+         TopicSession session = getTopicSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSubscriber " + session + " 
topic=" + topic);
+         }
+
+         TopicSubscriber result = session.createSubscriber(topic);
+         result = new ActiveMQRATopicSubscriber(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " 
ActiveMQTopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic subscriber
+    *
+    * @param topic           The topic
+    * @param messageSelector The message selector
+    * @param noLocal         If true inhibits the delivery of messages 
published by its own connection
+    * @return The subscriber
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createSubscriber(final Topic topic, final String 
messageSelector, final boolean noLocal) throws JMSException
+   {
+      lock();
+      try
+      {
+         TopicSession session = getTopicSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSubscriber " + session +
+                                            " topic=" +
+                                            topic +
+                                            " selector=" +
+                                            messageSelector +
+                                            " noLocal=" +
+                                            noLocal);
+         }
+
+         TopicSubscriber result = session.createSubscriber(topic, 
messageSelector, noLocal);
+         result = new ActiveMQRATopicSubscriber(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " 
ActiveMQTopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a durable topic subscriber
+    *
+    * @param topic The topic
+    * @param name  The name
+    * @return The subscriber
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createDurableSubscriber(final Topic topic, final 
String name) throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create durable subscriber 
from javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createDurableSubscriber " + session 
+ " topic=" + topic + " name=" + name);
+         }
+
+         TopicSubscriber result = session.createDurableSubscriber(topic, name);
+         result = new ActiveMQRATopicSubscriber(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdDurableSubscriber " + 
session + " ActiveMQTopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic subscriber
+    *
+    * @param topic           The topic
+    * @param name            The name
+    * @param messageSelector The message selector
+    * @param noLocal         If true inhibits the delivery of messages 
published by its own connection
+    * @return The subscriber
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicSubscriber createDurableSubscriber(final Topic topic,
+                                                  final String name,
+                                                  final String messageSelector,
+                                                  final boolean noLocal) 
throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createDurableSubscriber " + session 
+
+                                            " topic=" +
+                                            topic +
+                                            " name=" +
+                                            name +
+                                            " selector=" +
+                                            messageSelector +
+                                            " noLocal=" +
+                                            noLocal);
+         }
+
+         TopicSubscriber result = session.createDurableSubscriber(topic, name, 
messageSelector, noLocal);
+         result = new ActiveMQRATopicSubscriber(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdDurableSubscriber " + 
session + " ActiveMQTopicSubscriber=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a topic publisher
+    *
+    * @param topic The topic
+    * @return The publisher
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicPublisher createPublisher(final Topic topic) throws JMSException
+   {
+      lock();
+      try
+      {
+         TopicSession session = getTopicSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createPublisher " + session + " 
topic=" + topic);
+         }
+
+         TopicPublisher result = session.createPublisher(topic);
+         result = new ActiveMQRATopicPublisher(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdPublisher " + session + " 
publisher=" + result);
+         }
+
+         addProducer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a temporary topic
+    *
+    * @return The temporary topic
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TemporaryTopic createTemporaryTopic() throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create temporary topic for 
javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createTemporaryTopic " + session);
+         }
+
+         TemporaryTopic temp = session.createTemporaryTopic();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdTemporaryTopic " + session + 
" temp=" + temp);
+         }
+
+         sf.addTemporaryTopic(temp);
+
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Unsubscribe
+    *
+    * @param name The name
+    * @throws JMSException Thrown if an error occurs
+    */
+   public void unsubscribe(final String name) throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot unsubscribe for 
javax.jms.QueueSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("unsubscribe " + session + " name=" 
+ name);
+         }
+
+         session.unsubscribe(name);
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a browser
+    *
+    * @param queue The queue
+    * @return The browser
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueBrowser createBrowser(final Queue queue) throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create browser for 
javax.jms.TopicSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createBrowser " + session + " queue=" 
+ queue);
+      }
+
+      QueueBrowser result = session.createBrowser(queue);
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createdBrowser " + session + " 
browser=" + result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a browser
+    *
+    * @param queue           The queue
+    * @param messageSelector The message selector
+    * @return The browser
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueBrowser createBrowser(final Queue queue, final String 
messageSelector) throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create browser for 
javax.jms.TopicSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createBrowser " + session + " queue=" 
+ queue + " selector=" + messageSelector);
+      }
+
+      QueueBrowser result = session.createBrowser(queue, messageSelector);
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createdBrowser " + session + " 
browser=" + result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a queue
+    *
+    * @param queueName The queue name
+    * @return The queue
+    * @throws JMSException Thrown if an error occurs
+    */
+   public Queue createQueue(final String queueName) throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create browser or 
javax.jms.TopicSession");
+      }
+
+      Session session = getSessionInternal();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createQueue " + session + " 
queueName=" + queueName);
+      }
+
+      Queue result = session.createQueue(queueName);
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createdQueue " + session + " queue=" + 
result);
+      }
+
+      return result;
+   }
+
+   /**
+    * Create a queue receiver
+    *
+    * @param queue The queue
+    * @return The queue receiver
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueReceiver createReceiver(final Queue queue) throws JMSException
+   {
+      lock();
+      try
+      {
+         QueueSession session = getQueueSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " 
queue=" + queue);
+         }
+
+         QueueReceiver result = session.createReceiver(queue);
+         result = new ActiveMQRAQueueReceiver(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " 
receiver=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a queue receiver
+    *
+    * @param queue           The queue
+    * @param messageSelector
+    * @return The queue receiver
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueReceiver createReceiver(final Queue queue, final String 
messageSelector) throws JMSException
+   {
+      lock();
+      try
+      {
+         QueueSession session = getQueueSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " 
queue=" + queue + " selector=" + messageSelector);
+         }
+
+         QueueReceiver result = session.createReceiver(queue, messageSelector);
+         result = new ActiveMQRAQueueReceiver(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " 
receiver=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a queue sender
+    *
+    * @param queue The queue
+    * @return The queue sender
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueSender createSender(final Queue queue) throws JMSException
+   {
+      lock();
+      try
+      {
+         QueueSession session = getQueueSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSender " + session + " 
queue=" + queue);
+         }
+
+         QueueSender result = session.createSender(queue);
+         result = new ActiveMQRAQueueSender(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdSender " + session + " 
sender=" + result);
+         }
+
+         addProducer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a temporary queue
+    *
+    * @return The temporary queue
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TemporaryQueue createTemporaryQueue() throws JMSException
+   {
+      if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Cannot create temporary queue for 
javax.jms.TopicSession");
+      }
+
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createTemporaryQueue " + session);
+         }
+
+         TemporaryQueue temp = session.createTemporaryQueue();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdTemporaryQueue " + session + 
" temp=" + temp);
+         }
+
+         sf.addTemporaryQueue(temp);
+
+         return temp;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message consumer
+    *
+    * @param destination The destination
+    * @return The message consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public MessageConsumer createConsumer(final Destination destination) throws 
JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createConsumer " + session + " 
dest=" + destination);
+         }
+
+         MessageConsumer result = session.createConsumer(destination);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message consumer
+    *
+    * @param destination     The destination
+    * @param messageSelector The message selector
+    * @return The message consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public MessageConsumer createConsumer(final Destination destination, final 
String messageSelector) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createConsumer " + session +
+                                            " dest=" +
+                                            destination +
+                                            " messageSelector=" +
+                                            messageSelector);
+         }
+
+         MessageConsumer result = session.createConsumer(destination, 
messageSelector);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message consumer
+    *
+    * @param destination     The destination
+    * @param messageSelector The message selector
+    * @param noLocal         If true inhibits the delivery of messages 
published by its own connection
+    * @return The message consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public MessageConsumer createConsumer(final Destination destination,
+                                         final String messageSelector,
+                                         final boolean noLocal) throws 
JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createConsumer " + session +
+                                            " dest=" +
+                                            destination +
+                                            " messageSelector=" +
+                                            messageSelector +
+                                            " noLocal=" +
+                                            noLocal);
+         }
+
+         MessageConsumer result = session.createConsumer(destination, 
messageSelector, noLocal);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Create a message producer
+    *
+    * @param destination The destination
+    * @return The message producer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public MessageProducer createProducer(final Destination destination) throws 
JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createProducer " + session + " 
dest=" + destination);
+         }
+
+         MessageProducer result = session.createProducer(destination);
+         result = new ActiveMQRAMessageProducer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdProducer " + session + " 
producer=" + result);
+         }
+
+         addProducer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the acknowledge mode
+    *
+    * @return The mode
+    * @throws JMSException Thrown if an error occurs
+    */
+   public int getAcknowledgeMode() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getAcknowledgeMode()");
+      }
+
+      getSessionInternal();
+      return cri.getAcknowledgeMode();
+   }
+
+   /**
+    * Get the XA resource
+    *
+    * @return The XA resource
+    * @throws IllegalStateException If non XA connection
+    */
+   public XAResource getXAResource()
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getXAResource()");
+      }
+
+      if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION ||
+         cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         return null;
+      }
+
+      try
+      {
+         lock();
+
+         return getXAResourceInternal();
+      }
+      catch (Throwable t)
+      {
+         return null;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Returns the ID of the Node that this session is associated with.
+    *
+    * @return Node ID
+    */
+   public String getNodeId() throws JMSException
+   {
+      ActiveMQSession session = (ActiveMQSession) getSessionInternal();
+      ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) 
session.getCoreSession().getSessionFactory();
+      return factory.getLiveNodeId();
+   }
+
+   /**
+    * Get the session
+    *
+    * @return The session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public Session getSession() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getNonXAsession()");
+      }
+
+      if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION ||
+         cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Non XA connection");
+      }
+
+      lock();
+      try
+      {
+         return this;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the queue session
+    *
+    * @return The queue session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueSession getQueueSession() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getQueueSession()");
+      }
+
+      if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION ||
+         cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Non XA connection");
+      }
+
+      lock();
+      try
+      {
+         return this;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Get the topic session
+    *
+    * @return The topic session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicSession getTopicSession() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTopicSession()");
+      }
+
+      if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || 
cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION ||
+         cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Non XA connection");
+      }
+
+      lock();
+      try
+      {
+         return this;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   @Override
+   public MessageConsumer createSharedConsumer(final Topic topic, final String 
sharedSubscriptionName) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSharedConsumer " + session + 
" topic=" + topic + ", sharedSubscriptionName=" + sharedSubscriptionName);
+         }
+
+         MessageConsumer result = session.createSharedConsumer(topic, 
sharedSubscriptionName);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   @Override
+   public MessageConsumer createSharedConsumer(final Topic topic, final String 
sharedSubscriptionName,
+                                               final String messageSelector) 
throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSharedConsumer " + session + 
" topic=" + topic +
+                                            ", sharedSubscriptionName=" + 
sharedSubscriptionName + ", messageSelector=" + messageSelector);
+         }
+
+         MessageConsumer result = session.createSharedConsumer(topic, 
sharedSubscriptionName, messageSelector);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   @Override
+   public MessageConsumer createDurableConsumer(final Topic topic, final 
String name) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSharedConsumer " + session + 
" topic=" + topic + ", name=" + name);
+         }
+
+         MessageConsumer result = session.createDurableConsumer(topic, name);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   @Override
+   public MessageConsumer createDurableConsumer(Topic topic, String name, 
String messageSelector, boolean noLocal) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createDurableConsumer " + session + 
" topic=" + topic + ", name=" + name +
+                                            ", messageSelector=" + 
messageSelector + ", noLocal=" + noLocal);
+         }
+
+         MessageConsumer result = session.createDurableConsumer(topic, name, 
messageSelector, noLocal);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   @Override
+   public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSharedDurableConsumer " + 
session + " topic=" + topic + ", name=" +
+                                            name);
+         }
+
+         MessageConsumer result = session.createSharedDurableConsumer(topic, 
name);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   @Override
+   public MessageConsumer createSharedDurableConsumer(Topic topic, String 
name, String messageSelector) throws JMSException
+   {
+      lock();
+      try
+      {
+         Session session = getSessionInternal();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createSharedDurableConsumer " + 
session + " topic=" + topic + ", name=" +
+                                            name + ", messageSelector=" + 
messageSelector);
+         }
+
+         MessageConsumer result = session.createSharedDurableConsumer(topic, 
name, messageSelector);
+         result = new ActiveMQRAMessageConsumer(result, this);
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " 
consumer=" + result);
+         }
+
+         addConsumer(result);
+
+         return result;
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
+   /**
+    * Set the managed connection
+    *
+    * @param managedConnection The managed connection
+    */
+   void setManagedConnection(final ActiveMQRAManagedConnection 
managedConnection)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setManagedConnection(" + 
managedConnection + ")");
+      }
+
+      if (mc != null)
+      {
+         mc.removeHandle(this);
+      }
+
+      mc = managedConnection;
+   }
+
+   /**
+    * for tests only
+    */
+   public ManagedConnection getManagedConnection()
+   {
+      return mc;
+   }
+
+   /**
+    * Destroy
+    */
+   void destroy()
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("destroy()");
+      }
+
+      mc = null;
+   }
+
+   /**
+    * Start
+    *
+    * @throws JMSException Thrown if an error occurs
+    */
+   void start() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("start()");
+      }
+
+      if (mc != null)
+      {
+         mc.start();
+      }
+   }
+
+   /**
+    * Stop
+    *
+    * @throws JMSException Thrown if an error occurs
+    */
+   void stop() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("stop()");
+      }
+
+      if (mc != null)
+      {
+         mc.stop();
+      }
+   }
+
+   /**
+    * Check strict
+    *
+    * @throws JMSException Thrown if an error occurs
+    */
+   void checkStrict() throws JMSException
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("checkStrict()");
+      }
+
+      if (mc != null)
+      {
+         throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+      }
+   }
+
+   /**
+    * Close session
+    *
+    * @throws JMSException Thrown if an error occurs
+    */
+   void closeSession() throws JMSException
+   {
+      if (mc != null)
+      {
+         ActiveMQRALogger.LOGGER.trace("Closing session");
+
+         try
+         {
+            mc.stop();
+         }
+         catch (Throwable t)
+         {
+            ActiveMQRALogger.LOGGER.trace("Error stopping managed connection", 
t);
+         }
+
+         synchronized (consumers)
+         {
+            for (Iterator<MessageConsumer> i = consumers.iterator(); 
i.hasNext(); )
+            {
+               ActiveMQRAMessageConsumer consumer = 
(ActiveMQRAMessageConsumer) i.next();
+               try
+               {
+                  consumer.closeConsumer();
+               }
+               catch (Throwable t)
+               {
+                  ActiveMQRALogger.LOGGER.trace("Error closing consumer", t);
+               }
+               i.remove();
+            }
+         }
+
+         synchronized (producers)
+         {
+            for (Iterator<MessageProducer> i = producers.iterator(); 
i.hasNext(); )
+            {
+               ActiveMQRAMessageProducer producer = 
(ActiveMQRAMessageProducer) i.next();
+               try
+               {
+                  producer.closeProducer();
+               }
+               catch (Throwable t)
+               {
+                  ActiveMQRALogger.LOGGER.trace("Error closing producer", t);
+               }
+               i.remove();
+            }
+         }
+
+         mc.removeHandle(this);
+         ConnectionEvent ev = new ConnectionEvent(mc, 
ConnectionEvent.CONNECTION_CLOSED);
+         ev.setConnectionHandle(this);
+         mc.sendEvent(ev);
+         mc = null;
+      }
+   }
+
+   /**
+    * Add consumer
+    *
+    * @param consumer The consumer
+    */
+   void addConsumer(final MessageConsumer consumer)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("addConsumer(" + consumer + ")");
+      }
+
+      synchronized (consumers)
+      {
+         consumers.add(consumer);
+      }
+   }
+
+   /**
+    * Remove consumer
+    *
+    * @param consumer The consumer
+    */
+   void removeConsumer(final MessageConsumer consumer)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("removeConsumer(" + consumer + ")");
+      }
+
+      synchronized (consumers)
+      {
+         consumers.remove(consumer);
+      }
+   }
+
+   /**
+    * Add producer
+    *
+    * @param producer The producer
+    */
+   void addProducer(final MessageProducer producer)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("addProducer(" + producer + ")");
+      }
+
+      synchronized (producers)
+      {
+         producers.add(producer);
+      }
+   }
+
+   /**
+    * Remove producer
+    *
+    * @param producer The producer
+    */
+   void removeProducer(final MessageProducer producer)
+   {
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("removeProducer(" + producer + ")");
+      }
+
+      synchronized (producers)
+      {
+         producers.remove(producer);
+      }
+   }
+
+   /**
+    * Get the session and ensure that it is open
+    *
+    * @return The session
+    * @throws JMSException          Thrown if an error occurs
+    * @throws IllegalStateException The session is closed
+    */
+   Session getSessionInternal() throws JMSException
+   {
+      if (mc == null)
+      {
+         throw new IllegalStateException("The session is closed");
+      }
+
+      Session session = mc.getSession();
+
+      if (ActiveMQRASession.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getSessionInternal " + session + " for 
" + this);
+      }
+
+      return session;
+   }
+
+   /**
+    * Get the XA resource and ensure that it is open
+    *
+    * @return The XA Resource
+    * @throws JMSException          Thrown if an error occurs
+    * @throws IllegalStateException The session is closed
+    */
+   XAResource getXAResourceInternal() throws JMSException
+   {
+      if (mc == null)
+      {
+         throw new IllegalStateException("The session is closed");
+      }
+
+      try
+      {
+         XAResource xares = mc.getXAResource();
+
+         if (ActiveMQRASession.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("getXAResourceInternal " + xares + " 
for " + this);
+         }
+
+         return xares;
+      }
+      catch (ResourceException e)
+      {
+         JMSException jmse = new JMSException("Unable to get XA Resource");
+         jmse.initCause(e);
+         throw jmse;
+      }
+   }
+
+   /**
+    * Get the queue session
+    *
+    * @return The queue session
+    * @throws JMSException          Thrown if an error occurs
+    * @throws IllegalStateException The session is closed
+    */
+   QueueSession getQueueSessionInternal() throws JMSException
+   {
+      Session s = getSessionInternal();
+      if (!(s instanceof QueueSession))
+      {
+         throw new InvalidDestinationException("Attempting to use QueueSession 
methods on: " + this);
+      }
+      return (QueueSession) s;
+   }
+
+   /**
+    * Get the topic session
+    *
+    * @return The topic session
+    * @throws JMSException          Thrown if an error occurs
+    * @throws IllegalStateException The session is closed
+    */
+   TopicSession getTopicSessionInternal() throws JMSException
+   {
+      Session s = getSessionInternal();
+      if (!(s instanceof TopicSession))
+      {
+         throw new InvalidDestinationException("Attempting to use TopicSession 
methods on: " + this);
+      }
+      return (TopicSession) s;
+   }
+
+   /**
+    * @throws SystemException
+    * @throws RollbackException
+    */
+   public void checkState() throws JMSException
+   {
+      if (mc != null)
+      {
+         mc.checkTransactionActive();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java
new file mode 100644
index 0000000..eba16f5
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.XAQueueConnection;
+import javax.jms.XATopicConnection;
+
+/**
+ * A joint interface for all connection types
+ *
+ * @author <a href="mailto:adr...@jboss.com";>Adrian Brock</a>
+ * @author <a href="mailto:jesper.peder...@jboss.org";>Jesper Pedersen</a>
+ * @version $Revision: 71554 $
+ */
+public interface ActiveMQRASessionFactory extends XATopicConnection, 
XAQueueConnection
+{
+   /** Error message for strict behaviour */
+   String ISE = "This method is not applicable inside the application server. 
See the J2EE spec, e.g. J2EE1.4 Section 6.6";
+
+   /**
+    * Add a temporary queue
+    * @param temp The temporary queue
+    */
+   void addTemporaryQueue(TemporaryQueue temp);
+
+   /**
+    * Add a temporary topic
+    * @param temp The temporary topic
+    */
+   void addTemporaryTopic(TemporaryTopic temp);
+
+   /**
+    * Notification that a session is closed
+    * @param session The session
+    * @throws JMSException for any error
+    */
+   void closeSession(ActiveMQRASession session) throws JMSException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java
new file mode 100644
index 0000000..ef743ac
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java
@@ -0,0 +1,1046 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.XAJMSContext;
+import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicSession;
+import javax.naming.Reference;
+import javax.resource.Referenceable;
+import javax.resource.spi.ConnectionManager;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.jms.client.ActiveMQConnectionForContext;
+import org.apache.activemq.jms.client.ActiveMQConnectionForContextImpl;
+
+/**
+ * Implements the JMS Connection API and produces {@link ActiveMQRASession} 
objects.
+ *
+ * @author <a href="mailto:adr...@jboss.com";>Adrian Brock</a>
+ * @author <a href="mailto:jesper.peder...@jboss.org";>Jesper Pedersen</a>
+ */
+public final class ActiveMQRASessionFactoryImpl extends 
ActiveMQConnectionForContextImpl implements
+   ActiveMQRASessionFactory, ActiveMQConnectionForContext, Referenceable
+{
+   /**
+    * Trace enabled
+    */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Are we closed?
+    */
+   private boolean closed = false;
+
+   /**
+    * The naming reference
+    */
+   private Reference reference;
+
+   /**
+    * The user name
+    */
+   private String userName;
+
+   /**
+    * The password
+    */
+   private String password;
+
+   /**
+    * The client ID
+    */
+   private String clientID;
+
+   /**
+    * The connection type
+    */
+   private final int type;
+
+   /**
+    * Whether we are started
+    */
+   private boolean started = false;
+
+   /**
+    * The managed connection factory
+    */
+   private final ActiveMQRAManagedConnectionFactory mcf;
+   private TransactionManager tm;
+
+   /**
+    * The connection manager
+    */
+   private ConnectionManager cm;
+
+   /**
+    * The sessions
+    */
+   private final Set<ActiveMQRASession> sessions = new 
HashSet<ActiveMQRASession>();
+
+   /**
+    * The temporary queues
+    */
+   private final Set<TemporaryQueue> tempQueues = new 
HashSet<TemporaryQueue>();
+
+   /**
+    * The temporary topics
+    */
+   private final Set<TemporaryTopic> tempTopics = new 
HashSet<TemporaryTopic>();
+
+   /**
+    * Constructor
+    *
+    * @param mcf  The managed connection factory
+    * @param cm   The connection manager
+    * @param type The connection type
+    */
+   public ActiveMQRASessionFactoryImpl(final 
ActiveMQRAManagedConnectionFactory mcf,
+                                       final ConnectionManager cm,
+                                       final TransactionManager tm,
+                                       final int type)
+   {
+      this.mcf = mcf;
+
+      this.tm = tm;
+
+      if (cm == null)
+      {
+         this.cm = new ActiveMQRAConnectionManager();
+      }
+      else
+      {
+         this.cm = cm;
+      }
+
+      this.type = type;
+
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + mcf + ", " + cm + ", " 
+ type);
+      }
+   }
+
+   public JMSContext createContext(int sessionMode)
+   {
+      boolean inJtaTx = inJtaTransaction();
+      int sessionModeToUse;
+      switch (sessionMode)
+      {
+         case Session.AUTO_ACKNOWLEDGE:
+         case Session.DUPS_OK_ACKNOWLEDGE:
+         case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
+         case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
+            sessionModeToUse = sessionMode;
+            break;
+         //these are prohibited in JEE unless not in a JTA tx where they 
should be ignored and auto_ack used
+         case Session.CLIENT_ACKNOWLEDGE:
+            if (!inJtaTx)
+            {
+               throw 
ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime();
+            }
+            sessionModeToUse = Session.AUTO_ACKNOWLEDGE;
+            break;
+         case Session.SESSION_TRANSACTED:
+            if (!inJtaTx)
+            {
+               throw 
ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
+            }
+            sessionModeToUse = Session.AUTO_ACKNOWLEDGE;
+            break;
+         default:
+            throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(sessionMode);
+      }
+      incrementRefCounter();
+
+      return new ActiveMQRAJMSContext(this, sessionModeToUse, 
threadAwareContext);
+   }
+
+   public XAJMSContext createXAContext()
+   {
+      incrementRefCounter();
+
+      return new ActiveMQRAXAJMSContext(this, threadAwareContext);
+   }
+
+   /**
+    * Set the naming reference
+    *
+    * @param reference The reference
+    */
+   public void setReference(final Reference reference)
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setReference(" + reference + ")");
+      }
+
+      this.reference = reference;
+   }
+
+   /**
+    * Get the naming reference
+    *
+    * @return The reference
+    */
+   public Reference getReference()
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getReference()");
+      }
+
+      return reference;
+   }
+
+   /**
+    * Set the user name
+    *
+    * @param name The user name
+    */
+   public void setUserName(final String name)
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setUserName(" + name + ")");
+      }
+
+      userName = name;
+   }
+
+   /**
+    * Set the password
+    *
+    * @param password The password
+    */
+   public void setPassword(final String password)
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setPassword(****)");
+      }
+
+      this.password = password;
+   }
+
+   /**
+    * Get the client ID
+    *
+    * @return The client ID
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public String getClientID() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getClientID()");
+      }
+
+      checkClosed();
+
+      if (clientID == null)
+      {
+         return ((ActiveMQResourceAdapter) 
mcf.getResourceAdapter()).getProperties().getClientID();
+      }
+
+      return clientID;
+   }
+
+   /**
+    * Set the client ID -- throws IllegalStateException
+    *
+    * @param cID The client ID
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public void setClientID(final String cID) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setClientID(" + cID + ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a queue session
+    *
+    * @param transacted      Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @return The queue session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public QueueSession createQueueSession(final boolean transacted, final int 
acknowledgeMode) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createQueueSession(" + transacted + ", 
" + acknowledgeMode + ")");
+      }
+
+      checkClosed();
+
+      if (type == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || type == 
ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a queue session from a 
topic connection");
+      }
+
+      return allocateConnection(transacted, acknowledgeMode, type);
+   }
+
+   /**
+    * Create a XA queue session
+    *
+    * @return The XA queue session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public XAQueueSession createXAQueueSession() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createXAQueueSession()");
+      }
+
+      checkClosed();
+
+      if (type == ActiveMQRAConnectionFactory.CONNECTION || type == 
ActiveMQRAConnectionFactory.TOPIC_CONNECTION ||
+         type == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a topic session from a 
queue connection");
+      }
+
+      return allocateConnection(type);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    *
+    * @param queue           The queue
+    * @param messageSelector The message selector
+    * @param sessionPool     The session pool
+    * @param maxMessages     The number of max messages
+    * @return The connection consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Queue queue,
+                                                      final String 
messageSelector,
+                                                      final ServerSessionPool 
sessionPool,
+                                                      final int maxMessages) 
throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + queue +
+                                         ", " +
+                                         messageSelector +
+                                         ", " +
+                                         sessionPool +
+                                         ", " +
+                                         maxMessages +
+                                         ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a topic session
+    *
+    * @param transacted      Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @return The topic session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public TopicSession createTopicSession(final boolean transacted, final int 
acknowledgeMode) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createTopicSession(" + transacted + ", 
" + acknowledgeMode + ")");
+      }
+
+      checkClosed();
+
+      if (type == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || type == 
ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a topic session from a 
queue connection");
+      }
+
+      return allocateConnection(transacted, acknowledgeMode, type);
+   }
+
+   /**
+    * Create a XA topic session
+    *
+    * @return The XA topic session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public XATopicSession createXATopicSession() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createXATopicSession()");
+      }
+
+      checkClosed();
+
+      if (type == ActiveMQRAConnectionFactory.CONNECTION || type == 
ActiveMQRAConnectionFactory.QUEUE_CONNECTION ||
+         type == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION)
+      {
+         throw new IllegalStateException("Can not get a topic session from a 
queue connection");
+      }
+
+      return allocateConnection(type);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    *
+    * @param topic           The topic
+    * @param messageSelector The message selector
+    * @param sessionPool     The session pool
+    * @param maxMessages     The number of max messages
+    * @return The connection consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Topic topic,
+                                                      final String 
messageSelector,
+                                                      final ServerSessionPool 
sessionPool,
+                                                      final int maxMessages) 
throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + topic +
+                                         ", " +
+                                         messageSelector +
+                                         ", " +
+                                         sessionPool +
+                                         ", " +
+                                         maxMessages +
+                                         ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a durable connection consumer -- throws IllegalStateException
+    *
+    * @param topic            The topic
+    * @param subscriptionName The subscription name
+    * @param messageSelector  The message selector
+    * @param sessionPool      The session pool
+    * @param maxMessages      The number of max messages
+    * @return The connection consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
+                                                             final String 
subscriptionName,
+                                                             final String 
messageSelector,
+                                                             final 
ServerSessionPool sessionPool,
+                                                             final int 
maxMessages) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + topic +
+                                         ", " +
+                                         subscriptionName +
+                                         ", " +
+                                         messageSelector +
+                                         ", " +
+                                         sessionPool +
+                                         ", " +
+                                         maxMessages +
+                                         ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    *
+    * @param destination The destination
+    * @param pool        The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   public ConnectionConsumer createConnectionConsumer(final Destination 
destination,
+                                                      final ServerSessionPool 
pool,
+                                                      final int maxMessages) 
throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + 
destination +
+                                         ", " +
+                                         pool +
+                                         ", " +
+                                         maxMessages +
+                                         ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a connection consumer -- throws IllegalStateException
+    *
+    * @param destination The destination
+    * @param name        The name
+    * @param pool        The session pool
+    * @param maxMessages The number of max messages
+    * @return The connection consumer
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public ConnectionConsumer createConnectionConsumer(final Destination 
destination,
+                                                      final String name,
+                                                      final ServerSessionPool 
pool,
+                                                      final int maxMessages) 
throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + 
destination +
+                                         ", " +
+                                         name +
+                                         ", " +
+                                         pool +
+                                         ", " +
+                                         maxMessages +
+                                         ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Create a session
+    *
+    * @param transacted      Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @return The session
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public Session createSession(final boolean transacted, final int 
acknowledgeMode) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createSession(" + transacted + ", " + 
acknowledgeMode + ")");
+      }
+
+      checkClosed();
+      return allocateConnection(transacted, acknowledgeMode, type);
+   }
+
+   /**
+    * Create a XA session
+    *
+    * @return The XA session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public XASession createXASession() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createXASession()");
+      }
+
+      checkClosed();
+      return allocateConnection(type);
+   }
+
+   /**
+    * Get the connection metadata
+    *
+    * @return The connection metadata
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public ConnectionMetaData getMetaData() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMetaData()");
+      }
+
+      checkClosed();
+      return mcf.getMetaData();
+   }
+
+   /**
+    * Get the exception listener -- throws IllegalStateException
+    *
+    * @return The exception listener
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public ExceptionListener getExceptionListener() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getExceptionListener()");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Set the exception listener -- throws IllegalStateException
+    *
+    * @param listener The exception listener
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public void setExceptionListener(final ExceptionListener listener) throws 
JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setExceptionListener(" + listener + 
")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Start
+    *
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public void start() throws JMSException
+   {
+      checkClosed();
+
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("start() " + this);
+      }
+
+      synchronized (sessions)
+      {
+         if (started)
+         {
+            return;
+         }
+         started = true;
+         for (ActiveMQRASession session : sessions)
+         {
+            session.start();
+         }
+      }
+   }
+
+   /**
+    * Stop
+    *
+    * @throws IllegalStateException
+    * @throws JMSException          Thrown if an error occurs
+    */
+   @Override
+   public void stop() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("stop() " + this);
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Close
+    *
+    * @throws JMSException Thrown if an error occurs
+    */
+   @Override
+   public void close() throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("close() " + this);
+      }
+
+      if (closed)
+      {
+         return;
+      }
+
+      closed = true;
+
+      synchronized (sessions)
+      {
+         for (Iterator<ActiveMQRASession> i = sessions.iterator(); 
i.hasNext(); )
+         {
+            ActiveMQRASession session = i.next();
+            try
+            {
+               session.closeSession();
+            }
+            catch (Throwable t)
+            {
+               ActiveMQRALogger.LOGGER.trace("Error closing session", t);
+            }
+            i.remove();
+         }
+      }
+
+      synchronized (tempQueues)
+      {
+         for (Iterator<TemporaryQueue> i = tempQueues.iterator(); i.hasNext(); 
)
+         {
+            TemporaryQueue temp = i.next();
+            try
+            {
+               if (ActiveMQRASessionFactoryImpl.trace)
+               {
+                  ActiveMQRALogger.LOGGER.trace("Closing temporary queue " + 
temp + " for " + this);
+               }
+               temp.delete();
+            }
+            catch (Throwable t)
+            {
+               ActiveMQRALogger.LOGGER.trace("Error deleting temporary queue", 
t);
+            }
+            i.remove();
+         }
+      }
+
+      synchronized (tempTopics)
+      {
+         for (Iterator<TemporaryTopic> i = tempTopics.iterator(); i.hasNext(); 
)
+         {
+            TemporaryTopic temp = i.next();
+            try
+            {
+               if (ActiveMQRASessionFactoryImpl.trace)
+               {
+                  ActiveMQRALogger.LOGGER.trace("Closing temporary topic " + 
temp + " for " + this);
+               }
+               temp.delete();
+            }
+            catch (Throwable t)
+            {
+               ActiveMQRALogger.LOGGER.trace("Error deleting temporary queue", 
t);
+            }
+            i.remove();
+         }
+      }
+   }
+
+   /**
+    * Close session
+    *
+    * @param session The session
+    * @throws JMSException Thrown if an error occurs
+    */
+   public void closeSession(final ActiveMQRASession session) throws 
JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("closeSession(" + session + ")");
+      }
+
+      synchronized (sessions)
+      {
+         sessions.remove(session);
+      }
+   }
+
+   /**
+    * Add temporary queue
+    *
+    * @param temp The temporary queue
+    */
+   public void addTemporaryQueue(final TemporaryQueue temp)
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("addTemporaryQueue(" + temp + ")");
+      }
+
+      synchronized (tempQueues)
+      {
+         tempQueues.add(temp);
+      }
+   }
+
+   /**
+    * Add temporary topic
+    *
+    * @param temp The temporary topic
+    */
+   public void addTemporaryTopic(final TemporaryTopic temp)
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("addTemporaryTopic(" + temp + ")");
+      }
+
+      synchronized (tempTopics)
+      {
+         tempTopics.add(temp);
+      }
+   }
+
+   @Override
+   public Session createSession(int sessionMode) throws JMSException
+   {
+      return createSession(sessionMode == Session.SESSION_TRANSACTED, 
sessionMode);
+   }
+
+   @Override
+   public Session createSession() throws JMSException
+   {
+      return createSession(Session.AUTO_ACKNOWLEDGE);
+   }
+
+   @Override
+   public ConnectionConsumer createSharedConnectionConsumer(Topic topic, 
String subscriptionName, String messageSelector, ServerSessionPool sessionPool, 
int maxMessages) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("createSharedConnectionConsumer(" + 
topic + ", " + subscriptionName + ", " +
+                                         messageSelector + ", " + sessionPool 
+ ", " + maxMessages + ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   @Override
+   public ConnectionConsumer createSharedDurableConnectionConsumer(Topic 
topic, String subscriptionName, String messageSelector, ServerSessionPool 
sessionPool, int maxMessages) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         
ActiveMQRALogger.LOGGER.trace("createSharedDurableConnectionConsumer(" + topic 
+ ", " + subscriptionName +
+                                         ", " + messageSelector + ", " + 
sessionPool + ", " + maxMessages + ")");
+      }
+
+      throw new IllegalStateException(ActiveMQRASessionFactory.ISE);
+   }
+
+   /**
+    * Allocation a connection
+    *
+    * @param sessionType The session type
+    * @return The session
+    * @throws JMSException Thrown if an error occurs
+    */
+   protected ActiveMQRASession allocateConnection(final int sessionType) 
throws JMSException
+   {
+      return allocateConnection(false, Session.AUTO_ACKNOWLEDGE, sessionType);
+   }
+
+   /**
+    * Allocate a connection
+    *
+    * @param transacted      Use transactions
+    * @param acknowledgeMode The acknowledge mode
+    * @param sessionType     The session type
+    * @return The session
+    * @throws JMSException Thrown if an error occurs
+    */
+   protected ActiveMQRASession allocateConnection(boolean transacted, int 
acknowledgeMode, final int sessionType) throws JMSException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("allocateConnection(" + transacted +
+                                         ", " +
+                                         acknowledgeMode +
+                                         ", " +
+                                         sessionType +
+                                         ")");
+      }
+
+      try
+      {
+         synchronized (sessions)
+         {
+            if (sessions.isEmpty() == false)
+            {
+               throw new IllegalStateException("Only allowed one session per 
connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
+            }
+            //from createSession
+            // In a Java EE web or EJB container, when there is an active JTA 
transaction in progress:
+            //Both arguments {@code transacted} and {@code acknowledgeMode} 
are ignored.
+            if (inJtaTransaction())
+            {
+               transacted = true;
+               //from getAcknowledgeMode
+               // If the session is not transacted, returns the
+               // current acknowledgement mode for the session.
+               // If the session
+               // is transacted, returns SESSION_TRANSACTED.
+               acknowledgeMode = Session.SESSION_TRANSACTED;
+            }
+            //In the Java EE web or EJB container, when there is no active JTA 
transaction in progress
+            // The argument {@code transacted} is ignored.
+            else
+            {
+               //The session will always be non-transacted,
+               transacted = false;
+               switch (acknowledgeMode)
+               {
+                  //using one of the two acknowledgement modes 
AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE.
+                  case Session.AUTO_ACKNOWLEDGE:
+                  case Session.DUPS_OK_ACKNOWLEDGE:
+                     //plus our own
+                  case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
+                  case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
+                     break;
+                  //The value {@code Session.CLIENT_ACKNOWLEDGE} may not be 
used.
+                  case Session.CLIENT_ACKNOWLEDGE:
+                     throw 
ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
+                     //same with this although the spec doesn't explicitly say
+                  case Session.SESSION_TRANSACTED:
+                     throw 
ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime();
+                  default:
+                     throw 
ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode);
+               }
+            }
+
+            ActiveMQRAConnectionRequestInfo info = new 
ActiveMQRAConnectionRequestInfo(transacted,
+                                                                               
      acknowledgeMode,
+                                                                               
      sessionType);
+            info.setUserName(userName);
+            info.setPassword(password);
+            info.setClientID(clientID);
+            info.setDefaults(((ActiveMQResourceAdapter) 
mcf.getResourceAdapter()).getProperties());
+
+            if (ActiveMQRASessionFactoryImpl.trace)
+            {
+               ActiveMQRALogger.LOGGER.trace("Allocating session for " + this 
+ " with request info=" + info);
+            }
+
+            ActiveMQRASession session = (ActiveMQRASession) 
cm.allocateConnection(mcf, info);
+
+            try
+            {
+               if (ActiveMQRASessionFactoryImpl.trace)
+               {
+                  ActiveMQRALogger.LOGGER.trace("Allocated  " + this + " 
session=" + session);
+               }
+
+               session.setActiveMQSessionFactory(this);
+
+               if (started)
+               {
+                  session.start();
+               }
+
+               sessions.add(session);
+
+               return session;
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (Throwable ignored)
+               {
+               }
+               if (t instanceof Exception)
+               {
+                  throw (Exception) t;
+               }
+               else
+               {
+                  throw new RuntimeException("Unexpected error: ", t);
+               }
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         Throwable current = e;
+         while (current != null && !(current instanceof JMSException))
+         {
+            current = current.getCause();
+         }
+
+         if (current != null && current instanceof JMSException)
+         {
+            throw (JMSException) current;
+         }
+         else
+         {
+            JMSException je = new JMSException("Could not create a session: " 
+ e.getMessage());
+            je.setLinkedException(e);
+            je.initCause(e);
+            throw je;
+         }
+      }
+   }
+
+   /**
+    * Check if we are closed
+    *
+    * @throws IllegalStateException Thrown if closed
+    */
+   protected void checkClosed() throws IllegalStateException
+   {
+      if (ActiveMQRASessionFactoryImpl.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("checkClosed()" + this);
+      }
+
+      if (closed)
+      {
+         throw new IllegalStateException("The connection is closed");
+      }
+   }
+
+   private boolean inJtaTransaction()
+   {
+      boolean inJtaTx = false;
+      if (tm != null)
+      {
+         Transaction tx = null;
+         try
+         {
+            tx = tm.getTransaction();
+         }
+         catch (SystemException e)
+         {
+            //assume false
+         }
+         inJtaTx = tx != null;
+      }
+      return inJtaTx;
+   }
+}

Reply via email to