Support Headers and refactor XXXMessage & MessageConverter
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/37576dd4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/37576dd4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/37576dd4 Branch: refs/heads/jms-dev-1.1.0 Commit: 37576dd4990ff8e10ee0b91e013cca6b3f5835a2 Parents: ec4228e Author: zhangke <zhangke_beij...@qq.com> Authored: Thu Feb 23 17:55:47 2017 +0800 Committer: zhangke <zhangke_beij...@qq.com> Committed: Thu Feb 23 17:55:47 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/rocketmq/jms/Constant.java | 15 +- .../rocketmq/jms/DeliverMessageService.java | 18 +- .../org/apache/rocketmq/jms/JMSHeaderEnum.java | 41 ++ .../rocketmq/jms/JMSMessageModelEnum.java | 52 ++ .../apache/rocketmq/jms/RocketMQProducer.java | 130 ++--- .../apache/rocketmq/jms/RocketMQSession.java | 18 +- .../org/apache/rocketmq/jms/RocketMQTopic.java | 28 +- .../apache/rocketmq/jms/SendMessageHook.java | 27 ++ .../rocketmq/jms/msg/AbstractJMSMessage.java | 400 +++++++++++++++ .../rocketmq/jms/msg/JMSBytesMessage.java | 484 +++++++++++++++++++ .../apache/rocketmq/jms/msg/JMSMapMessage.java | 229 +++++++++ .../rocketmq/jms/msg/JMSObjectMessage.java | 61 +++ .../apache/rocketmq/jms/msg/JMSTextMessage.java | 66 +++ .../rocketmq/jms/msg/RocketMQBytesMessage.java | 457 ----------------- .../rocketmq/jms/msg/RocketMQMapMessage.java | 210 -------- .../rocketmq/jms/msg/RocketMQMessage.java | 431 ----------------- .../rocketmq/jms/msg/RocketMQObjectMessage.java | 40 -- .../rocketmq/jms/msg/RocketMQTextMessage.java | 47 -- .../jms/msg/serialize/MapSerialize.java | 33 ++ .../jms/msg/serialize/ObjectSerialize.java | 59 +++ .../rocketmq/jms/msg/serialize/Serialize.java | 29 ++ .../jms/msg/serialize/StringSerialize.java | 52 ++ .../rocketmq/jms/support/MessageConverter.java | 294 +++-------- .../rocketmq/jms/msg/JmsMapMessageTest.java | 70 +++ .../jms/msg/RocketMQBytesMessageTest.java | 20 +- .../jms/msg/RocketMQMapMessageTest.java | 70 --- .../jms/msg/RocketMQObjectMessageTest.java | 4 +- .../jms/msg/RocketMQTextMessageTest.java | 4 +- .../jms/support/MessageConvertTest.java | 13 +- 29 files changed, 1777 insertions(+), 1625 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/Constant.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/Constant.java b/core/src/main/java/org/apache/rocketmq/jms/Constant.java index 332ba1a..9519bea 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/Constant.java +++ b/core/src/main/java/org/apache/rocketmq/jms/Constant.java @@ -25,19 +25,6 @@ public interface Constant { boolean DEFAULT_DURABLE = false; - //------------------------JMS message header constant--------------------------------- - String JMS_DESTINATION = "jmsDestination"; - String JMS_DELIVERY_MODE = "jmsDeliveryMode"; - String JMS_EXPIRATION = "jmsExpiration"; - String JMS_DELIVERY_TIME = "jmsDeliveryTime"; - String JMS_PRIORITY = "jmsPriority"; - String JMS_MESSAGE_ID = "jmsMessageID"; - String JMS_TIMESTAMP = "jmsTimestamp"; - String JMS_CORRELATION_ID = "jmsCorrelationID"; - String JMS_REPLY_TO = "jmsReplyTo"; - String JMS_TYPE = "jmsType"; - String JMS_REDELIVERED = "jmsRedelivered"; - //-------------------------JMS defined properties constant---------------------------- /** * The identity of the user sending the Send message @@ -90,4 +77,6 @@ public interface Constant { * Default Jms Type */ String DEFAULT_JMS_TYPE = "rocketmq"; + + String MESSAGE_ID_PREFIX = "ID:"; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java index b25a339..da8196f 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java @@ -17,13 +17,6 @@ package org.apache.rocketmq.jms; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; -import org.apache.rocketmq.client.consumer.PullResult; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +31,13 @@ import javax.jms.JMSRuntimeException; import javax.jms.Message; import javax.jms.MessageConsumer; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.jms.support.JmsHelper; import org.apache.rocketmq.jms.support.MessageConverter; import org.slf4j.Logger; @@ -184,6 +184,10 @@ public class DeliverMessageService extends ServiceThread { */ private void handleMessage(MessageExt msg, MessageQueue mq) throws InterruptedException, JMSException { Message jmsMessage = MessageConverter.convert2JMSMessage(msg); + if (jmsMessage.getJMSExpiration() != 0 && System.currentTimeMillis() > jmsMessage.getJMSExpiration()) { + log.debug("The message[id={}] has been expired", msg.getMsgId()); + return; + } final MessageWrapper wrapper = new MessageWrapper(jmsMessage, this.consumer, mq, msg.getQueueOffset()); switch (this.consumeModel) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java new file mode 100644 index 0000000..a9c758e --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +public enum JMSHeaderEnum { + + JMSDestination, + JMSDeliveryMode, + JMSMessageID, + JMSTimestamp, + JMSCorrelationID, + JMSReplyTo, + JMSRedelivered, + JMSType, + JMSExpiration, + JMSPriority, + JMSDeliveryTime; + + public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = 0; + public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0; + public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false; + public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0; + public static final int JMS_PRIORITY_DEFAULT_VALUE = 5; + public static final int JMS_DELIVERY_TIME_DEFAULT_VALUE = 0; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java new file mode 100644 index 0000000..0659f92 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; + +public enum JMSMessageModelEnum { + BYTE(JMSBytesMessage.class), + MAP(JMSMapMessage.class), + OBJECT(JMSObjectMessage.class), + STRING(JMSTextMessage.class); + + public static final String MSG_MODEL_NAME = "MsgModel"; + + private Class jmsClass; + + JMSMessageModelEnum(Class jmsClass) { + this.jmsClass = jmsClass; + } + + public static JMSMessageModelEnum toMsgModelEnum(Class clazz) { + for (JMSMessageModelEnum e : values()) { + if (e.getJmsClass() == clazz) { + return e; + } + } + + throw new IllegalArgumentException(String.format("Not supported class[%s]", clazz)); + } + + public Class getJmsClass() { + return jmsClass; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java index 1c0b7d1..8cc5903 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java @@ -17,11 +17,6 @@ package org.apache.rocketmq.jms; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; import java.util.UUID; import javax.jms.CompletionListener; import javax.jms.Destination; @@ -29,8 +24,13 @@ import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.jms.Message; import javax.jms.MessageProducer; -import org.apache.rocketmq.jms.msg.RocketMQMessage; -import org.apache.rocketmq.jms.support.JmsHelper; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; import org.apache.rocketmq.jms.support.MessageConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,27 +47,30 @@ import static org.apache.rocketmq.jms.Constant.JMS_EXPIRATION; import static org.apache.rocketmq.jms.Constant.JMS_PRIORITY; import static org.apache.rocketmq.jms.Constant.JMS_TIMESTAMP; import static org.apache.rocketmq.jms.Constant.JMS_TYPE; +import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX; +import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Object; public class RocketMQProducer implements MessageProducer { private static final Logger log = LoggerFactory.getLogger(RocketMQProducer.class); - private RocketMQSession session; - - private final DefaultMQProducer mqProducer; - + private final DefaultMQProducer rocketMQProducer; private Destination destination; + private boolean disableMessageID; + private boolean disableMessageTimestamp; + private long timeToLive; + public RocketMQProducer(RocketMQSession session, Destination destination) { this.session = session; this.destination = destination; - this.mqProducer = new DefaultMQProducer(UUID.randomUUID().toString()); + this.rocketMQProducer = new DefaultMQProducer(UUID.randomUUID().toString()); ClientConfig clientConfig = this.session.getConnection().getClientConfig(); - this.mqProducer.setNamesrvAddr(clientConfig.getNamesrvAddr()); - this.mqProducer.setInstanceName(clientConfig.getInstanceName()); + this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr()); + this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName()); try { - this.mqProducer.start(); + this.rocketMQProducer.start(); } catch (MQClientException e) { throw new JMSRuntimeException(format("Fail to start producer, error msg:%s", getStackTrace(e))); @@ -76,24 +79,22 @@ public class RocketMQProducer implements MessageProducer { @Override public void setDisableMessageID(boolean value) throws JMSException { - //todo + this.disableMessageID = value; } @Override public boolean getDisableMessageID() throws JMSException { - //todo - return false; + return this.disableMessageID; } @Override public void setDisableMessageTimestamp(boolean value) throws JMSException { - //todo + this.disableMessageTimestamp = value; } @Override public boolean getDisableMessageTimestamp() throws JMSException { - //todo - return false; + return this.disableMessageTimestamp; } @Override @@ -120,13 +121,12 @@ public class RocketMQProducer implements MessageProducer { @Override public void setTimeToLive(long timeToLive) throws JMSException { - //todo + this.timeToLive = timeToLive; } @Override public long getTimeToLive() throws JMSException { - //todo - return 0; + return this.getTimeToLive(); } @Override @@ -148,7 +148,7 @@ public class RocketMQProducer implements MessageProducer { @Override public void close() throws JMSException { - this.mqProducer.shutdown(); + this.rocketMQProducer.shutdown(); } @Override @@ -170,23 +170,12 @@ public class RocketMQProducer implements MessageProducer { @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - String topicName = JmsHelper.getTopicName(destination); - - org.apache.rocketmq.common.message.Message rmqMsg = createRmqMessage(message, topicName); - - sendSync(rmqMsg); - } - private void sendSync(org.apache.rocketmq.common.message.Message rmqMsg) throws JMSException { - SendResult sendResult; + before(message); - try { - sendResult = mqProducer.send(rmqMsg); - } - catch (Exception e) { - throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); - } + MessageExt rmqMsg = createRocketMQMessage(message); + SendResult sendResult = sendSync(rmqMsg); if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { log.debug("Success to send message[key={}]", rmqMsg.getKeys()); return; @@ -196,29 +185,35 @@ public class RocketMQProducer implements MessageProducer { } } + private SendResult sendSync(org.apache.rocketmq.common.message.Message rmqMsg) throws JMSException { + + try { + return rocketMQProducer.send(rmqMsg); + } + catch (Exception e) { + throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); + } + } + private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg, CompletionListener completionListener) throws JMSException { try { - mqProducer.send(rmqMsg, new SendCompletionListener(completionListener)); + rocketMQProducer.send(rmqMsg, new SendCompletionListener(completionListener)); } catch (Exception e) { throw new JMSException(format("Fail to send message. Error: %s", getStackTrace(e))); } } - private org.apache.rocketmq.common.message.Message createRmqMessage(Message message, - String topicName) throws JMSException { - RocketMQMessage jmsMsg = (RocketMQMessage) message; + private MessageExt createRocketMQMessage(Message message) throws JMSException { + AbstractJMSMessage jmsMsg = convert2Object(message, AbstractJMSMessage.class); initJMSHeaders(jmsMsg, destination); - org.apache.rocketmq.common.message.Message rmqMsg = null; try { - rmqMsg = MessageConverter.convert2RMQMessage(jmsMsg); + return MessageConverter.convert2RMQMessage(jmsMsg); } catch (Exception e) { throw new JMSException(format("Fail to convert to RocketMQ message. Error: %s", getStackTrace(e))); } - - return rmqMsg; } /** @@ -226,26 +221,26 @@ public class RocketMQProducer implements MessageProducer { * <p/> * <P>JMS providers init message's headers. Do not allow user to set these by yourself. * - * @param rmqJmsMsg message + * @param jmsMsg message * @param destination * @throws javax.jms.JMSException * @see <CODE>Destination</CODE> */ - private void initJMSHeaders(RocketMQMessage rmqJmsMsg, Destination destination) throws JMSException { + private void initJMSHeaders(AbstractJMSMessage jmsMsg, Destination destination) throws JMSException { //JMS_DESTINATION default:"topic:message" - rmqJmsMsg.setHeader(JMS_DESTINATION, destination); + jmsMsg.setHeader(JMS_DESTINATION, destination); //JMS_DELIVERY_MODE default : PERSISTENT - rmqJmsMsg.setHeader(JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE); + jmsMsg.setHeader(JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE); //JMS_TIMESTAMP default : current time - rmqJmsMsg.setHeader(JMS_TIMESTAMP, System.currentTimeMillis()); + jmsMsg.setHeader(JMS_TIMESTAMP, System.currentTimeMillis()); //JMS_EXPIRATION default : 3 days //JMS_EXPIRATION = currentTime + time_to_live - rmqJmsMsg.setHeader(JMS_EXPIRATION, System.currentTimeMillis() + DEFAULT_TIME_TO_LIVE); + jmsMsg.setHeader(JMS_EXPIRATION, System.currentTimeMillis() + DEFAULT_TIME_TO_LIVE); //JMS_PRIORITY default : 4 - rmqJmsMsg.setHeader(JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY); + jmsMsg.setHeader(JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY); //JMS_TYPE default : ons(open notification service) - rmqJmsMsg.setHeader(JMS_TYPE, DEFAULT_JMS_TYPE); + jmsMsg.setHeader(JMS_TYPE, DEFAULT_JMS_TYPE); //JMS_REPLY_TO,JMS_CORRELATION_ID default : null //JMS_MESSAGE_ID is set by sendResult. //JMS_REDELIVERED is set by broker. @@ -271,11 +266,32 @@ public class RocketMQProducer implements MessageProducer { @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - String topicName = JmsHelper.getTopicName(destination); - org.apache.rocketmq.common.message.Message rmqMsg = createRmqMessage(message, topicName); + before(message); + + MessageExt rmqMsg = createRocketMQMessage(message); sendAsync(rmqMsg, completionListener); } + private void before(Message message) throws JMSException { + // timestamp + if (!getDisableMessageTimestamp()) { + message.setJMSTimestamp(System.currentTimeMillis()); + } + + // messageID is also required in async model, so {@link MessageExt#getMsgId()} can't be used. + if (!getDisableMessageID()) { + message.setJMSMessageID(new StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().getLeastSignificantBits()).toString()); + } + + // expiration + if (getTimeToLive() != 0) { + message.setJMSExpiration(System.currentTimeMillis() + getTimeToLive()); + } + else { + message.setJMSExpiration(0l); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java index 954f80a..c14c85d 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java @@ -43,9 +43,9 @@ import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; -import org.apache.rocketmq.jms.msg.RocketMQBytesMessage; -import org.apache.rocketmq.jms.msg.RocketMQObjectMessage; -import org.apache.rocketmq.jms.msg.RocketMQTextMessage; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSObjectMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +84,7 @@ public class RocketMQSession implements Session { @Override public BytesMessage createBytesMessage() throws JMSException { - return new RocketMQBytesMessage(); + return new JMSBytesMessage(); } @Override @@ -95,17 +95,17 @@ public class RocketMQSession implements Session { @Override public Message createMessage() throws JMSException { - return new RocketMQBytesMessage(); + return new JMSBytesMessage(); } @Override public ObjectMessage createObjectMessage() throws JMSException { - return new RocketMQObjectMessage(); + return new JMSObjectMessage(); } @Override public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { - return new RocketMQObjectMessage(serializable); + return new JMSObjectMessage(serializable); } @Override @@ -116,12 +116,12 @@ public class RocketMQSession implements Session { @Override public TextMessage createTextMessage() throws JMSException { - return new RocketMQTextMessage(); + return new JMSTextMessage(); } @Override public TextMessage createTextMessage(String text) throws JMSException { - return new RocketMQTextMessage(text); + return new JMSTextMessage(text); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java index 5fe1448..c1ff87b 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java @@ -17,28 +17,18 @@ package org.apache.rocketmq.jms; -import com.google.common.base.Joiner; import javax.jms.JMSException; import javax.jms.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR; - public class RocketMQTopic implements Topic { private static final Logger log = LoggerFactory.getLogger(RocketMQTopic.class); private String name; - private String type; public RocketMQTopic(String name) { this.name = name; - this.type = NO_MESSAGE_SELECTOR; - } - - public RocketMQTopic(String name, String type) { - this.name = name; - this.type = type; } @Override @@ -46,22 +36,8 @@ public class RocketMQTopic implements Topic { return this.name; } - public String getTypeName() throws JMSException { - return this.type; - } - - public String setTypeName(String type) throws JMSException { - return this.type = type; - } - + @Override public String toString() { - String print = ""; - try { - print = Joiner.on(":").join(this.getTopicName(), this.getTypeName()); - } - catch (JMSException e) { - log.error("Exception Caught in toString, e: {}", e); - } - return print; + return this.name; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java b/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java new file mode 100644 index 0000000..0dee423 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms; + +import javax.jms.Message; + +public class SendMessageHook { + + public void before(Message message) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java new file mode 100644 index 0000000..ef47db0 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.rocketmq.jms.JMSHeaderEnum; + +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSCorrelationID; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSDeliveryMode; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSDeliveryTime; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSDestination; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSExpiration; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSMessageID; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSPriority; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSRedelivered; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSReplyTo; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSTimestamp; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSType; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_EXPIRATION_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_REDELIVERED_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_TIMESTAMP_DEFAULT_VALUE; +import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Boolean; +import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Integer; +import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Long; +import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Object; +import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2String; + +public abstract class AbstractJMSMessage implements javax.jms.Message { + + protected Map<JMSHeaderEnum, Object> headers = Maps.newHashMap(); + protected Map<String, Object> properties = Maps.newHashMap(); + + protected boolean writeOnly; + + @Override + public String getJMSMessageID() { + return convert2String(headers.get(JMSMessageID)); + } + + @Override + public void setJMSMessageID(String id) { + setHeader(JMSMessageID, id); + } + + @Override + public long getJMSTimestamp() { + if (headers.containsKey(JMSTimestamp)) { + return convert2Long(headers.get(JMSTimestamp)); + } + return JMS_TIMESTAMP_DEFAULT_VALUE; + } + + @Override + public void setJMSTimestamp(long timestamp) { + setHeader(JMSTimestamp, timestamp); + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() { + String jmsCorrelationID = getJMSCorrelationID(); + if (jmsCorrelationID != null) { + try { + return BaseEncoding.base64().decode(jmsCorrelationID); + } + catch (Exception e) { + return jmsCorrelationID.getBytes(); + } + } + return null; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] correlationID) { + String encodedText = BaseEncoding.base64().encode(correlationID); + setJMSCorrelationID(encodedText); + } + + @Override + public String getJMSCorrelationID() { + return convert2String(headers.get(JMSCorrelationID)); + } + + @Override + public void setJMSCorrelationID(String correlationID) { + setHeader(JMSCorrelationID, correlationID); + } + + @Override + public Destination getJMSReplyTo() { + return convert2Object(headers.get(JMSReplyTo), Destination.class); + } + + @Override + public void setJMSReplyTo(Destination replyTo) { + setHeader(JMSReplyTo, replyTo); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + + @Override + public Destination getJMSDestination() { + return convert2Object(headers.get(JMSDestination), Destination.class); + } + + @Override + public void setJMSDestination(Destination destination) { + setHeader(JMSDestination, destination); + } + + @SuppressWarnings("unchecked") + public abstract <T> T getBody(Class<T> clazz) throws JMSException; + + public abstract byte[] getBody() throws JMSException; + + @Override + public int getJMSDeliveryMode() { + if (headers.containsKey(JMSDeliveryMode)) { + return convert2Integer(headers.get(JMSDeliveryMode)); + } + return JMS_DELIVERY_MODE_DEFAULT_VALUE; + } + + @Override + public void setJMSDeliveryMode(int deliveryMode) { + setHeader(JMSDeliveryMode, deliveryMode); + } + + @Override + public boolean getJMSRedelivered() { + if (headers.containsKey(JMSRedelivered)) { + return convert2Boolean(headers.get(JMSRedelivered)); + } + return JMS_REDELIVERED_DEFAULT_VALUE; + } + + @Override + public void setJMSRedelivered(boolean redelivered) { + setHeader(JMSRedelivered, redelivered); + } + + @Override + public String getJMSType() { + return convert2String(headers.get(JMSType)); + } + + @Override + public void setJMSType(String type) { + setHeader(JMSType, type); + } + + public Map<JMSHeaderEnum, Object> getHeaders() { + return this.headers; + } + + @Override + public long getJMSExpiration() { + if (headers.containsKey(JMSExpiration)) { + return convert2Long(headers.get(JMSExpiration)); + } + return JMS_EXPIRATION_DEFAULT_VALUE; + } + + @Override + public void setJMSExpiration(long expiration) { + setHeader(JMSExpiration, expiration); + } + + @Override + public int getJMSPriority() { + if (headers.containsKey(JMSPriority)) { + return convert2Integer(headers.get(JMSPriority)); + } + return JMS_PRIORITY_DEFAULT_VALUE; + } + + @Override + public void setJMSPriority(int priority) { + setHeader(JMSPriority, priority); + } + + @Override + public long getJMSDeliveryTime() throws JMSException { + if (headers.containsKey(JMSDeliveryTime)) { + return convert2Long(headers.get(JMSDeliveryTime)); + } + return JMS_DELIVERY_TIME_DEFAULT_VALUE; + } + + @Override + public void setJMSDeliveryTime(long deliveryTime) throws JMSException { + setHeader(JMSDeliveryTime, deliveryTime); + } + + private void setHeader(JMSHeaderEnum name, Object value) { + this.headers.put(name, value); + } + + public Map<String, Object> getProperties() { + return this.properties; + } + + public void setProperties(Map<String, Object> properties) { + this.properties = properties; + } + + @Override + public void acknowledge() throws JMSException { + //todo + throw new UnsupportedOperationException("Unsupported!"); + } + + @Override + public void clearProperties() { + this.properties.clear(); + } + + @Override + public void clearBody() { + this.writeOnly = true; + } + + @Override + public boolean propertyExists(String name) { + return properties.containsKey(name); + } + + @Override + public boolean getBooleanProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Boolean.valueOf(value.toString()); + } + return false; + } + + @Override + public byte getByteProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Byte.valueOf(value.toString()); + } + return 0; + } + + @Override + public short getShortProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Short.valueOf(value.toString()); + } + return 0; + } + + @Override + public int getIntProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Integer.valueOf(value.toString()); + } + return 0; + } + + @Override + public long getLongProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Long.valueOf(value.toString()); + } + return 0L; + } + + @Override + public float getFloatProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Float.valueOf(value.toString()); + } + return 0f; + } + + @Override + public double getDoubleProperty(String name) throws JMSException { + if (propertyExists(name)) { + Object value = getObjectProperty(name); + return Double.valueOf(value.toString()); + } + return 0d; + } + + @Override + public String getStringProperty(String name) throws JMSException { + if (propertyExists(name)) { + return getObjectProperty(name).toString(); + } + return null; + } + + @Override + public Object getObjectProperty(String name) throws JMSException { + return this.properties.get(name); + } + + @Override + public Enumeration<?> getPropertyNames() throws JMSException { + return Collections.enumeration(this.properties.keySet()); + } + + @Override + public void setBooleanProperty(String name, boolean value) { + setObjectProperty(name, value); + } + + @Override + public void setByteProperty(String name, byte value) { + setObjectProperty(name, value); + } + + @Override + public void setShortProperty(String name, short value) { + setObjectProperty(name, value); + } + + @Override + public void setIntProperty(String name, int value) { + setObjectProperty(name, value); + } + + @Override + public void setLongProperty(String name, long value) { + setObjectProperty(name, value); + } + + public void setFloatProperty(String name, float value) { + setObjectProperty(name, value); + } + + @Override + public void setDoubleProperty(String name, double value) { + setObjectProperty(name, value); + } + + @Override + public void setStringProperty(String name, String value) { + setObjectProperty(name, value); + } + + @Override + public abstract boolean isBodyAssignableTo(Class c) throws JMSException; + + @Override + public void setObjectProperty(String name, Object value) { + if (value instanceof Number || value instanceof String || value instanceof Boolean) { + this.properties.put(name, value); + } + else { + throw new IllegalArgumentException( + "Value should be boolean, byte, short, int, long, float, double, and String."); + } + } + + protected boolean isWriteOnly() { + return writeOnly; + } + + protected void checkIsWriteOnly() throws MessageNotWriteableException { + if (!writeOnly) { + throw new MessageNotWriteableException("Not writable"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java new file mode 100644 index 0000000..1e40b40 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import javax.jms.IllegalStateRuntimeException; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.rocketmq.jms.support.JmsHelper; + +import static java.lang.String.format; + +/** + * RocketMQ ByteMessage. + */ +public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage { + + private byte[] bytesIn; + private DataInputStream dataAsInput; + + private ByteArrayOutputStream bytesOut; + private DataOutputStream dataAsOutput; + + protected boolean readOnly; + + /** + * Message created for reading + * + * @param data + */ + public JMSBytesMessage(byte[] data) { + this.bytesIn = data; + this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); + this.readOnly = true; + this.writeOnly = false; + } + + /** + * Message created to be sent + */ + public JMSBytesMessage() { + this.bytesOut = new ByteArrayOutputStream(); + this.dataAsOutput = new DataOutputStream(this.bytesOut); + this.readOnly = false; + this.writeOnly = true; + } + + @Override public byte[] getBody(Class clazz) throws JMSException { + byte[] result; + if (isBodyAssignableTo(clazz)) { + if (isWriteOnly()) { + result = bytesOut.toByteArray(); + this.reset(); + return result; + } + else if (isReadOnly()) { + result = Arrays.copyOf(bytesIn, bytesIn.length); + this.reset(); + return result; + } + else { + throw new IllegalStateRuntimeException("Message must be in write only or read only status"); + } + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + byte[] result = Arrays.copyOf(bytesIn, bytesIn.length); + this.reset(); + return result; + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return byte[].class.isAssignableFrom(c); + } + + @Override public long getBodyLength() throws JMSException { + if (isWriteOnly()) { + return bytesOut.size(); + } + else if (isReadOnly()) { + return bytesIn.length; + } + else { + throw new IllegalStateRuntimeException("Message must be in write only or read only status"); + } + } + + public boolean readBoolean() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readBoolean(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + private void checkIsReadOnly() throws MessageNotReadableException { + if (!isReadOnly()) { + throw new MessageNotReadableException("Not readable"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("No data to read"); + } + } + + public byte readByte() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readByte(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readUnsignedByte() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUnsignedByte(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public short readShort() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readShort(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readUnsignedShort() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUnsignedShort(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public char readChar() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readChar(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readInt() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readInt(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public long readLong() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readLong(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public float readFloat() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readFloat(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public double readDouble() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readDouble(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public String readUTF() throws JMSException { + checkIsReadOnly(); + + try { + return dataAsInput.readUTF(); + } + catch (IOException e) { + throw handleInputException(e); + } + } + + public int readBytes(byte[] value) throws JMSException { + checkIsReadOnly(); + + return readBytes(value, value.length); + } + + public int readBytes(byte[] value, int length) throws JMSException { + checkIsReadOnly(); + + if (length > value.length) { + throw new IndexOutOfBoundsException("length must be smaller than the length of value"); + } + if (dataAsInput == null) { + throw new MessageNotReadableException("Message is not readable! "); + } + try { + int offset = 0; + while (offset < length) { + int read = dataAsInput.read(value, offset, length - offset); + if (read < 0) { + break; + } + offset += read; + } + + if (offset == 0 && length != 0) { + return -1; + } + else { + return offset; + } + } + catch (IOException e) { + throw handleInputException(e); + } + + } + + public void writeBoolean(boolean value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeBoolean(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + private void initializeWriteIfNecessary() { + if (bytesOut == null) { + bytesOut = new ByteArrayOutputStream(); + } + if (dataAsOutput == null) { + dataAsOutput = new DataOutputStream(bytesOut); + } + } + + public void writeByte(byte value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeByte(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeShort(short value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeShort(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeChar(char value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeChar(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeInt(int value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeInt(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeLong(long value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeLong(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeFloat(float value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeFloat(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeDouble(double value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeDouble(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeUTF(String value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + try { + dataAsOutput.writeUTF(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeBytes(byte[] value, int offset, int length) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + if (dataAsOutput == null) { + throw new MessageNotWriteableException("Message is not writable! "); + } + try { + dataAsOutput.write(value, offset, length); + } + catch (IOException e) { + throw handleOutputException(e); + } + } + + public void writeObject(Object value) throws JMSException { + checkIsWriteOnly(); + initializeWriteIfNecessary(); + + JmsHelper.handleUnSupportedException(); + } + + public void reset() throws JMSException { + try { + if (bytesOut != null) { + bytesOut.reset(); + } + if (this.dataAsInput != null) { + this.dataAsInput.reset(); + } + + this.readOnly = true; + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + } + + @Override public void clearBody() { + super.clearBody(); + this.bytesOut = null; + this.dataAsOutput = null; + this.dataAsInput = null; + this.bytesIn = null; + } + + private JMSException handleOutputException(final IOException e) { + JMSException ex = new JMSException(e.getMessage()); + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + private JMSException handleInputException(final IOException e) { + JMSException ex; + if (e instanceof EOFException) { + ex = new MessageEOFException(e.getMessage()); + } + else { + ex = new MessageFormatException(e.getMessage()); + } + ex.initCause(e); + ex.setLinkedException(e); + return ex; + } + + protected boolean isReadOnly() { + return readOnly; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java new file mode 100644 index 0000000..d1dd15d --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.jms.msg.serialize.MapSerialize; + +import static java.lang.String.format; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Boolean; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Byte; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2ByteArray; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Char; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Double; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Float; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Int; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Long; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Short; +import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2String; + +/** + * Message can only be accessed by a thread at a time. + */ +public class JMSMapMessage extends AbstractJMSMessage implements MapMessage { + + private Map<String, Object> map; + + protected boolean readOnly; + + public JMSMapMessage(Map<String, Object> map) { + this.map = map; + } + + public JMSMapMessage() { + this.map = new HashMap(); + } + + @Override public Map<String, Object> getBody(Class clazz) throws JMSException { + if (isBodyAssignableTo(clazz)) { + return this.map; + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return new MapSerialize().serialize(this.map); + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return Map.class.isAssignableFrom(c); + } + + @Override public boolean getBoolean(String name) throws JMSException { + checkName(name); + + return convert2Boolean(map.get(name)); + } + + private void checkName(String name) throws JMSException { + if (StringUtils.isBlank(name)) { + throw new JMSException("Name is required"); + } + } + + @Override public byte getByte(String name) throws JMSException { + checkName(name); + + return convert2Byte(map.get(name)); + } + + @Override public short getShort(String name) throws JMSException { + checkName(name); + + return convert2Short(map.get(name)); + } + + @Override public char getChar(String name) throws JMSException { + checkName(name); + + return convert2Char(map.get(name)); + } + + @Override public int getInt(String name) throws JMSException { + checkName(name); + + return convert2Int(map.get(name)); + } + + @Override public long getLong(String name) throws JMSException { + checkName(name); + + return convert2Long(map.get(name)); + } + + @Override public float getFloat(String name) throws JMSException { + checkName(name); + + return convert2Float(map.get(name)); + } + + @Override public double getDouble(String name) throws JMSException { + checkName(name); + + return convert2Double(map.get(name)); + } + + @Override public String getString(String name) throws JMSException { + checkName(name); + + return convert2String(map.get(name)); + } + + @Override public byte[] getBytes(String name) throws JMSException { + checkName(name); + + return convert2ByteArray(map.get(name)); + } + + @Override public Object getObject(String name) throws JMSException { + checkName(name); + + return map.get(name); + } + + @Override public Enumeration getMapNames() throws JMSException { + return Collections.enumeration(map.keySet()); + } + + @Override public void setBoolean(String name, boolean value) throws JMSException { + putProperty(name, value); + } + + private void putProperty(String name, Object obj) throws JMSException { + if (isReadOnly()) { + throw new MessageNotWriteableException("Message is not writable"); + } + + checkName(name); + + map.put(name, obj); + } + + @Override public void setByte(String name, byte value) throws JMSException { + putProperty(name, value); + } + + @Override public void setShort(String name, short value) throws JMSException { + putProperty(name, value); + } + + @Override public void setChar(String name, char value) throws JMSException { + putProperty(name, value); + } + + @Override public void setInt(String name, int value) throws JMSException { + putProperty(name, value); + } + + @Override public void setLong(String name, long value) throws JMSException { + putProperty(name, value); + } + + @Override public void setFloat(String name, float value) throws JMSException { + putProperty(name, value); + } + + @Override public void setDouble(String name, double value) throws JMSException { + putProperty(name, value); + } + + @Override public void setString(String name, String value) throws JMSException { + putProperty(name, value); + } + + @Override public void setBytes(String name, byte[] value) throws JMSException { + putProperty(name, value); + } + + @Override public void setBytes(String name, byte[] value, int offset, int length) throws JMSException { + putProperty(name, value); + } + + @Override public void setObject(String name, Object value) throws JMSException { + putProperty(name, value); + } + + @Override public boolean itemExists(String name) throws JMSException { + checkName(name); + + return map.containsKey(name); + } + + @Override public void clearBody() { + super.clearBody(); + this.map.clear(); + this.readOnly = false; + } + + protected boolean isReadOnly() { + return this.readOnly; + } + + public void setReadOnly(boolean readOnly) { + this.readOnly = readOnly; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java new file mode 100644 index 0000000..239ecc7 --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.IOException; +import java.io.Serializable; +import javax.jms.JMSException; +import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; + +public class JMSObjectMessage extends AbstractJMSMessage implements javax.jms.ObjectMessage { + + private Serializable body; + + public JMSObjectMessage(Serializable object) { + this.body = object; + } + + public JMSObjectMessage() { + + } + + @Override public Serializable getBody(Class clazz) throws JMSException { + return body; + } + + @Override public byte[] getBody() throws JMSException { + try { + return ObjectSerialize.serialize(body); + } + catch (IOException e) { + throw new JMSException(e.getMessage()); + } + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return true; + } + + public Serializable getObject() throws JMSException { + return this.body; + } + + public void setObject(Serializable object) throws JMSException { + this.body = object; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java new file mode 100644 index 0000000..13e344d --- /dev/null +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import static java.lang.String.format; + +public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage { + + private String text; + + public JMSTextMessage() { + + } + + public JMSTextMessage(String text) { + setText(text); + } + + @Override public String getBody(Class clazz) throws JMSException { + if (isBodyAssignableTo(clazz)) { + return text; + } + + throw new MessageFormatException(format("The type[%s] can't be casted to byte[]", clazz.toString())); + } + + @Override public byte[] getBody() throws JMSException { + return new byte[0]; + } + + @Override public boolean isBodyAssignableTo(Class c) throws JMSException { + return String.class.isAssignableFrom(c); + } + + public void clearBody() { + super.clearBody(); + this.text = null; + } + + public String getText() throws JMSException { + return this.text; + } + + public void setText(String text) { + this.text = text; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java deleted file mode 100644 index f172fc2..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import javax.jms.IllegalStateRuntimeException; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import org.apache.rocketmq.jms.support.JmsHelper; - -/** - * RocketMQ ByteMessage. - */ -public class RocketMQBytesMessage extends RocketMQMessage implements javax.jms.BytesMessage { - - private byte[] bytesIn; - private DataInputStream dataAsInput; - - private ByteArrayOutputStream bytesOut; - private DataOutputStream dataAsOutput; - - protected boolean readOnly; - - /** - * Message created for reading - * - * @param data - */ - public RocketMQBytesMessage(byte[] data) { - this.bytesIn = data; - this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length)); - this.readOnly = true; - this.writeOnly = false; - } - - /** - * Message created to be sent - */ - public RocketMQBytesMessage() { - this.bytesOut = new ByteArrayOutputStream(); - this.dataAsOutput = new DataOutputStream(this.bytesOut); - this.readOnly = false; - this.writeOnly = true; - } - - public long getBodyLength() throws JMSException { - return getData().length; - } - - /** - * @return the data - */ - public byte[] getData() { - if (isWriteOnly()) { - return bytesOut.toByteArray(); - } - else if (isReadOnly()) { - return bytesIn; - } - else { - throw new IllegalStateRuntimeException("Message must be in write only or read only status"); - } - } - - public boolean readBoolean() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readBoolean(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - private void checkIsReadOnly() throws MessageNotReadableException { - if (!isReadOnly()) { - throw new MessageNotReadableException("Not readable"); - } - if (dataAsInput == null) { - throw new MessageNotReadableException("No data to read"); - } - } - - public byte readByte() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readByte(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readUnsignedByte() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readUnsignedByte(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public short readShort() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readShort(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readUnsignedShort() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readUnsignedShort(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public char readChar() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readChar(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readInt() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readInt(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public long readLong() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readLong(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public float readFloat() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readFloat(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public double readDouble() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readDouble(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public String readUTF() throws JMSException { - checkIsReadOnly(); - - try { - return dataAsInput.readUTF(); - } - catch (IOException e) { - throw handleInputException(e); - } - } - - public int readBytes(byte[] value) throws JMSException { - checkIsReadOnly(); - - return readBytes(value, value.length); - } - - public int readBytes(byte[] value, int length) throws JMSException { - checkIsReadOnly(); - - if (length > value.length) { - throw new IndexOutOfBoundsException("length must be smaller than the length of value"); - } - if (dataAsInput == null) { - throw new MessageNotReadableException("Message is not readable! "); - } - try { - int offset = 0; - while (offset < length) { - int read = dataAsInput.read(value, offset, length - offset); - if (read < 0) { - break; - } - offset += read; - } - - if (offset == 0 && length != 0) { - return -1; - } - else { - return offset; - } - } - catch (IOException e) { - throw handleInputException(e); - } - - } - - public void writeBoolean(boolean value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeBoolean(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - private void initializeWriteIfNecessary() { - if (bytesOut == null) { - bytesOut = new ByteArrayOutputStream(); - } - if (dataAsOutput == null) { - dataAsOutput = new DataOutputStream(bytesOut); - } - } - - public void writeByte(byte value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeByte(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeShort(short value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeShort(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeChar(char value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeChar(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeInt(int value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeInt(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeLong(long value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeLong(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeFloat(float value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeFloat(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeDouble(double value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeDouble(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeUTF(String value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - try { - dataAsOutput.writeUTF(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeBytes(byte[] value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - if (dataAsOutput == null) { - throw new MessageNotWriteableException("Message is not writable! "); - } - try { - dataAsOutput.write(value); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeBytes(byte[] value, int offset, int length) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - if (dataAsOutput == null) { - throw new MessageNotWriteableException("Message is not writable! "); - } - try { - dataAsOutput.write(value, offset, length); - } - catch (IOException e) { - throw handleOutputException(e); - } - } - - public void writeObject(Object value) throws JMSException { - checkIsWriteOnly(); - initializeWriteIfNecessary(); - - JmsHelper.handleUnSupportedException(); - } - - public void reset() throws JMSException { - try { - if (bytesOut != null) { - bytesOut.reset(); - } - if (this.dataAsInput != null) { - this.dataAsInput.reset(); - } - - this.readOnly = true; - } - catch (IOException e) { - throw new JMSException(e.getMessage()); - } - } - - @Override public void clearBody() { - super.clearBody(); - this.bytesOut = null; - this.dataAsOutput = null; - this.dataAsInput = null; - this.bytesIn = null; - } - - private JMSException handleOutputException(final IOException e) { - JMSException ex = new JMSException(e.getMessage()); - ex.initCause(e); - ex.setLinkedException(e); - return ex; - } - - private JMSException handleInputException(final IOException e) { - JMSException ex; - if (e instanceof EOFException) { - ex = new MessageEOFException(e.getMessage()); - } - else { - ex = new MessageFormatException(e.getMessage()); - } - ex.initCause(e); - ex.setLinkedException(e); - return ex; - } - - protected boolean isReadOnly() { - return readOnly; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java deleted file mode 100644 index 79351ee..0000000 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.msg; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageNotWriteableException; -import org.apache.commons.lang.StringUtils; - -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Boolean; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Byte; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2ByteArray; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Char; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Double; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Float; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Int; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Long; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Short; -import static org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2String; - -/** - * Message can only be accessed by a thread at a time. - */ -public class RocketMQMapMessage extends RocketMQMessage implements MapMessage { - - private Map<String, Object> map; - - protected boolean readOnly; - - public RocketMQMapMessage(Map<String, Object> map) { - this.map = map; - } - - public RocketMQMapMessage() { - this.map = new HashMap(); - } - - @Override public boolean getBoolean(String name) throws JMSException { - checkName(name); - - return convert2Boolean(map.get(name)); - } - - private void checkName(String name) throws JMSException { - if (StringUtils.isBlank(name)) { - throw new JMSException("Name is required"); - } - } - - @Override public byte getByte(String name) throws JMSException { - checkName(name); - - return convert2Byte(map.get(name)); - } - - @Override public short getShort(String name) throws JMSException { - checkName(name); - - return convert2Short(map.get(name)); - } - - @Override public char getChar(String name) throws JMSException { - checkName(name); - - return convert2Char(map.get(name)); - } - - @Override public int getInt(String name) throws JMSException { - checkName(name); - - return convert2Int(map.get(name)); - } - - @Override public long getLong(String name) throws JMSException { - checkName(name); - - return convert2Long(map.get(name)); - } - - @Override public float getFloat(String name) throws JMSException { - checkName(name); - - return convert2Float(map.get(name)); - } - - @Override public double getDouble(String name) throws JMSException { - checkName(name); - - return convert2Double(map.get(name)); - } - - @Override public String getString(String name) throws JMSException { - checkName(name); - - return convert2String(map.get(name)); - } - - @Override public byte[] getBytes(String name) throws JMSException { - checkName(name); - - return convert2ByteArray(map.get(name)); - } - - @Override public Object getObject(String name) throws JMSException { - checkName(name); - - return map.get(name); - } - - @Override public Enumeration getMapNames() throws JMSException { - return Collections.enumeration(map.keySet()); - } - - @Override public void setBoolean(String name, boolean value) throws JMSException { - putProperty(name, value); - } - - private void putProperty(String name, Object obj) throws JMSException { - if (isReadOnly()) { - throw new MessageNotWriteableException("Message is not writable"); - } - - checkName(name); - - map.put(name, obj); - } - - @Override public void setByte(String name, byte value) throws JMSException { - putProperty(name, value); - } - - @Override public void setShort(String name, short value) throws JMSException { - putProperty(name, value); - } - - @Override public void setChar(String name, char value) throws JMSException { - putProperty(name, value); - } - - @Override public void setInt(String name, int value) throws JMSException { - putProperty(name, value); - } - - @Override public void setLong(String name, long value) throws JMSException { - putProperty(name, value); - } - - @Override public void setFloat(String name, float value) throws JMSException { - putProperty(name, value); - } - - @Override public void setDouble(String name, double value) throws JMSException { - putProperty(name, value); - } - - @Override public void setString(String name, String value) throws JMSException { - putProperty(name, value); - } - - @Override public void setBytes(String name, byte[] value) throws JMSException { - putProperty(name, value); - } - - @Override public void setBytes(String name, byte[] value, int offset, int length) throws JMSException { - putProperty(name, value); - } - - @Override public void setObject(String name, Object value) throws JMSException { - putProperty(name, value); - } - - @Override public boolean itemExists(String name) throws JMSException { - checkName(name); - - return map.containsKey(name); - } - - @Override public void clearBody() { - super.clearBody(); - this.map.clear(); - this.readOnly = false; - } - - protected boolean isReadOnly() { - return this.readOnly; - } - - public void setReadOnly(boolean readOnly) { - this.readOnly = readOnly; - } -}