Add integration test: UnDurableConsumerTest, and fix some bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/e1a80ff8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/e1a80ff8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/e1a80ff8 Branch: refs/heads/jms-dev-1.1.0 Commit: e1a80ff8ebc3922c8ff92e2e924aa4599cdef6a6 Parents: b9a7ffc Author: zhangke <zhangke_beij...@qq.com> Authored: Tue Feb 28 23:49:20 2017 +0800 Committer: zhangke <zhangke_beij...@qq.com> Committed: Tue Feb 28 23:49:20 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/jms/DeliverMessageService.java | 5 +- .../apache/rocketmq/jms/RocketMQConnection.java | 1 - .../apache/rocketmq/jms/RocketMQSession.java | 4 +- .../jms/msg/convert/JMS2RMQMessageConvert.java | 4 +- .../jms/msg/convert/RMQ2JMSMessageConvert.java | 7 +- .../msg/convert/JMS2RMQMessageConvertTest.java | 2 +- .../msg/convert/RMQ2JMSMessageConvertTest.java | 4 +- .../rocketmq/jms/integration/AppConfig.java | 1 + test/src/main/resources/logback.xml | 56 +++++++++ .../integration/ConsumeAsynchronousTest.java | 87 +++++++++++++ .../jms/integration/ConsumeNormallyTest.java | 122 ------------------ .../jms/integration/ConsumeSynchronousTest.java | 79 ++++++++++++ .../jms/integration/UnDurableConsumeTest.java | 123 +++++++++++++++++++ 13 files changed, 361 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java index e86ce92..5cc412e 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java @@ -127,7 +127,7 @@ public class DeliverMessageService extends ServiceThread { pullMessage(); } catch (InterruptedException e) { - log.info("Pulling messages service has been interrupted"); + log.debug("Pulling messages service has been interrupted"); } catch (Exception e) { log.error("Error during pulling messages", e); @@ -245,7 +245,6 @@ public class DeliverMessageService extends ServiceThread { } public void close() { - log.info("Begin to close message delivery service:{}", getServiceName()); this.stop(); @@ -253,7 +252,7 @@ public class DeliverMessageService extends ServiceThread { this.shutdown(true); - log.info("Success to close message delivery service:{}", getServiceName()); + log.debug("Success to close message delivery service:{}", getServiceName()); } public void setMessageSelector(String messageSelector) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java index db6d897..727ebca 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQConnection.java @@ -202,7 +202,6 @@ public class RocketMQConnection implements Connection { @Override public void close() throws JMSException { - log.info("Begin to close connection:{}", toString()); for (RocketMQSession session : sessionList) { session.close(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java index b42d4bb..8825dff 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java +++ b/core/src/main/java/org/apache/rocketmq/jms/RocketMQSession.java @@ -45,6 +45,7 @@ import javax.jms.TopicSubscriber; import org.apache.rocketmq.jms.destination.RocketMQQueue; import org.apache.rocketmq.jms.destination.RocketMQTopic; import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.JMSMapMessage; import org.apache.rocketmq.jms.msg.JMSObjectMessage; import org.apache.rocketmq.jms.msg.JMSTextMessage; import org.slf4j.Logger; @@ -90,8 +91,7 @@ public class RocketMQSession implements Session { @Override public MapMessage createMapMessage() throws JMSException { - //todo - throw new JMSException("Not support yet"); + return new JMSMapMessage(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java index 7495171..ca2cbed 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvert.java @@ -30,6 +30,8 @@ import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.toMsgModelEn public class JMS2RMQMessageConvert { + public static final String USER_PROPERTY_PREFIX = "USER:"; + public static MessageExt convert(AbstractJMSMessage jmsMsg) throws Exception { MessageExt rmqMsg = new MessageExt(); @@ -53,7 +55,7 @@ public class JMS2RMQMessageConvert { private static void handleProperties(AbstractJMSMessage jmsMsg, MessageExt rmqMsg) { Map<String, Object> userProps = jmsMsg.getProperties(); for (Map.Entry<String, Object> entry : userProps.entrySet()) { - rmqMsg.putUserProperty(entry.getKey(), entry.getValue().toString()); + rmqMsg.putUserProperty(new StringBuffer(USER_PROPERTY_PREFIX).append(entry.getKey()).toString(), entry.getValue().toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java index 7f9bb77..4adb692 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java +++ b/core/src/main/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvert.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.jms.msg.serialize.ObjectSerialize; import org.apache.rocketmq.jms.msg.serialize.StringSerialize; import static java.lang.String.format; +import static org.apache.rocketmq.jms.msg.convert.JMS2RMQMessageConvert.USER_PROPERTY_PREFIX; import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; public class RMQ2JMSMessageConvert { @@ -92,8 +93,10 @@ public class RMQ2JMSMessageConvert { Map<String, String> propertiesMap = rmqMsg.getProperties(); if (propertiesMap != null) { for (String properName : propertiesMap.keySet()) { - String properValue = propertiesMap.get(properName); - jmsMsg.setStringProperty(properName, properValue); + if (properName.startsWith(USER_PROPERTY_PREFIX)) { + String properValue = propertiesMap.get(properName); + jmsMsg.setStringProperty(properName.substring(USER_PROPERTY_PREFIX.length()), properValue); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java index acb3637..13a048c 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java @@ -53,7 +53,7 @@ public class JMS2RMQMessageConvertTest { assertThat(rmqMessage.getUserProperty(JMSHeaderEnum.JMSExpiration.name()), is("0")); assertThat(rmqMessage.getKeys(), is("ID:XXX")); - assertThat(rmqMessage.getUserProperty("MyProperty"), is("MyValue")); + assertThat(rmqMessage.getUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX + "MyProperty"), is("MyValue")); assertThat(rmqMessage.getUserProperty(MSG_MODEL_NAME), is(JMSMessageModelEnum.STRING.name())); assertThat(new String(rmqMessage.getBody()), is("text")); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java index 530cc12..1d5bb11 100644 --- a/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java +++ b/core/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java @@ -45,7 +45,7 @@ public class RMQ2JMSMessageConvertTest { rmqMessage.setTopic("topic"); rmqMessage.putUserProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), "2"); - rmqMessage.putUserProperty("MyProperty", "MyValue"); + rmqMessage.putUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX + "MyProperty", "MyValue"); // when Message jmsMessage = RMQ2JMSMessageConvert.convert(rmqMessage); @@ -59,7 +59,7 @@ public class RMQ2JMSMessageConvertTest { assertThat(JMSUtils.getDestinationName(jmsMessage.getJMSDestination()), is("topic")); assertThat(jmsMessage.getStringProperty("MyProperty"), is("MyValue")); - assertThat(jmsMessage.getIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name()), is(2)); + assertThat(jmsMessage.getIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name()), is(3)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java b/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java index dc1f3ef..b7b2a43 100644 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java +++ b/test/src/main/java/org/apache/rocketmq/jms/integration/AppConfig.java @@ -45,6 +45,7 @@ public class AppConfig { // CachingConnectionFactory factory = new CachingConnectionFactory(); // factory.setTargetConnectionFactory(new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS)); // return factory; + //todo return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/test/src/main/resources/logback.xml b/test/src/main/resources/logback.xml new file mode 100644 index 0000000..6757b08 --- /dev/null +++ b/test/src/main/resources/logback.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="DefaultAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${user.home}/logs/rocketmq/jms.log</file> + <append>true</append> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>${user.home}/logs/rocketmq/otherdays/jms.%i.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>10</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <append>true</append> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <logger name="org.apache.rocketmq.jms"> + <level value="INFO"/> + </logger> + + <root> + <level value="ERROR"/> + <appender-ref ref="STDOUT"/> + </root> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java new file mode 100644 index 0000000..fc2cc34 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeAsynchronousTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class ConsumeAsynchronousTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + @Test + public void testConsumeAsynchronous() throws Exception { + final String rmqTopicName = "coffee-async" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //producer + TextMessage message = session.createTextMessage("mocha coffee,please"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + //consumer + final List<Message> received = new ArrayList(); + MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); + consumer.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + received.add(message); + } + }); + + connection.start(); + + Thread.sleep(1000 * 5); + + assertThat(received.size(), is(1)); + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java deleted file mode 100644 index 9467117..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeNormallyTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; -import static org.hamcrest.core.IsNull.notNullValue; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class ConsumeNormallyTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - @Test - public void testConsumeSynchronous() throws Exception { - final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //producer - TextMessage message = session.createTextMessage("a"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - //consumer - MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); - - connection.start(); - - Message msg = consumer.receive(); - - assertThat(msg, notNullValue()); - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - - } - - @Test - public void testConsumeAsynchronous() throws Exception { - final String rmqTopicName = "coffee-async" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //producer - TextMessage message = session.createTextMessage("mocha coffee,please"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - //consumer - final List<Message> received = new ArrayList(); - MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); - consumer.setMessageListener(new MessageListener() { - @Override public void onMessage(Message message) { - received.add(message); - } - }); - - connection.start(); - - Thread.sleep(1000 * 5); - - assertThat(received.size(), is(1)); - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java new file mode 100644 index 0000000..a7a7e5e --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/ConsumeSynchronousTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class ConsumeSynchronousTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + @Test + public void testConsumeSynchronous() throws Exception { + final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //producer + TextMessage message = session.createTextMessage("a"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + //consumer + MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); + + connection.start(); + + Message msg = consumer.receive(); + + assertThat(msg, notNullValue()); + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e1a80ff8/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java new file mode 100644 index 0000000..c03e86d --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class UnDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + /** + * Test messages that producer after consumer inactive will not be delivered to consumer when it start again. + * + * <p>Test step: + * 1. Create a consumer and start the connection + * 2. Create a producer and send a message(msgA) to the topic subscribed by previous consumer + * 3. MsgA should be consumed successfully + * 4. Close the consumer and stop the connection + * 5. Producer sends a message(msgB) after the consumer closed + * 6. Create another consumer which is a un-durable one, and start the connection + * 7. Result: msgB should be consumed by the previous un-durable consumer + * + * @throws Exception + */ + @Test + public void testConsumeNotDurable() throws Exception { + final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //consumer + final List<Message> received = new ArrayList(); + final MessageListener msgListener = new MessageListener() { + @Override public void onMessage(Message message) { + received.add(message); + } + }; + MessageConsumer consumer = session.createConsumer(topic); + consumer.setMessageListener(msgListener); + + connection.start(); + + //producer + TextMessage message = session.createTextMessage("a"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + Thread.sleep(1000 * 2); + + assertThat(received.size(), is(1)); + received.clear(); + + // close the consumer + connection.stop(); + consumer.close(); + + // send message + TextMessage lostMessage = session.createTextMessage("b"); + producer.send(lostMessage); + + Thread.sleep(1000 * 2); + + // start the un-durable consumer again + consumer = session.createConsumer(topic, "topic"); + consumer.setMessageListener(msgListener); + connection.start(); + + Thread.sleep(1000 * 3); + + assertThat(received.size(), is(0)); + + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +}