http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessage.java deleted file mode 100644 index 4f8038f..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessage.java +++ /dev/null @@ -1,1089 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.JMSRuntimeException; -import javax.jms.Message; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; - -import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQPropertyConversionException; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.jms.HornetQJMSConstants; -import org.apache.activemq.core.message.impl.MessageInternal; -import org.apache.activemq.reader.MessageUtil; -import org.apache.activemq.utils.UUID; - - -/** - * HornetQ implementation of a JMS Message. - * <br> - * JMS Messages only live on the client side - the server only deals with MessageImpl - * instances - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:bersh...@yahoo.com">Tyronne Wickramarathne</a> Partially ported from JBossMQ implementation - * originally written by: - * @author Norbert Lataille (norbert.latai...@m4x.org) - * @author Hiram Chirino (cojonud...@hotmail.com) - * @author David Maplesden (david.maples...@orion.co.nz) - * @author <a href="mailto:adr...@jboss.org">Adrian Brock</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - * @author <a href="mailto:clebert.suco...@jboss.org">Clebert Suconic</a> - */ -public class HornetQMessage implements javax.jms.Message -{ - // Constants ----------------------------------------------------- - public static final byte TYPE = org.apache.activemq.api.core.Message.DEFAULT_TYPE; - - public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage) - { - Map<String, Object> jmsMessage = new HashMap<String, Object>(); - - String deliveryMode = (Boolean)coreMessage.get("durable") ? "PERSISTENT" : "NON_PERSISTENT"; - byte priority = (Byte)coreMessage.get("priority"); - long timestamp = (Long)coreMessage.get("timestamp"); - long expiration = (Long)coreMessage.get("expiration"); - - jmsMessage.put("JMSPriority", priority); - jmsMessage.put("JMSTimestamp", timestamp); - jmsMessage.put("JMSExpiration", expiration); - jmsMessage.put("JMSDeliveryMode", deliveryMode); - - for (Map.Entry<String, Object> entry : coreMessage.entrySet()) - { - if (entry.getKey().equals("type") || entry.getKey().equals("durable") || - entry.getKey().equals("expiration") || - entry.getKey().equals("timestamp") || - entry.getKey().equals("priority")) - { - // Ignore - } - else if (entry.getKey().equals("userID")) - { - jmsMessage.put("JMSMessageID", entry.getValue().toString()); - } - else - { - Object value = entry.getValue(); - if (value instanceof SimpleString) - { - jmsMessage.put(entry.getKey(), value.toString()); - } - else - { - jmsMessage.put(entry.getKey(), value); - } - } - } - - return jmsMessage; - } - - // Static -------------------------------------------------------- - - private static final HashSet<String> reservedIdentifiers = new HashSet<String>(); - static - { - HornetQMessage.reservedIdentifiers.add("NULL"); - HornetQMessage.reservedIdentifiers.add("TRUE"); - HornetQMessage.reservedIdentifiers.add("FALSE"); - HornetQMessage.reservedIdentifiers.add("NOT"); - HornetQMessage.reservedIdentifiers.add("AND"); - HornetQMessage.reservedIdentifiers.add("OR"); - HornetQMessage.reservedIdentifiers.add("BETWEEN"); - HornetQMessage.reservedIdentifiers.add("LIKE"); - HornetQMessage.reservedIdentifiers.add("IN"); - HornetQMessage.reservedIdentifiers.add("IS"); - HornetQMessage.reservedIdentifiers.add("ESCAPE"); - } - - public static HornetQMessage createMessage(final ClientMessage message, final ClientSession session) - { - int type = message.getType(); - - HornetQMessage msg; - - switch (type) - { - case HornetQMessage.TYPE: // 0 - { - msg = new HornetQMessage(message, session); - break; - } - case HornetQBytesMessage.TYPE: // 4 - { - msg = new HornetQBytesMessage(message, session); - break; - } - case HornetQMapMessage.TYPE: // 5 - { - msg = new HornetQMapMessage(message, session); - break; - } - case HornetQObjectMessage.TYPE: - { - msg = new HornetQObjectMessage(message, session); - break; - } - case HornetQStreamMessage.TYPE: // 6 - { - msg = new HornetQStreamMessage(message, session); - break; - } - case HornetQTextMessage.TYPE: // 3 - { - msg = new HornetQTextMessage(message, session); - break; - } - default: - { - throw new JMSRuntimeException("Invalid message type " + type); - } - } - - return msg; - } - - // Attributes ---------------------------------------------------- - - // The underlying message - protected ClientMessage message; - - private ClientSession session; - - // Read-only? - protected boolean readOnly; - - // Properties read-only? - protected boolean propertiesReadOnly; - - // Cache it - private Destination dest; - - // Cache it - private String msgID; - - // Cache it - private Destination replyTo; - - // Cache it - private String jmsCorrelationID; - - // Cache it - private String jmsType; - - private boolean individualAck; - - private long jmsDeliveryTime; - - // Constructors -------------------------------------------------- - - /* - * Create a new message prior to sending - */ - protected HornetQMessage(final byte type, final ClientSession session) - { - message = session.createMessage(type, true, 0, System.currentTimeMillis(), (byte)4); - - } - - protected HornetQMessage(final ClientSession session) - { - this(HornetQMessage.TYPE, session); - } - - /** - * Constructor for when receiving a message from the server - */ - public HornetQMessage(final ClientMessage message, final ClientSession session) - { - this.message = message; - - readOnly = true; - - propertiesReadOnly = true; - - this.session = session; - } - - /* - * A constructor that takes a foreign message - */ - public HornetQMessage(final Message foreign, final ClientSession session) throws JMSException - { - this(foreign, HornetQMessage.TYPE, session); - } - - public HornetQMessage() - { - } - - protected HornetQMessage(final Message foreign, final byte type, final ClientSession session) throws JMSException - { - this(type, session); - - setJMSTimestamp(foreign.getJMSTimestamp()); - - String value = System.getProperty(HornetQJMSConstants.JMS_HORNETQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME); - - boolean supportBytesId = !"false".equals(value); - - if (supportBytesId) - { - try - { - byte[] corrIDBytes = foreign.getJMSCorrelationIDAsBytes(); - setJMSCorrelationIDAsBytes(corrIDBytes); - } - catch (JMSException e) - { - // specified as String - String corrIDString = foreign.getJMSCorrelationID(); - if (corrIDString != null) - { - setJMSCorrelationID(corrIDString); - } - } - } - else - { - // Some providers, like WSMQ do automatic conversions between native byte[] correlation id - // and String correlation id. This makes it impossible for HQ to guarantee to return the correct - // type as set by the user - // So we allow the behaviour to be overridden by a system property - // https://jira.jboss.org/jira/browse/HORNETQ-356 - // https://jira.jboss.org/jira/browse/HORNETQ-332 - String corrIDString = foreign.getJMSCorrelationID(); - if (corrIDString != null) - { - setJMSCorrelationID(corrIDString); - } - } - - setJMSReplyTo(foreign.getJMSReplyTo()); - setJMSDestination(foreign.getJMSDestination()); - setJMSDeliveryMode(foreign.getJMSDeliveryMode()); - setJMSExpiration(foreign.getJMSExpiration()); - setJMSPriority(foreign.getJMSPriority()); - setJMSType(foreign.getJMSType()); - - // We can't avoid a cast warning here since getPropertyNames() is on the JMS API - for (Enumeration<String> props = foreign.getPropertyNames(); props.hasMoreElements();) - { - String name = props.nextElement(); - - Object prop = foreign.getObjectProperty(name); - - setObjectProperty(name, prop); - } - } - - // javax.jmx.Message implementation ------------------------------ - - public String getJMSMessageID() - { - if (msgID == null) - { - UUID uid = message.getUserID(); - - msgID = uid == null ? null : "ID:" + uid.toString(); - } - return msgID; - } - - public void setJMSMessageID(final String jmsMessageID) throws JMSException - { - if (jmsMessageID != null && !jmsMessageID.startsWith("ID:")) - { - throw new JMSException("JMSMessageID must start with ID:"); - } - - message.setUserID(null); - - msgID = jmsMessageID; - } - - public long getJMSTimestamp() throws JMSException - { - return message.getTimestamp(); - } - - public void setJMSTimestamp(final long timestamp) throws JMSException - { - message.setTimestamp(timestamp); - } - - public byte[] getJMSCorrelationIDAsBytes() throws JMSException - { - return MessageUtil.getJMSCorrelationIDAsBytes(message); - } - - public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException - { - try - { - MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); - } - catch (ActiveMQException e) - { - JMSException ex = new JMSException(e.getMessage()); - ex.initCause(e); - throw ex; - } - } - - public void setJMSCorrelationID(final String correlationID) throws JMSException - { - MessageUtil.setJMSCorrelationID(message, correlationID); - jmsCorrelationID = correlationID; - } - - public String getJMSCorrelationID() throws JMSException - { - if (jmsCorrelationID == null) - { - jmsCorrelationID = MessageUtil.getJMSCorrelationID(message); - } - - return jmsCorrelationID; - } - - public Destination getJMSReplyTo() throws JMSException - { - if (replyTo == null) - { - - SimpleString repl = MessageUtil.getJMSReplyTo(message); - - if (repl != null) - { - replyTo = HornetQDestination.fromAddress(repl.toString()); - } - } - return replyTo; - } - - public void setJMSReplyTo(final Destination dest) throws JMSException - { - - if (dest == null) - { - MessageUtil.setJMSReplyTo(message, null); - replyTo = null; - } - else - { - if (dest instanceof HornetQDestination == false) - { - throw new InvalidDestinationException("Not a HornetQ destination " + dest); - } - - HornetQDestination jbd = (HornetQDestination)dest; - - MessageUtil.setJMSReplyTo(message, jbd.getSimpleAddress()); - - replyTo = jbd; - } - } - - public Destination getJMSDestination() throws JMSException - { - if (dest == null) - { - SimpleString sdest = message.getAddress(); - - dest = sdest == null ? null : HornetQDestination.fromAddress(sdest.toString()); - } - - return dest; - } - - public void setJMSDestination(final Destination destination) throws JMSException - { - dest = destination; - } - - public int getJMSDeliveryMode() throws JMSException - { - return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; - } - - public void setJMSDeliveryMode(final int deliveryMode) throws JMSException - { - if (deliveryMode == DeliveryMode.PERSISTENT) - { - message.setDurable(true); - } - else if (deliveryMode == DeliveryMode.NON_PERSISTENT) - { - message.setDurable(false); - } - else - { - throw HornetQJMSClientBundle.BUNDLE.illegalDeliveryMode(deliveryMode); - } - } - - public boolean getJMSRedelivered() throws JMSException - { - return message.getDeliveryCount() > 1; - } - - public void setJMSRedelivered(final boolean redelivered) throws JMSException - { - if (!redelivered) - { - message.setDeliveryCount(1); - } - else - { - if (message.getDeliveryCount() > 1) - { - // do nothing - } - else - { - message.setDeliveryCount(2); - } - } - } - - public void setJMSType(final String type) throws JMSException - { - if (type != null) - { - MessageUtil.setJMSType(message, type); - - jmsType = type; - } - } - - public String getJMSType() throws JMSException - { - if (jmsType == null) - { - jmsType = MessageUtil.getJMSType(message); - } - return jmsType; - } - - public long getJMSExpiration() throws JMSException - { - return message.getExpiration(); - } - - public void setJMSExpiration(final long expiration) throws JMSException - { - message.setExpiration(expiration); - } - - public int getJMSPriority() throws JMSException - { - return message.getPriority(); - } - - public void setJMSPriority(final int priority) throws JMSException - { - checkPriority(priority); - - message.setPriority((byte)priority); - } - - public void clearProperties() throws JMSException - { - - MessageUtil.clearProperties(message); - - propertiesReadOnly = false; - } - - public void clearBody() throws JMSException - { - readOnly = false; - } - - public boolean propertyExists(final String name) throws JMSException - { - return MessageUtil.propertyExists(message, name); - } - - public boolean getBooleanProperty(final String name) throws JMSException - { - try - { - return message.getBooleanProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public byte getByteProperty(final String name) throws JMSException - { - try - { - return message.getByteProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public short getShortProperty(final String name) throws JMSException - { - try - { - return message.getShortProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public int getIntProperty(final String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { - return message.getDeliveryCount(); - } - - try - { - return message.getIntProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public long getLongProperty(final String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { - return message.getDeliveryCount(); - } - - try - { - return message.getLongProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public float getFloatProperty(final String name) throws JMSException - { - try - { - return message.getFloatProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public double getDoubleProperty(final String name) throws JMSException - { - try - { - return message.getDoubleProperty(new SimpleString(name)); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public String getStringProperty(final String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { - return String.valueOf(message.getDeliveryCount()); - } - - try - { - if (MessageUtil.JMSXGROUPID.equals(name)) - { - return message.getStringProperty(org.apache.activemq.api.core.Message.HDR_GROUP_ID); - } - else - { - return message.getStringProperty(new SimpleString(name)); - } - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public Object getObjectProperty(final String name) throws JMSException - { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) - { - return String.valueOf(message.getDeliveryCount()); - } - - Object val = message.getObjectProperty(name); - if (val instanceof SimpleString) - { - val = ((SimpleString)val).toString(); - } - return val; - } - - @SuppressWarnings("rawtypes") - @Override - public Enumeration getPropertyNames() throws JMSException - { - return Collections.enumeration(MessageUtil.getPropertyNames(message)); - } - - public void setBooleanProperty(final String name, final boolean value) throws JMSException - { - checkProperty(name); - - message.putBooleanProperty(new SimpleString(name), value); - } - - public void setByteProperty(final String name, final byte value) throws JMSException - { - checkProperty(name); - message.putByteProperty(new SimpleString(name), value); - } - - public void setShortProperty(final String name, final short value) throws JMSException - { - checkProperty(name); - message.putShortProperty(new SimpleString(name), value); - } - - public void setIntProperty(final String name, final int value) throws JMSException - { - checkProperty(name); - message.putIntProperty(new SimpleString(name), value); - } - - public void setLongProperty(final String name, final long value) throws JMSException - { - checkProperty(name); - message.putLongProperty(new SimpleString(name), value); - } - - public void setFloatProperty(final String name, final float value) throws JMSException - { - checkProperty(name); - message.putFloatProperty(new SimpleString(name), value); - } - - public void setDoubleProperty(final String name, final double value) throws JMSException - { - checkProperty(name); - message.putDoubleProperty(new SimpleString(name), value); - } - - public void setStringProperty(final String name, final String value) throws JMSException - { - checkProperty(name); - - if (MessageUtil.JMSXGROUPID.equals(name)) - { - message.putStringProperty(org.apache.activemq.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value)); - } - else - { - message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value)); - } - } - - public void setObjectProperty(final String name, final Object value) throws JMSException - { - if (HornetQJMSConstants.JMS_HORNETQ_OUTPUT_STREAM.equals(name)) - { - setOutputStream((OutputStream)value); - - return; - } - else if (HornetQJMSConstants.JMS_HORNETQ_SAVE_STREAM.equals(name)) - { - saveToOutputStream((OutputStream)value); - - return; - } - - checkProperty(name); - - if (HornetQJMSConstants.JMS_HORNETQ_INPUT_STREAM.equals(name)) - { - setInputStream((InputStream)value); - - return; - } - - try - { - message.putObjectProperty(new SimpleString(name), value); - } - catch (ActiveMQPropertyConversionException e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - public void acknowledge() throws JMSException - { - if (session != null) - { - try - { - if (individualAck) - { - message.individualAcknowledge(); - } - - session.commit(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - } - - @Override - public long getJMSDeliveryTime() throws JMSException - { - Long value; - try - { - value = message.getLongProperty(org.apache.activemq.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME); - } - catch (Exception e) - { - return 0; - } - - if (value == null) - { - return 0; - } - else - { - return value.longValue(); - } - } - - @Override - public void setJMSDeliveryTime(long deliveryTime) throws JMSException - { - message.putLongProperty(org.apache.activemq.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, deliveryTime); - } - - @Override - public <T> T getBody(Class<T> c) throws JMSException - { - if (isBodyAssignableTo(c)) - { - return getBodyInternal(c); - } - // XXX HORNETQ-1209 Do we need translations here? - throw new MessageFormatException("Body not assignable to " + c); - } - - @SuppressWarnings("unchecked") - protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException - { - InputStream is = ((MessageInternal)message).getBodyInputStream(); - try - { - ObjectInputStream ois = new ObjectInputStream(is); - return (T)ois.readObject(); - } - catch (Exception e) - { - throw new MessageFormatException(e.getMessage()); - } - } - - - @Override - public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c) - { - /** - * From the specs: - * <p> - * If the message is a {@code Message} (but not one of its subtypes) then this method will - * return true irrespective of the value of this parameter. - */ - return true; - } - - /** - * Helper method for {@link #isBodyAssignableTo(Class)}. - * @return true if the message has no body. - */ - protected boolean hasNoBody() - { - return message.getBodySize() == 0; - } - - // Public -------------------------------------------------------- - - public void setIndividualAcknowledge() - { - this.individualAck = true; - } - - public void resetMessageID(final String newMsgID) - { - this.msgID = newMsgID; - } - - public ClientMessage getCoreMessage() - { - return message; - } - - public void doBeforeSend() throws Exception - { - message.getBodyBuffer().resetReaderIndex(); - } - - public void checkBuffer() - { - message.getBodyBuffer(); - } - - public void doBeforeReceive() throws ActiveMQException - { - message.checkCompletion(); - - ActiveMQBuffer body = message.getBodyBuffer(); - - if (body != null) - { - body.resetReaderIndex(); - } - } - - public byte getType() - { - return HornetQMessage.TYPE; - } - - public void setInputStream(final InputStream input) throws JMSException - { - checkStream(); - if (readOnly) - { - throw HornetQJMSClientBundle.BUNDLE.messageNotWritable(); - } - - message.setBodyInputStream(input); - } - - public void setOutputStream(final OutputStream output) throws JMSException - { - checkStream(); - if (!readOnly) - { - throw new IllegalStateException("OutputStream property is only valid on received messages"); - } - - try - { - message.setOutputStream(output); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void saveToOutputStream(final OutputStream output) throws JMSException - { - checkStream(); - if (!readOnly) - { - throw new IllegalStateException("OutputStream property is only valid on received messages"); - } - - try - { - message.saveToOutputStream(output); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public boolean waitCompletionOnStream(final long timeWait) throws JMSException - { - checkStream(); - try - { - return message.waitOutputStreamCompletion(timeWait); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - @Override - public String toString() - { - StringBuffer sb = new StringBuffer("HornetQMessage["); - sb.append(getJMSMessageID()); - sb.append("]:"); - sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT"); - return sb.toString(); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - protected void checkWrite() throws JMSException - { - if (readOnly) - { - throw HornetQJMSClientBundle.BUNDLE.messageNotWritable(); - } - } - - protected void checkRead() throws JMSException - { - if (!readOnly) - { - throw HornetQJMSClientBundle.BUNDLE.messageNotReadable(); - } - } - - // Private ------------------------------------------------------------ - - private void checkStream() throws JMSException - { - if (!(message.getType() == HornetQBytesMessage.TYPE || message.getType() == HornetQStreamMessage.TYPE)) - { - throw HornetQJMSClientBundle.BUNDLE.onlyValidForByteOrStreamMessages(); - } - } - - private void checkProperty(final String name) throws JMSException - { - if (propertiesReadOnly) - { - if (name.equals(HornetQJMSConstants.JMS_HORNETQ_INPUT_STREAM)) - { - throw new MessageNotWriteableException("You cannot set the Input Stream on received messages. Did you mean " + HornetQJMSConstants.JMS_HORNETQ_OUTPUT_STREAM + - " or " + - HornetQJMSConstants.JMS_HORNETQ_SAVE_STREAM + - "?"); - } - else - { - throw HornetQJMSClientBundle.BUNDLE.messageNotWritable(); - } - } - - if (name == null) - { - throw HornetQJMSClientBundle.BUNDLE.nullArgumentNotAllowed("property"); - } - - if (name.equals("")) - { - throw new IllegalArgumentException("The name of a property must not be an empty String."); - } - - if (!isValidJavaIdentifier(name)) - { - throw HornetQJMSClientBundle.BUNDLE.invalidJavaIdentifier(name); - } - - if (HornetQMessage.reservedIdentifiers.contains(name)) - { - throw new JMSRuntimeException("The property name '" + name + "' is reserved due to selector syntax."); - } - - if (name.startsWith("JMS_HORNETQ")) - { - throw new JMSRuntimeException("The property name '" + name + "' is illegal since it starts with JMS_HORNETQ"); - } - } - - private boolean isValidJavaIdentifier(final String s) - { - if (s == null || s.length() == 0) - { - return false; - } - - char[] c = s.toCharArray(); - - if (!Character.isJavaIdentifierStart(c[0])) - { - return false; - } - - for (int i = 1; i < c.length; i++) - { - if (!Character.isJavaIdentifierPart(c[i])) - { - return false; - } - } - - return true; - } - - private void checkPriority(final int priority) throws JMSException - { - if (priority < 0 || priority > 9) - { - throw new JMSException(priority + " is not valid: priority must be between 0 and 9"); - } - } - - // Inner classes ------------------------------------------------- -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageConsumer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageConsumer.java deleted file mode 100644 index 48dbabb..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageConsumer.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.QueueReceiver; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientConsumer; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.MessageHandler; -import org.apache.activemq.api.jms.HornetQJMSConstants; - -/** - * HornetQ implementation of a JMS MessageConsumer. - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - */ -public final class HornetQMessageConsumer implements QueueReceiver, TopicSubscriber -{ - private final ClientConsumer consumer; - - private MessageListener listener; - - private MessageHandler coreListener; - - private final HornetQConnection connection; - - private final HornetQSession session; - - private final int ackMode; - - private final boolean noLocal; - - private final HornetQDestination destination; - - private final String selector; - - private final SimpleString autoDeleteQueueName; - - // Constructors -------------------------------------------------- - - protected HornetQMessageConsumer(final HornetQConnection connection, - final HornetQSession session, - final ClientConsumer consumer, - final boolean noLocal, - final HornetQDestination destination, - final String selector, - final SimpleString autoDeleteQueueName) throws JMSException - { - this.connection = connection; - - this.session = session; - - this.consumer = consumer; - - ackMode = session.getAcknowledgeMode(); - - this.noLocal = noLocal; - - this.destination = destination; - - this.selector = selector; - - this.autoDeleteQueueName = autoDeleteQueueName; - } - - // MessageConsumer implementation -------------------------------- - - public String getMessageSelector() throws JMSException - { - checkClosed(); - - return selector; - } - - public MessageListener getMessageListener() throws JMSException - { - checkClosed(); - - return listener; - } - - public void setMessageListener(final MessageListener listener) throws JMSException - { - this.listener = listener; - - coreListener = listener == null ? null : new JMSMessageListenerWrapper(connection, session, consumer, listener, ackMode); - - try - { - consumer.setMessageHandler(coreListener); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public Message receive() throws JMSException - { - return getMessage(0, false); - } - - public Message receive(final long timeout) throws JMSException - { - return getMessage(timeout, false); - } - - public Message receiveNoWait() throws JMSException - { - return getMessage(0, true); - } - - public void close() throws JMSException - { - try - { - consumer.close(); - - if (autoDeleteQueueName != null) - { - // If non durable subscriber need to delete subscription too - session.deleteQueue(autoDeleteQueueName); - } - - session.removeConsumer(this); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - // QueueReceiver implementation ---------------------------------- - - public Queue getQueue() throws JMSException - { - checkClosed(); - - return (Queue)destination; - } - - // TopicSubscriber implementation -------------------------------- - - public Topic getTopic() throws JMSException - { - checkClosed(); - - return (Topic)destination; - } - - public boolean getNoLocal() throws JMSException - { - checkClosed(); - - return noLocal; - } - - // Public -------------------------------------------------------- - - @Override - public String toString() - { - return "HornetQMessageConsumer[" + consumer + "]"; - } - - public boolean isClosed() - { - return consumer.isClosed(); - } - - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - private void checkClosed() throws JMSException - { - if (consumer.isClosed() || session.getCoreSession().isClosed()) - { - throw new IllegalStateException("Consumer is closed"); - } - } - - private HornetQMessage getMessage(final long timeout, final boolean noWait) throws JMSException - { - try - { - ClientMessage coreMessage; - - if (noWait) - { - coreMessage = consumer.receiveImmediate(); - } - else - { - coreMessage = consumer.receive(timeout); - } - - HornetQMessage jmsMsg = null; - - if (coreMessage != null) - { - boolean needSession = - ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE; - jmsMsg = HornetQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null); - - jmsMsg.doBeforeReceive(); - - // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered - // https://issues.jboss.org/browse/JBPAPP-6110 - if (session.getAcknowledgeMode() == HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) - { - jmsMsg.setIndividualAcknowledge(); - } - else - { - coreMessage.acknowledge(); - } - } - - return jmsMsg; - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageProducer.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageProducer.java deleted file mode 100644 index 6f083ab..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQMessageProducer.java +++ /dev/null @@ -1,601 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.BytesMessage; -import javax.jms.CompletionListener; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueSender; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientProducer; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.utils.UUID; -import org.apache.activemq.utils.UUIDGenerator; -/** - * HornetQ implementation of a JMS MessageProducer. - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - */ -public class HornetQMessageProducer implements MessageProducer, QueueSender, TopicPublisher -{ - private final HornetQConnection connection; - - private final SimpleString connID; - - private final ClientProducer clientProducer; - private final ClientSession clientSession; - - private boolean disableMessageID = false; - - private boolean disableMessageTimestamp = false; - - private int defaultPriority = Message.DEFAULT_PRIORITY; - private long defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; - private int defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; - private long defaultDeliveryDelay = Message.DEFAULT_DELIVERY_DELAY; - - private final HornetQDestination defaultDestination; - // Constructors -------------------------------------------------- - - protected HornetQMessageProducer(final HornetQConnection connection, final ClientProducer producer, - final HornetQDestination defaultDestination, final ClientSession clientSession) throws JMSException - { - this.connection = connection; - - connID = connection.getClientID() != null ? new SimpleString(connection.getClientID()) : connection.getUID(); - - this.clientProducer = producer; - - this.defaultDestination = defaultDestination; - - this.clientSession = clientSession; - } - - // MessageProducer implementation -------------------------------- - - public void setDisableMessageID(final boolean value) throws JMSException - { - checkClosed(); - - disableMessageID = value; - } - - public boolean getDisableMessageID() throws JMSException - { - checkClosed(); - - return disableMessageID; - } - - public void setDisableMessageTimestamp(final boolean value) throws JMSException - { - checkClosed(); - - disableMessageTimestamp = value; - } - - public boolean getDisableMessageTimestamp() throws JMSException - { - checkClosed(); - - return disableMessageTimestamp; - } - - public void setDeliveryMode(final int deliveryMode) throws JMSException - { - checkClosed(); - if (deliveryMode != DeliveryMode.NON_PERSISTENT && deliveryMode != DeliveryMode.PERSISTENT) - { - throw HornetQJMSClientBundle.BUNDLE.illegalDeliveryMode(deliveryMode); - } - - defaultDeliveryMode = deliveryMode; - } - - public int getDeliveryMode() throws JMSException - { - checkClosed(); - - return defaultDeliveryMode; - } - - public void setPriority(final int defaultPriority) throws JMSException - { - checkClosed(); - - if (defaultPriority < 0 || defaultPriority > 9) - { - throw new JMSException("Illegal priority value: " + defaultPriority); - } - - this.defaultPriority = defaultPriority; - } - - public int getPriority() throws JMSException - { - checkClosed(); - - return defaultPriority; - } - - public void setTimeToLive(final long timeToLive) throws JMSException - { - checkClosed(); - - defaultTimeToLive = timeToLive; - } - - public long getTimeToLive() throws JMSException - { - checkClosed(); - - return defaultTimeToLive; - } - - public Destination getDestination() throws JMSException - { - checkClosed(); - - return defaultDestination; - } - - public void close() throws JMSException - { - connection.getThreadAwareContext().assertNotCompletionListenerThread(); - try - { - clientProducer.close(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - public void send(final Message message) throws JMSException - { - checkDefaultDestination(); - doSendx(defaultDestination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, null); - } - - public void send(final Message message, - final int deliveryMode, - final int priority, final long timeToLive) throws JMSException - { - checkDefaultDestination(); - doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, null); - } - - public void send(final Destination destination, final Message message) throws JMSException - { - send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive); - } - - public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, - final long timeToLive) throws JMSException - { - checkClosed(); - - checkDestination(destination); - - doSendx((HornetQDestination)destination, message, deliveryMode, priority, timeToLive, null); - } - - @Override - public void setDeliveryDelay(long deliveryDelay) throws JMSException - { - this.defaultDeliveryDelay = deliveryDelay; - } - - @Override - public long getDeliveryDelay() throws JMSException - { - return defaultDeliveryDelay; - } - - @Override - public void send(Message message, CompletionListener completionListener) throws JMSException - { - send(message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, completionListener); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive, - CompletionListener completionListener) throws JMSException - { - checkCompletionListener(completionListener); - checkDefaultDestination(); - doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, completionListener); - } - - @Override - public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException - { - send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive, completionListener); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, - CompletionListener completionListener) throws JMSException - { - checkClosed(); - - checkCompletionListener(completionListener); - - checkDestination(destination); - - doSendx((HornetQDestination)destination, message, deliveryMode, priority, timeToLive, completionListener); - } - - // TopicPublisher Implementation --------------------------------- - - public Topic getTopic() throws JMSException - { - return (Topic)getDestination(); - } - - public void publish(final Message message) throws JMSException - { - send(message); - } - - public void publish(final Topic topic, final Message message) throws JMSException - { - send(topic, message); - } - - public void publish(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException - { - send(message, deliveryMode, priority, timeToLive); - } - - public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, - final long timeToLive) throws JMSException - { - checkDestination(topic); - doSendx((HornetQDestination)topic, message, deliveryMode, priority, timeToLive, null); - } - - // QueueSender Implementation ------------------------------------ - - public void send(final Queue queue, final Message message) throws JMSException - { - send((Destination)queue, message); - } - - public void send(final Queue queue, final Message message, final int deliveryMode, final int priority, - final long timeToLive) throws JMSException - { - checkDestination(queue); - doSendx((HornetQDestination)queue, message, deliveryMode, priority, timeToLive, null); - } - - public Queue getQueue() throws JMSException - { - return (Queue)getDestination(); - } - - // Public -------------------------------------------------------- - - @Override - public String toString() - { - return "HornetQMessageProducer->" + clientProducer; - } - - /** - * Check if the default destination has been set - */ - private void checkDefaultDestination() - { - if (defaultDestination == null) - { - throw new UnsupportedOperationException("Cannot specify destination if producer has a default destination"); - } - } - - /** - * Check if the destination is sent correctly - */ - private void checkDestination(Destination destination) throws InvalidDestinationException - { - if (destination != null && !(destination instanceof HornetQDestination)) - { - throw new InvalidDestinationException("Not a HornetQ Destination:" + destination); - } - if (destination != null && defaultDestination != null) - { - throw new UnsupportedOperationException("Cannot specify destination if producer has a default destination"); - } - if (destination == null) - { - throw HornetQJMSClientBundle.BUNDLE.nullTopic(); - } - } - - private void checkCompletionListener(CompletionListener completionListener) - { - if (completionListener == null) - { - throw HornetQJMSClientBundle.BUNDLE.nullArgumentNotAllowed("CompletionListener"); - } - } - - - private void doSendx(HornetQDestination destination, final Message jmsMessage, final int deliveryMode, - final int priority, final long timeToLive, - CompletionListener completionListener) throws JMSException - { - - jmsMessage.setJMSDeliveryMode(deliveryMode); - - jmsMessage.setJMSPriority(priority); - - - if (timeToLive == 0) - { - jmsMessage.setJMSExpiration(0); - } - else - { - jmsMessage.setJMSExpiration(System.currentTimeMillis() + timeToLive); - } - - if (!disableMessageTimestamp) - { - jmsMessage.setJMSTimestamp(System.currentTimeMillis()); - } - else - { - jmsMessage.setJMSTimestamp(0); - } - - SimpleString address = null; - - if (destination == null) - { - if (defaultDestination == null) - { - throw new UnsupportedOperationException("Destination must be specified on send with an anonymous producer"); - } - - destination = defaultDestination; - } - else - { - if (defaultDestination != null) - { - if (!destination.equals(defaultDestination)) - { - throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is " - + "specified in the arguments to the send, " - + "these destinations must be equal"); - } - } - - address = destination.getSimpleAddress(); - - if (!connection.containsKnownDestination(address)) - { - try - { - ClientSession.AddressQuery query = clientSession.addressQuery(address); - if (!query.isExists()) - { - throw new InvalidDestinationException("Destination " + address + " does not exist"); - } - else - { - connection.addKnownDestination(address); - } - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - } - - HornetQMessage hqJmsMessage; - - boolean foreign = false; - - // First convert from foreign message if appropriate - if (!(jmsMessage instanceof HornetQMessage)) - { - // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client, - // a message whose implementation is not one of its own. - - if (jmsMessage instanceof BytesMessage) - { - hqJmsMessage = new HornetQBytesMessage((BytesMessage)jmsMessage, clientSession); - } - else if (jmsMessage instanceof MapMessage) - { - hqJmsMessage = new HornetQMapMessage((MapMessage)jmsMessage, clientSession); - } - else if (jmsMessage instanceof ObjectMessage) - { - hqJmsMessage = new HornetQObjectMessage((ObjectMessage)jmsMessage, clientSession); - } - else if (jmsMessage instanceof StreamMessage) - { - hqJmsMessage = new HornetQStreamMessage((StreamMessage)jmsMessage, clientSession); - } - else if (jmsMessage instanceof TextMessage) - { - hqJmsMessage = new HornetQTextMessage((TextMessage)jmsMessage, clientSession); - } - else - { - hqJmsMessage = new HornetQMessage(jmsMessage, clientSession); - } - - // Set the destination on the original message - jmsMessage.setJMSDestination(destination); - - foreign = true; - } - else - { - hqJmsMessage = (HornetQMessage)jmsMessage; - } - - if (!disableMessageID) - { - // Generate a JMS id - - UUID uid = UUIDGenerator.getInstance().generateUUID(); - - hqJmsMessage.getCoreMessage().setUserID(uid); - - hqJmsMessage.resetMessageID(null); - } - - if (foreign) - { - jmsMessage.setJMSMessageID(hqJmsMessage.getJMSMessageID()); - } - - hqJmsMessage.setJMSDestination(destination); - - try - { - hqJmsMessage.doBeforeSend(); - } - catch (Exception e) - { - JMSException je = new JMSException(e.getMessage()); - - je.initCause(e); - - throw je; - } - - if (defaultDeliveryDelay > 0) - { - hqJmsMessage.setJMSDeliveryTime(System.currentTimeMillis() + defaultDeliveryDelay); - } - - ClientMessage coreMessage = hqJmsMessage.getCoreMessage(); - coreMessage.putStringProperty(HornetQConnection.CONNECTION_ID_PROPERTY_NAME, connID); - - try - { - /** - * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper}, - * so we avoid it if we can. - */ - if (completionListener != null) - { - clientProducer.send(address, coreMessage, new CompletionListenerWrapper(completionListener, jmsMessage, this)); - } - else - { - clientProducer.send(address, coreMessage); - } - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - - private void checkClosed() throws JMSException - { - if (clientProducer.isClosed() || clientSession.isClosed()) - { - throw new IllegalStateException("Producer is closed"); - } - } - - private static final class CompletionListenerWrapper implements SendAcknowledgementHandler - { - private final CompletionListener completionListener; - private final Message jmsMessage; - private final HornetQMessageProducer producer; - - /** - * @param jmsMessage - * @param producer - */ - public CompletionListenerWrapper(CompletionListener listener, Message jmsMessage, HornetQMessageProducer producer) - { - this.completionListener = listener; - this.jmsMessage = jmsMessage; - this.producer = producer; - } - - @Override - public void sendAcknowledged(org.apache.activemq.api.core.Message clientMessage) - { - if (jmsMessage instanceof StreamMessage) - { - try - { - ((StreamMessage)jmsMessage).reset(); - } - catch (JMSException e) - { - // HORNETQ-1209 XXX ignore? - } - } - if (jmsMessage instanceof BytesMessage) - { - try - { - ((BytesMessage)jmsMessage).reset(); - } - catch (JMSException e) - { - // HORNETQ-1209 XXX ignore? - } - } - - try - { - producer.connection.getThreadAwareContext().setCurrentThread(true); - completionListener.onCompletion(jmsMessage); - } - finally - { - producer.connection.getThreadAwareContext().clearCurrentThread(true); - } - } - - @Override - public String toString() - { - return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQObjectMessage.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQObjectMessage.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQObjectMessage.java deleted file mode 100644 index e4d98e4..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQObjectMessage.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.ObjectMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.Message; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; - -/** - * HornetQ implementation of a JMS ObjectMessage. - * <br> - * Don't used ObjectMessage if you want good performance! - * <p> - * Serialization is slooooow! - * - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:atay...@redhat.com">Andy Taylor</a> - */ -public class HornetQObjectMessage extends HornetQMessage implements ObjectMessage -{ - // Constants ----------------------------------------------------- - - public static final byte TYPE = Message.OBJECT_TYPE; - - // Attributes ---------------------------------------------------- - - // keep a snapshot of the Serializable Object as a byte[] to provide Object isolation - private byte[] data; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - protected HornetQObjectMessage(final ClientSession session) - { - super(HornetQObjectMessage.TYPE, session); - } - - protected HornetQObjectMessage(final ClientMessage message, final ClientSession session) - { - super(message, session); - } - - /** - * A copy constructor for foreign JMS ObjectMessages. - */ - public HornetQObjectMessage(final ObjectMessage foreign, final ClientSession session) throws JMSException - { - super(foreign, HornetQObjectMessage.TYPE, session); - - setObject(foreign.getObject()); - } - - // Public -------------------------------------------------------- - - @Override - public byte getType() - { - return HornetQObjectMessage.TYPE; - } - - @Override - public void doBeforeSend() throws Exception - { - message.getBodyBuffer().clear(); - if (data != null) - { - message.getBodyBuffer().writeInt(data.length); - message.getBodyBuffer().writeBytes(data); - } - - super.doBeforeSend(); - } - - @Override - public void doBeforeReceive() throws ActiveMQException - { - super.doBeforeReceive(); - try - { - int len = message.getBodyBuffer().readInt(); - data = new byte[len]; - message.getBodyBuffer().readBytes(data); - } - catch (Exception e) - { - data = null; - } - - } - - // ObjectMessage implementation ---------------------------------- - - public void setObject(final Serializable object) throws JMSException - { - checkWrite(); - - if (object != null) - { - try - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - - ObjectOutputStream oos = new ObjectOutputStream(baos); - - oos.writeObject(object); - - oos.flush(); - - data = baos.toByteArray(); - } - catch (Exception e) - { - JMSException je = new JMSException("Failed to serialize object"); - je.setLinkedException(e); - je.initCause(e); - throw je; - } - } - } - - // lazy deserialize the Object the first time the client requests it - public Serializable getObject() throws JMSException - { - if (data == null || data.length == 0) - { - return null; - } - - try - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - ObjectInputStream ois = new org.apache.activemq.utils.ObjectInputStreamWithClassLoader(bais); - Serializable object = (Serializable)ois.readObject(); - return object; - } - catch (Exception e) - { - JMSException je = new JMSException(e.getMessage()); - je.setStackTrace(e.getStackTrace()); - throw je; - } - } - - @Override - public void clearBody() throws JMSException - { - super.clearBody(); - - data = null; - } - - @Override - protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException - { - try - { - return (T)getObject(); - } - catch (JMSException e) - { - throw new MessageFormatException("Deserialization error on HornetQObjectMessage"); - } - } - - @Override - public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") - Class c) - { - if (data == null) // we have no body - return true; - try - { - return Serializable.class == c || Object.class == c || c.isInstance(getObject()); - } - catch (JMSException e) - { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueue.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueue.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueue.java deleted file mode 100644 index 68e50cf..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueue.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.Queue; - -import org.apache.activemq.api.core.SimpleString; - -/** - * HornetQ implementation of a JMS Queue. - * <br> - * This class can be instantiated directly. - * - * @author <a href="mailto:ovi...@feodorov.com">Ovidiu Feodorov</a> - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @version <tt>$Revision: 8737 $</tt> - * - */ -public class HornetQQueue extends HornetQDestination implements Queue -{ - // Constants ----------------------------------------------------- - private static final long serialVersionUID = -1106092883162295462L; - - // Static -------------------------------------------------------- - - public static SimpleString createAddressFromName(final String name) - { - return new SimpleString(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + name); - } - - // Attributes ---------------------------------------------------- - - // Constructors -------------------------------------------------- - - public HornetQQueue(final String name) - { - super(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX + name, name, false, true, null); - } - - - - /** - * @param address - * @param name - * @param temporary - * @param session - */ - public HornetQQueue(String address, String name, boolean temporary, HornetQSession session) - { - super(address, name, temporary, true, session); - } - - public HornetQQueue(final String address, final String name) - { - super(address, name, false, true, null); - } - - // Queue implementation ------------------------------------------ - - // Public -------------------------------------------------------- - - public String getQueueName() - { - return name; - } - - @Override - public String toString() - { - return "HornetQQueue[" + name + "]"; - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueBrowser.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueBrowser.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueBrowser.java deleted file mode 100644 index 869d299..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueBrowser.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import java.util.Enumeration; -import java.util.NoSuchElementException; - -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueBrowser; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientConsumer; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientSession; - -/** - * HornetQ implementation of a JMS QueueBrowser. - * - * @author <a href="mailto:tim....@jboss.com">Tim Fox</a> - * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> - * - */ -public final class HornetQQueueBrowser implements QueueBrowser -{ - // Constants ------------------------------------------------------------------------------------ - - // Static --------------------------------------------------------------------------------------- - - // Attributes ----------------------------------------------------------------------------------- - - private final ClientSession session; - - private ClientConsumer consumer; - - private final HornetQQueue queue; - - private SimpleString filterString; - - // Constructors --------------------------------------------------------------------------------- - - protected HornetQQueueBrowser(final HornetQQueue queue, final String messageSelector, final ClientSession session) throws JMSException - { - this.session = session; - this.queue = queue; - if (messageSelector != null) - { - filterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(messageSelector)); - } - } - - // QueueBrowser implementation ------------------------------------------------------------------- - - public void close() throws JMSException - { - if (consumer != null) - { - try - { - consumer.close(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - } - } - - public Enumeration getEnumeration() throws JMSException - { - try - { - close(); - - consumer = session.createConsumer(queue.getSimpleAddress(), filterString, true); - - return new BrowserEnumeration(); - } - catch (ActiveMQException e) - { - throw JMSExceptionHelper.convertFromHornetQException(e); - } - - } - - public String getMessageSelector() throws JMSException - { - return filterString == null ? null : filterString.toString(); - } - - public Queue getQueue() throws JMSException - { - return queue; - } - - // Public --------------------------------------------------------------------------------------- - - @Override - public String toString() - { - return "HornetQQueueBrowser->" + consumer; - } - - // Package protected ---------------------------------------------------------------------------- - - // Protected ------------------------------------------------------------------------------------ - - // Private -------------------------------------------------------------------------------------- - - // Inner classes -------------------------------------------------------------------------------- - - private final class BrowserEnumeration implements Enumeration<HornetQMessage> - { - ClientMessage current = null; - - public boolean hasMoreElements() - { - if (current == null) - { - try - { - current = consumer.receiveImmediate(); - } - catch (ActiveMQException e) - { - return false; - } - } - return current != null; - } - - public HornetQMessage nextElement() - { - HornetQMessage msg; - if (hasMoreElements()) - { - ClientMessage next = current; - current = null; - msg = HornetQMessage.createMessage(next, session); - try - { - msg.doBeforeReceive(); - } - catch (Exception e) - { - HornetQJMSClientLogger.LOGGER.errorCreatingMessage(e); - - return null; - } - return msg; - } - else - { - throw new NoSuchElementException(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueConnectionFactory.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueConnectionFactory.java deleted file mode 100644 index 68ddd1b..0000000 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/HornetQQueueConnectionFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.jms.client; - -import javax.jms.QueueConnectionFactory; - -import org.apache.activemq.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.api.jms.JMSFactoryType; - -/** - * A class that represents a QueueConnectionFactory. - * - * @author <a href="mailto:h...@redhat.com">Howard Gao</a> - */ -public class HornetQQueueConnectionFactory extends HornetQConnectionFactory implements QueueConnectionFactory -{ - private static final long serialVersionUID = 5312455021322463546L; - - /** - * - */ - public HornetQQueueConnectionFactory() - { - super(); - } - - /** - * @param serverLocator - */ - public HornetQQueueConnectionFactory(ServerLocator serverLocator) - { - super(serverLocator); - } - - /** - * @param ha - * @param groupConfiguration - */ - public HornetQQueueConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration) - { - super(ha, groupConfiguration); - } - - /** - * @param ha - * @param initialConnectors - */ - public HornetQQueueConnectionFactory(boolean ha, TransportConfiguration... initialConnectors) - { - super(ha, initialConnectors); - } - - public int getFactoryType() - { - return JMSFactoryType.QUEUE_CF.intValue(); - } -}