http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java 
b/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
new file mode 100644
index 0000000..62b5056
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import javax.jms.Message;
+import org.apache.rocketmq.jms.exception.MessageExpiredException;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class ReceiveMessageHookTest {
+
+    @Test(expected = MessageExpiredException.class)
+    public void testValidateFail() throws Exception {
+        ReceiveMessageHook hook = new ReceiveMessageHook();
+
+        Message message = new JMSTextMessage("text");
+        message.setJMSExpiration(new Date().getTime());
+        Thread.sleep(100);
+        hook.before(message);
+    }
+
+    @Test
+    public void testValidateSuccess() throws Exception {
+        ReceiveMessageHook hook = new ReceiveMessageHook();
+
+        Message message = new JMSTextMessage("text");
+        // never expired
+        message.setJMSExpiration(0);
+        hook.before(message);
+
+        // expired in the future
+        message.setJMSExpiration(new 
SimpleDateFormat("yyyy-MM-dd").parse("2999-01-01").getTime());
+        hook.before(message);
+    }
+
+    @Test
+    public void setProviderProperties() throws Exception {
+        ReceiveMessageHook hook = new ReceiveMessageHook();
+
+        Message message = new JMSTextMessage("text");
+        hook.before(message);
+
+        
assertThat(message.getLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name()), 
greaterThan(0L));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java 
b/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
new file mode 100644
index 0000000..29e91ec
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.hook;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import org.apache.rocketmq.jms.RocketMQProducer;
+import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SendMessageHookTest {
+
+    @Test(expected = UnsupportDeliveryModelException.class)
+    public void testValidate() throws Exception {
+        final JMSTextMessage message = new JMSTextMessage("text");
+        final RocketMQTopic destination = new RocketMQTopic("destination");
+        final int deliveryMode = DeliveryMode.NON_PERSISTENT;
+        final int priority = 4;
+        final long timeToLive = 1000 * 100L;
+
+        SendMessageHook hook = new SendMessageHook();
+        hook.before(message, destination, deliveryMode, priority, timeToLive);
+    }
+
+    @Test
+    public void testSetHeader() throws Exception {
+        RocketMQProducer producer = mock(RocketMQProducer.class);
+        when(producer.getDeliveryDelay()).thenReturn(0L);
+
+        final JMSTextMessage message = new JMSTextMessage("text");
+        final Destination destination = new RocketMQTopic("destination");
+        final int deliveryMode = DeliveryMode.PERSISTENT;
+        final int priority = 5;
+        long timeToLive = JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE;
+        SendMessageHook hook = new SendMessageHook(producer);
+        hook.before(message, destination, deliveryMode, priority, timeToLive);
+
+        assertThat(message.getJMSDestination(), is(destination));
+        assertThat(message.getJMSDeliveryMode(), 
is(JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE));
+        assertThat(message.getJMSExpiration(), is(0L));
+        assertThat(message.getJMSDeliveryTime(), notNullValue());
+        assertThat(message.getJMSPriority(), is(5));
+        assertThat(message.getJMSMessageID(), notNullValue());
+        assertThat(message.getJMSTimestamp(), notNullValue());
+    }
+
+    /**
+     * Disable ID,timestamp, and set expired time
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSetHeader2() throws Exception {
+        RocketMQProducer producer = mock(RocketMQProducer.class);
+        when(producer.getUserName()).thenReturn("user");
+        when(producer.getDisableMessageID()).thenReturn(true);
+        when(producer.getDisableMessageTimestamp()).thenReturn(true);
+
+        final JMSTextMessage message = new JMSTextMessage("text");
+        final Destination destination = new RocketMQTopic("destination");
+        final int deliveryMode = DeliveryMode.PERSISTENT;
+        final int priority = 5;
+        final long timeToLive = 1000 * 100L;
+        SendMessageHook hook = new SendMessageHook(producer);
+        hook.before(message, destination, deliveryMode, priority, timeToLive);
+
+        // assert header
+        assertThat(message.getJMSMessageID(), nullValue());
+        assertThat(message.getJMSTimestamp(), is(0L));
+        assertThat(message.getJMSExpiration(), not(is(0L)));
+
+        // assert properties
+        
assertThat(message.getStringProperty(JMSPropertiesEnum.JMSXUserID.name()), 
is("user"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java 
b/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
new file mode 100644
index 0000000..ae08bec
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source;
+
+import javax.jms.ConnectionFactory;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.annotation.EnableJms;
+import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.converter.SimpleMessageConverter;
+
+@Configuration
+@ComponentScan(basePackageClasses = {RocketMQServer.class})
+@EnableJms
+public class AppConfig {
+
+    @Bean
+    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
+        DefaultJmsListenerContainerFactory factory = new 
DefaultJmsListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory());
+        factory.setConcurrency("1");
+        return factory;
+    }
+
+    @Bean
+    public ConnectionFactory connectionFactory() {
+//        CachingConnectionFactory factory = new CachingConnectionFactory();
+//        factory.setTargetConnectionFactory(new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS));
+//        return factory;
+        //todo
+        return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS);
+    }
+
+    @Bean
+    public JmsTemplate jmsTemplate() {
+        JmsTemplate jmsTemplate = new JmsTemplate();
+        jmsTemplate.setConnectionFactory(connectionFactory());
+        jmsTemplate.setMessageConverter(new SimpleMessageConverter());
+        return jmsTemplate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java 
b/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java
new file mode 100644
index 0000000..0f7a6b1
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source;
+
+public class Constant {
+
+    public static final String NAME_SERVER_IP = "127.0.0.1";
+
+    public static final int NAME_SERVER_PORT = 9153;
+
+    public static final String NAME_SERVER_ADDRESS = NAME_SERVER_IP + ":" + 
NAME_SERVER_PORT;
+
+    public static final String BROKER_IP = "127.0.0.1";
+
+    public static final int BROKER_PORT = 9055;
+
+    public static final String BROKER_ADDRESS = BROKER_IP + ":" + BROKER_PORT;
+
+    public static final int BROKER_HA_PORT = 9043;
+
+    public static final String CLIENT_ID = "coffee";
+
+    public static final String CLIENT_ID_SECOND = "tea";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java 
b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java
new file mode 100644
index 0000000..e602601
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source;
+
+import com.google.common.collect.Sets;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import static 
org.apache.rocketmq.jms.integration.source.Constant.BROKER_ADDRESS;
+import static 
org.apache.rocketmq.jms.integration.source.Constant.NAME_SERVER_ADDRESS;
+
+@Service
+public class RocketMQAdmin {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQAdmin.class);
+
+    @Autowired
+    // make sure RocketMQServer start ahead
+    private RocketMQServer rocketMQServer;
+
+    //MQAdmin client
+    private DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+
+    @PostConstruct
+    public void start() {
+        // reduce rebalance waiting time
+        System.setProperty("rocketmq.client.rebalance.waitInterval", "1000");
+
+        defaultMQAdminExt.setNamesrvAddr(NAME_SERVER_ADDRESS);
+        try {
+            defaultMQAdminExt.start();
+            log.info("Start RocketMQAdmin Successfully");
+        }
+        catch (MQClientException e) {
+            log.error("Failed to start MQAdmin", e);
+            System.exit(1);
+        }
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        defaultMQAdminExt.shutdown();
+    }
+
+    public void createTopic(String topic) {
+        createTopic(topic, 1);
+    }
+
+    public void createTopic(String topic, int queueNum) {
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setReadQueueNums(queueNum);
+        topicConfig.setWriteQueueNums(queueNum);
+        try {
+            defaultMQAdminExt.createAndUpdateTopicConfig(BROKER_ADDRESS, 
topicConfig);
+        }
+        catch (Exception e) {
+            log.error("Create topic:{}, addr:{} failed:{}", topic, 
BROKER_ADDRESS, ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public void deleteTopic(String topic) {
+        try {
+            
defaultMQAdminExt.deleteTopicInBroker(Sets.newHashSet(BROKER_ADDRESS), topic);
+        }
+        catch (Exception e) {
+            log.error("Delete topic:{}, addr:{} failed:{}", topic, 
BROKER_ADDRESS, ExceptionUtils.getStackTrace(e));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java 
b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java
new file mode 100644
index 0000000..71ae6b1
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static java.io.File.separator;
+import static 
org.apache.rocketmq.jms.integration.source.Constant.BROKER_HA_PORT;
+import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_PORT;
+import static 
org.apache.rocketmq.jms.integration.source.Constant.NAME_SERVER_ADDRESS;
+
+@Service
+public class RocketMQServer {
+    public static Logger log = LoggerFactory.getLogger(RocketMQServer.class);
+    private final SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss");
+    private final String rootDir = System.getProperty("user.home") + separator 
+ "rocketmq-jms" + separator;
+    // fixed location of config files which is updated after RMQ3.2.6
+    private final String configDir = System.getProperty("user.home") + 
separator + "store/config";
+
+    private String serverDir;
+    private volatile boolean started = false;
+
+    //name server
+    private NamesrvConfig namesrvConfig = new NamesrvConfig();
+    private NettyServerConfig nameServerNettyServerConfig = new 
NettyServerConfig();
+    private NamesrvController namesrvController;
+
+    //broker
+    private final String brokerName = "JmsTestBrokerName";
+    private BrokerController brokerController;
+    private BrokerConfig brokerConfig = new BrokerConfig();
+    private NettyServerConfig nettyServerConfig = new NettyServerConfig();
+    private NettyClientConfig nettyClientConfig = new NettyClientConfig();
+    private MessageStoreConfig storeConfig = new MessageStoreConfig();
+
+    public RocketMQServer() {
+        this.storeConfig.setDiskMaxUsedSpaceRatio(95);
+    }
+
+    @PostConstruct
+    public void start() {
+        if (started) {
+            return;
+        }
+
+        createServerDir();
+
+        startNameServer();
+
+        startBroker();
+
+        started = true;
+
+        log.info("Start RocketServer Successfully");
+    }
+
+    private void createServerDir() {
+        for (int i = 0; i < 5; i++) {
+            serverDir = rootDir + sf.format(new Date());
+            final File file = new File(serverDir);
+            if (!file.exists()) {
+                return;
+            }
+        }
+        log.error("Has retry 5 times to register base dir,but still failed.");
+        System.exit(1);
+    }
+
+    private void startNameServer() {
+        namesrvConfig.setKvConfigPath(serverDir + separator + "namesrv" + 
separator + "kvConfig.json");
+        nameServerNettyServerConfig.setListenPort(Constant.NAME_SERVER_PORT);
+        namesrvController = new NamesrvController(namesrvConfig, 
nameServerNettyServerConfig);
+        try {
+            namesrvController.initialize();
+            log.info("Success to start Name Server:{}", NAME_SERVER_ADDRESS);
+            namesrvController.start();
+        }
+        catch (Exception e) {
+            log.error("Failed to start Name Server", e);
+            System.exit(1);
+        }
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, NAME_SERVER_ADDRESS);
+    }
+
+    private void startBroker() {
+        System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", 
"0.98");
+        brokerConfig.setBrokerName(brokerName);
+        brokerConfig.setBrokerIP1(Constant.BROKER_IP);
+        brokerConfig.setNamesrvAddr(NAME_SERVER_ADDRESS);
+        storeConfig.setStorePathRootDir(serverDir);
+        storeConfig.setStorePathCommitLog(serverDir + separator + "commitlog");
+        storeConfig.setHaListenPort(BROKER_HA_PORT);
+        nettyServerConfig.setListenPort(BROKER_PORT);
+        brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, storeConfig);
+
+        try {
+            brokerController.initialize();
+            log.info("Broker Start name:{} address:{}", 
brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+            brokerController.start();
+
+        }
+        catch (Exception e) {
+            log.error("Failed to start Broker", e);
+            System.exit(1);
+        }
+    }
+
+    @PreDestroy
+    private void shutdown() {
+        brokerController.shutdown();
+        namesrvController.shutdown();
+        deleteFile(new File(rootDir));
+        deleteFile(new File(configDir));
+    }
+
+    public void deleteFile(File file) {
+        if (!file.exists()) {
+            return;
+        }
+        if (file.isFile()) {
+            file.delete();
+        }
+        else if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                deleteFile(files[i]);
+            }
+            file.delete();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java
new file mode 100644
index 0000000..f4f8df8
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jms.annotation.JmsListener;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SimpleTextListener {
+
+    public static final String DESTINATION = "orderTest";
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    private List<String> receivedMsgs = new ArrayList();
+
+    public SimpleTextListener() {
+    }
+
+    @PostConstruct
+    public void init() {
+        this.rocketMQAdmin.createTopic(DESTINATION);
+    }
+
+    @PreDestroy
+    public void destroy() {
+        this.rocketMQAdmin.deleteTopic(DESTINATION);
+    }
+
+    @JmsListener(destination = DESTINATION)
+    public void processOrder(Message<String> message) {
+        receivedMsgs.add(message.getPayload());
+    }
+
+    public List<String> getReceivedMsg() {
+        return receivedMsgs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java
new file mode 100644
index 0000000..5136f37
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source.support;
+
+public interface ConditionMatcher {
+
+    boolean match();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java
new file mode 100644
index 0000000..e03c6c9
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.source.support;
+
+import org.apache.commons.lang.time.StopWatch;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimeLimitAssert {
+
+    public static void doAssert(ConditionMatcher conditionMatcher, int 
timeLimit) throws InterruptedException {
+        StopWatch watch = new StopWatch();
+        watch.start();
+
+        while (!conditionMatcher.match()) {
+            Thread.sleep(500);
+            if (watch.getTime() > timeLimit * 1000) {
+                assertFalse(String.format("Doesn't match assert condition in 
%s second", timeLimit), true);
+            }
+        }
+
+        assertTrue(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java
new file mode 100644
index 0000000..018e844
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.source.AppConfig;
+import org.apache.rocketmq.jms.integration.source.Constant;
+import org.apache.rocketmq.jms.integration.source.RocketMQAdmin;
+import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class ConsumeAsynchronousTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    @Test
+    public void testConsumeAsynchronous() throws Exception {
+        final String rmqTopicName = "coffee-async" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //producer
+            TextMessage message = session.createTextMessage("mocha 
coffee,please");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            //consumer
+            final List<Message> received = new ArrayList();
+            MessageConsumer consumer = session.createDurableConsumer(topic, 
"consumer");
+            consumer.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    received.add(message);
+                }
+            });
+
+            connection.start();
+
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return received.size() == 1;
+                }
+            }, 5);
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java
new file mode 100644
index 0000000..8a9fa41
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.test;
+
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.source.AppConfig;
+import org.apache.rocketmq.jms.integration.source.Constant;
+import org.apache.rocketmq.jms.integration.source.RocketMQAdmin;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class ConsumeSynchronousTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    @Test
+    public void testConsumeSynchronous() throws Exception {
+        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        connection.start();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //producer
+            TextMessage message = session.createTextMessage("a");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            //consumer
+            MessageConsumer consumer = session.createDurableConsumer(topic, 
"consumer");
+
+            connection.start();
+
+            Message msg = consumer.receive();
+
+            assertThat(msg, notNullValue());
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java
new file mode 100644
index 0000000..045d615
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.source.AppConfig;
+import org.apache.rocketmq.jms.integration.source.Constant;
+import org.apache.rocketmq.jms.integration.source.RocketMQAdmin;
+import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class NonDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    /**
+     * Test messages that producer after consumer inactive will not be 
delivered to consumer when it start again.
+     *
+     * <p>Test step:
+     * 1. Create a consumer and start the connection
+     * 2. Create a producer and send a message(msgA) to the topic subscribed 
by previous consumer
+     * 3. MsgA should be consumed successfully
+     * 4. Close the consumer and stop the connection
+     * 5. Producer sends a message(msgB) after the consumer closed
+     * 6. Create another consumer which is a non-durable one, and start the 
connection
+     * 7. Result: msgB should be consumed by the previous non-durable consumer
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConsumeNotDurable() throws Exception {
+        final String rmqTopicName = "coffee" + UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        connection.start();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //consumer
+            final List<Message> received = new ArrayList();
+            final MessageListener msgListener = new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    received.add(message);
+                }
+            };
+            MessageConsumer consumer = session.createConsumer(topic);
+            consumer.setMessageListener(msgListener);
+
+            connection.start();
+
+            Thread.sleep(1000 * 3);
+
+            //producer
+            TextMessage message = session.createTextMessage("a");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return received.size() == 1;
+                }
+            }, 3);
+
+            received.clear();
+
+            // close the consumer
+            connection.stop();
+            consumer.close();
+
+            // send message
+            TextMessage lostMessage = session.createTextMessage("b");
+            producer.send(lostMessage);
+
+            Thread.sleep(1000 * 2);
+
+            // start the non-durable consumer again
+            consumer = session.createConsumer(topic, "topic");
+            consumer.setMessageListener(msgListener);
+            connection.start();
+
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return received.size() == 0;
+                }
+            }, 5);
+
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java
new file mode 100644
index 0000000..c3731de
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.integration.source.AppConfig;
+import org.apache.rocketmq.jms.integration.source.Constant;
+import org.apache.rocketmq.jms.integration.source.RocketMQAdmin;
+import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class SharedDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    /**
+     * Test messages will be deliver to every consumer if these consumers are 
in shared durable subscription.
+     *
+     * <p>Test step:
+     * 1. Create a share durable consumer(consumerA) via the first 
connection(connectionA)
+     * 2. Create a share durable consumer(consumerB) via another 
connection(connectionB)
+     * 3. The two consumer must subscribe the same topic with identical 
subscriptionName,
+     * and they also have the same clientID.
+     * 4. Send several(eg:10) messages to this topic
+     * 5. Result: all messages should be received by both consumerA and 
consumerB
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConsumeAllMessages() throws Exception {
+        final String rmqTopicName = "coffee" + UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connectionA = null, connectionB = null;
+        final String subscriptionName = "MySubscription";
+        final List<Message> receivedA = new ArrayList(), receivedB = new 
ArrayList();
+
+        try {
+            // consumerA
+            connectionA = factory.createConnection();
+            Session sessionA = connectionA.createSession();
+            connectionA.start();
+            Topic topic = sessionA.createTopic(rmqTopicName);
+            MessageConsumer consumerA = 
sessionA.createSharedDurableConsumer(topic, subscriptionName);
+            consumerA.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    receivedA.add(message);
+                }
+            });
+
+            // consumerB
+            connectionB = factory.createConnection();
+            Session sessionB = connectionB.createSession();
+            MessageConsumer consumerB = 
sessionB.createSharedDurableConsumer(topic, subscriptionName);
+            consumerB.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    receivedB.add(message);
+                }
+            });
+            connectionB.start();
+
+            //producer
+            TextMessage message = sessionA.createTextMessage("a");
+            MessageProducer producer = sessionA.createProducer(topic);
+            for (int i = 0; i < 10; i++) {
+                producer.send(message);
+            }
+
+            Thread.sleep(1000 * 2);
+
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return receivedA.size()==10 && receivedB.size()==10;
+                }
+            },5);
+        }
+        finally {
+            connectionA.close();
+            connectionB.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java
new file mode 100644
index 0000000..809dbae
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.apache.rocketmq.jms.RocketMQSession;
+import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException;
+import org.apache.rocketmq.jms.integration.source.AppConfig;
+import org.apache.rocketmq.jms.integration.source.Constant;
+import org.apache.rocketmq.jms.integration.source.RocketMQAdmin;
+import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class UnsharedDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    /**
+     * Test each message will be deliver to only one consumer if these 
consumers are in unshared durable subscription.
+     *
+     * <p>Test step:
+     * 1. Create a unshared durable consumer(consumerA) via the first 
connection(connectionA)
+     * 2. Create a unshared durable consumer(consumerB) via another 
connection(connectionB)
+     * 3. Result:
+     * a. The creating consumerB should throw a JMSException as consumerA and 
consumberB have the same subscription
+     * b. All messages should be received by consumerA
+     *
+     * @throws Exception
+     * @see {@link RocketMQSession}
+     */
+    @Test
+    public void testEachMessageOnlyConsumeByOneConsumer() throws Exception {
+        final String rmqTopicName = "coffee" + UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName, 2);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connectionA = null, connectionB = null;
+        final String subscriptionName = "MySubscription";
+        final List<Message> receivedA = new ArrayList();
+
+        try {
+            // consumerA
+            connectionA = factory.createConnection();
+            Session sessionA = connectionA.createSession();
+            connectionA.start();
+            Topic topic = sessionA.createTopic(rmqTopicName);
+            MessageConsumer consumerA = sessionA.createDurableConsumer(topic, 
subscriptionName);
+            consumerA.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    receivedA.add(message);
+                }
+            });
+
+            Thread.sleep(1000 * 2);
+
+            // consumerB
+            try {
+                connectionB = factory.createConnection();
+                Session sessionB = connectionB.createSession();
+                sessionB.createDurableConsumer(topic, subscriptionName);
+                assertFalse("Doesn't get the expected " + 
DuplicateSubscriptionException.class.getSimpleName(), true);
+            }
+            catch (DuplicateSubscriptionException e) {
+                assertTrue(true);
+            }
+
+            connectionA.start();
+
+            //producer
+            TextMessage message = sessionA.createTextMessage("a");
+            MessageProducer producer = sessionA.createProducer(topic);
+            for (int i = 0; i < 10; i++) {
+                producer.send(message);
+            }
+
+            TimeLimitAssert.doAssert(new ConditionMatcher() {
+                @Override public boolean match() {
+                    return receivedA.size() == 10;
+                }
+            }, 5);
+        }
+        finally {
+            connectionA.close();
+            connectionB.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java
 
b/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java
new file mode 100644
index 0000000..920a49d
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration.test.listener;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.rocketmq.jms.integration.source.AppConfig;
+import org.apache.rocketmq.jms.integration.source.SimpleTextListener;
+import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher;
+import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static 
org.apache.rocketmq.jms.integration.source.SimpleTextListener.DESTINATION;
+
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class SimpleTextListenerTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleTextListenerTest.class);
+
+    @Autowired
+    private JmsTemplate jmsTemplate;
+
+    @Autowired
+    private SimpleTextListener simpleTextListener;
+
+    @Test
+    public void testListener() throws Exception {
+        jmsTemplate.convertAndSend(DESTINATION, "first");
+        StopWatch watch = new StopWatch();
+        watch.start();
+
+        TimeLimitAssert.doAssert(new ConditionMatcher() {
+            @Override public boolean match() {
+                return simpleTextListener.getReceivedMsg().size() == 1;
+            }
+        }, 60);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java
new file mode 100644
index 0000000..20520f6
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class JMSBytesMessageTest {
+
+    private byte[] receiveData = "receive data test".getBytes();
+    private byte[] sendData = "send data test".getBytes();
+
+    @Test
+    public void testGetData() throws Exception {
+        JMSBytesMessage readMessage = new JMSBytesMessage(receiveData);
+        assertThat(new String(receiveData), is(new 
String(readMessage.getBody())));
+
+        JMSBytesMessage sendMessage = new JMSBytesMessage();
+        sendMessage.writeBytes(sendData, 0, sendData.length);
+        assertThat(new String(sendData), is(new 
String(sendMessage.getBody())));
+    }
+
+    @Test
+    public void testGetBodyLength() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage(receiveData);
+        assertThat(msg.getBodyLength(), is(new Long(receiveData.length)));
+    }
+
+    @Test
+    public void testReadBytes1() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage(receiveData);
+        byte[] receiveValue = new byte[receiveData.length];
+        msg.readBytes(receiveValue);
+        assertThat(new String(receiveValue), is(new String(receiveData)));
+    }
+
+    @Test
+    public void testReadBytes2() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage(receiveData);
+
+        byte[] receiveValue1 = new byte[2];
+        msg.readBytes(receiveValue1);
+        assertThat(new String(receiveData).substring(0, 2), is(new 
String(receiveValue1)));
+
+        byte[] receiveValue2 = new byte[2];
+        msg.readBytes(receiveValue2);
+        assertThat(new String(receiveData).substring(2, 4), is(new 
String(receiveValue2)));
+
+    }
+
+    @Test
+    public void testWriteBytes() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage();
+        msg.writeBytes(sendData);
+        assertThat(new String(msg.getBody()), is(new String(sendData)));
+    }
+
+    @Test(expected = MessageNotReadableException.class)
+    public void testNotReadableException() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage();
+        msg.writeBoolean(true);
+        msg.readBoolean();
+    }
+
+    @Test(expected = MessageNotWriteableException.class)
+    public void testNotWritableException() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage(receiveData);
+        msg.writeBoolean(true);
+    }
+
+    @Test
+    public void testClearBody() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage(receiveData);
+        msg.clearBody();
+        msg.writeBoolean(true);
+    }
+
+    @Test
+    public void testReset() throws Exception {
+        JMSBytesMessage msg = new JMSBytesMessage(receiveData);
+        byte[] b = new byte[2];
+        msg.readBytes(b);
+        msg.reset();
+        msg.readBytes(b);
+        assertThat(new String(receiveData).substring(0, 2), is(new String(b)));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java
new file mode 100644
index 0000000..cb34653
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class JMSMapMessageTest {
+
+    @Test
+    public void testGetBoolean() throws Exception {
+        JMSMapMessage msg = new JMSMapMessage();
+
+        // get an empty value will return false
+        assertThat(msg.getBoolean("man"), is(false));
+
+        // get an not empty value
+        msg.setBoolean("man", true);
+        assertThat(msg.getBoolean("man"), is(true));
+
+        // key is null
+        try {
+            msg.getBoolean(null);
+            assertTrue(false);
+        }
+        catch (JMSException e) {
+            assertTrue(true);
+        }
+
+        // in read-only model
+        msg.setReadOnly(true);
+        try {
+            msg.setBoolean("man", true);
+            assertTrue(false);
+        }
+        catch (MessageNotWriteableException e) {
+            assertTrue(true);
+        }
+
+        // both read and write are allowed after clearBody()
+        msg.clearBody();
+        msg.setBoolean("man", false);
+        msg.getBoolean("man");
+
+        // map is empty after clearBody()
+        msg.clearBody();
+        assertThat(msg.getBoolean("man"), is(false));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java
new file mode 100644
index 0000000..63c03ae
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import java.io.Serializable;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class JMSObjectMessageTest {
+
+    @Test
+    public void testGetObject() throws Exception {
+        final User user = new User("jack", 20);
+        JMSObjectMessage msg = new JMSObjectMessage(user);
+        assertThat((User)msg.getObject(), is(user));
+    }
+
+    @Test
+    public void testGetBody() throws Exception {
+        final User user = new User("jack", 20);
+        JMSObjectMessage msg = new JMSObjectMessage(user);
+        assertThat((User)msg.getBody(Object.class), is((User)msg.getObject()));
+    }
+
+    private class User implements Serializable {
+        private String name;
+        private int age;
+
+        private User(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return EqualsBuilder.reflectionEquals(this, obj);
+        }
+
+        public int getAge() {
+            return age;
+        }
+
+        public void setAge(int age) {
+            this.age = age;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java
new file mode 100644
index 0000000..d9c0cac
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class JMSTextMessageTest {
+    private String text = "jmsRocketMQTextMessage test";
+
+    @Test
+    public void testGetBody() throws Exception {
+        JMSTextMessage msg = new JMSTextMessage(text);
+        assertThat(msg.getBody(String.class), is(text));
+    }
+
+    @Test
+    public void testSetText() throws Exception {
+        JMSTextMessage msg = new JMSTextMessage();
+        msg.setText(text);
+        assertThat(msg.getText(), is(text));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
 
b/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
new file mode 100644
index 0000000..13a048c
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.convert;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.apache.rocketmq.jms.msg.AbstractJMSMessage;
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum;
+import org.junit.Test;
+
+import static 
org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class JMS2RMQMessageConvertTest {
+
+    @Test
+    public void testConvert() throws Exception {
+        AbstractJMSMessage jmsMessage = new JMSTextMessage("text");
+
+        // given
+        jmsMessage.setJMSDestination(new RocketMQTopic("topic"));
+        jmsMessage.setJMSMessageID("ID:XXX");
+        jmsMessage.setJMSTimestamp(1488273583542L);
+        jmsMessage.setJMSExpiration(0L);
+
+        jmsMessage.setStringProperty("MyProperty", "MyValue");
+
+        // when
+        MessageExt rmqMessage = JMS2RMQMessageConvert.convert(jmsMessage);
+
+        // then
+        assertThat(rmqMessage.getTopic(), is("topic"));
+        
assertThat(rmqMessage.getUserProperty(JMSHeaderEnum.JMSMessageID.name()), 
is("ID:XXX"));
+        assertThat(rmqMessage.getBornTimestamp(), is(1488273583542L));
+        
assertThat(rmqMessage.getUserProperty(JMSHeaderEnum.JMSExpiration.name()), 
is("0"));
+        assertThat(rmqMessage.getKeys(), is("ID:XXX"));
+
+        
assertThat(rmqMessage.getUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX
 + "MyProperty"), is("MyValue"));
+        assertThat(rmqMessage.getUserProperty(MSG_MODEL_NAME), 
is(JMSMessageModelEnum.STRING.name()));
+        assertThat(new String(rmqMessage.getBody()), is("text"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
 
b/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
new file mode 100644
index 0000000..1d5bb11
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.convert;
+
+import javax.jms.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum;
+import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum;
+import org.apache.rocketmq.jms.support.JMSUtils;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class RMQ2JMSMessageConvertTest {
+
+    @Test
+    public void testConvert() throws Exception {
+        MessageExt rmqMessage = new MessageExt();
+
+        // given
+        rmqMessage.setBody("body".getBytes());
+        rmqMessage.putUserProperty(JMSMessageModelEnum.MSG_MODEL_NAME, 
JMSMessageModelEnum.BYTE.name());
+        rmqMessage.putUserProperty(JMSHeaderEnum.JMSMessageID.name(), 
"ID:YYY");
+        rmqMessage.setBornTimestamp(1488273585542L);
+        rmqMessage.putUserProperty(JMSHeaderEnum.JMSExpiration.name(), "0");
+        rmqMessage.setReconsumeTimes(2);
+        rmqMessage.setTopic("topic");
+
+        rmqMessage.putUserProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), 
"2");
+        rmqMessage.putUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX 
+ "MyProperty", "MyValue");
+
+        // when
+        Message jmsMessage = RMQ2JMSMessageConvert.convert(rmqMessage);
+
+        // then
+        assertThat(JMSBytesMessage.class.isInstance(jmsMessage), is(true));
+        assertThat(jmsMessage.getJMSMessageID(), is("ID:YYY"));
+        assertThat(jmsMessage.getJMSTimestamp(), is(1488273585542L));
+        assertThat(jmsMessage.getJMSExpiration(), is(0L));
+        assertThat(jmsMessage.getJMSRedelivered(), is(true));
+        
assertThat(JMSUtils.getDestinationName(jmsMessage.getJMSDestination()), 
is("topic"));
+
+        assertThat(jmsMessage.getStringProperty("MyProperty"), is("MyValue"));
+        
assertThat(jmsMessage.getIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name()),
 is(3));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java
new file mode 100644
index 0000000..28255dd
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.enums;
+
+import org.apache.rocketmq.jms.msg.JMSTextMessage;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class JMSMessageModelEnumTest {
+    @Test
+    public void testToMsgModelEnum() throws Exception {
+        assertThat(JMSMessageModelEnum.toMsgModelEnum(new 
JMSTextMessage("text")), is(JMSMessageModelEnum.STRING));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java
new file mode 100644
index 0000000..5204fb3
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class MapSerializeTest {
+
+    @Test
+    public void serializeAndDeserialize() throws Exception {
+        Map map = new HashMap();
+        map.put("name", "John");
+        map.put("age", 20);
+
+        byte[] bytes = MapSerialize.instance().serialize(map);
+        Map newMap = MapSerialize.instance().deserialize(bytes);
+
+        assertThat(map.size(), is(newMap.size()));
+        assertThat(newMap.get("name").toString(), is("John"));
+        assertThat(newMap.get("age").toString(), is("20"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java
new file mode 100644
index 0000000..1661b08
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import java.io.Serializable;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class ObjectSerializeTest {
+
+    @Test
+    public void serializeAndDeserialize() throws Exception {
+        Person person = new Person();
+        person.setName("John");
+        person.setAge(30);
+
+        byte[] bytes = ObjectSerialize.instance().serialize(person);
+        Person newPerson = 
(Person)ObjectSerialize.instance().deserialize(bytes);
+
+        assertThat(newPerson.getName(), is(person.getName()));
+        assertThat(newPerson.getAge(), is(person.getAge()));
+    }
+
+    private static class Person implements Serializable {
+        private static final long serialVersionUID = -4981805070659153282L;
+
+        private String name;
+        private int age;
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public int getAge() {
+            return age;
+        }
+
+        public void setAge(int age) {
+            this.age = age;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java 
b/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java
new file mode 100644
index 0000000..4e6a54a
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.msg.serialize;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class StringSerializeTest {
+
+    @Test
+    public void serializeAndDeserialize() throws Exception {
+        String text = "MyText";
+
+        byte[] bytes = StringSerialize.instance().serialize(text);
+        String newText = StringSerialize.instance().deserialize(bytes);
+
+        assertThat(text, is(newText));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java 
b/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
new file mode 100644
index 0000000..db15fee
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.support;
+
+import org.apache.rocketmq.jms.destination.RocketMQQueue;
+import org.apache.rocketmq.jms.destination.RocketMQTopic;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+
+public class JMSUtilsTest {
+
+    @Test
+    public void getTopicName() throws Exception {
+        RocketMQTopic topic = new RocketMQTopic("topic");
+        assertThat(JMSUtils.getDestinationName(topic), is("topic"));
+
+        RocketMQQueue queue = new RocketMQQueue("queue");
+        assertThat(JMSUtils.getDestinationName(queue), is("queue"));
+    }
+
+    @Test
+    public void getConsumerGroup() throws Exception {
+        final String subscriptionName = "subscriptionName";
+        final String clientID = "clientID";
+        String consumerGroupA = JMSUtils.getConsumerGroup(subscriptionName, 
clientID, true);
+        assertThat(consumerGroupA.contains(subscriptionName), is(true));
+        assertThat(consumerGroupA.contains(clientID), is(true));
+        assertThat(consumerGroupA.substring(subscriptionName.length() + 
clientID.length() + 2).length(), is(36));
+
+        String consumerGroupB = JMSUtils.getConsumerGroup(subscriptionName, 
clientID, false);
+        assertThat(consumerGroupB.contains(subscriptionName), is(true));
+        assertThat(consumerGroupB.contains(clientID), is(true));
+        assertThat(consumerGroupB.length(), is(subscriptionName.length() + 
clientID.length() + 1));
+
+        String consumerGroupC = JMSUtils.getConsumerGroup(null, null, true);
+        assertThat(consumerGroupC.length(), is(36));
+    }
+
+    @Test
+    public void uuid() throws Exception {
+        assertThat(JMSUtils.uuid(), notNullValue());
+    }
+
+}
\ No newline at end of file


Reply via email to