http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQSession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQSession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQSession.java deleted file mode 100644 index f37da08..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQSession.java +++ /dev/null @@ -1,1276 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import java.io.Serializable; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.jms.TransactionInProgressException; -import javax.transaction.xa.XAResource; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQQueueExistsException; -import org.apache.activemq.selector.filter.FilterException; -import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientConsumer; -import org.apache.activemq.api.core.client.ClientProducer; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.ClientSession.AddressQuery; -import org.apache.activemq.api.core.client.ClientSession.QueueQuery; - -/** - * HornetQ implementation of a JMS Session. - * <br> - * Note that we *do not* support JMS ASF (Application Server Facilities) optional - * constructs such as ConnectionConsumer - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - * - * - */ -public class HornetQSession implements QueueSession, TopicSession -{ - public static final int TYPE_GENERIC_SESSION = 0; - - public static final int TYPE_QUEUE_SESSION = 1; - - public static final int TYPE_TOPIC_SESSION = 2; - - private static SimpleString REJECTING_FILTER = new SimpleString("_HQX=-1"); - - private final HornetQConnection connection; - - private final ClientSession session; - - private final int sessionType; - - private final int ackMode; - - private final boolean transacted; - - private final boolean xa; - - private boolean recoverCalled; - - private final Set<HornetQMessageConsumer> consumers = new HashSet<HornetQMessageConsumer>(); - - // Constructors -------------------------------------------------- - - protected HornetQSession(final HornetQConnection connection, - final boolean transacted, - final boolean xa, - final int ackMode, - final ClientSession session, - final int sessionType) - { - this.connection = connection; - - this.ackMode = ackMode; - - this.session = session; - - this.sessionType = sessionType; - - this.transacted = transacted; - - this.xa = xa; - } - - // Session implementation ---------------------------------------- - - public BytesMessage createBytesMessage() throws JMSException - { - checkClosed(); - - return new HornetQBytesMessage(session); - } - - public MapMessage createMapMessage() throws JMSException - { - checkClosed(); - - return new HornetQMapMessage(session); - } - - public Message createMessage() throws JMSException - { - checkClosed(); - - return new HornetQMessage(session); - } - - public ObjectMessage createObjectMessage() throws JMSException - { - checkClosed(); - - return new HornetQObjectMessage(session); - } - - public ObjectMessage createObjectMessage(final Serializable object) throws JMSException - { - checkClosed(); - - HornetQObjectMessage msg = new HornetQObjectMessage(session); - - msg.setObject(object); - - return msg; - } - - public StreamMessage createStreamMessage() throws JMSException - { - checkClosed(); - - return new HornetQStreamMessage(session); - } - - public TextMessage createTextMessage() throws JMSException - { - checkClosed(); - - HornetQTextMessage msg = new HornetQTextMessage(session); - - msg.setText(null); - - return msg; - } - - public TextMessage createTextMessage(final String text) throws JMSException - { - checkClosed(); - - HornetQTextMessage msg = new HornetQTextMessage(session); - - msg.setText(text); - - return msg; - } - - public boolean getTransacted() throws JMSException - { - checkClosed(); - - return transacted; - } - - public int getAcknowledgeMode() throws JMSException - { - checkClosed(); - - return ackMode; - } - - public boolean isXA() - { - return xa; - } - - public void commit() throws JMSException - { - if (!transacted) - { - throw new IllegalStateException("Cannot commit a non-transacted session"); - } - if (xa) - { - throw new TransactionInProgressException("Cannot call commit on an XA session"); - } - try - { - session.commit(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void rollback() throws JMSException - { - if (!transacted) - { - throw new IllegalStateException("Cannot rollback a non-transacted session"); - } - if (xa) - { - throw new TransactionInProgressException("Cannot call rollback on an XA session"); - } - - try - { - session.rollback(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void close() throws JMSException - { - connection.getThreadAwareContext().assertNotCompletionListenerThread(); - connection.getThreadAwareContext().assertNotMessageListenerThread(); - synchronized (connection) - { - try - { - for (HornetQMessageConsumer cons : new HashSet<HornetQMessageConsumer>(consumers)) - { - cons.close(); - } - - session.close(); - - connection.removeSession(this); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - } - - public void recover() throws JMSException - { - if (transacted) - { - throw new IllegalStateException("Cannot recover a transacted session"); - } - - try - { - session.rollback(true); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - - recoverCalled = true; - } - - public MessageListener getMessageListener() throws JMSException - { - checkClosed(); - - return null; - } - - public void setMessageListener(final MessageListener listener) throws JMSException - { - checkClosed(); - } - - public void run() - { - } - - public MessageProducer createProducer(final Destination destination) throws JMSException - { - if (destination != null && !(destination instanceof HornetQDestination)) - { - throw new InvalidDestinationException("Not a HornetQ Destination:" + destination); - } - - try - { - HornetQDestination jbd = (HornetQDestination)destination; - - if (jbd != null) - { - ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); - - if (!response.isExists()) - { - throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); - } - - connection.addKnownDestination(jbd.getSimpleAddress()); - } - - ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress()); - - return new HornetQMessageProducer(connection, producer, jbd, session); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public MessageConsumer createConsumer(final Destination destination) throws JMSException - { - return createConsumer(destination, null, false); - } - - public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException - { - return createConsumer(destination, messageSelector, false); - } - - public MessageConsumer createConsumer(final Destination destination, - final String messageSelector, - final boolean noLocal) throws JMSException - { - if (destination == null) - { - throw new InvalidDestinationException("Cannot create a consumer with a null destination"); - } - - if (!(destination instanceof HornetQDestination)) - { - throw new InvalidDestinationException("Not a HornetQDestination:" + destination); - } - - HornetQDestination jbdest = (HornetQDestination)destination; - - if (jbdest.isTemporary() && !connection.containsTemporaryQueue(jbdest.getSimpleAddress())) - { - throw new JMSException("Can not create consumer for temporary destination " + destination + - " from another JMS connection"); - } - - return createConsumer(jbdest, null, messageSelector, noLocal, ConsumerDurability.NON_DURABLE); - } - - public Queue createQueue(final String queueName) throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_TOPIC_SESSION) - { - throw new IllegalStateException("Cannot create a queue using a TopicSession"); - } - - try - { - HornetQQueue queue = lookupQueue(queueName, false); - - if (queue == null) - { - queue = lookupQueue(queueName, true); - } - - if (queue == null) - { - throw new JMSException("There is no queue with name " + queueName); - } - else - { - return queue; - } - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public Topic createTopic(final String topicName) throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot create a topic on a QueueSession"); - } - - try - { - HornetQTopic topic = lookupTopic(topicName, false); - - if (topic == null) - { - topic = lookupTopic(topicName, true); - } - - if (topic == null) - { - throw new JMSException("There is no topic with name " + topicName); - } - else - { - return topic; - } - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException - { - return createDurableSubscriber(topic, name, null, false); - } - - public TopicSubscriber createDurableSubscriber(final Topic topic, - final String name, - String messageSelector, - final boolean noLocal) throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession"); - } - checkTopic(topic); - if (!(topic instanceof HornetQDestination)) - { - throw new InvalidDestinationException("Not a HornetQTopic:" + topic); - } - if ("".equals(messageSelector)) - { - messageSelector = null; - } - - HornetQDestination jbdest = (HornetQDestination)topic; - - if (jbdest.isQueue()) - { - throw new InvalidDestinationException("Cannot create a subscriber on a queue"); - } - - return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE); - } - - private void checkTopic(Topic topic) throws InvalidDestinationException - { - if (topic == null) - { - throw HornetQJMSClientBundle.BUNDLE.nullTopic(); - } - } - - @Override - public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException - { - return createSharedConsumer(topic, sharedSubscriptionName, null); - } - - /** - * Note: Needs to throw an exception if a subscriptionName is already in use by another topic, or if the messageSelector is different - * - * validate multiple subscriptions on the same session. - * validate multiple subscriptions on different sessions - * validate failure in one connection while another connection stills fine. - * Validate different filters in different possible scenarios - * - * @param topic - * @param name - * @param messageSelector - * @return - * @throws JMSException - */ - @Override - public MessageConsumer createSharedConsumer(Topic topic, String name, String messageSelector) throws JMSException - { - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot create a shared consumer on a QueueSession"); - } - checkTopic(topic); - HornetQTopic localTopic; - if (topic instanceof HornetQTopic) - { - localTopic = (HornetQTopic)topic; - } - else - { - localTopic = new HornetQTopic(topic.getTopicName()); - } - return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.NON_DURABLE, true); - } - - @Override - public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException - { - return createDurableConsumer(topic, name, null, false); - } - - @Override - public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException - { - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot create a durable consumer on a QueueSession"); - } - checkTopic(topic); - HornetQTopic localTopic; - if (topic instanceof HornetQTopic) - { - localTopic = (HornetQTopic)topic; - } - else - { - localTopic = new HornetQTopic(topic.getTopicName()); - } - return createConsumer(localTopic, name, messageSelector, noLocal, ConsumerDurability.DURABLE); - } - - @Override - public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException - { - return createSharedDurableConsumer(topic, name, null); - } - - @Override - public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException - { - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot create a shared durable consumer on a QueueSession"); - } - - checkTopic(topic); - - HornetQTopic localTopic; - - if (topic instanceof HornetQTopic) - { - localTopic = (HornetQTopic)topic; - } - else - { - localTopic = new HornetQTopic(topic.getTopicName()); - } - return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.DURABLE, true); - } - - enum ConsumerDurability - { - DURABLE, NON_DURABLE; - } - - - /** - * This is an internal method for shared consumers - */ - private HornetQMessageConsumer internalCreateSharedConsumer(final HornetQDestination dest, - final String subscriptionName, - String selectorString, - ConsumerDurability durability, - final boolean shared) throws JMSException - { - try - { - - if (dest.isQueue()) - { - // This is not really possible unless someone makes a mistake on code - // createSharedConsumer only accpets Topics by declaration - throw new RuntimeException("Internal error: createSharedConsumer is only meant for Topics"); - } - - if (subscriptionName == null) - { - throw HornetQJMSClientBundle.BUNDLE.invalidSubscriptionName(); - } - - selectorString = "".equals(selectorString) ? null : selectorString; - - SimpleString coreFilterString = null; - - if (selectorString != null) - { - coreFilterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString)); - } - - ClientConsumer consumer; - - SimpleString autoDeleteQueueName = null; - - AddressQuery response = session.addressQuery(dest.getSimpleAddress()); - - if (!response.isExists()) - { - throw HornetQJMSClientBundle.BUNDLE.destinationDoesNotExist(dest.getSimpleAddress()); - } - - SimpleString queueName; - - if (dest.isTemporary() && durability == ConsumerDurability.DURABLE) - { - throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); - } - - queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), - subscriptionName)); - - if (durability == ConsumerDurability.DURABLE) - { - try - { - session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); - } - catch (ActiveMQQueueExistsException ignored) - { - // We ignore this because querying and then creating the queue wouldn't be idempotent - // we could also add a parameter to ignore existence what would require a bigger work around to avoid - // compatibility. - } - } - else - { - session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false); - } - - consumer = session.createConsumer(queueName, null, false); - - HornetQMessageConsumer jbc = new HornetQMessageConsumer(connection, this, - consumer, - false, - dest, - selectorString, - autoDeleteQueueName); - - consumers.add(jbc); - - return jbc; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - - - private HornetQMessageConsumer createConsumer(final HornetQDestination dest, - final String subscriptionName, - String selectorString, final boolean noLocal, - ConsumerDurability durability) throws JMSException - { - try - { - selectorString = "".equals(selectorString) ? null : selectorString; - - if (noLocal) - { - connection.setHasNoLocal(); - - String filter; - if (connection.getClientID() != null) - { - filter = - HornetQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getClientID() + - "'"; - } - else - { - filter = HornetQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getUID() + "'"; - } - - if (selectorString != null) - { - selectorString += " AND " + filter; - } - else - { - selectorString = filter; - } - } - - SimpleString coreFilterString = null; - - if (selectorString != null) - { - coreFilterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString)); - } - - ClientConsumer consumer; - - SimpleString autoDeleteQueueName = null; - - if (dest.isQueue()) - { - AddressQuery response = session.addressQuery(dest.getSimpleAddress()); - - if (!response.isExists()) - { - throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist"); - } - - connection.addKnownDestination(dest.getSimpleAddress()); - - consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false); - } - else - { - AddressQuery response = session.addressQuery(dest.getSimpleAddress()); - - if (!response.isExists()) - { - throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist"); - } - - connection.addKnownDestination(dest.getSimpleAddress()); - - SimpleString queueName; - - if (subscriptionName == null) - { - if (durability != ConsumerDurability.NON_DURABLE) - throw new RuntimeException(); - // Non durable sub - - queueName = new SimpleString(UUID.randomUUID().toString()); - - session.createTemporaryQueue(dest.getSimpleAddress(), queueName, coreFilterString); - - consumer = session.createConsumer(queueName, null, false); - - autoDeleteQueueName = queueName; - } - else - { - // Durable sub - if (durability != ConsumerDurability.DURABLE) - throw new RuntimeException(); - if (connection.getClientID() == null) - { - throw new IllegalStateException("Cannot create durable subscription - client ID has not been set"); - } - - if (dest.isTemporary()) - { - throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); - } - - queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), - subscriptionName)); - - QueueQuery subResponse = session.queueQuery(queueName); - - if (!subResponse.isExists()) - { - session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); - } - else - { - // Already exists - if (subResponse.getConsumerCount() > 0) - { - throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); - } - - // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1): - // A client can change an existing durable subscription by - // creating a durable - // TopicSubscriber with the same name and a new topic and/or - // message selector. - // Changing a durable subscriber is equivalent to - // unsubscribing (deleting) the old - // one and creating a new one. - - SimpleString oldFilterString = subResponse.getFilterString(); - - boolean selectorChanged = coreFilterString == null && oldFilterString != null || - oldFilterString == null && - coreFilterString != null || - oldFilterString != null && - coreFilterString != null && - !oldFilterString.equals(coreFilterString); - - SimpleString oldTopicName = subResponse.getAddress(); - - boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress()); - - if (selectorChanged || topicChanged) - { - // Delete the old durable sub - session.deleteQueue(queueName); - - // Create the new one - session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); - } - } - - consumer = session.createConsumer(queueName, null, false); - } - } - - HornetQMessageConsumer jbc = new HornetQMessageConsumer(connection, - this, - consumer, - noLocal, - dest, - selectorString, - autoDeleteQueueName); - - consumers.add(jbc); - - return jbc; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void ackAllConsumers() throws JMSException - { - checkClosed(); - } - - public QueueBrowser createBrowser(final Queue queue) throws JMSException - { - return createBrowser(queue, null); - } - - public QueueBrowser createBrowser(final Queue queue, String filterString) throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_TOPIC_SESSION) - { - throw new IllegalStateException("Cannot create a browser on a TopicSession"); - } - if (queue == null) - { - throw new InvalidDestinationException("Cannot create a browser with a null queue"); - } - if (!(queue instanceof HornetQDestination)) - { - throw new InvalidDestinationException("Not a HornetQQueue:" + queue); - } - if ("".equals(filterString)) - { - filterString = null; - } - - // eager test of the filter syntax as required by JMS spec - try - { - if (filterString != null) - { - SelectorParser.parse(filterString.trim()); - } - } - catch (FilterException e) - { - throw JMSExceptionHelper.convertFromHornetQException(HornetQJMSClientBundle.BUNDLE.invalidFilter(e, new SimpleString(filterString))); - } - - HornetQDestination jbq = (HornetQDestination)queue; - - if (!jbq.isQueue()) - { - throw new InvalidDestinationException("Cannot create a browser on a topic"); - } - - try - { - AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress())); - if (!message.isExists()) - { - throw new InvalidDestinationException(jbq.getAddress() + " does not exist"); - } - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - - return new HornetQQueueBrowser((HornetQQueue)jbq, filterString, session); - - } - - public TemporaryQueue createTemporaryQueue() throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_TOPIC_SESSION) - { - throw new IllegalStateException("Cannot create a temporary queue using a TopicSession"); - } - - try - { - HornetQTemporaryQueue queue = HornetQDestination.createTemporaryQueue(this); - - SimpleString simpleAddress = queue.getSimpleAddress(); - - session.createTemporaryQueue(simpleAddress, simpleAddress); - - connection.addTemporaryQueue(simpleAddress); - - return queue; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public TemporaryTopic createTemporaryTopic() throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot create a temporary topic on a QueueSession"); - } - - try - { - HornetQTemporaryTopic topic = HornetQDestination.createTemporaryTopic(this); - - SimpleString simpleAddress = topic.getSimpleAddress(); - - // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS - // checks when routing messages to a topic that - // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no - // subscriptions - core has no notion of a topic - - session.createTemporaryQueue(simpleAddress, simpleAddress, HornetQSession.REJECTING_FILTER); - - connection.addTemporaryQueue(simpleAddress); - - return topic; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void unsubscribe(final String name) throws JMSException - { - // As per spec. section 4.11 - if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) - { - throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); - } - - SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), - name)); - - try - { - QueueQuery response = session.queueQuery(queueName); - - if (!response.isExists()) - { - throw new InvalidDestinationException("Cannot unsubscribe, subscription with name " + name + - " does not exist"); - } - - if (response.getConsumerCount() != 0) - { - throw new IllegalStateException("Cannot unsubscribe durable subscription " + name + - " since it has active subscribers"); - } - - session.deleteQueue(queueName); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - // XASession implementation - - public Session getSession() throws JMSException - { - if (!xa) - { - throw new IllegalStateException("Isn't an XASession"); - } - - return this; - } - - public XAResource getXAResource() - { - return session.getXAResource(); - } - - // QueueSession implementation - - public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException - { - return (QueueReceiver)createConsumer(queue, messageSelector); - } - - public QueueReceiver createReceiver(final Queue queue) throws JMSException - { - return (QueueReceiver)createConsumer(queue); - } - - public QueueSender createSender(final Queue queue) throws JMSException - { - return (QueueSender)createProducer(queue); - } - - // XAQueueSession implementation - - public QueueSession getQueueSession() throws JMSException - { - return (QueueSession)getSession(); - } - - // TopicSession implementation - - public TopicPublisher createPublisher(final Topic topic) throws JMSException - { - return (TopicPublisher)createProducer(topic); - } - - public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException - { - return (TopicSubscriber)createConsumer(topic, messageSelector, noLocal); - } - - public TopicSubscriber createSubscriber(final Topic topic) throws JMSException - { - return (TopicSubscriber)createConsumer(topic); - } - - // XATopicSession implementation - - public TopicSession getTopicSession() throws JMSException - { - return (TopicSession)getSession(); - } - - // Public -------------------------------------------------------- - - @Override - public String toString() - { - return "HornetQSession->" + session; - } - - public ClientSession getCoreSession() - { - return session; - } - - public boolean isRecoverCalled() - { - return recoverCalled; - } - - public void setRecoverCalled(final boolean recoverCalled) - { - this.recoverCalled = recoverCalled; - } - - public void deleteTemporaryTopic(final HornetQDestination tempTopic) throws JMSException - { - if (!tempTopic.isTemporary()) - { - throw new InvalidDestinationException("Not a temporary topic " + tempTopic); - } - - try - { - AddressQuery response = session.addressQuery(tempTopic.getSimpleAddress()); - - if (!response.isExists()) - { - throw new InvalidDestinationException("Cannot delete temporary topic " + tempTopic.getName() + - " does not exist"); - } - - if (response.getQueueNames().size() > 1) - { - throw new IllegalStateException("Cannot delete temporary topic " + tempTopic.getName() + - " since it has subscribers"); - } - - SimpleString address = tempTopic.getSimpleAddress(); - - session.deleteQueue(address); - - connection.removeTemporaryQueue(address); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void deleteTemporaryQueue(final HornetQDestination tempQueue) throws JMSException - { - if (!tempQueue.isTemporary()) - { - throw new InvalidDestinationException("Not a temporary queue " + tempQueue); - } - try - { - QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress()); - - if (!response.isExists()) - { - throw new InvalidDestinationException("Cannot delete temporary queue " + tempQueue.getName() + - " does not exist"); - } - - if (response.getConsumerCount() > 0) - { - throw new IllegalStateException("Cannot delete temporary queue " + tempQueue.getName() + - " since it has subscribers"); - } - - SimpleString address = tempQueue.getSimpleAddress(); - - session.deleteQueue(address); - - connection.removeTemporaryQueue(address); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void start() throws JMSException - { - try - { - session.start(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void stop() throws JMSException - { - try - { - session.stop(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void removeConsumer(final HornetQMessageConsumer consumer) - { - consumers.remove(consumer); - } - - // Package protected --------------------------------------------- - - void deleteQueue(final SimpleString queueName) throws JMSException - { - if (!session.isClosed()) - { - try - { - session.deleteQueue(queueName); - } - catch (ActiveMQException ignore) - { - // Exception on deleting queue shouldn't prevent close from completing - } - } - } - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - private void checkClosed() throws JMSException - { - if (session.isClosed()) - { - throw new IllegalStateException("Session is closed"); - } - } - - private HornetQQueue lookupQueue(final String queueName, boolean isTemporary) throws ActiveMQException - { - HornetQQueue queue; - - if (isTemporary) - { - queue = HornetQDestination.createTemporaryQueue(queueName); - } - else - { - queue = HornetQDestination.createQueue(queueName); - } - - QueueQuery response = session.queueQuery(queue.getSimpleAddress()); - - if (response.isExists()) - { - return queue; - } - else - { - return null; - } - } - - private HornetQTopic lookupTopic(final String topicName, final boolean isTemporary) throws ActiveMQException - { - - HornetQTopic topic; - - if (isTemporary) - { - topic = HornetQDestination.createTemporaryTopic(topicName); - } - else - { - topic = HornetQDestination.createTopic(topicName); - } - - AddressQuery query = session.addressQuery(topic.getSimpleAddress()); - - if (!query.isExists()) - { - return null; - } - else - { - return topic; - } - } - - // Inner classes ------------------------------------------------- - -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQStreamMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQStreamMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQStreamMessage.java deleted file mode 100644 index 5274557..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQStreamMessage.java +++ /dev/null @@ -1,466 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.StreamMessage; - -import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.Message; -import org.apache.activemq.api.core.Pair; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.core.client.impl.ClientMessageImpl; -import org.apache.activemq.utils.DataConstants; - -import static org.apache.activemq.reader.StreamMessageUtil.streamReadBoolean; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadByte; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadBytes; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadChar; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadDouble; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadFloat; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadInteger; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadLong; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadObject; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadShort; -import static org.apache.activemq.reader.StreamMessageUtil.streamReadString; - -/** - * HornetQ implementation of a JMS StreamMessage. - * - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * - * Some parts based on JBM 1.x class by: - * - * @author Norbert Lataille (norbert.latai...@m4x.org) - * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - */ -public final class HornetQStreamMessage extends HornetQMessage implements StreamMessage -{ - public static final byte TYPE = Message.STREAM_TYPE; - - protected HornetQStreamMessage(final ClientSession session) - { - super(HornetQStreamMessage.TYPE, session); - } - - protected HornetQStreamMessage(final ClientMessage message, final ClientSession session) - { - super(message, session); - } - - public HornetQStreamMessage(final StreamMessage foreign, final ClientSession session) throws JMSException - { - super(foreign, HornetQStreamMessage.TYPE, session); - - foreign.reset(); - - try - { - while (true) - { - Object obj = foreign.readObject(); - writeObject(obj); - } - } - catch (MessageEOFException e) - { - // Ignore - } - } - - // For testing only - public HornetQStreamMessage() - { - message = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1500); - } - - // Public -------------------------------------------------------- - - @Override - public byte getType() - { - return HornetQStreamMessage.TYPE; - } - - // StreamMessage implementation ---------------------------------- - - public boolean readBoolean() throws JMSException - { - checkRead(); - try - { - return streamReadBoolean(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public byte readByte() throws JMSException - { - checkRead(); - - try - { - return streamReadByte(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public short readShort() throws JMSException - { - checkRead(); - try - { - return streamReadShort(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public char readChar() throws JMSException - { - checkRead(); - try - { - return streamReadChar(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public int readInt() throws JMSException - { - checkRead(); - try - { - return streamReadInteger(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public long readLong() throws JMSException - { - checkRead(); - try - { - return streamReadLong(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public float readFloat() throws JMSException - { - checkRead(); - try - { - return streamReadFloat(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public double readDouble() throws JMSException - { - checkRead(); - try - { - return streamReadDouble(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public String readString() throws JMSException - { - checkRead(); - try - { - return streamReadString(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - /** - * len here is used to control how many more bytes to read - */ - private int len = 0; - - public int readBytes(final byte[] value) throws JMSException - { - checkRead(); - try - { - Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); - - len = pairRead.getA(); - return pairRead.getB(); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public Object readObject() throws JMSException - { - checkRead(); - try - { - return streamReadObject(message); - } - catch (IllegalStateException e) - { - throw new MessageFormatException(e.getMessage()); - } - catch (IndexOutOfBoundsException e) - { - throw new MessageEOFException(""); - } - } - - public void writeBoolean(final boolean value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.BOOLEAN); - getBuffer().writeBoolean(value); - } - - public void writeByte(final byte value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.BYTE); - getBuffer().writeByte(value); - } - - public void writeShort(final short value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.SHORT); - getBuffer().writeShort(value); - } - - public void writeChar(final char value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.CHAR); - getBuffer().writeShort((short)value); - } - - public void writeInt(final int value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.INT); - getBuffer().writeInt(value); - } - - public void writeLong(final long value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.LONG); - getBuffer().writeLong(value); - } - - public void writeFloat(final float value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.FLOAT); - getBuffer().writeInt(Float.floatToIntBits(value)); - } - - public void writeDouble(final double value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.DOUBLE); - getBuffer().writeLong(Double.doubleToLongBits(value)); - } - - public void writeString(final String value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.STRING); - getBuffer().writeNullableString(value); - } - - public void writeBytes(final byte[] value) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.BYTES); - getBuffer().writeInt(value.length); - getBuffer().writeBytes(value); - } - - public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException - { - checkWrite(); - getBuffer().writeByte(DataConstants.BYTES); - getBuffer().writeInt(length); - getBuffer().writeBytes(value, offset, length); - } - - public void writeObject(final Object value) throws JMSException - { - if (value instanceof String) - { - writeString((String)value); - } - else if (value instanceof Boolean) - { - writeBoolean((Boolean)value); - } - else if (value instanceof Byte) - { - writeByte((Byte)value); - } - else if (value instanceof Short) - { - writeShort((Short)value); - } - else if (value instanceof Integer) - { - writeInt((Integer)value); - } - else if (value instanceof Long) - { - writeLong((Long)value); - } - else if (value instanceof Float) - { - writeFloat((Float)value); - } - else if (value instanceof Double) - { - writeDouble((Double)value); - } - else if (value instanceof byte[]) - { - writeBytes((byte[])value); - } - else if (value instanceof Character) - { - writeChar((Character)value); - } - else if (value == null) - { - writeString(null); - } - else - { - throw new MessageFormatException("Invalid object type: " + value.getClass()); - } - } - - public void reset() throws JMSException - { - if (!readOnly) - { - readOnly = true; - } - getBuffer().resetReaderIndex(); - } - - // HornetQRAMessage overrides ---------------------------------------- - - @Override - public void clearBody() throws JMSException - { - super.clearBody(); - - getBuffer().clear(); - } - - @Override - public void doBeforeSend() throws Exception - { - reset(); - } - - private ActiveMQBuffer getBuffer() - { - return message.getBodyBuffer(); - } - - @SuppressWarnings("rawtypes") - @Override - public boolean isBodyAssignableTo(Class c) - { - return false; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryQueue.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryQueue.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryQueue.java deleted file mode 100644 index 6d06ac5..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryQueue.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.TemporaryQueue; - - -/** - * HornetQ implementation of a JMS TemporaryQueue. - * <br> - * This class can be instantiated directly. - * - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @version <tt>$Revision: 3569 $</tt> - * - */ -public class HornetQTemporaryQueue extends HornetQQueue implements TemporaryQueue -{ - // Constants ----------------------------------------------------- - - private static final long serialVersionUID = -4624930377557954624L; - - // Static -------------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Constructors -------------------------------------------------- - - - // TemporaryQueue implementation ------------------------------------------ - - // Public -------------------------------------------------------- - - /** - * @param address - * @param name - * @param session - */ - public HornetQTemporaryQueue(String address, String name, HornetQSession session) - { - super(address, name, true, session); - } - - @Override - public String toString() - { - return "HornetQTemporaryQueue[" + name + "]"; - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryTopic.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryTopic.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryTopic.java deleted file mode 100644 index 103619e..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTemporaryTopic.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.TemporaryTopic; - -/** - * A HornetQTemporaryTopic - * - * @author <a href="mailto:clebert.suco...@jboss.org">Clebert Suconic</a> - * - * - */ -public class HornetQTemporaryTopic extends HornetQTopic implements TemporaryTopic -{ - - // Constants ----------------------------------------------------- - - private static final long serialVersionUID = 845450764835635266L; - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - protected HornetQTemporaryTopic(final String address, final String name, - final HornetQSession session) - { - super(address, name, true, session); - } - - // Public -------------------------------------------------------- - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTextMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTextMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTextMessage.java deleted file mode 100644 index 65bdb82..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTextMessage.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSException; -import javax.jms.TextMessage; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.Message; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; - -import static org.apache.activemq.reader.TextMessageUtil.readBodyText; -import static org.apache.activemq.reader.TextMessageUtil.writeBodyText; - - -/** - * HornetQ implementation of a JMS TextMessage. - * <br> - * This class was ported from SpyTextMessage in JBossMQ. - * - * @author Norbert Lataille (norbert.latai...@m4x.org) - * @author <a href="mailto:ja...@planet57.com">Jason Dillon</a> - * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - * @version $Revision: 3412 $ - */ -public class HornetQTextMessage extends HornetQMessage implements TextMessage -{ - // Constants ----------------------------------------------------- - - public static final byte TYPE = Message.TEXT_TYPE; - - // Attributes ---------------------------------------------------- - - // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write - // methods are more efficient for a SimpleString - private SimpleString text; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - public HornetQTextMessage(final ClientSession session) - { - super(HornetQTextMessage.TYPE, session); - } - - public HornetQTextMessage(final ClientMessage message, final ClientSession session) - { - super(message, session); - } - - /** - * A copy constructor for non-HornetQ JMS TextMessages. - */ - public HornetQTextMessage(final TextMessage foreign, final ClientSession session) throws JMSException - { - super(foreign, HornetQTextMessage.TYPE, session); - - setText(foreign.getText()); - } - - // Public -------------------------------------------------------- - - @Override - public byte getType() - { - return HornetQTextMessage.TYPE; - } - - // TextMessage implementation ------------------------------------ - - public void setText(final String text) throws JMSException - { - checkWrite(); - - if (text != null) - { - this.text = new SimpleString(text); - } - else - { - this.text = null; - } - - writeBodyText(message, this.text); - } - - public String getText() - { - if (text != null) - { - return text.toString(); - } - else - { - return null; - } - } - - @Override - public void clearBody() throws JMSException - { - super.clearBody(); - - text = null; - } - - // HornetQRAMessage override ----------------------------------------- - - @Override - public void doBeforeReceive() throws ActiveMQException - { - super.doBeforeReceive(); - - text = readBodyText(message); - } - - @Override - protected <T> T getBodyInternal(Class<T> c) - { - return (T) getText(); - } - - @Override - public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c) - { - if (text == null) - return true; - return c.isAssignableFrom(java.lang.String.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopic.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopic.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopic.java deleted file mode 100644 index 94408c8..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopic.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.Topic; - -import org.apache.activemq.api.core.SimpleString; - -/** - * HornetQ implementation of a JMS Topic. - * <br> - * This class can be instantiated directly. - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @version <tt>$Revision: 8737 $</tt> - * - */ -public class HornetQTopic extends HornetQDestination implements Topic -{ - // Constants ----------------------------------------------------- - - private static final long serialVersionUID = 7873614001276404156L; - // Static -------------------------------------------------------- - - public static SimpleString createAddressFromName(final String name) - { - return new SimpleString(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name); - } - - // Attributes ---------------------------------------------------- - - // Constructors -------------------------------------------------- - - public HornetQTopic(final String name) - { - super(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null); - } - - - /** - * @param address - * @param name - * @param temporary - * @param session - */ - protected HornetQTopic(String address, String name, boolean temporary, HornetQSession session) - { - super(address, name, temporary, false, session); - } - - - // Topic implementation ------------------------------------------ - - public String getTopicName() - { - return name; - } - - // Public -------------------------------------------------------- - - @Override - public String toString() - { - return "HornetQTopic[" + name + "]"; - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopicConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopicConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopicConnectionFactory.java deleted file mode 100644 index 13e187f..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQTopicConnectionFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.TopicConnectionFactory; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.jms.JMSFactoryType; - -/** - * A class that represents a TopicConnectionFactory. - * - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - */ -public class HornetQTopicConnectionFactory extends HornetQConnectionFactory implements TopicConnectionFactory -{ - private static final long serialVersionUID = 7317051989866548455L; - - /** - * - */ - public HornetQTopicConnectionFactory() - { - super(); - } - - /** - * @param serverLocator - */ - public HornetQTopicConnectionFactory(ServerLocator serverLocator) - { - super(serverLocator); - } - - - /** - * @param ha - * @param groupConfiguration - */ - public HornetQTopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - super(ha, groupConfiguration); - } - - /** - * @param ha - * @param initialConnectors - */ - public HornetQTopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) - { - super(ha, initialConnectors); - } - - public int getFactoryType() - { - return JMSFactoryType.TOPIC_CF.intValue(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnection.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnection.java deleted file mode 100644 index 29b6ec0..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnection.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.XAQueueConnection; -import javax.jms.XAQueueSession; -import javax.jms.XASession; -import javax.jms.XATopicConnection; -import javax.jms.XATopicSession; - -import org.apache.activemq.api.core.client.ClientSessionFactory; - -/** - * HornetQ implementation of a JMS XAConnection. - * <p> - * The flat implementation of {@link XATopicConnection} and {@link XAQueueConnection} is per design, - * following common practices of JMS 1.1. - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - */ -public final class HornetQXAConnection extends HornetQConnection implements XATopicConnection, XAQueueConnection -{ - - public HornetQXAConnection(final String username, final String password, final int connectionType, - final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, - final ClientSessionFactory sessionFactory) - { - super(username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory); - } - - @Override - public XASession createXASession() throws JMSException - { - checkClosed(); - return (XASession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_GENERIC_SESSION); - } - - @Override - public XAQueueSession createXAQueueSession() throws JMSException - { - checkClosed(); - return (XAQueueSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_QUEUE_SESSION); - - } - - @Override - public XATopicSession createXATopicSession() throws JMSException - { - checkClosed(); - return (XATopicSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_TOPIC_SESSION); - } - - @Override - protected boolean isXA() - { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnectionFactory.java deleted file mode 100644 index 3eae974..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAConnectionFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.XAQueueConnectionFactory; -import javax.jms.XATopicConnectionFactory; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.jms.JMSFactoryType; - -/** - * A class that represents a XAConnectionFactory. - * <p> - * We consider the XAConnectionFactory to be the most complete possible option. It can be casted to any other connection factory since it is fully functional - * - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - */ -public class HornetQXAConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory, - XAQueueConnectionFactory -{ - private static final long serialVersionUID = 743611571839154115L; - - /** - * - */ - public HornetQXAConnectionFactory() - { - super(); - } - - /** - * @param serverLocator - */ - public HornetQXAConnectionFactory(ServerLocator serverLocator) - { - super(serverLocator); - } - - /** - * @param ha - * @param groupConfiguration - */ - public HornetQXAConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - super(ha, groupConfiguration); - } - - /** - * @param ha - * @param initialConnectors - */ - public HornetQXAConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) - { - super(ha, initialConnectors); - } - - @Override - public int getFactoryType() - { - return JMSFactoryType.XA_CF.intValue(); - } - - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAJMSContext.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAJMSContext.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAJMSContext.java deleted file mode 100644 index 65ab212..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAJMSContext.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.XAJMSContext; - -public class HornetQXAJMSContext extends HornetQJMSContext implements XAJMSContext -{ - public HornetQXAJMSContext(HornetQConnectionForContext connection, ThreadAwareContext threadAwareContext) - { - super(connection, threadAwareContext); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAQueueConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAQueueConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAQueueConnectionFactory.java deleted file mode 100644 index 1d8d622..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXAQueueConnectionFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.XAQueueConnectionFactory; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.jms.JMSFactoryType; - -/** - * A class that represents a XAQueueConnectionFactory. - * - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - * - */ -public class HornetQXAQueueConnectionFactory extends HornetQConnectionFactory implements XAQueueConnectionFactory -{ - private static final long serialVersionUID = 8612457847251087454L; - - /** - * - */ - public HornetQXAQueueConnectionFactory() - { - super(); - } - - /** - * @param serverLocator - */ - public HornetQXAQueueConnectionFactory(ServerLocator serverLocator) - { - super(serverLocator); - } - - /** - * @param ha - * @param groupConfiguration - */ - public HornetQXAQueueConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - super(ha, groupConfiguration); - } - - /** - * @param ha - * @param initialConnectors - */ - public HornetQXAQueueConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) - { - super(ha, initialConnectors); - } - - public int getFactoryType() - { - return JMSFactoryType.QUEUE_XA_CF.intValue(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXASession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXASession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXASession.java deleted file mode 100644 index 390fff0..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXASession.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.XAQueueSession; -import javax.jms.XATopicSession; - -import org.apache.activemq.api.core.client.ClientSession; - -/** - * A HornetQXASession - * - * @author clebertsuconic - * - * - */ -public class HornetQXASession extends HornetQSession implements XAQueueSession, XATopicSession -{ - - /** - * @param connection - * @param transacted - * @param xa - * @param ackMode - * @param session - * @param sessionType - */ - protected HornetQXASession(HornetQConnection connection, - boolean transacted, - boolean xa, - int ackMode, - ClientSession session, - int sessionType) - { - super(connection, transacted, xa, ackMode, session, sessionType); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXATopicConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXATopicConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXATopicConnectionFactory.java deleted file mode 100644 index 462d240..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQXATopicConnectionFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.XATopicConnectionFactory; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.jms.JMSFactoryType; - -/** - * A class that represents a XATopicConnectionFactory. - * - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - */ -public class HornetQXATopicConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory -{ - private static final long serialVersionUID = -7018290426884419693L; - - /** - * - */ - public HornetQXATopicConnectionFactory() - { - super(); - } - - /** - * @param serverLocator - */ - public HornetQXATopicConnectionFactory(final ServerLocator serverLocator) - { - super(serverLocator); - } - - /** - * @param ha - * @param groupConfiguration - */ - public HornetQXATopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - super(ha, groupConfiguration); - } - - /** - * @param ha - * @param initialConnectors - */ - public HornetQXATopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) - { - super(ha, initialConnectors); - } - - public int getFactoryType() - { - return JMSFactoryType.TOPIC_XA_CF.intValue(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/JMSExceptionHelper.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/JMSExceptionHelper.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/JMSExceptionHelper.java index c61f48a..b957801 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/JMSExceptionHelper.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/JMSExceptionHelper.java @@ -29,7 +29,7 @@ import org.apache.activemq.api.core.ActiveMQException; public final class JMSExceptionHelper { - public static JMSException convertFromHornetQException(final ActiveMQException me) + public static JMSException convertFromActiveMQException(final ActiveMQException me) { JMSException je; switch (me.getType())