http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSClient.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSClient.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSClient.java deleted file mode 100644 index 974137c..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSClient.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.api.jms; - -import javax.jms.Queue; -import javax.jms.Topic; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.jms.client.HornetQConnectionFactory; -import org.apache.activemq.jms.client.HornetQDestination; -import org.apache.activemq.jms.client.HornetQJMSConnectionFactory; -import org.apache.activemq.jms.client.HornetQQueueConnectionFactory; -import org.apache.activemq.jms.client.HornetQTopicConnectionFactory; -import org.apache.activemq.jms.client.HornetQXAConnectionFactory; -import org.apache.activemq.jms.client.HornetQXAQueueConnectionFactory; -import org.apache.activemq.jms.client.HornetQXATopicConnectionFactory; - -/** - * A utility class for creating HornetQ client-side JMS managed resources. - * - * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> - */ -public class HornetQJMSClient -{ - - /** - * Creates a HornetQConnectionFactory that receives cluster topology updates from the cluster as - * servers leave or join and new backups are appointed or removed. - * <p> - * The discoveryAddress and discoveryPort parameters in this method are used to listen for UDP - * broadcasts which contain connection information for members of the cluster. The broadcasted - * connection information is simply used to make an initial connection to the cluster, once that - * connection is made, up to date cluster topology information is downloaded and automatically - * updated whenever the cluster topology changes. If the topology includes backup servers that - * information is also propagated to the client so that it can know which server to failover onto - * in case of live server failure. - * @param discoveryAddress The UDP group address to listen for updates - * @param discoveryPort the UDP port to listen for updates - * @return the HornetQConnectionFactory - */ - public static HornetQConnectionFactory createConnectionFactoryWithHA(final DiscoveryGroupConfiguration groupConfiguration, JMSFactoryType jmsFactoryType) - { - HornetQConnectionFactory factory = null; - if (jmsFactoryType.equals(JMSFactoryType.CF)) - { - factory = new HornetQJMSConnectionFactory(true, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF)) - { - factory = new HornetQQueueConnectionFactory(true, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF)) - { - factory = new HornetQTopicConnectionFactory(true, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.XA_CF)) - { - factory = new HornetQXAConnectionFactory(true, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF)) - { - factory = new HornetQXAQueueConnectionFactory(true, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF)) - { - factory = new HornetQXATopicConnectionFactory(true, groupConfiguration); - } - - return factory; - } - - /** - * Create a HornetQConnectionFactory which creates session factories from a set of live servers, no HA backup information is propagated to the client - * - * The UDP address and port are used to listen for live servers in the cluster - * - * @param discoveryAddress The UDP group address to listen for updates - * @param discoveryPort the UDP port to listen for updates - * @return the HornetQConnectionFactory - */ - public static HornetQConnectionFactory createConnectionFactoryWithoutHA(final DiscoveryGroupConfiguration groupConfiguration, JMSFactoryType jmsFactoryType) - { - HornetQConnectionFactory factory = null; - if (jmsFactoryType.equals(JMSFactoryType.CF)) - { - factory = new HornetQJMSConnectionFactory(false, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF)) - { - factory = new HornetQQueueConnectionFactory(false, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF)) - { - factory = new HornetQTopicConnectionFactory(false, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.XA_CF)) - { - factory = new HornetQXAConnectionFactory(false, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF)) - { - factory = new HornetQXAQueueConnectionFactory(false, groupConfiguration); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF)) - { - factory = new HornetQXATopicConnectionFactory(false, groupConfiguration); - } - - return factory; - } - - /** - * Create a HornetQConnectionFactory which will receive cluster topology updates from the cluster - * as servers leave or join and new backups are appointed or removed. - * <p> - * The initial list of servers supplied in this method is simply to make an initial connection to - * the cluster, once that connection is made, up to date cluster topology information is - * downloaded and automatically updated whenever the cluster topology changes. If the topology - * includes backup servers that information is also propagated to the client so that it can know - * which server to failover onto in case of live server failure. - * @param initialServers The initial set of servers used to make a connection to the cluster. - * Each one is tried in turn until a successful connection is made. Once a connection - * is made, the cluster topology is downloaded and the rest of the list is ignored. - * @return the HornetQConnectionFactory - */ - public static HornetQConnectionFactory createConnectionFactoryWithHA(JMSFactoryType jmsFactoryType, final TransportConfiguration... initialServers) - { - HornetQConnectionFactory factory = null; - if (jmsFactoryType.equals(JMSFactoryType.CF)) - { - factory = new HornetQJMSConnectionFactory(true, initialServers); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF)) - { - factory = new HornetQQueueConnectionFactory(true, initialServers); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF)) - { - factory = new HornetQTopicConnectionFactory(true, initialServers); - } - else if (jmsFactoryType.equals(JMSFactoryType.XA_CF)) - { - factory = new HornetQXAConnectionFactory(true, initialServers); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF)) - { - factory = new HornetQXAQueueConnectionFactory(true, initialServers); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF)) - { - factory = new HornetQXATopicConnectionFactory(true, initialServers); - } - - return factory; - } - - /** - * Create a HornetQConnectionFactory which creates session factories using a static list of - * transportConfigurations. - * <p> - * The HornetQConnectionFactory is not updated automatically as the cluster topology changes, and - * no HA backup information is propagated to the client - * @param transportConfigurations - * @return the HornetQConnectionFactory - */ - public static HornetQConnectionFactory createConnectionFactoryWithoutHA(JMSFactoryType jmsFactoryType, final TransportConfiguration... transportConfigurations) - { - HornetQConnectionFactory factory = null; - if (jmsFactoryType.equals(JMSFactoryType.CF)) - { - factory = new HornetQJMSConnectionFactory(false, transportConfigurations); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_CF)) - { - factory = new HornetQQueueConnectionFactory(false, transportConfigurations); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_CF)) - { - factory = new HornetQTopicConnectionFactory(false, transportConfigurations); - } - else if (jmsFactoryType.equals(JMSFactoryType.XA_CF)) - { - factory = new HornetQXAConnectionFactory(false, transportConfigurations); - } - else if (jmsFactoryType.equals(JMSFactoryType.QUEUE_XA_CF)) - { - factory = new HornetQXAQueueConnectionFactory(false, transportConfigurations); - } - else if (jmsFactoryType.equals(JMSFactoryType.TOPIC_XA_CF)) - { - factory = new HornetQXATopicConnectionFactory(false, transportConfigurations); - } - - return factory; - } - - /** - * Creates a client-side representation of a JMS Topic. - * - * @param name the name of the topic - * @return The Topic - */ - public static Topic createTopic(final String name) - { - return HornetQDestination.createTopic(name); - } - - /** - * Creates a client-side representation of a JMS Queue. - * - * @param name the name of the queue - * @return The Queue - */ - public static Queue createQueue(final String name) - { - return HornetQDestination.createQueue(name); - } - - private HornetQJMSClient() - { - // Utility class - } -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSConstants.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSConstants.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSConstants.java deleted file mode 100644 index 2a57bea..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/HornetQJMSConstants.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.api.jms; - -/** - * Constants for HornetQ for property keys used for HornetQ specific extensions to JMS. - * - * @author Tim Fox - * - * - */ -public class HornetQJMSConstants -{ - public static final String JMS_HORNETQ_INPUT_STREAM = "JMS_HQ_InputStream"; - - public static final String JMS_HORNETQ_OUTPUT_STREAM = "JMS_HQ_OutputStream"; - - public static final String JMS_HORNETQ_SAVE_STREAM = "JMS_HQ_SaveStream"; - - public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "HQ_BRIDGE_MSG_ID_LIST"; - - public static final int PRE_ACKNOWLEDGE = 100; - - public static final int INDIVIDUAL_ACKNOWLEDGE = 101; - - public static final String JMS_HORNETQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME = - "hq.jms.support-bytes-id"; -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/ConnectionFactoryControl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/ConnectionFactoryControl.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/ConnectionFactoryControl.java index 6b482ee..02767e9 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/ConnectionFactoryControl.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/ConnectionFactoryControl.java @@ -19,7 +19,7 @@ import org.apache.activemq.api.core.management.Parameter; /** * A ConnectionFactoryControl is used to manage a JMS ConnectionFactory. <br> - * HornetQ JMS ConnectionFactory uses an underlying ClientSessionFactory to connect to HornetQ + * ActiveMQ JMS ConnectionFactory uses an underlying ClientSessionFactory to connect to ActiveMQ * servers. Please refer to the ClientSessionFactory for a detailed description. * * @author <a href="mailto:jmes...@redhat.com">Jeff Mesnil</a> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/DestinationControl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/DestinationControl.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/DestinationControl.java index c775bcf..6964772 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/DestinationControl.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/DestinationControl.java @@ -32,7 +32,7 @@ public interface DestinationControl String getName(); /** - * Returns the HornetQ address corresponding to this destination. + * Returns the ActiveMQ address corresponding to this destination. */ String getAddress(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSManagementHelper.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSManagementHelper.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSManagementHelper.java index 4d25f9c..eb6c292 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSManagementHelper.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSManagementHelper.java @@ -16,10 +16,10 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.activemq.api.core.management.ManagementHelper; -import org.apache.activemq.jms.client.HornetQMessage; +import org.apache.activemq.jms.client.ActiveMQMessage; /** - * Helper class to use JMS messages to manage HornetQ server resources. + * Helper class to use JMS messages to manage ActiveMQ server resources. * @author <a href="mailto:jmes...@redhat.com">Jeff Mesnil</a> * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> */ @@ -27,13 +27,13 @@ public class JMSManagementHelper { private static org.apache.activemq.api.core.Message getCoreMessage(final Message jmsMessage) { - if (jmsMessage instanceof HornetQMessage == false) + if (jmsMessage instanceof ActiveMQMessage == false) { - throw new IllegalArgumentException("Cannot send a non HornetQ message as a management message " + jmsMessage.getClass() + throw new IllegalArgumentException("Cannot send a non ActiveMQ message as a management message " + jmsMessage.getClass() .getName()); } - return ((HornetQMessage)jmsMessage).getCoreMessage(); + return ((ActiveMQMessage)jmsMessage).getCoreMessage(); } /** http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSServerControl.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSServerControl.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSServerControl.java index dea580d..03b3383 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSServerControl.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSServerControl.java @@ -18,7 +18,7 @@ import org.apache.activemq.api.core.management.Operation; import org.apache.activemq.api.core.management.Parameter; /** - * A JMSSserverControl is used to manage HornetQ JMS server. + * A JMSSserverControl is used to manage ActiveMQ JMS server. * * @author <a href="mailto:jmes...@redhat.com">Jeff Mesnil</a> * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> @@ -270,8 +270,8 @@ public interface JMSServerControl /** * Closes all the connections on this server for consumers which are consuming from a queue associated with a particular address. */ - @Operation(desc = "Closes all the consumer connections for the given HornetQ address", impact = MBeanOperationInfo.INFO) - boolean closeConsumerConnectionsForAddress(@Parameter(desc = "a HornetQ address", name = "address") String address) throws Exception; + @Operation(desc = "Closes all the consumer connections for the given ActiveMQ address", impact = MBeanOperationInfo.INFO) + boolean closeConsumerConnectionsForAddress(@Parameter(desc = "a ActiveMQ address", name = "address") String address) throws Exception; /** * Closes all the connections on this server for sessions using a particular user name. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/SubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/SubscriptionInfo.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/SubscriptionInfo.java index b6102cb..e27c92b 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/SubscriptionInfo.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/SubscriptionInfo.java @@ -85,7 +85,7 @@ public class SubscriptionInfo // Public -------------------------------------------------------- /** - * Returns the name of the HornetQ core queue corresponding to this subscription. + * Returns the name of the ActiveMQ core queue corresponding to this subscription. */ public String getQueueName() { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/package-info.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/package-info.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/package-info.java index 72e8964..95a1d8b 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/package-info.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/package-info.java @@ -11,9 +11,9 @@ * permissions and limitations under the License. */ /** - * Management API for HornetQ JMS resources. + * Management API for ActiveMQ JMS resources. * <br> - * HornetQ JMS resources can be managed either using JMX or by sending JMS management messages to the + * ActiveMQ JMS resources can be managed either using JMX or by sending JMS management messages to the * server's special management address. Please refer to the user manual for more information. */ package org.apache.activemq.api.jms.management; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/package-info.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/package-info.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/package-info.java index e638e3e..20b574c 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/package-info.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/package-info.java @@ -11,10 +11,10 @@ * permissions and limitations under the License. */ /** - * API to create HornetQ JMS resources. + * API to create ActiveMQ JMS resources. * <br> * This package contains classes to create - * HornetQ JMS managed resources (ConnectionFactory, Queue and Topic). + * ActiveMQ JMS managed resources (ConnectionFactory, Queue and Topic). * */ package org.apache.activemq.api.jms; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQBytesMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQBytesMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQBytesMessage.java new file mode 100644 index 0000000..24b24df --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQBytesMessage.java @@ -0,0 +1,436 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Message; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.core.message.impl.MessageImpl; + +import static org.apache.activemq.reader.BytesMessageUtil.bytesMessageReset; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadBoolean; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadByte; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadBytes; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadChar; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadDouble; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadFloat; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadInt; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadLong; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadShort; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadUTF; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadUnsignedByte; +import static org.apache.activemq.reader.BytesMessageUtil.bytesReadUnsignedShort; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteBoolean; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteByte; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteBytes; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteChar; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteDouble; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteFloat; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteInt; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteLong; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteObject; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteShort; +import static org.apache.activemq.reader.BytesMessageUtil.bytesWriteUTF; + +/** + * ActiveMQ implementation of a JMS {@link BytesMessage}. + * + * @author Norbert Lataille (norbert.latai...@m4x.org) + * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> + */ +public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage +{ + // Static ------------------------------------------------------- + public static final byte TYPE = Message.BYTES_TYPE; + + // Attributes ---------------------------------------------------- + + private int bodyLength; + + // Constructor --------------------------------------------------- + + /** + * This constructor is used to construct messages prior to sending + */ + protected ActiveMQBytesMessage(final ClientSession session) + { + super(ActiveMQBytesMessage.TYPE, session); + } + + /** + * Constructor on receipt at client side + */ + protected ActiveMQBytesMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + /** + * Foreign message constructor + */ + public ActiveMQBytesMessage(final BytesMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, ActiveMQBytesMessage.TYPE, session); + + foreign.reset(); + + byte[] buffer = new byte[1024]; + int n = foreign.readBytes(buffer); + while (n != -1) + { + writeBytes(buffer, 0, n); + n = foreign.readBytes(buffer); + } + } + + // BytesMessage implementation ----------------------------------- + + public boolean readBoolean() throws JMSException + { + checkRead(); + try + { + return bytesReadBoolean(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public byte readByte() throws JMSException + { + checkRead(); + try + { + return bytesReadByte(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readUnsignedByte() throws JMSException + { + checkRead(); + try + { + return bytesReadUnsignedByte(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public short readShort() throws JMSException + { + checkRead(); + try + { + return bytesReadShort(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readUnsignedShort() throws JMSException + { + checkRead(); + try + { + return bytesReadUnsignedShort(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public char readChar() throws JMSException + { + checkRead(); + try + { + return bytesReadChar(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readInt() throws JMSException + { + checkRead(); + try + { + return bytesReadInt(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public long readLong() throws JMSException + { + checkRead(); + try + { + return bytesReadLong(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public float readFloat() throws JMSException + { + checkRead(); + try + { + return bytesReadFloat(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public double readDouble() throws JMSException + { + checkRead(); + try + { + return bytesReadDouble(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public String readUTF() throws JMSException + { + checkRead(); + try + { + return bytesReadUTF(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + catch (Exception e) + { + JMSException je = new JMSException("Failed to get UTF"); + je.setLinkedException(e); + je.initCause(e); + throw je; + } + } + + public int readBytes(final byte[] value) throws JMSException + { + checkRead(); + return bytesReadBytes(message, value); + } + + public int readBytes(final byte[] value, final int length) throws JMSException + { + checkRead(); + return bytesReadBytes(message, value, length); + + } + + public void writeBoolean(final boolean value) throws JMSException + { + checkWrite(); + bytesWriteBoolean(message, value); + } + + public void writeByte(final byte value) throws JMSException + { + checkWrite(); + bytesWriteByte(message, value); + } + + public void writeShort(final short value) throws JMSException + { + checkWrite(); + bytesWriteShort(message, value); + } + + public void writeChar(final char value) throws JMSException + { + checkWrite(); + bytesWriteChar(message, value); + } + + public void writeInt(final int value) throws JMSException + { + checkWrite(); + bytesWriteInt(message, value); + } + + public void writeLong(final long value) throws JMSException + { + checkWrite(); + bytesWriteLong(message, value); + } + + public void writeFloat(final float value) throws JMSException + { + checkWrite(); + bytesWriteFloat(message, value); + } + + public void writeDouble(final double value) throws JMSException + { + checkWrite(); + bytesWriteDouble(message, value); + } + + public void writeUTF(final String value) throws JMSException + { + checkWrite(); + try + { + bytesWriteUTF(message, value); + } + catch (Exception e) + { + JMSException je = new JMSException("Failed to write UTF"); + je.setLinkedException(e); + je.initCause(e); + throw je; + } + + } + + public void writeBytes(final byte[] value) throws JMSException + { + checkWrite(); + bytesWriteBytes(message, value); + } + + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + checkWrite(); + bytesWriteBytes(message, value, offset, length); + } + + public void writeObject(final Object value) throws JMSException + { + checkWrite(); + if (!bytesWriteObject(message, value)) + { + throw new MessageFormatException("Invalid object for properties"); + } + } + + public void reset() throws JMSException + { + if (!readOnly) + { + readOnly = true; + + bodyLength = message.getBodySize(); + } + + bytesMessageReset(message); + } + + @Override + public void doBeforeReceive() throws ActiveMQException + { + bodyLength = message.getBodySize(); + } + + // ActiveMQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + try + { + getBuffer().clear(); + } + catch (RuntimeException e) + { + JMSException e2 = new JMSException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + public long getBodyLength() throws JMSException + { + checkRead(); + + return bodyLength; + } + + @Override + public void doBeforeSend() throws Exception + { + reset(); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return ActiveMQBytesMessage.TYPE; + } + + private ActiveMQBuffer getBuffer() + { + return message.getBodyBuffer(); + } + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") + Class c) + { + return c.isAssignableFrom(byte[].class); + } + + @Override + protected <T> T getBodyInternal(Class<T> c) + { + if (bodyLength == 0) + return null; + byte[] dst = new byte[bodyLength]; + message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst); + return (T)dst; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java new file mode 100644 index 0000000..83465ba --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java @@ -0,0 +1,862 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.InvalidClientIDException; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import java.lang.ref.WeakReference; +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.FailoverEventListener; +import org.apache.activemq.api.core.client.FailoverEventType; +import org.apache.activemq.api.core.client.SessionFailureListener; +import org.apache.activemq.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.core.client.impl.ClientSessionInternal; +import org.apache.activemq.core.version.Version; +import org.apache.activemq.reader.MessageUtil; +import org.apache.activemq.utils.ConcurrentHashSet; +import org.apache.activemq.utils.UUIDGenerator; +import org.apache.activemq.utils.VersionLoader; + +/** + * ActiveMQ implementation of a JMS Connection. + * <p> + * The flat implementation of {@link TopicConnection} and {@link QueueConnection} is per design, + * following the common usage of these as one flat API in JMS 1.1. + * + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> + */ +public class ActiveMQConnection extends ActiveMQConnectionForContextImpl implements TopicConnection, QueueConnection +{ + // Constants ------------------------------------------------------------------------------------ + public static final int TYPE_GENERIC_CONNECTION = 0; + + public static final int TYPE_QUEUE_CONNECTION = 1; + + public static final int TYPE_TOPIC_CONNECTION = 2; + + public static final String EXCEPTION_FAILOVER = "FAILOVER"; + + public static final String EXCEPTION_DISCONNECT = "DISCONNECT"; + + public static final SimpleString CONNECTION_ID_PROPERTY_NAME = MessageUtil.CONNECTION_ID_PROPERTY_NAME; + + // Static --------------------------------------------------------------------------------------- + + // Attributes ----------------------------------------------------------------------------------- + + private final int connectionType; + + private final Set<ActiveMQSession> sessions = new org.apache.activemq.utils.ConcurrentHashSet<ActiveMQSession>(); + + private final Set<SimpleString> tempQueues = new org.apache.activemq.utils.ConcurrentHashSet<SimpleString>(); + + private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<SimpleString>(); + + private volatile boolean hasNoLocal; + + private volatile ExceptionListener exceptionListener; + + private volatile FailoverEventListener failoverEventListener; + + private volatile boolean justCreated = true; + + private volatile ConnectionMetaData metaData; + + private volatile boolean closed; + + private volatile boolean started; + + private String clientID; + + private final ClientSessionFactory sessionFactory; + + private final SimpleString uid; + + private final String username; + + private final String password; + + private final SessionFailureListener listener = new JMSFailureListener(this); + + private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this); + + private final Version thisVersion; + + private final int dupsOKBatchSize; + + private final int transactionBatchSize; + + private ClientSession initialSession; + + private final Exception creationStack; + + private ActiveMQConnectionFactory factoryReference; + + // Constructors --------------------------------------------------------------------------------- + + public ActiveMQConnection(final String username, final String password, final int connectionType, + final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final ClientSessionFactory sessionFactory) + { + this.username = username; + + this.password = password; + + this.connectionType = connectionType; + + this.clientID = clientID; + + this.sessionFactory = sessionFactory; + + uid = UUIDGenerator.getInstance().generateSimpleStringUUID(); + + thisVersion = VersionLoader.getVersion(); + + this.dupsOKBatchSize = dupsOKBatchSize; + + this.transactionBatchSize = transactionBatchSize; + + creationStack = new Exception(); + } + + /** + * This internal method serves basically the Resource Adapter. + * The resource adapter plays with an XASession and a non XASession. + * When there is no enlisted transaction, the EE specification mandates that the commit should + * be done as if it was a nonXA Session (i.e. SessionTransacted). + * For that reason we have this method to force that nonXASession, since the JMS Javadoc + * mandates createSession to return a XASession. + */ + public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_GENERIC_CONNECTION); + } + + /** + * This internal method serves basically the Resource Adapter. + * The resource adapter plays with an XASession and a non XASession. + * When there is no enlisted transaction, the EE specification mandates that the commit should + * be done as if it was a nonXA Session (i.e. SessionTransacted). + * For that reason we have this method to force that nonXASession, since the JMS Javadoc + * mandates createSession to return a XASession. + */ + public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_TOPIC_CONNECTION); + } + + /** + * This internal method serves basically the Resource Adapter. + * The resource adapter plays with an XASession and a non XASession. + * When there is no enlisted transaction, the EE specification mandates that the commit should + * be done as if it was a nonXA Session (i.e. SessionTransacted). + * For that reason we have this method to force that nonXASession, since the JMS Javadoc + * mandates createSession to return a XASession. + */ + public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, acknowledgeMode, ActiveMQConnection.TYPE_QUEUE_CONNECTION); + } + + + // Connection implementation -------------------------------------------------------------------- + + public synchronized Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQConnection.TYPE_GENERIC_CONNECTION); + } + + public String getClientID() throws JMSException + { + checkClosed(); + + return clientID; + } + + public void setClientID(final String clientID) throws JMSException + { + checkClosed(); + + if (this.clientID != null) + { + throw new IllegalStateException("Client id has already been set"); + } + + if (!justCreated) + { + throw new IllegalStateException("setClientID can only be called directly after the connection is created"); + } + + try + { + initialSession.addUniqueMetaData("jms-client-id", clientID); + } + catch (ActiveMQException e) + { + if (e.getType() == ActiveMQExceptionType.DUPLICATE_METADATA) + { + throw new InvalidClientIDException("clientID=" + clientID + " was already set into another connection"); + } + } + + this.clientID = clientID; + try + { + this.addSessionMetaData(initialSession); + } + catch (ActiveMQException e) + { + JMSException ex = new JMSException("Internal error setting metadata jms-client-id"); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; + } + + justCreated = false; + } + + public ConnectionMetaData getMetaData() throws JMSException + { + checkClosed(); + + justCreated = false; + + if (metaData == null) + { + metaData = new ActiveMQConnectionMetaData(thisVersion); + } + + return metaData; + } + + public ExceptionListener getExceptionListener() throws JMSException + { + checkClosed(); + + justCreated = false; + + return exceptionListener; + } + + public void setExceptionListener(final ExceptionListener listener) throws JMSException + { + checkClosed(); + + exceptionListener = listener; + justCreated = false; + } + + public synchronized void start() throws JMSException + { + checkClosed(); + + for (ActiveMQSession session : sessions) + { + session.start(); + } + + justCreated = false; + started = true; + } + + public synchronized void signalStopToAllSessions() + { + for (ActiveMQSession session : sessions) + { + ClientSession coreSession = session.getCoreSession(); + if (coreSession instanceof ClientSessionInternal) + { + ClientSessionInternal internalSession = (ClientSessionInternal) coreSession; + internalSession.setStopSignal(); + } + } + + } + + public synchronized void stop() throws JMSException + { + threadAwareContext.assertNotMessageListenerThread(); + + checkClosed(); + + for (ActiveMQSession session : sessions) + { + session.stop(); + } + + justCreated = false; + started = false; + } + + public final synchronized void close() throws JMSException + { + threadAwareContext.assertNotCompletionListenerThread(); + threadAwareContext.assertNotMessageListenerThread(); + + if (closed) + { + return; + } + + sessionFactory.close(); + + try + { + for (ActiveMQSession session : new HashSet<ActiveMQSession>(sessions)) + { + session.close(); + } + + try + { + if (!tempQueues.isEmpty()) + { + // Remove any temporary queues + + for (SimpleString queueName : tempQueues) + { + if (!initialSession.isClosed()) + { + try + { + initialSession.deleteQueue(queueName); + } + catch (ActiveMQException ignore) + { + // Exception on deleting queue shouldn't prevent close from completing + } + } + } + } + } + finally + { + if (initialSession != null) + { + initialSession.close(); + } + } + + closed = true; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + public ConnectionConsumer + createConnectionConsumer(final Destination destination, final String messageSelector, + final ServerSessionPool sessionPool, final int maxMessages) throws JMSException + { + checkClosed(); + + checkTempQueues(destination); + + // We offer a RA, so no need to implement this for MDBs + return null; + } + + private void checkTempQueues(Destination destination) throws JMSException + { + ActiveMQDestination jbdest = (ActiveMQDestination) destination; + + if (jbdest.isTemporary() && !containsTemporaryQueue(jbdest.getSimpleAddress())) + { + throw new JMSException("Can not create consumer for temporary destination " + destination + + " from another JMS connection"); + } + } + + public ConnectionConsumer + createDurableConnectionConsumer(final Topic topic, final String subscriptionName, + final String messageSelector, final ServerSessionPool sessionPool, + final int maxMessages) throws JMSException + { + checkClosed(); + // As spec. section 4.11 + if (connectionType == ActiveMQConnection.TYPE_QUEUE_CONNECTION) + { + String msg = "Cannot create a durable connection consumer on a QueueConnection"; + throw new javax.jms.IllegalStateException(msg); + } + checkTempQueues(topic); + // We offer RA, so no need for this + return null; + } + + @Override + public Session createSession(int sessionMode) throws JMSException + { + checkClosed(); + return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION); + + } + + @Override + public Session createSession() throws JMSException + { + checkClosed(); + return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, ActiveMQSession.TYPE_GENERIC_SESSION); + } + + // QueueConnection implementation --------------------------------------------------------------- + + public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException + { + checkClosed(); + return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION); + } + + /** + * I'm keeping this as static as the same check will be done within RA. + * This is to conform with TCK Tests where we must return ackMode exactly as they want if transacted=false + */ + public static int checkAck(boolean transacted, int acknowledgeMode) + { + if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) + { + return Session.AUTO_ACKNOWLEDGE; + } + + return acknowledgeMode; + } + + public ConnectionConsumer + createConnectionConsumer(final Queue queue, final String messageSelector, + final ServerSessionPool sessionPool, final int maxMessages) throws JMSException + { + checkClosed(); + checkTempQueues(queue); + return null; + } + + // TopicConnection implementation --------------------------------------------------------------- + + public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION); + } + + public ConnectionConsumer + createConnectionConsumer(final Topic topic, final String messageSelector, + final ServerSessionPool sessionPool, final int maxMessages) throws JMSException + { + checkClosed(); + checkTempQueues(topic); + return null; + } + + @Override + public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + return null; // we offer RA + } + + @Override + public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + return null; // we offer RA + } + + // Public --------------------------------------------------------------------------------------- + + /** + * Sets a FailureListener for the session which is notified if a failure occurs on the session. + * + * @param listener the listener to add + * @throws JMSException + */ + public void setFailoverListener(final FailoverEventListener listener) throws JMSException + { + checkClosed(); + + justCreated = false; + + this.failoverEventListener = listener; + + } + + /** + * @return {@link FailoverEventListener} the current failover event listener for this connection + * @throws JMSException + */ + public FailoverEventListener getFailoverListener() throws JMSException + { + checkClosed(); + + justCreated = false; + + return failoverEventListener; + } + + public void addTemporaryQueue(final SimpleString queueAddress) + { + tempQueues.add(queueAddress); + knownDestinations.add(queueAddress); + } + + public void removeTemporaryQueue(final SimpleString queueAddress) + { + tempQueues.remove(queueAddress); + } + + public void addKnownDestination(final SimpleString address) + { + knownDestinations.add(address); + } + + public boolean containsKnownDestination(final SimpleString address) + { + return knownDestinations.contains(address); + } + + public boolean containsTemporaryQueue(final SimpleString queueAddress) + { + return tempQueues.contains(queueAddress); + } + + public boolean hasNoLocal() + { + return hasNoLocal; + } + + public void setHasNoLocal() + { + hasNoLocal = true; + } + + public SimpleString getUID() + { + return uid; + } + + public void removeSession(final ActiveMQSession session) + { + sessions.remove(session); + } + + public ClientSession getInitialSession() + { + return initialSession; + } + + // Package protected ---------------------------------------------------------------------------- + + // Protected ------------------------------------------------------------------------------------ + + // In case the user forgets to close the connection manually + + @Override + protected final void finalize() throws Throwable + { + if (!closed) + { + ActiveMQJMSClientLogger.LOGGER.connectionLeftOpen(creationStack); + + close(); + } + } + + protected boolean isXA() + { + return false; + } + + protected final ActiveMQSession + createSessionInternal(final boolean isXA, final boolean transacted, int acknowledgeMode, final int type) throws JMSException + { + if (transacted) + { + acknowledgeMode = Session.SESSION_TRANSACTED; + } + + try + { + ClientSession session; + + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + session = + sessionFactory.createSession(username, password, isXA, false, false, + sessionFactory.getServerLocator().isPreAcknowledge(), + transactionBatchSize); + } + else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, true, + sessionFactory.getServerLocator().isPreAcknowledge(), 0); + } + else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, true, + sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize); + } + else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, false, + sessionFactory.getServerLocator().isPreAcknowledge(), + transactionBatchSize); + } + else if (acknowledgeMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize); + } + else if (acknowledgeMode == ActiveMQJMSConstants.PRE_ACKNOWLEDGE) + { + session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize); + } + else + { + throw new JMSRuntimeException("Invalid ackmode: " + acknowledgeMode); + } + + justCreated = false; + + // Setting multiple times on different sessions doesn't matter since RemotingConnection + // maintains + // a set (no duplicates) + session.addFailureListener(listener); + session.addFailoverListener(failoverListener); + + ActiveMQSession jbs = createAMQSession(isXA, transacted, acknowledgeMode, session, type); + + sessions.add(jbs); + + if (started) + { + session.start(); + } + + this.addSessionMetaData(session); + + return jbs; + } + catch (ActiveMQException e) + { + throw JMSExceptionHelper.convertFromActiveMQException(e); + } + } + + // Private -------------------------------------------------------------------------------------- + + /** + * @param transacted + * @param acknowledgeMode + * @param session + * @param type + * @return + */ + protected ActiveMQSession createAMQSession(boolean isXA, boolean transacted, int acknowledgeMode, ClientSession session, int type) + { + if (isXA) + { + return new ActiveMQXASession(this, transacted, true, acknowledgeMode, session, type); + } + else + { + return new ActiveMQSession(this, transacted, false, acknowledgeMode, session, type); + } + } + + protected final void checkClosed() throws JMSException + { + if (closed) + { + throw new IllegalStateException("Connection is closed"); + } + } + + public void authorize() throws JMSException + { + try + { + initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0); + + addSessionMetaData(initialSession); + + initialSession.addFailureListener(listener); + initialSession.addFailoverListener(failoverListener); + } + catch (ActiveMQException me) + { + throw JMSExceptionHelper.convertFromActiveMQException(me); + } + } + + private void addSessionMetaData(ClientSession session) throws ActiveMQException + { + session.addMetaData("jms-session", ""); + if (clientID != null) + { + session.addMetaData("jms-client-id", clientID); + } + } + + public void setReference(ActiveMQConnectionFactory factory) + { + this.factoryReference = factory; + } + + public boolean isStarted() + { + return started; + } + + + // Inner classes -------------------------------------------------------------------------------- + + private static class JMSFailureListener implements SessionFailureListener + { + private final WeakReference<ActiveMQConnection> connectionRef; + + JMSFailureListener(final ActiveMQConnection connection) + { + connectionRef = new WeakReference<ActiveMQConnection>(connection); + } + + @Override + public synchronized void connectionFailed(final ActiveMQException me, boolean failedOver) + { + if (me == null) + { + return; + } + + ActiveMQConnection conn = connectionRef.get(); + + if (conn != null) + { + try + { + final ExceptionListener exceptionListener = conn.getExceptionListener(); + + if (exceptionListener != null) + { + final JMSException je = + new JMSException(me.toString(), failedOver ? EXCEPTION_FAILOVER : EXCEPTION_DISCONNECT); + + je.initCause(me); + + new Thread(new Runnable() + { + public void run() + { + exceptionListener.onException(je); + } + }).start(); + } + } + catch (JMSException e) + { + if (!conn.closed) + { + ActiveMQJMSClientLogger.LOGGER.errorCallingExcListener(e); + } + } + } + } + + @Override + public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + public void beforeReconnect(final ActiveMQException me) + { + + } + + } + + private static class FailoverEventListenerImpl implements FailoverEventListener + { + private final WeakReference<ActiveMQConnection> connectionRef; + + FailoverEventListenerImpl(final ActiveMQConnection connection) + { + connectionRef = new WeakReference<ActiveMQConnection>(connection); + } + + @Override + public void failoverEvent(final FailoverEventType eventType) + { + ActiveMQConnection conn = connectionRef.get(); + + if (conn != null) + { + try + { + final FailoverEventListener failoverListener = conn.getFailoverListener(); + + if (failoverListener != null) + { + + new Thread(new Runnable() + { + public void run() + { + failoverListener.failoverEvent(eventType); + } + }).start(); + } + } + catch (JMSException e) + { + if (!conn.closed) + { + ActiveMQJMSClientLogger.LOGGER.errorCallingFailoverListener(e); + } + } + } + + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java new file mode 100644 index 0000000..85498fb --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionFactory.java @@ -0,0 +1,821 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.jms.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.JMSSecurityException; +import javax.jms.JMSSecurityRuntimeException; +import javax.jms.QueueConnection; +import javax.jms.TopicConnection; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XAJMSContext; +import javax.jms.XAQueueConnection; +import javax.jms.XATopicConnection; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import java.io.Serializable; + +import org.apache.activemq.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.api.jms.JMSFactoryType; +import org.apache.activemq.jms.referenceable.ConnectionFactoryObjectFactory; +import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr; + +/** + * ActiveMQ implementation of a JMS ConnectionFactory. + * + * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> + * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> + */ +public class ActiveMQConnectionFactory implements Serializable, Referenceable, ConnectionFactory, XAConnectionFactory +{ + private static final long serialVersionUID = -2810634789345348326L; + + private final ServerLocator serverLocator; + + private String clientID; + + private int dupsOKBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE; + + private int transactionBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE; + + private boolean readOnly; + + public ActiveMQConnectionFactory() + { + serverLocator = null; + } + + public ActiveMQConnectionFactory(final ServerLocator serverLocator) + { + this.serverLocator = serverLocator; + + serverLocator.disableFinalizeCheck(); + } + + public ActiveMQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + if (ha) + { + serverLocator = ActiveMQClient.createServerLocatorWithHA(groupConfiguration); + } + else + { + serverLocator = ActiveMQClient.createServerLocatorWithoutHA(groupConfiguration); + } + + serverLocator.disableFinalizeCheck(); + } + + public ActiveMQConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + if (ha) + { + serverLocator = ActiveMQClient.createServerLocatorWithHA(initialConnectors); + } + else + { + serverLocator = ActiveMQClient.createServerLocatorWithoutHA(initialConnectors); + } + + serverLocator.disableFinalizeCheck(); + } + + // ConnectionFactory implementation ------------------------------------------------------------- + + public Connection createConnection() throws JMSException + { + return createConnection(null, null); + } + + public Connection createConnection(final String username, final String password) throws JMSException + { + return createConnectionInternal(username, password, false, ActiveMQConnection.TYPE_GENERIC_CONNECTION); + } + + @Override + public JMSContext createContext() + { + return createContext(null, null); + } + + @Override + public JMSContext createContext(final int sessionMode) + { + return createContext(null, null, sessionMode); + } + + @Override + public JMSContext createContext(final String userName, final String password) + { + return createContext(userName, password, JMSContext.AUTO_ACKNOWLEDGE); + } + + @Override + public JMSContext createContext(String userName, String password, int sessionMode) + { + validateSessionMode(sessionMode); + try + { + ActiveMQConnection connection = + createConnectionInternal(userName, password, false, ActiveMQConnection.TYPE_GENERIC_CONNECTION); + return connection.createContext(sessionMode); + } + catch (JMSSecurityException e) + { + throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + /** + * @param mode + */ + private static void validateSessionMode(int mode) + { + switch (mode) + { + case JMSContext.AUTO_ACKNOWLEDGE: + case JMSContext.CLIENT_ACKNOWLEDGE: + case JMSContext.DUPS_OK_ACKNOWLEDGE: + case JMSContext.SESSION_TRANSACTED: + { + return; + } + default: + throw new JMSRuntimeException("Invalid Session Mode: " + mode); + } + } + + // QueueConnectionFactory implementation -------------------------------------------------------- + + public QueueConnection createQueueConnection() throws JMSException + { + return createQueueConnection(null, null); + } + + public QueueConnection createQueueConnection(final String username, final String password) throws JMSException + { + return createConnectionInternal(username, password, false, ActiveMQConnection.TYPE_QUEUE_CONNECTION); + } + + // TopicConnectionFactory implementation -------------------------------------------------------- + + public TopicConnection createTopicConnection() throws JMSException + { + return createTopicConnection(null, null); + } + + public TopicConnection createTopicConnection(final String username, final String password) throws JMSException + { + return createConnectionInternal(username, password, false, ActiveMQConnection.TYPE_TOPIC_CONNECTION); + } + + // XAConnectionFactory implementation ----------------------------------------------------------- + + public XAConnection createXAConnection() throws JMSException + { + return createXAConnection(null, null); + } + + public XAConnection createXAConnection(final String username, final String password) throws JMSException + { + return (XAConnection) createConnectionInternal(username, password, true, ActiveMQConnection.TYPE_GENERIC_CONNECTION); + } + + @Override + public XAJMSContext createXAContext() + { + return createXAContext(null, null); + } + + @Override + public XAJMSContext createXAContext(String userName, String password) + { + try + { + ActiveMQConnection connection = + createConnectionInternal(userName, password, true, ActiveMQConnection.TYPE_GENERIC_CONNECTION); + return connection.createXAContext(); + } + catch (JMSSecurityException e) + { + throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + // XAQueueConnectionFactory implementation ------------------------------------------------------ + + public XAQueueConnection createXAQueueConnection() throws JMSException + { + return createXAQueueConnection(null, null); + } + + public XAQueueConnection createXAQueueConnection(final String username, final String password) throws JMSException + { + return (XAQueueConnection) createConnectionInternal(username, password, true, ActiveMQConnection.TYPE_QUEUE_CONNECTION); + } + + // XATopicConnectionFactory implementation ------------------------------------------------------ + + public XATopicConnection createXATopicConnection() throws JMSException + { + return createXATopicConnection(null, null); + } + + public XATopicConnection createXATopicConnection(final String username, final String password) throws JMSException + { + return (XATopicConnection) createConnectionInternal(username, password, true, ActiveMQConnection.TYPE_TOPIC_CONNECTION); + } + + @Override + public Reference getReference() throws NamingException + { + return new Reference(this.getClass().getCanonicalName(), + new SerializableObjectRefAddr("ActiveMQ-CF", this), + ConnectionFactoryObjectFactory.class.getCanonicalName(), + null); + } + + // Public --------------------------------------------------------------------------------------- + + public boolean isHA() + { + return serverLocator.isHA(); + } + + public synchronized String getConnectionLoadBalancingPolicyClassName() + { + return serverLocator.getConnectionLoadBalancingPolicyClassName(); + } + + public synchronized void setConnectionLoadBalancingPolicyClassName(final String connectionLoadBalancingPolicyClassName) + { + checkWrite(); + serverLocator.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName); + } + + public synchronized TransportConfiguration[] getStaticConnectors() + { + return serverLocator.getStaticTransportConfigurations(); + } + + public synchronized DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() + { + return serverLocator.getDiscoveryGroupConfiguration(); + } + + public synchronized String getClientID() + { + return clientID; + } + + public synchronized void setClientID(final String clientID) + { + checkWrite(); + this.clientID = clientID; + } + + public synchronized int getDupsOKBatchSize() + { + return dupsOKBatchSize; + } + + public synchronized void setDupsOKBatchSize(final int dupsOKBatchSize) + { + checkWrite(); + this.dupsOKBatchSize = dupsOKBatchSize; + } + + public synchronized int getTransactionBatchSize() + { + return transactionBatchSize; + } + + public synchronized void setTransactionBatchSize(final int transactionBatchSize) + { + checkWrite(); + this.transactionBatchSize = transactionBatchSize; + } + + public synchronized long getClientFailureCheckPeriod() + { + return serverLocator.getClientFailureCheckPeriod(); + } + + public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod) + { + checkWrite(); + serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod); + } + + public synchronized long getConnectionTTL() + { + return serverLocator.getConnectionTTL(); + } + + public synchronized void setConnectionTTL(final long connectionTTL) + { + checkWrite(); + serverLocator.setConnectionTTL(connectionTTL); + } + + public synchronized long getCallTimeout() + { + return serverLocator.getCallTimeout(); + } + + public synchronized void setCallTimeout(final long callTimeout) + { + checkWrite(); + serverLocator.setCallTimeout(callTimeout); + } + + public synchronized long getCallFailoverTimeout() + { + return serverLocator.getCallFailoverTimeout(); + } + + public synchronized void setCallFailoverTimeout(final long callTimeout) + { + checkWrite(); + serverLocator.setCallFailoverTimeout(callTimeout); + } + + public synchronized int getConsumerWindowSize() + { + return serverLocator.getConsumerWindowSize(); + } + + public synchronized void setConsumerWindowSize(final int consumerWindowSize) + { + checkWrite(); + serverLocator.setConsumerWindowSize(consumerWindowSize); + } + + public synchronized int getConsumerMaxRate() + { + return serverLocator.getConsumerMaxRate(); + } + + public synchronized void setConsumerMaxRate(final int consumerMaxRate) + { + checkWrite(); + serverLocator.setConsumerMaxRate(consumerMaxRate); + } + + public synchronized int getConfirmationWindowSize() + { + return serverLocator.getConfirmationWindowSize(); + } + + public synchronized void setConfirmationWindowSize(final int confirmationWindowSize) + { + checkWrite(); + serverLocator.setConfirmationWindowSize(confirmationWindowSize); + } + + public synchronized int getProducerMaxRate() + { + return serverLocator.getProducerMaxRate(); + } + + public synchronized void setProducerMaxRate(final int producerMaxRate) + { + checkWrite(); + serverLocator.setProducerMaxRate(producerMaxRate); + } + + public synchronized int getProducerWindowSize() + { + return serverLocator.getProducerWindowSize(); + } + + public synchronized void setProducerWindowSize(final int producerWindowSize) + { + checkWrite(); + serverLocator.setProducerWindowSize(producerWindowSize); + } + + /** + * @param cacheLargeMessagesClient + */ + public synchronized void setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient) + { + checkWrite(); + serverLocator.setCacheLargeMessagesClient(cacheLargeMessagesClient); + } + + public synchronized boolean isCacheLargeMessagesClient() + { + return serverLocator.isCacheLargeMessagesClient(); + } + + public synchronized int getMinLargeMessageSize() + { + return serverLocator.getMinLargeMessageSize(); + } + + public synchronized void setMinLargeMessageSize(final int minLargeMessageSize) + { + checkWrite(); + serverLocator.setMinLargeMessageSize(minLargeMessageSize); + } + + public synchronized boolean isBlockOnAcknowledge() + { + return serverLocator.isBlockOnAcknowledge(); + } + + public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge) + { + checkWrite(); + serverLocator.setBlockOnAcknowledge(blockOnAcknowledge); + } + + public synchronized boolean isBlockOnNonDurableSend() + { + return serverLocator.isBlockOnNonDurableSend(); + } + + public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) + { + checkWrite(); + serverLocator.setBlockOnNonDurableSend(blockOnNonDurableSend); + } + + public synchronized boolean isBlockOnDurableSend() + { + return serverLocator.isBlockOnDurableSend(); + } + + public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend) + { + checkWrite(); + serverLocator.setBlockOnDurableSend(blockOnDurableSend); + } + + public synchronized boolean isAutoGroup() + { + return serverLocator.isAutoGroup(); + } + + public synchronized void setAutoGroup(final boolean autoGroup) + { + checkWrite(); + serverLocator.setAutoGroup(autoGroup); + } + + public synchronized boolean isPreAcknowledge() + { + return serverLocator.isPreAcknowledge(); + } + + public synchronized void setPreAcknowledge(final boolean preAcknowledge) + { + checkWrite(); + serverLocator.setPreAcknowledge(preAcknowledge); + } + + public synchronized long getRetryInterval() + { + return serverLocator.getRetryInterval(); + } + + public synchronized void setRetryInterval(final long retryInterval) + { + checkWrite(); + serverLocator.setRetryInterval(retryInterval); + } + + public synchronized long getMaxRetryInterval() + { + return serverLocator.getMaxRetryInterval(); + } + + public synchronized void setMaxRetryInterval(final long retryInterval) + { + checkWrite(); + serverLocator.setMaxRetryInterval(retryInterval); + } + + public synchronized double getRetryIntervalMultiplier() + { + return serverLocator.getRetryIntervalMultiplier(); + } + + public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier) + { + checkWrite(); + serverLocator.setRetryIntervalMultiplier(retryIntervalMultiplier); + } + + public synchronized int getReconnectAttempts() + { + return serverLocator.getReconnectAttempts(); + } + + public synchronized void setReconnectAttempts(final int reconnectAttempts) + { + checkWrite(); + serverLocator.setReconnectAttempts(reconnectAttempts); + } + + public synchronized void setInitialConnectAttempts(final int reconnectAttempts) + { + checkWrite(); + serverLocator.setInitialConnectAttempts(reconnectAttempts); + } + + public synchronized int getInitialConnectAttempts() + { + checkWrite(); + return serverLocator.getInitialConnectAttempts(); + } + + public synchronized boolean isFailoverOnInitialConnection() + { + return serverLocator.isFailoverOnInitialConnection(); + } + + public synchronized void setFailoverOnInitialConnection(final boolean failover) + { + checkWrite(); + serverLocator.setFailoverOnInitialConnection(failover); + } + + public synchronized boolean isUseGlobalPools() + { + return serverLocator.isUseGlobalPools(); + } + + public synchronized void setUseGlobalPools(final boolean useGlobalPools) + { + checkWrite(); + serverLocator.setUseGlobalPools(useGlobalPools); + } + + public synchronized int getScheduledThreadPoolMaxSize() + { + return serverLocator.getScheduledThreadPoolMaxSize(); + } + + public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) + { + checkWrite(); + serverLocator.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); + } + + public synchronized int getThreadPoolMaxSize() + { + return serverLocator.getThreadPoolMaxSize(); + } + + public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize) + { + checkWrite(); + serverLocator.setThreadPoolMaxSize(threadPoolMaxSize); + } + + public synchronized int getInitialMessagePacketSize() + { + return serverLocator.getInitialMessagePacketSize(); + } + + public synchronized void setInitialMessagePacketSize(final int size) + { + checkWrite(); + serverLocator.setInitialMessagePacketSize(size); + } + + public void setGroupID(final String groupID) + { + serverLocator.setGroupID(groupID); + } + + public String getGroupID() + { + return serverLocator.getGroupID(); + } + + public boolean isCompressLargeMessage() + { + return serverLocator.isCompressLargeMessage(); + } + + public void setCompressLargeMessage(boolean avoidLargeMessages) + { + serverLocator.setCompressLargeMessage(avoidLargeMessages); + } + + public void close() + { + ServerLocator locator0 = serverLocator; + if (locator0 != null) + locator0.close(); + } + + public ServerLocator getServerLocator() + { + return serverLocator; + } + + public int getFactoryType() + { + return JMSFactoryType.CF.intValue(); + } + + // Package protected ---------------------------------------------------------------------------- + + // Protected ------------------------------------------------------------------------------------ + + protected synchronized ActiveMQConnection createConnectionInternal(final String username, + final String password, + final boolean isXA, + final int type) throws JMSException + { + readOnly = true; + + ClientSessionFactory factory; + + try + { + factory = serverLocator.createSessionFactory(); + } + catch (Exception e) + { + JMSException jmse = new JMSException("Failed to create session factory"); + + jmse.initCause(e); + jmse.setLinkedException(e); + + throw jmse; + } + + ActiveMQConnection connection = null; + + if (isXA) + { + if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) + { + connection = new ActiveMQXAConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) + { + connection = + new ActiveMQXAConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) + { + connection = + new ActiveMQXAConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + } + else + { + if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) + { + connection = new ActiveMQConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) + { + connection = + new ActiveMQConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) + { + connection = + new ActiveMQConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + } + + if (connection == null) + { + throw new JMSException("Failed to create connection: invalid type " + type); + } + connection.setReference(this); + + try + { + connection.authorize(); + } + catch (JMSException e) + { + try + { + connection.close(); + } + catch (JMSException me) + { + } + throw e; + } + + return connection; + } + + @Override + public String toString() + { + return "ActiveMQConnectionFactory [serverLocator=" + serverLocator + + ", clientID=" + + clientID + + ", consumerWindowSize = " + + getConsumerWindowSize() + + ", dupsOKBatchSize=" + + dupsOKBatchSize + + ", transactionBatchSize=" + + transactionBatchSize + + ", readOnly=" + + readOnly + + "]"; + } + + + // Private -------------------------------------------------------------------------------------- + + private void checkWrite() + { + if (readOnly) + { + throw new IllegalStateException("Cannot set attribute on ActiveMQConnectionFactory after it has been used"); + } + } + + @Override + protected void finalize() throws Throwable + { + try + { + serverLocator.close(); + } + catch (Exception e) + { + e.printStackTrace(); + //not much we can do here + } + super.finalize(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContext.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContext.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContext.java new file mode 100644 index 0000000..bd84ab6 --- /dev/null +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnectionForContext.java @@ -0,0 +1,34 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * + */ +package org.apache.activemq.jms.client; + +import javax.jms.JMSContext; +import javax.jms.XAJMSContext; + +/** + * Interface created to support reference counting all contexts using it. + * <p> + * Necessary to support {@code JMSContext.close()} conditions. + * @see JMSContext + */ +public interface ActiveMQConnectionForContext extends javax.jms.Connection +{ + JMSContext createContext(int sessionMode); + + XAJMSContext createXAContext(); + + void closeFromContext(); +}