Add integration test: UnDurableConsumerTest, and fix some bugs

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/e1a80ff8
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/e1a80ff8
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/e1a80ff8

Branch: refs/heads/jms-dev-1.1.0
Commit: e1a80ff8ebc3922c8ff92e2e924aa4599cdef6a6
Parents: b9a7ffc
Author: zhangke <zhangke_beij...@qq.com>
Authored: Tue Feb 28 23:49:20 2017 +0800
Committer: zhangke <zhangke_beij...@qq.com>
Committed: Tue Feb 28 23:49:20 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/jms/DeliverMessageService.java     |   5 +-
 .../apache/rocketmq/jms/RocketMQConnection.java |   1 -
 .../apache/rocketmq/jms/RocketMQSession.java    |   4 +-
 .../jms/msg/convert/JMS2RMQMessageConvert.java  |   4 +-
 .../jms/msg/convert/RMQ2JMSMessageConvert.java  |   7 +-
 .../msg/convert/JMS2RMQMessageConvertTest.java  |   2 +-
 .../msg/convert/RMQ2JMSMessageConvertTest.java  |   4 +-
 .../rocketmq/jms/integration/AppConfig.java     |   1 +
 test/src/main/resources/logback.xml             |  56 +++++++++
 .../integration/ConsumeAsynchronousTest.java    |  87 +++++++++++++
 .../jms/integration/ConsumeNormallyTest.java    | 122 ------------------
 .../jms/integration/ConsumeSynchronousTest.java |  79 ++++++++++++
 .../jms/integration/UnDurableConsumeTest.java   | 123 +++++++++++++++++++
 13 files changed, 361 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
index e86ce92..5cc412e 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
@@ -127,7 +127,7 @@ public class DeliverMessageService extends ServiceThread {
                 pullMessage();
             }
             catch (InterruptedException e) {
-                log.info("Pulling messages service has been interrupted");
+                log.debug("Pulling messages service has been interrupted");
             }
             catch (Exception e) {
                 log.error("Error during pulling messages", e);
@@ -245,7 +245,6 @@ public class DeliverMessageService extends ServiceThread {
     }
 
     public void close() {
-        log.info("Begin to close message delivery service:{}", 
getServiceName());
 
         this.stop();
 
@@ -253,7 +252,7 @@ public class DeliverMessageService extends ServiceThread {
 
         this.shutdown(true);
 
-        log.info("Success to close message delivery service:{}", 
getServiceName());
+        log.debug("Success to close message delivery service:{}", 
getServiceName());
     }
 
     public void setMessageSelector(String messageSelector) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
index db6d897..727ebca 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java
@@ -202,7 +202,6 @@ public class RocketMQConnection implements Connection {
 
     @Override
     public void close() throws JMSException {
-        log.info("Begin to close connection:{}", toString());
 
         for (RocketMQSession session : sessionList) {
             session.close();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java 
b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
index b42d4bb..8825dff 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java
@@ -45,6 +45,7 @@ import javax.jms.TopicSubscriber;
 import org.apache.rocketmq.jms.destination.RocketMQQueue;
 import org.apache.rocketmq.jms.destination.RocketMQTopic;
 import org.apache.rocketmq.jms.msg.JMSBytesMessage;
+import org.apache.rocketmq.jms.msg.JMSMapMessage;
 import org.apache.rocketmq.jms.msg.JMSObjectMessage;
 import org.apache.rocketmq.jms.msg.JMSTextMessage;
 import org.slf4j.Logger;
@@ -90,8 +91,7 @@ public class RocketMQSession implements Session {
 
     @Override
     public MapMessage createMapMessage() throws JMSException {
-        //todo
-        throw new JMSException("Not support yet");
+        return new JMSMapMessage();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
index 7495171..ca2cbed 100644
--- 
a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java
@@ -30,6 +30,8 @@ import static 
org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.toMsgModelEn
 
 public class JMS2RMQMessageConvert {
 
+    public static final String USER_PROPERTY_PREFIX = "USER:";
+
     public static MessageExt convert(AbstractJMSMessage jmsMsg) throws 
Exception {
         MessageExt rmqMsg = new MessageExt();
 
@@ -53,7 +55,7 @@ public class JMS2RMQMessageConvert {
     private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt 
rmqMsg) {
         Map<String, Object> userProps = jmsMsg.getProperties();
         for (Map.Entry<String, Object> entry : userProps.entrySet()) {
-            rmqMsg.putUserProperty(entry.getKey(), 
entry.getValue().toString());
+            rmqMsg.putUserProperty(new 
StringBuffer(USER_PROPERTY_PREFIX).append(entry.getKey()).toString(), 
entry.getValue().toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
index 7f9bb77..4adb692 100644
--- 
a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
+++ 
b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize;
 import org.apache.rocketmq.jms.msg.serialize.StringSerialize;
 
 import static java.lang.String.format;
+import static 
org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert.USER_PROPERTY_PREFIX;
 import static 
org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME;
 
 public class RMQ2JMSMessageConvert {
@@ -92,8 +93,10 @@ public class RMQ2JMSMessageConvert {
         Map<String, String> propertiesMap = rmqMsg.getProperties();
         if (propertiesMap != null) {
             for (String properName : propertiesMap.keySet()) {
-                String properValue = propertiesMap.get(properName);
-                jmsMsg.setStringProperty(properName, properValue);
+                if (properName.startsWith(USER_PROPERTY_PREFIX)) {
+                    String properValue = propertiesMap.get(properName);
+                    
jmsMsg.setStringProperty(properName.substring(USER_PROPERTY_PREFIX.length()), 
properValue);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
 
b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
index acb3637..13a048c 100644
--- 
a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
+++ 
b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java
@@ -53,7 +53,7 @@ public class JMS2RMQMessageConvertTest {
         
assertThat(rmqMessage.getUserProperty(JMSHeaderEnum.JMSExpiration.name()), 
is("0"));
         assertThat(rmqMessage.getKeys(), is("ID:XXX"));
 
-        assertThat(rmqMessage.getUserProperty("MyProperty"), is("MyValue"));
+        
assertThat(rmqMessage.getUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX
 + "MyProperty"), is("MyValue"));
         assertThat(rmqMessage.getUserProperty(MSG_MODEL_NAME), 
is(JMSMessageModelEnum.STRING.name()));
         assertThat(new String(rmqMessage.getBody()), is("text"));
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
 
b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
index 530cc12..1d5bb11 100644
--- 
a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
+++ 
b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java
@@ -45,7 +45,7 @@ public class RMQ2JMSMessageConvertTest {
         rmqMessage.setTopic("topic");
 
         rmqMessage.putUserProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), 
"2");
-        rmqMessage.putUserProperty("MyProperty", "MyValue");
+        rmqMessage.putUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX 
+ "MyProperty", "MyValue");
 
         // when
         Message jmsMessage = RMQ2JMSMessageConvert.convert(rmqMessage);
@@ -59,7 +59,7 @@ public class RMQ2JMSMessageConvertTest {
         
assertThat(JMSUtils.getDestinationName(jmsMessage.getJMSDestination()), 
is("topic"));
 
         assertThat(jmsMessage.getStringProperty("MyProperty"), is("MyValue"));
-        
assertThat(jmsMessage.getIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name()),
 is(2));
+        
assertThat(jmsMessage.getIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name()),
 is(3));
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java 
b/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java
index dc1f3ef..b7b2a43 100644
--- a/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java
+++ b/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java
@@ -45,6 +45,7 @@ public class AppConfig {
 //        CachingConnectionFactory factory = new CachingConnectionFactory();
 //        factory.setTargetConnectionFactory(new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS));
 //        return factory;
+        //todo
         return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/test/src/main/resources/logback.xml 
b/test/src/main/resources/logback.xml
new file mode 100644
index 0000000..6757b08
--- /dev/null
+++ b/test/src/main/resources/logback.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="DefaultAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmq/jms.log</file>
+        <append>true</append>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <append>true</append>
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.rocketmq.jms">
+        <level value="INFO"/>
+    </logger>
+
+    <root>
+        <level value="ERROR"/>
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
new file mode 100644
index 0000000..fc2cc34
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class ConsumeAsynchronousTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    @Test
+    public void testConsumeAsynchronous() throws Exception {
+        final String rmqTopicName = "coffee-async" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        connection.start();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //producer
+            TextMessage message = session.createTextMessage("mocha 
coffee,please");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            //consumer
+            final List<Message> received = new ArrayList();
+            MessageConsumer consumer = session.createDurableConsumer(topic, 
"consumer");
+            consumer.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    received.add(message);
+                }
+            });
+
+            connection.start();
+
+            Thread.sleep(1000 * 5);
+
+            assertThat(received.size(), is(1));
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java
deleted file mode 100644
index 9467117..0000000
--- 
a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.jms.integration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.rocketmq.jms.RocketMQConnectionFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsNull.notNullValue;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes = AppConfig.class)
-public class ConsumeNormallyTest {
-
-    @Autowired
-    private RocketMQAdmin rocketMQAdmin;
-
-    @Test
-    public void testConsumeSynchronous() throws Exception {
-        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
-        rocketMQAdmin.createTopic(rmqTopicName);
-
-        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
-        Connection connection = factory.createConnection();
-        Session session = connection.createSession();
-        connection.start();
-        Topic topic = session.createTopic(rmqTopicName);
-
-        try {
-            //producer
-            TextMessage message = session.createTextMessage("a");
-            MessageProducer producer = session.createProducer(topic);
-            producer.send(message);
-
-            //consumer
-            MessageConsumer consumer = session.createDurableConsumer(topic, 
"consumer");
-
-            connection.start();
-
-            Message msg = consumer.receive();
-
-            assertThat(msg, notNullValue());
-        }
-        finally {
-            connection.close();
-            rocketMQAdmin.deleteTopic(rmqTopicName);
-        }
-
-    }
-
-    @Test
-    public void testConsumeAsynchronous() throws Exception {
-        final String rmqTopicName = "coffee-async" + 
UUID.randomUUID().toString();
-        rocketMQAdmin.createTopic(rmqTopicName);
-
-        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
-        Connection connection = factory.createConnection();
-        Session session = connection.createSession();
-        connection.start();
-        Topic topic = session.createTopic(rmqTopicName);
-
-        try {
-            //producer
-            TextMessage message = session.createTextMessage("mocha 
coffee,please");
-            MessageProducer producer = session.createProducer(topic);
-            producer.send(message);
-
-            //consumer
-            final List<Message> received = new ArrayList();
-            MessageConsumer consumer = session.createDurableConsumer(topic, 
"consumer");
-            consumer.setMessageListener(new MessageListener() {
-                @Override public void onMessage(Message message) {
-                    received.add(message);
-                }
-            });
-
-            connection.start();
-
-            Thread.sleep(1000 * 5);
-
-            assertThat(received.size(), is(1));
-        }
-        finally {
-            connection.close();
-            rocketMQAdmin.deleteTopic(rmqTopicName);
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java
new file mode 100644
index 0000000..a7a7e5e
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class ConsumeSynchronousTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    @Test
+    public void testConsumeSynchronous() throws Exception {
+        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        connection.start();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //producer
+            TextMessage message = session.createTextMessage("a");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            //consumer
+            MessageConsumer consumer = session.createDurableConsumer(topic, 
"consumer");
+
+            connection.start();
+
+            Message msg = consumer.receive();
+
+            assertThat(msg, notNullValue());
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
new file mode 100644
index 0000000..c03e86d
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class UnDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    /**
+     * Test messages that producer after consumer inactive will not be 
delivered to consumer when it start again.
+     *
+     * <p>Test step:
+     * 1. Create a consumer and start the connection
+     * 2. Create a producer and send a message(msgA) to the topic subscribed 
by previous consumer
+     * 3. MsgA should be consumed successfully
+     * 4. Close the consumer and stop the connection
+     * 5. Producer sends a message(msgB) after the consumer closed
+     * 6. Create another consumer which is a un-durable one, and start the 
connection
+     * 7. Result: msgB should be consumed by the previous un-durable consumer
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConsumeNotDurable() throws Exception {
+        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        connection.start();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //consumer
+            final List<Message> received = new ArrayList();
+            final MessageListener msgListener = new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    received.add(message);
+                }
+            };
+            MessageConsumer consumer = session.createConsumer(topic);
+            consumer.setMessageListener(msgListener);
+
+            connection.start();
+
+            //producer
+            TextMessage message = session.createTextMessage("a");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            Thread.sleep(1000 * 2);
+
+            assertThat(received.size(), is(1));
+            received.clear();
+
+            // close the consumer
+            connection.stop();
+            consumer.close();
+
+            // send message
+            TextMessage lostMessage = session.createTextMessage("b");
+            producer.send(lostMessage);
+
+            Thread.sleep(1000 * 2);
+
+            // start the un-durable consumer again
+            consumer = session.createConsumer(topic, "topic");
+            consumer.setMessageListener(msgListener);
+            connection.start();
+
+            Thread.sleep(1000 * 3);
+
+            assertThat(received.size(), is(0));
+
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

Reply via email to