Support Headers and refactor XXXMessage & MessageConverter

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/37576dd4
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/37576dd4
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/37576dd4

Branch: refs/heads/jms-dev-1.1.0
Commit: 37576dd4990ff8e10ee0b91e013cca6b3f5835a2
Parents: ec4228e
Author: zhangke <zhangke_beij...@qq.com>
Authored: Thu Feb 23 17:55:47 2017 +0800
Committer: zhangke <zhangke_beij...@qq.com>
Committed: Thu Feb 23 17:55:47 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/rocketmq/jms/Constant.java  |  15 +-
 .../rocketmq/jms/DeliverMessageService.java     |  18 +-
 .../org/apache/rocketmq/jms/JMSHeaderEnum.java  |  41 ++
 .../rocketmq/jms/JMSMessageModelEnum.java       |  52 ++
 .../apache/rocketmq/jms/RocketMQProducer.java   | 130 ++---
 .../apache/rocketmq/jms/RocketMQSession.java    |  18 +-
 .../org/apache/rocketmq/jms/RocketMQTopic.java  |  28 +-
 .../apache/rocketmq/jms/SendMessageHook.java    |  27 ++
 .../rocketmq/jms/msg/AbstractJMSMessage.java    | 400 +++++++++++++++
 .../rocketmq/jms/msg/JMSBytesMessage.java       | 484 +++++++++++++++++++
 .../apache/rocketmq/jms/msg/JMSMapMessage.java  | 229 +++++++++
 .../rocketmq/jms/msg/JMSObjectMessage.java      |  61 +++
 .../apache/rocketmq/jms/msg/JMSTextMessage.java |  66 +++
 .../rocketmq/jms/msg/RocketMQBytesMessage.java  | 457 -----------------
 .../rocketmq/jms/msg/RocketMQMapMessage.java    | 210 --------
 .../rocketmq/jms/msg/RocketMQMessage.java       | 431 -----------------
 .../rocketmq/jms/msg/RocketMQObjectMessage.java |  40 --
 .../rocketmq/jms/msg/RocketMQTextMessage.java   |  47 --
 .../jms/msg/serialize/MapSerialize.java         |  33 ++
 .../jms/msg/serialize/ObjectSerialize.java      |  59 +++
 .../rocketmq/jms/msg/serialize/Serialize.java   |  29 ++
 .../jms/msg/serialize/StringSerialize.java      |  52 ++
 .../rocketmq/jms/support/MessageConverter.java  | 294 +++--------
 .../rocketmq/jms/msg/JmsMapMessageTest.java     |  70 +++
 .../jms/msg/RocketMQBytesMessageTest.java       |  20 +-
 .../jms/msg/RocketMQMapMessageTest.java         |  70 ---
 .../jms/msg/RocketMQObjectMessageTest.java      |   4 +-
 .../jms/msg/RocketMQTextMessageTest.java        |   4 +-
 .../jms/support/MessageConvertTest.java         |  13 +-
 29 files changed, 1777 insertions(+), 1625 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/Constant.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/Constant.java 
b/core/src/main/java/org/apache/rocketmq/jms/Constant.java
index 332ba1a..9519bea 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/Constant.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/Constant.java
@@ -25,19 +25,6 @@ public interface Constant {
 
     boolean DEFAULT_DURABLE = false;
 
-    //------------------------JMS message header 
constant---------------------------------
-    String JMS_DESTINATION = "jmsDestination";
-    String JMS_DELIVERY_MODE = "jmsDeliveryMode";
-    String JMS_EXPIRATION = "jmsExpiration";
-    String JMS_DELIVERY_TIME = "jmsDeliveryTime";
-    String JMS_PRIORITY = "jmsPriority";
-    String JMS_MESSAGE_ID = "jmsMessageID";
-    String JMS_TIMESTAMP = "jmsTimestamp";
-    String JMS_CORRELATION_ID = "jmsCorrelationID";
-    String JMS_REPLY_TO = "jmsReplyTo";
-    String JMS_TYPE = "jmsType";
-    String JMS_REDELIVERED = "jmsRedelivered";
-
     //-------------------------JMS defined properties 
constant----------------------------
     /**
      * The identity of the user sending the Send message
@@ -90,4 +77,6 @@ public interface Constant {
      * Default Jms Type
      */
     String DEFAULT_JMS_TYPE = "rocketmq";
+
+    String MESSAGE_ID_PREFIX = "ID:";
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
index b25a339..da8196f 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
@@ -17,13 +17,6 @@
 
 package org.apache.rocketmq.jms;
 
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +31,13 @@ import javax.jms.JMSRuntimeException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.jms.support.JmsHelper;
 import org.apache.rocketmq.jms.support.MessageConverter;
 import org.slf4j.Logger;
@@ -184,6 +184,10 @@ public class DeliverMessageService extends ServiceThread {
      */
     private void handleMessage(MessageExt msg, MessageQueue mq) throws 
InterruptedException, JMSException {
         Message jmsMessage = MessageConverter.convert2JMSMessage(msg);
+        if (jmsMessage.getJMSExpiration() != 0 && System.currentTimeMillis() > 
jmsMessage.getJMSExpiration()) {
+            log.debug("The message[id={}] has been expired", msg.getMsgId());
+            return;
+        }
         final MessageWrapper wrapper = new MessageWrapper(jmsMessage, 
this.consumer, mq, msg.getQueueOffset());
 
         switch (this.consumeModel) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java 
b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
new file mode 100644
index 0000000..a9c758e
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/JMSHeaderEnum.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+public enum JMSHeaderEnum {
+
+    JMSDestination,
+    JMSDeliveryMode,
+    JMSMessageID,
+    JMSTimestamp,
+    JMSCorrelationID,
+    JMSReplyTo,
+    JMSRedelivered,
+    JMSType,
+    JMSExpiration,
+    JMSPriority,
+    JMSDeliveryTime;
+
+    public static final int JMS_DELIVERY_MODE_DEFAULT_VALUE = 0;
+    public static final int JMS_TIMESTAMP_DEFAULT_VALUE = 0;
+    public static final boolean JMS_REDELIVERED_DEFAULT_VALUE = false;
+    public static final int JMS_EXPIRATION_DEFAULT_VALUE = 0;
+    public static final int JMS_PRIORITY_DEFAULT_VALUE = 5;
+    public static final int JMS_DELIVERY_TIME_DEFAULT_VALUE = 0;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java 
b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
new file mode 100644
index 0000000..0659f92
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/JMSMessageModelEnum.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSMapMessage;
+import org.apache.rocketmq.jms.msg.JMSObjectMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+
+public enum JMSMessageModelEnum {
+    BYTE(JMSBytesMessage.class),
+    MAP(JMSMapMessage.class),
+    OBJECT(JMSObjectMessage.class),
+    STRING(JMSTextMessage.class);
+
+    public static final String MSG_MODEL_NAME = "MsgModel";
+
+    private Class jmsClass;
+
+    JMSMessageModelEnum(Class jmsClass) {
+        this.jmsClass = jmsClass;
+    }
+
+    public static JMSMessageModelEnum toMsgModelEnum(Class clazz) {
+        for (JMSMessageModelEnum e : values()) {
+            if (e.getJmsClass() == clazz) {
+                return e;
+            }
+        }
+
+        throw new IllegalArgumentException(String.format("Not supported 
class[%s]", clazz));
+    }
+
+    public Class getJmsClass() {
+        return jmsClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
index 1c0b7d1..8cc5903 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQProducer.java
@@ -17,11 +17,6 @@
 
 package org.apache.rocketmq.jms;
 
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
 import java.util.UUID;
 import javax.jms.CompletionListener;
 import javax.jms.Destination;
@@ -29,8 +24,13 @@ import javax.jms.JMSException;
 import javax.jms.JMSRuntimeException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
-import org.apache.rocketmq.jms.msg.RocketMQMessage;
-import org.apache.rocketmq.jms.support.JmsHelper;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
 import org.apache.rocketmq.jms.support.MessageConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,27 +47,30 @@ import static 
org.apache.rocketmq.jms.Constant.JMS_EXPIRATION;
 import static org.apache.rocketmq.jms.Constant.JMS_PRIORITY;
 import static org.apache.rocketmq.jms.Constant.JMS_TIMESTAMP;
 import static org.apache.rocketmq.jms.Constant.JMS_TYPE;
+import static org.apache.rocketmq.jms.Constant.MESSAGE_ID_PREFIX;
+import static 
org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Object;
 
 public class RocketMQProducer implements MessageProducer {
 
     private static final Logger log = 
LoggerFactory.getLogger(RocketMQProducer.class);
-
     private RocketMQSession session;
-
-    private final DefaultMQProducer mqProducer;
-
+    private final DefaultMQProducer rocketMQProducer;
     private Destination destination;
 
+    private boolean disableMessageID;
+    private boolean disableMessageTimestamp;
+    private long timeToLive;
+
     public RocketMQProducer(RocketMQSession session, Destination destination) {
         this.session = session;
         this.destination = destination;
 
-        this.mqProducer = new DefaultMQProducer(UUID.randomUUID().toString());
+        this.rocketMQProducer = new 
DefaultMQProducer(UUID.randomUUID().toString());
         ClientConfig clientConfig = 
this.session.getConnection().getClientConfig();
-        this.mqProducer.setNamesrvAddr(clientConfig.getNamesrvAddr());
-        this.mqProducer.setInstanceName(clientConfig.getInstanceName());
+        this.rocketMQProducer.setNamesrvAddr(clientConfig.getNamesrvAddr());
+        this.rocketMQProducer.setInstanceName(clientConfig.getInstanceName());
         try {
-            this.mqProducer.start();
+            this.rocketMQProducer.start();
         }
         catch (MQClientException e) {
             throw new JMSRuntimeException(format("Fail to start producer, 
error msg:%s", getStackTrace(e)));
@@ -76,24 +79,22 @@ public class RocketMQProducer implements MessageProducer {
 
     @Override
     public void setDisableMessageID(boolean value) throws JMSException {
-        //todo
+        this.disableMessageID = value;
     }
 
     @Override
     public boolean getDisableMessageID() throws JMSException {
-        //todo
-        return false;
+        return this.disableMessageID;
     }
 
     @Override
     public void setDisableMessageTimestamp(boolean value) throws JMSException {
-        //todo
+        this.disableMessageTimestamp = value;
     }
 
     @Override
     public boolean getDisableMessageTimestamp() throws JMSException {
-        //todo
-        return false;
+        return this.disableMessageTimestamp;
     }
 
     @Override
@@ -120,13 +121,12 @@ public class RocketMQProducer implements MessageProducer {
 
     @Override
     public void setTimeToLive(long timeToLive) throws JMSException {
-        //todo
+        this.timeToLive = timeToLive;
     }
 
     @Override
     public long getTimeToLive() throws JMSException {
-        //todo
-        return 0;
+        return this.getTimeToLive();
     }
 
     @Override
@@ -148,7 +148,7 @@ public class RocketMQProducer implements MessageProducer {
 
     @Override
     public void close() throws JMSException {
-        this.mqProducer.shutdown();
+        this.rocketMQProducer.shutdown();
     }
 
     @Override
@@ -170,23 +170,12 @@ public class RocketMQProducer implements MessageProducer {
     @Override
     public void send(Destination destination, Message message, int 
deliveryMode, int priority,
         long timeToLive) throws JMSException {
-        String topicName = JmsHelper.getTopicName(destination);
-
-        org.apache.rocketmq.common.message.Message rmqMsg = 
createRmqMessage(message, topicName);
-
-        sendSync(rmqMsg);
-    }
 
-    private void sendSync(org.apache.rocketmq.common.message.Message rmqMsg) 
throws JMSException {
-        SendResult sendResult;
+        before(message);
 
-        try {
-            sendResult = mqProducer.send(rmqMsg);
-        }
-        catch (Exception e) {
-            throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
-        }
+        MessageExt rmqMsg = createRocketMQMessage(message);
 
+        SendResult sendResult = sendSync(rmqMsg);
         if (sendResult != null && sendResult.getSendStatus() == 
SendStatus.SEND_OK) {
             log.debug("Success to send message[key={}]", rmqMsg.getKeys());
             return;
@@ -196,29 +185,35 @@ public class RocketMQProducer implements MessageProducer {
         }
     }
 
+    private SendResult sendSync(org.apache.rocketmq.common.message.Message 
rmqMsg) throws JMSException {
+
+        try {
+            return rocketMQProducer.send(rmqMsg);
+        }
+        catch (Exception e) {
+            throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
+        }
+    }
+
     private void sendAsync(org.apache.rocketmq.common.message.Message rmqMsg,
         CompletionListener completionListener) throws JMSException {
         try {
-            mqProducer.send(rmqMsg, new 
SendCompletionListener(completionListener));
+            rocketMQProducer.send(rmqMsg, new 
SendCompletionListener(completionListener));
         }
         catch (Exception e) {
             throw new JMSException(format("Fail to send message. Error: %s", 
getStackTrace(e)));
         }
     }
 
-    private org.apache.rocketmq.common.message.Message 
createRmqMessage(Message message,
-        String topicName) throws JMSException {
-        RocketMQMessage jmsMsg = (RocketMQMessage) message;
+    private MessageExt createRocketMQMessage(Message message) throws 
JMSException {
+        AbstractJMSMessage jmsMsg = convert2Object(message, 
AbstractJMSMessage.class);
         initJMSHeaders(jmsMsg, destination);
-        org.apache.rocketmq.common.message.Message rmqMsg = null;
         try {
-            rmqMsg = MessageConverter.convert2RMQMessage(jmsMsg);
+            return MessageConverter.convert2RMQMessage(jmsMsg);
         }
         catch (Exception e) {
             throw new JMSException(format("Fail to convert to RocketMQ 
message. Error: %s", getStackTrace(e)));
         }
-
-        return rmqMsg;
     }
 
     /**
@@ -226,26 +221,26 @@ public class RocketMQProducer implements MessageProducer {
      * <p/>
      * <P>JMS providers init message's headers. Do not allow user to set these 
by yourself.
      *
-     * @param rmqJmsMsg message
+     * @param jmsMsg message
      * @param destination
      * @throws javax.jms.JMSException
      * @see <CODE>Destination</CODE>
      */
-    private void initJMSHeaders(RocketMQMessage rmqJmsMsg, Destination 
destination) throws JMSException {
+    private void initJMSHeaders(AbstractJMSMessage jmsMsg, Destination 
destination) throws JMSException {
 
         //JMS_DESTINATION default:"topic:message"
-        rmqJmsMsg.setHeader(JMS_DESTINATION, destination);
+        jmsMsg.setHeader(JMS_DESTINATION, destination);
         //JMS_DELIVERY_MODE default : PERSISTENT
-        rmqJmsMsg.setHeader(JMS_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_DELIVERY_MODE);
+        jmsMsg.setHeader(JMS_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_DELIVERY_MODE);
         //JMS_TIMESTAMP default : current time
-        rmqJmsMsg.setHeader(JMS_TIMESTAMP, System.currentTimeMillis());
+        jmsMsg.setHeader(JMS_TIMESTAMP, System.currentTimeMillis());
         //JMS_EXPIRATION default :  3 days
         //JMS_EXPIRATION = currentTime + time_to_live
-        rmqJmsMsg.setHeader(JMS_EXPIRATION, System.currentTimeMillis() + 
DEFAULT_TIME_TO_LIVE);
+        jmsMsg.setHeader(JMS_EXPIRATION, System.currentTimeMillis() + 
DEFAULT_TIME_TO_LIVE);
         //JMS_PRIORITY default : 4
-        rmqJmsMsg.setHeader(JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
+        jmsMsg.setHeader(JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
         //JMS_TYPE default : ons(open notification service)
-        rmqJmsMsg.setHeader(JMS_TYPE, DEFAULT_JMS_TYPE);
+        jmsMsg.setHeader(JMS_TYPE, DEFAULT_JMS_TYPE);
         //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
         //JMS_MESSAGE_ID is set by sendResult.
         //JMS_REDELIVERED is set by broker.
@@ -271,11 +266,32 @@ public class RocketMQProducer implements MessageProducer {
     @Override
     public void send(Destination destination, Message message, int 
deliveryMode, int priority, long timeToLive,
         CompletionListener completionListener) throws JMSException {
-        String topicName = JmsHelper.getTopicName(destination);
 
-        org.apache.rocketmq.common.message.Message rmqMsg = 
createRmqMessage(message, topicName);
+        before(message);
+
+        MessageExt rmqMsg = createRocketMQMessage(message);
 
         sendAsync(rmqMsg, completionListener);
     }
 
+    private void before(Message message) throws JMSException {
+        // timestamp
+        if (!getDisableMessageTimestamp()) {
+            message.setJMSTimestamp(System.currentTimeMillis());
+        }
+
+        // messageID is also required in async model, so {@link 
MessageExt#getMsgId()} can't be used.
+        if (!getDisableMessageID()) {
+            message.setJMSMessageID(new 
StringBuffer(MESSAGE_ID_PREFIX).append(UUID.randomUUID().getLeastSignificantBits()).toString());
+        }
+
+        // expiration
+        if (getTimeToLive() != 0) {
+            message.setJMSExpiration(System.currentTimeMillis() + 
getTimeToLive());
+        }
+        else {
+            message.setJMSExpiration(0l);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
index 954f80a..c14c85d 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
@@ -43,9 +43,9 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
-import org.apache.rocketmq.jms.msg.RocketMQBytesMessage;
-import org.apache.rocketmq.jms.msg.RocketMQObjectMessage;
-import org.apache.rocketmq.jms.msg.RocketMQTextMessage;
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSObjectMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +84,7 @@ public class RocketMQSession implements Session {
 
     @Override
     public BytesMessage createBytesMessage() throws JMSException {
-        return new RocketMQBytesMessage();
+        return new JMSBytesMessage();
     }
 
     @Override
@@ -95,17 +95,17 @@ public class RocketMQSession implements Session {
 
     @Override
     public Message createMessage() throws JMSException {
-        return new RocketMQBytesMessage();
+        return new JMSBytesMessage();
     }
 
     @Override
     public ObjectMessage createObjectMessage() throws JMSException {
-        return new RocketMQObjectMessage();
+        return new JMSObjectMessage();
     }
 
     @Override
     public ObjectMessage createObjectMessage(Serializable serializable) throws 
JMSException {
-        return new RocketMQObjectMessage(serializable);
+        return new JMSObjectMessage(serializable);
     }
 
     @Override
@@ -116,12 +116,12 @@ public class RocketMQSession implements Session {
 
     @Override
     public TextMessage createTextMessage() throws JMSException {
-        return new RocketMQTextMessage();
+        return new JMSTextMessage();
     }
 
     @Override
     public TextMessage createTextMessage(String text) throws JMSException {
-        return new RocketMQTextMessage(text);
+        return new JMSTextMessage(text);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java
index 5fe1448..c1ff87b 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQTopic.java
@@ -17,28 +17,18 @@
 
 package org.apache.rocketmq.jms;
 
-import com.google.common.base.Joiner;
 import javax.jms.JMSException;
 import javax.jms.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.rocketmq.jms.Constant.NO_MESSAGE_SELECTOR;
-
 public class RocketMQTopic implements Topic {
     private static final Logger log = 
LoggerFactory.getLogger(RocketMQTopic.class);
 
     private String name;
-    private String type;
 
     public RocketMQTopic(String name) {
         this.name = name;
-        this.type = NO_MESSAGE_SELECTOR;
-    }
-
-    public RocketMQTopic(String name, String type) {
-        this.name = name;
-        this.type = type;
     }
 
     @Override
@@ -46,22 +36,8 @@ public class RocketMQTopic implements Topic {
         return this.name;
     }
 
-    public String getTypeName() throws JMSException {
-        return this.type;
-    }
-
-    public String setTypeName(String type) throws JMSException {
-        return this.type = type;
-    }
-
+    @Override
     public String toString() {
-        String print = "";
-        try {
-            print = Joiner.on(":").join(this.getTopicName(), 
this.getTypeName());
-        }
-        catch (JMSException e) {
-            log.error("Exception Caught in toString, e: {}", e);
-        }
-        return print;
+        return this.name;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java 
b/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java
new file mode 100644
index 0000000..0dee423
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/SendMessageHook.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import javax.jms.Message;
+
+public class SendMessageHook {
+
+    public void before(Message message) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java
new file mode 100644
index 0000000..ef47db0
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/AbstractJMSMessage.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.rocketmq.jms.JMSHeaderEnum;
+
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSCorrelationID;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSDeliveryMode;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSDeliveryTime;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSDestination;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSExpiration;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSMessageID;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSPriority;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSRedelivered;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSReplyTo;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSTimestamp;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMSType;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_DELIVERY_TIME_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_EXPIRATION_DEFAULT_VALUE;
+import static org.apache.rocketmq.jms.JMSHeaderEnum.JMS_PRIORITY_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_REDELIVERED_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.JMSHeaderEnum.JMS_TIMESTAMP_DEFAULT_VALUE;
+import static 
org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Boolean;
+import static 
org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Integer;
+import static org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Long;
+import static 
org.apache.rocketmq.jms.support.DirectTypeConverter.convert2Object;
+import static 
org.apache.rocketmq.jms.support.DirectTypeConverter.convert2String;
+
+public abstract class AbstractJMSMessage implements javax.jms.Message {
+
+    protected Map<JMSHeaderEnum, Object> headers = Maps.newHashMap();
+    protected Map<String, Object> properties = Maps.newHashMap();
+
+    protected boolean writeOnly;
+
+    @Override
+    public String getJMSMessageID() {
+        return convert2String(headers.get(JMSMessageID));
+    }
+
+    @Override
+    public void setJMSMessageID(String id) {
+        setHeader(JMSMessageID, id);
+    }
+
+    @Override
+    public long getJMSTimestamp() {
+        if (headers.containsKey(JMSTimestamp)) {
+            return convert2Long(headers.get(JMSTimestamp));
+        }
+        return JMS_TIMESTAMP_DEFAULT_VALUE;
+    }
+
+    @Override
+    public void setJMSTimestamp(long timestamp) {
+        setHeader(JMSTimestamp, timestamp);
+    }
+
+    @Override
+    public byte[] getJMSCorrelationIDAsBytes() {
+        String jmsCorrelationID = getJMSCorrelationID();
+        if (jmsCorrelationID != null) {
+            try {
+                return BaseEncoding.base64().decode(jmsCorrelationID);
+            }
+            catch (Exception e) {
+                return jmsCorrelationID.getBytes();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
+        String encodedText = BaseEncoding.base64().encode(correlationID);
+        setJMSCorrelationID(encodedText);
+    }
+
+    @Override
+    public String getJMSCorrelationID() {
+        return convert2String(headers.get(JMSCorrelationID));
+    }
+
+    @Override
+    public void setJMSCorrelationID(String correlationID) {
+        setHeader(JMSCorrelationID, correlationID);
+    }
+
+    @Override
+    public Destination getJMSReplyTo() {
+        return convert2Object(headers.get(JMSReplyTo), Destination.class);
+    }
+
+    @Override
+    public void setJMSReplyTo(Destination replyTo) {
+        setHeader(JMSReplyTo, replyTo);
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
+
+    @Override
+    public Destination getJMSDestination() {
+        return convert2Object(headers.get(JMSDestination), Destination.class);
+    }
+
+    @Override
+    public void setJMSDestination(Destination destination) {
+        setHeader(JMSDestination, destination);
+    }
+
+    @SuppressWarnings("unchecked")
+    public abstract <T> T getBody(Class<T> clazz) throws JMSException;
+
+    public abstract byte[] getBody() throws JMSException;
+
+    @Override
+    public int getJMSDeliveryMode() {
+        if (headers.containsKey(JMSDeliveryMode)) {
+            return convert2Integer(headers.get(JMSDeliveryMode));
+        }
+        return JMS_DELIVERY_MODE_DEFAULT_VALUE;
+    }
+
+    @Override
+    public void setJMSDeliveryMode(int deliveryMode) {
+        setHeader(JMSDeliveryMode, deliveryMode);
+    }
+
+    @Override
+    public boolean getJMSRedelivered() {
+        if (headers.containsKey(JMSRedelivered)) {
+            return convert2Boolean(headers.get(JMSRedelivered));
+        }
+        return JMS_REDELIVERED_DEFAULT_VALUE;
+    }
+
+    @Override
+    public void setJMSRedelivered(boolean redelivered) {
+        setHeader(JMSRedelivered, redelivered);
+    }
+
+    @Override
+    public String getJMSType() {
+        return convert2String(headers.get(JMSType));
+    }
+
+    @Override
+    public void setJMSType(String type) {
+        setHeader(JMSType, type);
+    }
+
+    public Map<JMSHeaderEnum, Object> getHeaders() {
+        return this.headers;
+    }
+
+    @Override
+    public long getJMSExpiration() {
+        if (headers.containsKey(JMSExpiration)) {
+            return convert2Long(headers.get(JMSExpiration));
+        }
+        return JMS_EXPIRATION_DEFAULT_VALUE;
+    }
+
+    @Override
+    public void setJMSExpiration(long expiration) {
+        setHeader(JMSExpiration, expiration);
+    }
+
+    @Override
+    public int getJMSPriority() {
+        if (headers.containsKey(JMSPriority)) {
+            return convert2Integer(headers.get(JMSPriority));
+        }
+        return JMS_PRIORITY_DEFAULT_VALUE;
+    }
+
+    @Override
+    public void setJMSPriority(int priority) {
+        setHeader(JMSPriority, priority);
+    }
+
+    @Override
+    public long getJMSDeliveryTime() throws JMSException {
+        if (headers.containsKey(JMSDeliveryTime)) {
+            return convert2Long(headers.get(JMSDeliveryTime));
+        }
+        return JMS_DELIVERY_TIME_DEFAULT_VALUE;
+    }
+
+    @Override
+    public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
+        setHeader(JMSDeliveryTime, deliveryTime);
+    }
+
+    private void setHeader(JMSHeaderEnum name, Object value) {
+        this.headers.put(name, value);
+    }
+
+    public Map<String, Object> getProperties() {
+        return this.properties;
+    }
+
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void acknowledge() throws JMSException {
+        //todo
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public void clearProperties() {
+        this.properties.clear();
+    }
+
+    @Override
+    public void clearBody() {
+        this.writeOnly = true;
+    }
+
+    @Override
+    public boolean propertyExists(String name) {
+        return properties.containsKey(name);
+    }
+
+    @Override
+    public boolean getBooleanProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Boolean.valueOf(value.toString());
+        }
+        return false;
+    }
+
+    @Override
+    public byte getByteProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Byte.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public short getShortProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Short.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public int getIntProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Integer.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public long getLongProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Long.valueOf(value.toString());
+        }
+        return 0L;
+    }
+
+    @Override
+    public float getFloatProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Float.valueOf(value.toString());
+        }
+        return 0f;
+    }
+
+    @Override
+    public double getDoubleProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return Double.valueOf(value.toString());
+        }
+        return 0d;
+    }
+
+    @Override
+    public String getStringProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            return getObjectProperty(name).toString();
+        }
+        return null;
+    }
+
+    @Override
+    public Object getObjectProperty(String name) throws JMSException {
+        return this.properties.get(name);
+    }
+
+    @Override
+    public Enumeration<?> getPropertyNames() throws JMSException {
+        return Collections.enumeration(this.properties.keySet());
+    }
+
+    @Override
+    public void setBooleanProperty(String name, boolean value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setByteProperty(String name, byte value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setShortProperty(String name, short value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setIntProperty(String name, int value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setLongProperty(String name, long value) {
+        setObjectProperty(name, value);
+    }
+
+    public void setFloatProperty(String name, float value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setDoubleProperty(String name, double value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setStringProperty(String name, String value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public abstract boolean isBodyAssignableTo(Class c) throws JMSException;
+
+    @Override
+    public void setObjectProperty(String name, Object value) {
+        if (value instanceof Number || value instanceof String || value 
instanceof Boolean) {
+            this.properties.put(name, value);
+        }
+        else {
+            throw new IllegalArgumentException(
+                "Value should be boolean, byte, short, int, long, float, 
double, and String.");
+        }
+    }
+
+    protected boolean isWriteOnly() {
+        return writeOnly;
+    }
+
+    protected void checkIsWriteOnly() throws MessageNotWriteableException {
+        if (!writeOnly) {
+            throw new MessageNotWriteableException("Not writable");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java
new file mode 100644
index 0000000..1e40b40
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSBytesMessage.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.rocketmq.jms.support.JmsHelper;
+
+import static java.lang.String.format;
+
+/**
+ * RocketMQ ByteMessage.
+ */
+public class JMSBytesMessage extends AbstractJMSMessage implements 
javax.jms.BytesMessage {
+
+    private byte[] bytesIn;
+    private DataInputStream dataAsInput;
+
+    private ByteArrayOutputStream bytesOut;
+    private DataOutputStream dataAsOutput;
+
+    protected boolean readOnly;
+
+    /**
+     * Message created for reading
+     *
+     * @param data
+     */
+    public JMSBytesMessage(byte[] data) {
+        this.bytesIn = data;
+        this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 
0, data.length));
+        this.readOnly = true;
+        this.writeOnly = false;
+    }
+
+    /**
+     * Message created to be sent
+     */
+    public JMSBytesMessage() {
+        this.bytesOut = new ByteArrayOutputStream();
+        this.dataAsOutput = new DataOutputStream(this.bytesOut);
+        this.readOnly = false;
+        this.writeOnly = true;
+    }
+
+    @Override public byte[] getBody(Class clazz) throws JMSException {
+        byte[] result;
+        if (isBodyAssignableTo(clazz)) {
+            if (isWriteOnly()) {
+                result = bytesOut.toByteArray();
+                this.reset();
+                return result;
+            }
+            else if (isReadOnly()) {
+                result = Arrays.copyOf(bytesIn, bytesIn.length);
+                this.reset();
+                return result;
+            }
+            else {
+                throw new IllegalStateRuntimeException("Message must be in 
write only or read only status");
+            }
+        }
+
+        throw new MessageFormatException(format("The type[%s] can't be casted 
to byte[]", clazz.toString()));
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        byte[] result = Arrays.copyOf(bytesIn, bytesIn.length);
+        this.reset();
+        return result;
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return byte[].class.isAssignableFrom(c);
+    }
+
+    @Override public long getBodyLength() throws JMSException {
+        if (isWriteOnly()) {
+            return bytesOut.size();
+        }
+        else if (isReadOnly()) {
+            return bytesIn.length;
+        }
+        else {
+            throw new IllegalStateRuntimeException("Message must be in write 
only or read only status");
+        }
+    }
+
+    public boolean readBoolean() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readBoolean();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    private void checkIsReadOnly() throws MessageNotReadableException {
+        if (!isReadOnly()) {
+            throw new MessageNotReadableException("Not readable");
+        }
+        if (dataAsInput == null) {
+            throw new MessageNotReadableException("No data to read");
+        }
+    }
+
+    public byte readByte() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readByte();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readUnsignedByte() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readUnsignedByte();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public short readShort() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readShort();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readUnsignedShort() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readUnsignedShort();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public char readChar() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readChar();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readInt() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readInt();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public long readLong() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readLong();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public float readFloat() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readFloat();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public double readDouble() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readDouble();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public String readUTF() throws JMSException {
+        checkIsReadOnly();
+
+        try {
+            return dataAsInput.readUTF();
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+    }
+
+    public int readBytes(byte[] value) throws JMSException {
+        checkIsReadOnly();
+
+        return readBytes(value, value.length);
+    }
+
+    public int readBytes(byte[] value, int length) throws JMSException {
+        checkIsReadOnly();
+
+        if (length > value.length) {
+            throw new IndexOutOfBoundsException("length must be smaller than 
the length of value");
+        }
+        if (dataAsInput == null) {
+            throw new MessageNotReadableException("Message is not readable! ");
+        }
+        try {
+            int offset = 0;
+            while (offset < length) {
+                int read = dataAsInput.read(value, offset, length - offset);
+                if (read < 0) {
+                    break;
+                }
+                offset += read;
+            }
+
+            if (offset == 0 && length != 0) {
+                return -1;
+            }
+            else {
+                return offset;
+            }
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+
+    }
+
+    public void writeBoolean(boolean value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeBoolean(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    private void initializeWriteIfNecessary() {
+        if (bytesOut == null) {
+            bytesOut = new ByteArrayOutputStream();
+        }
+        if (dataAsOutput == null) {
+            dataAsOutput = new DataOutputStream(bytesOut);
+        }
+    }
+
+    public void writeByte(byte value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeByte(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeShort(short value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeShort(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeChar(char value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeChar(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeInt(int value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeInt(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeLong(long value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeLong(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeFloat(float value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeFloat(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeDouble(double value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeDouble(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeUTF(String value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        try {
+            dataAsOutput.writeUTF(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeBytes(byte[] value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! 
");
+        }
+        try {
+            dataAsOutput.write(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeBytes(byte[] value, int offset, int length) throws 
JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! 
");
+        }
+        try {
+            dataAsOutput.write(value, offset, length);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeObject(Object value) throws JMSException {
+        checkIsWriteOnly();
+        initializeWriteIfNecessary();
+
+        JmsHelper.handleUnSupportedException();
+    }
+
+    public void reset() throws JMSException {
+        try {
+            if (bytesOut != null) {
+                bytesOut.reset();
+            }
+            if (this.dataAsInput != null) {
+                this.dataAsInput.reset();
+            }
+
+            this.readOnly = true;
+        }
+        catch (IOException e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+
+    @Override public void clearBody() {
+        super.clearBody();
+        this.bytesOut = null;
+        this.dataAsOutput = null;
+        this.dataAsInput = null;
+        this.bytesIn = null;
+    }
+
+    private JMSException handleOutputException(final IOException e) {
+        JMSException ex = new JMSException(e.getMessage());
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    private JMSException handleInputException(final IOException e) {
+        JMSException ex;
+        if (e instanceof EOFException) {
+            ex = new MessageEOFException(e.getMessage());
+        }
+        else {
+            ex = new MessageFormatException(e.getMessage());
+        }
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    protected boolean isReadOnly() {
+        return readOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
new file mode 100644
index 0000000..d1dd15d
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSMapMessage.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.msg.serialize.MapSerialize;
+
+import static java.lang.String.format;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Boolean;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Byte;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2ByteArray;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Char;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Double;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Float;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Int;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Long;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Short;
+import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2String;
+
+/**
+ * Message can only be accessed by a thread at a time.
+ */
+public class JMSMapMessage extends AbstractJMSMessage implements MapMessage {
+
+    private Map<String, Object> map;
+
+    protected boolean readOnly;
+
+    public JMSMapMessage(Map<String, Object> map) {
+        this.map = map;
+    }
+
+    public JMSMapMessage() {
+        this.map = new HashMap();
+    }
+
+    @Override public Map<String, Object> getBody(Class clazz) throws 
JMSException {
+        if (isBodyAssignableTo(clazz)) {
+            return this.map;
+        }
+
+        throw new MessageFormatException(format("The type[%s] can't be casted 
to byte[]", clazz.toString()));
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        return new MapSerialize().serialize(this.map);
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return Map.class.isAssignableFrom(c);
+    }
+
+    @Override public boolean getBoolean(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Boolean(map.get(name));
+    }
+
+    private void checkName(String name) throws JMSException {
+        if (StringUtils.isBlank(name)) {
+            throw new JMSException("Name is required");
+        }
+    }
+
+    @Override public byte getByte(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Byte(map.get(name));
+    }
+
+    @Override public short getShort(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Short(map.get(name));
+    }
+
+    @Override public char getChar(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Char(map.get(name));
+    }
+
+    @Override public int getInt(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Int(map.get(name));
+    }
+
+    @Override public long getLong(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Long(map.get(name));
+    }
+
+    @Override public float getFloat(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Float(map.get(name));
+    }
+
+    @Override public double getDouble(String name) throws JMSException {
+        checkName(name);
+
+        return convert2Double(map.get(name));
+    }
+
+    @Override public String getString(String name) throws JMSException {
+        checkName(name);
+
+        return convert2String(map.get(name));
+    }
+
+    @Override public byte[] getBytes(String name) throws JMSException {
+        checkName(name);
+
+        return convert2ByteArray(map.get(name));
+    }
+
+    @Override public Object getObject(String name) throws JMSException {
+        checkName(name);
+
+        return map.get(name);
+    }
+
+    @Override public Enumeration getMapNames() throws JMSException {
+        return Collections.enumeration(map.keySet());
+    }
+
+    @Override public void setBoolean(String name, boolean value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    private void putProperty(String name, Object obj) throws JMSException {
+        if (isReadOnly()) {
+            throw new MessageNotWriteableException("Message is not writable");
+        }
+
+        checkName(name);
+
+        map.put(name, obj);
+    }
+
+    @Override public void setByte(String name, byte value) throws JMSException 
{
+        putProperty(name, value);
+    }
+
+    @Override public void setShort(String name, short value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setChar(String name, char value) throws JMSException 
{
+        putProperty(name, value);
+    }
+
+    @Override public void setInt(String name, int value) throws JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setLong(String name, long value) throws JMSException 
{
+        putProperty(name, value);
+    }
+
+    @Override public void setFloat(String name, float value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setDouble(String name, double value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setString(String name, String value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setBytes(String name, byte[] value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setBytes(String name, byte[] value, int offset, int 
length) throws JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public void setObject(String name, Object value) throws 
JMSException {
+        putProperty(name, value);
+    }
+
+    @Override public boolean itemExists(String name) throws JMSException {
+        checkName(name);
+
+        return map.containsKey(name);
+    }
+
+    @Override public void clearBody() {
+        super.clearBody();
+        this.map.clear();
+        this.readOnly = false;
+    }
+
+    protected boolean isReadOnly() {
+        return this.readOnly;
+    }
+
+    public void setReadOnly(boolean readOnly) {
+        this.readOnly = readOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
new file mode 100644
index 0000000..239ecc7
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSObjectMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize;
+
+public class JMSObjectMessage extends AbstractJMSMessage implements 
javax.jms.ObjectMessage {
+
+    private Serializable body;
+
+    public JMSObjectMessage(Serializable object) {
+        this.body = object;
+    }
+
+    public JMSObjectMessage() {
+
+    }
+
+    @Override public Serializable getBody(Class clazz) throws JMSException {
+        return body;
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        try {
+            return ObjectSerialize.serialize(body);
+        }
+        catch (IOException e) {
+            throw new JMSException(e.getMessage());
+        }
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return true;
+    }
+
+    public Serializable getObject() throws JMSException {
+        return this.body;
+    }
+
+    public void setObject(Serializable object) throws JMSException {
+        this.body = object;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
new file mode 100644
index 0000000..13e344d
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/jms/msg/JMSTextMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import static java.lang.String.format;
+
+public class JMSTextMessage extends AbstractJMSMessage implements 
javax.jms.TextMessage {
+
+    private String text;
+
+    public JMSTextMessage() {
+
+    }
+
+    public JMSTextMessage(String text) {
+        setText(text);
+    }
+
+    @Override public String getBody(Class clazz) throws JMSException {
+        if (isBodyAssignableTo(clazz)) {
+            return text;
+        }
+
+        throw new MessageFormatException(format("The type[%s] can't be casted 
to byte[]", clazz.toString()));
+    }
+
+    @Override public byte[] getBody() throws JMSException {
+        return new byte[0];
+    }
+
+    @Override public boolean isBodyAssignableTo(Class c) throws JMSException {
+        return String.class.isAssignableFrom(c);
+    }
+
+    public void clearBody() {
+        super.clearBody();
+        this.text = null;
+    }
+
+    public String getText() throws JMSException {
+        return this.text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java
deleted file mode 100644
index f172fc2..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQBytesMessage.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.msg;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import javax.jms.IllegalStateRuntimeException;
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import org.apache.rocketmq.jms.support.JmsHelper;
-
-/**
- * RocketMQ ByteMessage.
- */
-public class RocketMQBytesMessage extends RocketMQMessage implements 
javax.jms.BytesMessage {
-
-    private byte[] bytesIn;
-    private DataInputStream dataAsInput;
-
-    private ByteArrayOutputStream bytesOut;
-    private DataOutputStream dataAsOutput;
-
-    protected boolean readOnly;
-
-    /**
-     * Message created for reading
-     *
-     * @param data
-     */
-    public RocketMQBytesMessage(byte[] data) {
-        this.bytesIn = data;
-        this.dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 
0, data.length));
-        this.readOnly = true;
-        this.writeOnly = false;
-    }
-
-    /**
-     * Message created to be sent
-     */
-    public RocketMQBytesMessage() {
-        this.bytesOut = new ByteArrayOutputStream();
-        this.dataAsOutput = new DataOutputStream(this.bytesOut);
-        this.readOnly = false;
-        this.writeOnly = true;
-    }
-
-    public long getBodyLength() throws JMSException {
-        return getData().length;
-    }
-
-    /**
-     * @return the data
-     */
-    public byte[] getData() {
-        if (isWriteOnly()) {
-            return bytesOut.toByteArray();
-        }
-        else if (isReadOnly()) {
-            return bytesIn;
-        }
-        else {
-            throw new IllegalStateRuntimeException("Message must be in write 
only or read only status");
-        }
-    }
-
-    public boolean readBoolean() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readBoolean();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    private void checkIsReadOnly() throws MessageNotReadableException {
-        if (!isReadOnly()) {
-            throw new MessageNotReadableException("Not readable");
-        }
-        if (dataAsInput == null) {
-            throw new MessageNotReadableException("No data to read");
-        }
-    }
-
-    public byte readByte() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readByte();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public int readUnsignedByte() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readUnsignedByte();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public short readShort() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readShort();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public int readUnsignedShort() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readUnsignedShort();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public char readChar() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readChar();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public int readInt() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readInt();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public long readLong() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readLong();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public float readFloat() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readFloat();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public double readDouble() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readDouble();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public String readUTF() throws JMSException {
-        checkIsReadOnly();
-
-        try {
-            return dataAsInput.readUTF();
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-    }
-
-    public int readBytes(byte[] value) throws JMSException {
-        checkIsReadOnly();
-
-        return readBytes(value, value.length);
-    }
-
-    public int readBytes(byte[] value, int length) throws JMSException {
-        checkIsReadOnly();
-
-        if (length > value.length) {
-            throw new IndexOutOfBoundsException("length must be smaller than 
the length of value");
-        }
-        if (dataAsInput == null) {
-            throw new MessageNotReadableException("Message is not readable! ");
-        }
-        try {
-            int offset = 0;
-            while (offset < length) {
-                int read = dataAsInput.read(value, offset, length - offset);
-                if (read < 0) {
-                    break;
-                }
-                offset += read;
-            }
-
-            if (offset == 0 && length != 0) {
-                return -1;
-            }
-            else {
-                return offset;
-            }
-        }
-        catch (IOException e) {
-            throw handleInputException(e);
-        }
-
-    }
-
-    public void writeBoolean(boolean value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeBoolean(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    private void initializeWriteIfNecessary() {
-        if (bytesOut == null) {
-            bytesOut = new ByteArrayOutputStream();
-        }
-        if (dataAsOutput == null) {
-            dataAsOutput = new DataOutputStream(bytesOut);
-        }
-    }
-
-    public void writeByte(byte value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeByte(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeShort(short value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeShort(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeChar(char value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeChar(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeInt(int value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeInt(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeLong(long value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeLong(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeFloat(float value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeFloat(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeDouble(double value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeDouble(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeUTF(String value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        try {
-            dataAsOutput.writeUTF(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeBytes(byte[] value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        if (dataAsOutput == null) {
-            throw new MessageNotWriteableException("Message is not writable! 
");
-        }
-        try {
-            dataAsOutput.write(value);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeBytes(byte[] value, int offset, int length) throws 
JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        if (dataAsOutput == null) {
-            throw new MessageNotWriteableException("Message is not writable! 
");
-        }
-        try {
-            dataAsOutput.write(value, offset, length);
-        }
-        catch (IOException e) {
-            throw handleOutputException(e);
-        }
-    }
-
-    public void writeObject(Object value) throws JMSException {
-        checkIsWriteOnly();
-        initializeWriteIfNecessary();
-
-        JmsHelper.handleUnSupportedException();
-    }
-
-    public void reset() throws JMSException {
-        try {
-            if (bytesOut != null) {
-                bytesOut.reset();
-            }
-            if (this.dataAsInput != null) {
-                this.dataAsInput.reset();
-            }
-
-            this.readOnly = true;
-        }
-        catch (IOException e) {
-            throw new JMSException(e.getMessage());
-        }
-    }
-
-    @Override public void clearBody() {
-        super.clearBody();
-        this.bytesOut = null;
-        this.dataAsOutput = null;
-        this.dataAsInput = null;
-        this.bytesIn = null;
-    }
-
-    private JMSException handleOutputException(final IOException e) {
-        JMSException ex = new JMSException(e.getMessage());
-        ex.initCause(e);
-        ex.setLinkedException(e);
-        return ex;
-    }
-
-    private JMSException handleInputException(final IOException e) {
-        JMSException ex;
-        if (e instanceof EOFException) {
-            ex = new MessageEOFException(e.getMessage());
-        }
-        else {
-            ex = new MessageFormatException(e.getMessage());
-        }
-        ex.initCause(e);
-        ex.setLinkedException(e);
-        return ex;
-    }
-
-    protected boolean isReadOnly() {
-        return readOnly;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/37576dd4/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java 
b/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java
deleted file mode 100644
index 79351ee..0000000
--- a/core/src/main/java/org/apache/rocketmq/jms/msg/RocketMQMapMessage.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms.msg;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageNotWriteableException;
-import org.apache.commons.lang.StringUtils;
-
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Boolean;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Byte;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2ByteArray;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Char;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Double;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Float;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Int;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Long;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2Short;
-import static 
org.apache.rocketmq.jms.support.PrimitiveTypeConverter.convert2String;
-
-/**
- * Message can only be accessed by a thread at a time.
- */
-public class RocketMQMapMessage extends RocketMQMessage implements MapMessage {
-
-    private Map<String, Object> map;
-
-    protected boolean readOnly;
-
-    public RocketMQMapMessage(Map<String, Object> map) {
-        this.map = map;
-    }
-
-    public RocketMQMapMessage() {
-        this.map = new HashMap();
-    }
-
-    @Override public boolean getBoolean(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Boolean(map.get(name));
-    }
-
-    private void checkName(String name) throws JMSException {
-        if (StringUtils.isBlank(name)) {
-            throw new JMSException("Name is required");
-        }
-    }
-
-    @Override public byte getByte(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Byte(map.get(name));
-    }
-
-    @Override public short getShort(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Short(map.get(name));
-    }
-
-    @Override public char getChar(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Char(map.get(name));
-    }
-
-    @Override public int getInt(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Int(map.get(name));
-    }
-
-    @Override public long getLong(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Long(map.get(name));
-    }
-
-    @Override public float getFloat(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Float(map.get(name));
-    }
-
-    @Override public double getDouble(String name) throws JMSException {
-        checkName(name);
-
-        return convert2Double(map.get(name));
-    }
-
-    @Override public String getString(String name) throws JMSException {
-        checkName(name);
-
-        return convert2String(map.get(name));
-    }
-
-    @Override public byte[] getBytes(String name) throws JMSException {
-        checkName(name);
-
-        return convert2ByteArray(map.get(name));
-    }
-
-    @Override public Object getObject(String name) throws JMSException {
-        checkName(name);
-
-        return map.get(name);
-    }
-
-    @Override public Enumeration getMapNames() throws JMSException {
-        return Collections.enumeration(map.keySet());
-    }
-
-    @Override public void setBoolean(String name, boolean value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    private void putProperty(String name, Object obj) throws JMSException {
-        if (isReadOnly()) {
-            throw new MessageNotWriteableException("Message is not writable");
-        }
-
-        checkName(name);
-
-        map.put(name, obj);
-    }
-
-    @Override public void setByte(String name, byte value) throws JMSException 
{
-        putProperty(name, value);
-    }
-
-    @Override public void setShort(String name, short value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setChar(String name, char value) throws JMSException 
{
-        putProperty(name, value);
-    }
-
-    @Override public void setInt(String name, int value) throws JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setLong(String name, long value) throws JMSException 
{
-        putProperty(name, value);
-    }
-
-    @Override public void setFloat(String name, float value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setDouble(String name, double value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setString(String name, String value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setBytes(String name, byte[] value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setBytes(String name, byte[] value, int offset, int 
length) throws JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public void setObject(String name, Object value) throws 
JMSException {
-        putProperty(name, value);
-    }
-
-    @Override public boolean itemExists(String name) throws JMSException {
-        checkName(name);
-
-        return map.containsKey(name);
-    }
-
-    @Override public void clearBody() {
-        super.clearBody();
-        this.map.clear();
-        this.readOnly = false;
-    }
-
-    protected boolean isReadOnly() {
-        return this.readOnly;
-    }
-
-    public void setReadOnly(boolean readOnly) {
-        this.readOnly = readOnly;
-    }
-}


Reply via email to