http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientBundle.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientBundle.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientBundle.java deleted file mode 100644 index e5a6c0b..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientBundle.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - - -import javax.jms.IllegalStateException; -import javax.jms.IllegalStateRuntimeException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; - -import org.apache.activemq.api.core.ActiveMQIllegalStateException; -import org.apache.activemq.api.core.ActiveMQInvalidFilterExpressionException; -import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; -import org.apache.activemq.api.core.SimpleString; -import org.jboss.logging.annotations.Cause; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageBundle; -import org.jboss.logging.Messages; - -/** - * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> - * 3/12/12 - * - * Logger Code 12 - * - * each message id must be 6 digits long starting with 10, the 3rd digit should be 9 - * - * so 129000 to 129999 - */ -@MessageBundle(projectCode = "HQ") -public interface HornetQJMSClientBundle -{ - HornetQJMSClientBundle BUNDLE = Messages.getBundle(HornetQJMSClientBundle.class); - - @Message(id = 129000, value = "Invalid filter: {0}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQInvalidFilterExpressionException invalidFilter(@Cause Throwable e, SimpleString filter); - - @Message(id = 129001, value = "Invalid Subscription Name. It is required to set the subscription name", format = Message.Format.MESSAGE_FORMAT) - ActiveMQIllegalStateException invalidSubscriptionName(); - - @Message(id = 129002, value = "Destination {0} does not exist", format = Message.Format.MESSAGE_FORMAT) - ActiveMQNonExistentQueueException destinationDoesNotExist(SimpleString destination); - - @Message(id = 129003, value = "name cannot be null", format = Message.Format.MESSAGE_FORMAT) - IllegalArgumentException nameCannotBeNull(); - - @Message(id = 129004, value = "name cannot be empty", format = Message.Format.MESSAGE_FORMAT) - IllegalArgumentException nameCannotBeEmpty(); - - @Message(id = 129005, value = "It is illegal to call this method from within a Message Listener", format = Message.Format.MESSAGE_FORMAT) - IllegalStateRuntimeException callingMethodFromListenerRuntime(); - - @Message(id = 129006, value = "It is illegal to call this method from within a Message Listener", format = Message.Format.MESSAGE_FORMAT) - IllegalStateException callingMethodFromListener(); - - @Message(id = 129007, value = "It is illegal to call this method from within a Completion Listener", format = Message.Format.MESSAGE_FORMAT) - IllegalStateRuntimeException callingMethodFromCompletionListenerRuntime(); - - @Message(id = 129008, value = "It is illegal to call this method from within a Completion Listener", format = Message.Format.MESSAGE_FORMAT) - IllegalStateException callingMethodFromCompletionListener(); - - @Message(id = 129009, value = "Null {0} is not allowed", format = Message.Format.MESSAGE_FORMAT) - IllegalArgumentException nullArgumentNotAllowed(String type); - - @Message(id = 129010, value = "Topic (Destination) cannot be null", format = Message.Format.MESSAGE_FORMAT) - InvalidDestinationException nullTopic(); - - @Message(id = 129011, value = "LargeMessage streaming is only possible on ByteMessage or StreamMessage", - format = Message.Format.MESSAGE_FORMAT) - IllegalStateException onlyValidForByteOrStreamMessages(); - - @Message(id = 129012, value = "The property name ''{0}'' is not a valid java identifier.", - format = Message.Format.MESSAGE_FORMAT) - JMSRuntimeException invalidJavaIdentifier(String propertyName); - - @Message(id = 129013, value = "Message is read-only", format = Message.Format.MESSAGE_FORMAT) - MessageNotWriteableException messageNotWritable(); - - @Message(id = 129014, value = "Message is write-only", format = Message.Format.MESSAGE_FORMAT) - MessageNotReadableException messageNotReadable(); - - @Message(id = 129015, value = "Illegal deliveryMode value: {0}", format = Message.Format.MESSAGE_FORMAT) - JMSException illegalDeliveryMode(int deliveryMode); -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientLogger.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientLogger.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientLogger.java deleted file mode 100644 index 180e8f1..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSClientLogger.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import org.jboss.logging.BasicLogger; -import org.jboss.logging.Logger; -import org.jboss.logging.annotations.Cause; -import org.jboss.logging.annotations.LogMessage; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageLogger; - -/** - * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> - * 3/15/12 - * - * Logger Code 12 - * - * each message id must be 6 digits long starting with 12, the 3rd digit donates the level so - * - * INF0 1 - * WARN 2 - * DEBUG 3 - * ERROR 4 - * TRACE 5 - * FATAL 6 - * - * so an INFO message would be 121000 to 121999 - */ -@MessageLogger(projectCode = "HQ") -public interface HornetQJMSClientLogger extends BasicLogger -{ - /** - * The default logger. - */ - HornetQJMSClientLogger LOGGER = Logger.getMessageLogger(HornetQJMSClientLogger.class, HornetQJMSClientLogger.class.getPackage().getName()); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122000, value = "I''m closing a JMS connection you left open. Please make sure you close all JMS connections explicitly before letting them go out of scope! see stacktrace to find out where it was created" , format = Message.Format.MESSAGE_FORMAT) - void connectionLeftOpen(@Cause Exception e); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 122001, value = "Unhandled exception thrown from onMessage" , format = Message.Format.MESSAGE_FORMAT) - void onMessageError(@Cause Exception e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124000, value = "Failed to call JMS exception listener" , format = Message.Format.MESSAGE_FORMAT) - void errorCallingExcListener(@Cause Exception e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124002, value = "Queue Browser failed to create message" , format = Message.Format.MESSAGE_FORMAT) - void errorCreatingMessage(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124003, value = "Message Listener failed to prepare message for receipt" , format = Message.Format.MESSAGE_FORMAT) - void errorPreparingMessageForReceipt(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124004, value = "Message Listener failed to process message" , format = Message.Format.MESSAGE_FORMAT) - void errorProcessingMessage(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124005, value = "Message Listener failed to recover session" , format = Message.Format.MESSAGE_FORMAT) - void errorRecoveringSession(@Cause Throwable e); - - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124006, value = "Failed to call Failover listener" , format = Message.Format.MESSAGE_FORMAT) - void errorCallingFailoverListener(@Cause Exception e); - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConnectionFactory.java deleted file mode 100644 index dc76692..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConnectionFactory.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.QueueConnectionFactory; -import javax.jms.TopicConnectionFactory; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ServerLocator; - - -/** - * A class that represents a ConnectionFactory. - * - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - */ -public class HornetQJMSConnectionFactory extends HornetQConnectionFactory implements TopicConnectionFactory, QueueConnectionFactory -{ - - private static final long serialVersionUID = -2810634789345348326L; - - /** - * - */ - public HornetQJMSConnectionFactory() - { - super(); - } - - /** - * @param serverLocator - */ - public HornetQJMSConnectionFactory(ServerLocator serverLocator) - { - super(serverLocator); - } - - /** - * @param ha - * @param groupConfiguration - */ - public HornetQJMSConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - super(ha, groupConfiguration); - } - - /** - * @param ha - * @param initialConnectors - */ - public HornetQJMSConnectionFactory(boolean ha, TransportConfiguration... initialConnectors) - { - super(ha, initialConnectors); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConsumer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConsumer.java deleted file mode 100644 index b9ba41f..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSConsumer.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSConsumer; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; - -/** - * @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2013 Red Hat inc. - */ -public class HornetQJMSConsumer implements JMSConsumer -{ - - private final HornetQJMSContext context; - private final MessageConsumer consumer; - - HornetQJMSConsumer(HornetQJMSContext context, MessageConsumer consumer) - { - this.context = context; - this.consumer = consumer; - } - - @Override - public String getMessageSelector() - { - try - { - return consumer.getMessageSelector(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public MessageListener getMessageListener() throws JMSRuntimeException - { - try - { - return consumer.getMessageListener(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void setMessageListener(MessageListener listener) throws JMSRuntimeException - { - try - { - consumer.setMessageListener(new MessageListenerWrapper(listener)); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public Message receive() - { - try - { - return context.setLastMessage(this, consumer.receive()); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public Message receive(long timeout) - { - try - { - return context.setLastMessage(this, consumer.receive(timeout)); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public Message receiveNoWait() - { - try - { - return context.setLastMessage(this, consumer.receiveNoWait()); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void close() - { - try - { - consumer.close(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public <T> T receiveBody(Class<T> c) - { - try - { - Message message = consumer.receive(); - context.setLastMessage(HornetQJMSConsumer.this, message); - return message == null ? null : message.getBody(c); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public <T> T receiveBody(Class<T> c, long timeout) - { - try - { - Message message = consumer.receive(timeout); - context.setLastMessage(HornetQJMSConsumer.this, message); - return message == null ? null : message.getBody(c); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public <T> T receiveBodyNoWait(Class<T> c) - { - try - { - Message message = consumer.receiveNoWait(); - context.setLastMessage(HornetQJMSConsumer.this, message); - return message == null ? null : message.getBody(c); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - final class MessageListenerWrapper implements MessageListener - { - private final MessageListener wrapped; - - public MessageListenerWrapper(MessageListener wrapped) - { - this.wrapped = wrapped; - } - - @Override - public void onMessage(Message message) - { - context.setLastMessage(HornetQJMSConsumer.this, message); - - context.getThreadAwareContext().setCurrentThread(false); - try - { - wrapped.onMessage(message); - } - finally - { - context.getThreadAwareContext().clearCurrentThread(false); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSContext.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSContext.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSContext.java deleted file mode 100644 index d314336..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSContext.java +++ /dev/null @@ -1,769 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.BytesMessage; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateRuntimeException; -import javax.jms.JMSConsumer; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import javax.jms.JMSProducer; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.XAConnection; -import javax.jms.XASession; -import javax.transaction.xa.XAResource; -import java.io.Serializable; - -/** - * HornetQ implementation of a JMSContext. - * - * @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2013 Red Hat inc - */ -public class HornetQJMSContext implements JMSContext -{ - private static final boolean DEFAULT_AUTO_START = true; - private final int sessionMode; - - private final ThreadAwareContext threadAwareContext; - - /** - * Client ACK needs to hold last acked messages, so context.ack calls will be respected. - */ - private volatile Message lastMessagesWaitingAck; - - private final HornetQConnectionForContext connection; - private Session session; - private boolean autoStart = HornetQJMSContext.DEFAULT_AUTO_START; - private MessageProducer innerProducer; - private boolean xa; - private boolean closed; - - HornetQJMSContext(final HornetQConnectionForContext connection, final int ackMode, final boolean xa, ThreadAwareContext threadAwareContext) - { - this.connection = connection; - this.sessionMode = ackMode; - this.xa = xa; - this.threadAwareContext = threadAwareContext; - } - - public HornetQJMSContext(HornetQConnectionForContext connection, int ackMode, ThreadAwareContext threadAwareContext) - { - this(connection, ackMode, false, threadAwareContext); - } - - public HornetQJMSContext(HornetQConnectionForContext connection, ThreadAwareContext threadAwareContext) - { - this(connection, SESSION_TRANSACTED, true, threadAwareContext); - } - - // XAJMSContext implementation ------------------------------------- - - public JMSContext getContext() - { - return this; - } - - public Session getSession() - { - return session; - } - - public XAResource getXAResource() - { - checkSession(); - return ((XASession) session).getXAResource(); - } - - // JMSContext implementation ------------------------------------- - - @Override - public JMSContext createContext(int sessionMode) - { - return connection.createContext(sessionMode); - } - - @Override - public JMSProducer createProducer() - { - checkSession(); - try - { - return new HornetQJMSProducer(this, getInnerProducer()); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - private synchronized MessageProducer getInnerProducer() throws JMSException - { - if (innerProducer == null) - { - innerProducer = session.createProducer(null); - } - - return innerProducer; - } - - /** - * - */ - private void checkSession() - { - if (session == null) - { - synchronized (this) - { - if (closed) - throw new IllegalStateRuntimeException("Context is closed"); - if (session == null) - { - try - { - if (xa) - { - session = ((XAConnection) connection).createXASession(); - } - else - { - session = connection.createSession(sessionMode); - } - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - } - } - } - - @Override - public String getClientID() - { - try - { - return connection.getClientID(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void setClientID(String clientID) - { - try - { - connection.setClientID(clientID); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public ConnectionMetaData getMetaData() - { - try - { - return connection.getMetaData(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public ExceptionListener getExceptionListener() - { - try - { - return connection.getExceptionListener(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void setExceptionListener(ExceptionListener listener) - { - try - { - connection.setExceptionListener(listener); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void start() - { - try - { - connection.start(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void stop() - { - threadAwareContext.assertNotMessageListenerThreadRuntime(); - try - { - connection.stop(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void setAutoStart(boolean autoStart) - { - this.autoStart = autoStart; - } - - @Override - public boolean getAutoStart() - { - return autoStart; - } - - @Override - public void close() - { - threadAwareContext.assertNotCompletionListenerThreadRuntime(); - threadAwareContext.assertNotMessageListenerThreadRuntime(); - try - { - synchronized (this) - { - if (session != null) - session.close(); - connection.closeFromContext(); - closed = true; - } - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public BytesMessage createBytesMessage() - { - checkSession(); - try - { - return session.createBytesMessage(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public MapMessage createMapMessage() - { - checkSession(); - try - { - return session.createMapMessage(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public Message createMessage() - { - checkSession(); - try - { - return session.createMessage(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public ObjectMessage createObjectMessage() - { - checkSession(); - try - { - return session.createObjectMessage(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public ObjectMessage createObjectMessage(Serializable object) - { - checkSession(); - try - { - return session.createObjectMessage(object); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public StreamMessage createStreamMessage() - { - checkSession(); - try - { - return session.createStreamMessage(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public TextMessage createTextMessage() - { - checkSession(); - try - { - return session.createTextMessage(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public TextMessage createTextMessage(String text) - { - checkSession(); - try - { - return session.createTextMessage(text); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public boolean getTransacted() - { - checkSession(); - try - { - return session.getTransacted(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public int getSessionMode() - { - return sessionMode; - } - - @Override - public void commit() - { - threadAwareContext.assertNotCompletionListenerThreadRuntime(); - checkSession(); - try - { - session.commit(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void rollback() - { - threadAwareContext.assertNotCompletionListenerThreadRuntime(); - checkSession(); - try - { - session.rollback(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void recover() - { - checkSession(); - try - { - session.recover(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createConsumer(Destination destination) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createConsumer(destination)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createConsumer(Destination destination, String messageSelector) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createConsumer(destination, messageSelector)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createConsumer(destination, messageSelector, noLocal)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public Queue createQueue(String queueName) - { - checkSession(); - try - { - return session.createQueue(queueName); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public Topic createTopic(String topicName) - { - checkSession(); - try - { - return session.createTopic(topicName); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createDurableConsumer(Topic topic, String name) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createDurableConsumer(topic, name)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createDurableConsumer(topic, name, messageSelector, noLocal)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createSharedDurableConsumer(Topic topic, String name) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createSharedDurableConsumer(topic, name)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createSharedDurableConsumer(topic, name, messageSelector)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createSharedConsumer(topic, sharedSubscriptionName)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) - { - checkSession(); - try - { - HornetQJMSConsumer consumer = new HornetQJMSConsumer(this, session.createSharedConsumer(topic, sharedSubscriptionName, messageSelector)); - checkAutoStart(); - return consumer; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public QueueBrowser createBrowser(Queue queue) - { - checkSession(); - try - { - QueueBrowser browser = session.createBrowser(queue); - checkAutoStart(); - return browser; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public QueueBrowser createBrowser(Queue queue, String messageSelector) - { - checkSession(); - try - { - QueueBrowser browser = session.createBrowser(queue, messageSelector); - checkAutoStart(); - return browser; - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public TemporaryQueue createTemporaryQueue() - { - checkSession(); - try - { - return session.createTemporaryQueue(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public TemporaryTopic createTemporaryTopic() - { - checkSession(); - try - { - return session.createTemporaryTopic(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void unsubscribe(String name) - { - checkSession(); - try - { - session.unsubscribe(name); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public void acknowledge() - { - checkSession(); - if (closed) - throw new IllegalStateRuntimeException("Context is closed"); - try - { - if (lastMessagesWaitingAck != null) - { - lastMessagesWaitingAck.acknowledge(); - } - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - /** - * This is to be used on tests only. It's not part of the interface and it's not guaranteed to be kept - * on the API contract. - * - * @return - */ - public Session getUsedSession() - { - return this.session; - } - - private synchronized void checkAutoStart() throws JMSException - { - if (closed) - throw new IllegalStateRuntimeException("Context is closed"); - if (autoStart) - { - connection.start(); - } - } - - /** - * this is to ensure Context.acknowledge would work on ClientACK - */ - Message setLastMessage(final JMSConsumer consumer, final Message lastMessageReceived) - { - if (sessionMode == CLIENT_ACKNOWLEDGE) - { - lastMessagesWaitingAck = lastMessageReceived; - } - return lastMessageReceived; - } - - public ThreadAwareContext getThreadAwareContext() - { - return threadAwareContext; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSProducer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSProducer.java deleted file mode 100644 index 86a5f7c..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQJMSProducer.java +++ /dev/null @@ -1,800 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.BytesMessage; -import javax.jms.CompletionListener; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.JMSProducer; -import javax.jms.JMSRuntimeException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageFormatRuntimeException; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.TextMessage; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.activemq.api.core.ActiveMQPropertyConversionException; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.utils.TypedProperties; - -/** - * NOTE: this class forwards {@link #setDisableMessageID(boolean)} and - * {@link #setDisableMessageTimestamp(boolean)} calls their equivalent at the - * {@link MessageProducer}. IF the user is using the producer in async mode, this may lead to races. - * We allow/tolerate this because these are just optional optimizations. - * - * @author <a href="http://jmesnil.net/">Jeff Mesnil</a> (c) 2013 Red Hat inc. - */ -public final class HornetQJMSProducer implements JMSProducer -{ - private final HornetQJMSContext context; - private final MessageProducer producer; - private final TypedProperties properties = new TypedProperties(); - - //we convert Strings to SimpleStrings so if getProperty is called the wrong object is returned, this list let's us return the - //correct type - private final List<SimpleString> stringPropertyNames = new ArrayList<>(); - - private volatile CompletionListener completionListener; - - private Destination jmsHeaderReplyTo; - private String jmsHeaderCorrelationID; - private byte[] jmsHeaderCorrelationIDAsBytes; - private String jmsHeaderType; - - HornetQJMSProducer(HornetQJMSContext context, MessageProducer producer) - { - this.context = context; - this.producer = producer; - } - - @Override - public JMSProducer send(Destination destination, Message message) - { - if (message == null) - { - throw new MessageFormatRuntimeException("null message"); - } - - try - { - if (jmsHeaderCorrelationID != null) - { - message.setJMSCorrelationID(jmsHeaderCorrelationID); - } - if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) - { - message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes); - } - if (jmsHeaderReplyTo != null) - { - message.setJMSReplyTo(jmsHeaderReplyTo); - } - if (jmsHeaderType != null) - { - message.setJMSType(jmsHeaderType); - } - // XXX HORNETQ-1209 "JMS 2.0" can this be a foreign msg? - // if so, then "SimpleString" properties will trigger an error. - setProperties(message); - if (completionListener != null) - { - CompletionListener wrapped = new CompletionListenerWrapper(completionListener); - producer.send(destination, message, wrapped); - } - else - { - producer.send(destination, message); - } - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - return this; - } - - /** - * Sets all properties we carry onto the message. - * - * @param message - * @throws JMSException - */ - private void setProperties(Message message) throws JMSException - { - for (SimpleString name : properties.getPropertyNames()) - { - message.setObjectProperty(name.toString(), properties.getProperty(name)); - } - } - - @Override - public JMSProducer send(Destination destination, String body) - { - TextMessage message = context.createTextMessage(body); - send(destination, message); - return this; - } - - @Override - public JMSProducer send(Destination destination, Map<String, Object> body) - { - MapMessage message = context.createMapMessage(); - if (body != null) - { - try - { - for (Entry<String, Object> entry : body.entrySet()) - { - final String name = entry.getKey(); - final Object v = entry.getValue(); - if (v instanceof String) - { - message.setString(name, (String) v); - } - else if (v instanceof Long) - { - message.setLong(name, (Long) v); - } - else if (v instanceof Double) - { - message.setDouble(name, (Double) v); - } - else if (v instanceof Integer) - { - message.setInt(name, (Integer) v); - } - else if (v instanceof Character) - { - message.setChar(name, (Character) v); - } - else if (v instanceof Short) - { - message.setShort(name, (Short) v); - } - else if (v instanceof Boolean) - { - message.setBoolean(name, (Boolean) v); - } - else if (v instanceof Float) - { - message.setFloat(name, (Float) v); - } - else if (v instanceof Byte) - { - message.setByte(name, (Byte) v); - } - else if (v instanceof byte[]) - { - byte[] array = (byte[]) v; - message.setBytes(name, array, 0, array.length); - } - else - { - message.setObject(name, v); - } - } - } - catch (JMSException e) - { - throw new MessageFormatRuntimeException(e.getMessage()); - } - } - send(destination, message); - return this; - } - - @Override - public JMSProducer send(Destination destination, byte[] body) - { - BytesMessage message = context.createBytesMessage(); - if (body != null) - { - try - { - message.writeBytes(body); - } - catch (JMSException e) - { - throw new MessageFormatRuntimeException(e.getMessage()); - } - } - send(destination, message); - return this; - } - - @Override - public JMSProducer send(Destination destination, Serializable body) - { - ObjectMessage message = context.createObjectMessage(body); - send(destination, message); - return this; - } - - @Override - public JMSProducer setDisableMessageID(boolean value) - { - try - { - producer.setDisableMessageID(value); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - return this; - } - - @Override - public boolean getDisableMessageID() - { - try - { - return producer.getDisableMessageID(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSProducer setDisableMessageTimestamp(boolean value) - { - try - { - producer.setDisableMessageTimestamp(value); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - return this; - } - - @Override - public boolean getDisableMessageTimestamp() - { - try - { - return producer.getDisableMessageTimestamp(); - } - catch (JMSException e) - { - throw JmsExceptionUtils.convertToRuntimeException(e); - } - } - - @Override - public JMSProducer setDeliveryMode(int deliveryMode) - { - try - { - producer.setDeliveryMode(deliveryMode); - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - return this; - } - - @Override - public int getDeliveryMode() - { - try - { - return producer.getDeliveryMode(); - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - } - - @Override - public JMSProducer setPriority(int priority) - { - try - { - producer.setPriority(priority); - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - return this; - } - - @Override - public int getPriority() - { - try - { - return producer.getPriority(); - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - } - - @Override - public JMSProducer setTimeToLive(long timeToLive) - { - try - { - producer.setTimeToLive(timeToLive); - return this; - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - } - - @Override - public long getTimeToLive() - { - long timeToLive = 0; - try - { - timeToLive = producer.getTimeToLive(); - return timeToLive; - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - } - - @Override - public JMSProducer setDeliveryDelay(long deliveryDelay) - { - try - { - producer.setDeliveryDelay(deliveryDelay); - return this; - } - catch (JMSException e) - { - JMSRuntimeException e2 = new JMSRuntimeException(e.getMessage()); - e2.initCause(e); - throw e2; - } - } - - @Override - public long getDeliveryDelay() - { - long deliveryDelay = 0; - try - { - deliveryDelay = producer.getDeliveryDelay(); - } - catch (Exception ignored) - { - } - return deliveryDelay; - } - - @Override - public JMSProducer setAsync(CompletionListener completionListener) - { - this.completionListener = completionListener; - return this; - } - - @Override - public CompletionListener getAsync() - { - return completionListener; - } - - @Override - public JMSProducer setProperty(String name, boolean value) - { - checkName(name); - properties.putBooleanProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, byte value) - { - checkName(name); - properties.putByteProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, short value) - { - checkName(name); - properties.putShortProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, int value) - { - checkName(name); - properties.putIntProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, long value) - { - checkName(name); - properties.putLongProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, float value) - { - checkName(name); - properties.putFloatProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, double value) - { - checkName(name); - properties.putDoubleProperty(new SimpleString(name), value); - return this; - } - - @Override - public JMSProducer setProperty(String name, String value) - { - checkName(name); - SimpleString key = new SimpleString(name); - properties.putSimpleStringProperty(key, new SimpleString(value)); - stringPropertyNames.add(key); - return this; - } - - @Override - public JMSProducer setProperty(String name, Object value) - { - checkName(name); - try - { - TypedProperties.setObjectProperty(new SimpleString(name), value, properties); - } - catch (ActiveMQPropertyConversionException hqe) - { - throw new MessageFormatRuntimeException(hqe.getMessage()); - } - catch (RuntimeException e) - { - throw new JMSRuntimeException(e.getMessage()); - } - return this; - } - - @Override - public JMSProducer clearProperties() - { - try - { - stringPropertyNames.clear(); - properties.clear(); - } - catch (RuntimeException e) - { - throw new JMSRuntimeException(e.getMessage()); - } - return this; - } - - @Override - public boolean propertyExists(String name) - { - return properties.containsProperty(new SimpleString(name)); - } - - @Override - public boolean getBooleanProperty(String name) - { - try - { - return properties.getBooleanProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - catch (RuntimeException e) - { - throw new JMSRuntimeException(e.getMessage()); - } - } - - @Override - public byte getByteProperty(String name) - { - try - { - return properties.getByteProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - } - - @Override - public short getShortProperty(String name) - { - try - { - return properties.getShortProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - } - - @Override - public int getIntProperty(String name) - { - try - { - return properties.getIntProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - } - - @Override - public long getLongProperty(String name) - { - try - { - return properties.getLongProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - } - - @Override - public float getFloatProperty(String name) - { - try - { - return properties.getFloatProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - } - - @Override - public double getDoubleProperty(String name) - { - try - { - return properties.getDoubleProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - } - - @Override - public String getStringProperty(String name) - { - try - { - SimpleString prop = properties.getSimpleStringProperty(new SimpleString(name)); - if (prop == null) - return null; - return prop.toString(); - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - catch (RuntimeException e) - { - throw new JMSRuntimeException(e.getMessage()); - } - } - - @Override - public Object getObjectProperty(String name) - { - try - { - SimpleString key = new SimpleString(name); - Object property = properties.getProperty(key); - if (stringPropertyNames.contains(key)) - { - property = property.toString(); - } - return property; - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - catch (RuntimeException e) - { - throw new JMSRuntimeException(e.getMessage()); - } - } - - @Override - public Set<String> getPropertyNames() - { - try - { - Set<SimpleString> simplePropNames = properties.getPropertyNames(); - Set<String> propNames = new HashSet<String>(simplePropNames.size()); - - for (SimpleString str : simplePropNames) - { - propNames.add(str.toString()); - } - return propNames; - } - catch (ActiveMQPropertyConversionException ce) - { - throw new MessageFormatRuntimeException(ce.getMessage()); - } - catch (RuntimeException e) - { - throw new JMSRuntimeException(e.getMessage()); - } - } - - @Override - public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationID) - { - if (correlationID == null || correlationID.length == 0) - { - throw new JMSRuntimeException("Please specify a non-zero length byte[]"); - } - jmsHeaderCorrelationIDAsBytes = Arrays.copyOf(correlationID, correlationID.length); - return this; - } - - @Override - public byte[] getJMSCorrelationIDAsBytes() - { - return Arrays.copyOf(jmsHeaderCorrelationIDAsBytes, jmsHeaderCorrelationIDAsBytes.length); - } - - @Override - public JMSProducer setJMSCorrelationID(String correlationID) - { - jmsHeaderCorrelationID = correlationID; - return this; - } - - @Override - public String getJMSCorrelationID() - { - return jmsHeaderCorrelationID; - } - - @Override - public JMSProducer setJMSType(String type) - { - jmsHeaderType = type; - return this; - } - - @Override - public String getJMSType() - { - return jmsHeaderType; - } - - @Override - public JMSProducer setJMSReplyTo(Destination replyTo) - { - jmsHeaderReplyTo = replyTo; - return this; - } - - @Override - public Destination getJMSReplyTo() - { - return jmsHeaderReplyTo; - } - - private void checkName(String name) - { - if (name == null) - { - throw HornetQJMSClientBundle.BUNDLE.nameCannotBeNull(); - } - if (name.equals("")) - { - throw HornetQJMSClientBundle.BUNDLE.nameCannotBeEmpty(); - } - } - - final class CompletionListenerWrapper implements CompletionListener - { - - private final CompletionListener wrapped; - - public CompletionListenerWrapper(CompletionListener wrapped) - { - this.wrapped = wrapped; - } - - @Override - public void onCompletion(Message message) - { - context.getThreadAwareContext().setCurrentThread(true); - try - { - wrapped.onCompletion(message); - } - finally - { - context.getThreadAwareContext().clearCurrentThread(true); - } - } - - @Override - public void onException(Message message, Exception exception) - { - context.getThreadAwareContext().setCurrentThread(true); - try - { - wrapped.onException(message, exception); - } - finally - { - context.getThreadAwareContext().clearCurrentThread(true); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMapMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMapMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMapMessage.java deleted file mode 100644 index 635854b..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMapMessage.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageFormatException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Set; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQPropertyConversionException; -import org.apache.activemq.api.core.Message; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.utils.TypedProperties; - - -import static org.apache.activemq.reader.MapMessageUtil.writeBodyMap; -import static org.apache.activemq.reader.MapMessageUtil.readBodyMap; - -/** - * HornetQ implementation of a JMS MapMessage. - * - * @author Norbert Lataille (norbert.latai...@m4x.org) - * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - * @version $Revision: 3412 $ - */ -public final class HornetQMapMessage extends HornetQMessage implements MapMessage -{ - // Constants ----------------------------------------------------- - - public static final byte TYPE = Message.MAP_TYPE; - - // Attributes ---------------------------------------------------- - - private final TypedProperties map = new TypedProperties(); - - private boolean invalid; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - /* - * This constructor is used to construct messages prior to sending - */ - protected HornetQMapMessage(final ClientSession session) - { - super(HornetQMapMessage.TYPE, session); - - invalid = true; - } - - /* - * This constructor is used during reading - */ - protected HornetQMapMessage(final ClientMessage message, final ClientSession session) - { - super(message, session); - - invalid = false; - } - - public HornetQMapMessage() - { - invalid = false; - } - - /** - * Constructor for a foreign MapMessage - * - * @param foreign - * @throws JMSException - */ - public HornetQMapMessage(final MapMessage foreign, final ClientSession session) throws JMSException - { - super(foreign, HornetQMapMessage.TYPE, session); - Enumeration<?> names = foreign.getMapNames(); - while (names.hasMoreElements()) - { - String name = (String) names.nextElement(); - Object obj = foreign.getObject(name); - setObject(name, obj); - } - } - - // Public -------------------------------------------------------- - - @Override - public byte getType() - { - return HornetQMapMessage.TYPE; - } - - // MapMessage implementation ------------------------------------- - - public void setBoolean(final String name, final boolean value) throws JMSException - { - checkName(name); - map.putBooleanProperty(new SimpleString(name), value); - invalid = true; - } - - public void setByte(final String name, final byte value) throws JMSException - { - checkName(name); - map.putByteProperty(new SimpleString(name), value); - invalid = true; - } - - public void setShort(final String name, final short value) throws JMSException - { - checkName(name); - map.putShortProperty(new SimpleString(name), value); - invalid = true; - } - - public void setChar(final String name, final char value) throws JMSException - { - checkName(name); - map.putCharProperty(new SimpleString(name), value); - invalid = true; - } - - public void setInt(final String name, final int value) throws JMSException - { - checkName(name); - map.putIntProperty(new SimpleString(name), value); - invalid = true; - } - - public void setLong(final String name, final long value) throws JMSException - { - checkName(name); - map.putLongProperty(new SimpleString(name), value); - invalid = true; - } - - public void setFloat(final String name, final float value) throws JMSException - { - checkName(name); - map.putFloatProperty(new SimpleString(name), value); - invalid = true; - } - - public void setDouble(final String name, final double value) throws JMSException - { - checkName(name); - map.putDoubleProperty(new SimpleString(name), value); - invalid = true; - } - - public void setString(final String name, final String value) throws JMSException - { - checkName(name); - map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value)); - invalid = true; - } - - public void setBytes(final String name, final byte[] value) throws JMSException - { - checkName(name); - map.putBytesProperty(new SimpleString(name), value); - invalid = true; - } - - public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException - { - checkName(name); - if (offset + length > value.length) - { - throw new JMSException("Invalid offset/length"); - } - byte[] newBytes = new byte[length]; - System.arraycopy(value, offset, newBytes, 0, length); - map.putBytesProperty(new SimpleString(name), newBytes); - invalid = true; - } - - public void setObject(final String name, final Object value) throws JMSException - { - checkName(name); - try - { - TypedProperties.setObjectProperty(new SimpleString(name), value, map); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - invalid = true; - } - - public boolean getBoolean(final String name) throws JMSException - { - try - { - return map.getBooleanProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public byte getByte(final String name) throws JMSException - { - try - { - return map.getByteProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public short getShort(final String name) throws JMSException - { - try - { - return map.getShortProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public char getChar(final String name) throws JMSException - { - try - { - return map.getCharProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public int getInt(final String name) throws JMSException - { - try - { - return map.getIntProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public long getLong(final String name) throws JMSException - { - try - { - return map.getLongProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public float getFloat(final String name) throws JMSException - { - try - { - return map.getFloatProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public double getDouble(final String name) throws JMSException - { - try - { - return map.getDoubleProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public String getString(final String name) throws JMSException - { - try - { - SimpleString str = map.getSimpleStringProperty(new SimpleString(name)); - if (str == null) - { - return null; - } - else - { - return str.toString(); - } - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public byte[] getBytes(final String name) throws JMSException - { - try - { - return map.getBytesProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public Object getObject(final String name) throws JMSException - { - Object val = map.getProperty(new SimpleString(name)); - - if (val instanceof SimpleString) - { - val = ((SimpleString) val).toString(); - } - - return val; - } - - public Enumeration getMapNames() throws JMSException - { - Set<SimpleString> simplePropNames = map.getPropertyNames(); - Set<String> propNames = new HashSet<String>(simplePropNames.size()); - - for (SimpleString str : simplePropNames) - { - propNames.add(str.toString()); - } - - return Collections.enumeration(propNames); - } - - public boolean itemExists(final String name) throws JMSException - { - return map.containsProperty(new SimpleString(name)); - } - - - // HornetQRAMessage overrides ---------------------------------------- - - @Override - public void clearBody() throws JMSException - { - super.clearBody(); - - map.clear(); - - invalid = true; - } - - @Override - public void doBeforeSend() throws Exception - { - if (invalid) - { - writeBodyMap(message, map); - invalid = false; - } - - super.doBeforeSend(); - } - - @Override - public void doBeforeReceive() throws ActiveMQException - { - super.doBeforeReceive(); - - readBodyMap(message, map); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - /** - * Check the name - * - * @param name the name - */ - private void checkName(final String name) throws JMSException - { - checkWrite(); - - if (name == null) - { - throw HornetQJMSClientBundle.BUNDLE.nameCannotBeNull(); - } - if (name.equals("")) - { - throw HornetQJMSClientBundle.BUNDLE.nameCannotBeEmpty(); - } - } - - @Override - protected boolean hasNoBody() - { - return map.isEmpty(); - } - - @Override - public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") - Class c) - { - if (hasNoBody()) - { - return true; - } - return c.isAssignableFrom(java.util.Map.class); - } - - @SuppressWarnings("unchecked") - @Override - protected <T> T getBodyInternal(Class<T> c) - { - return (T) map.getMap(); - } -}