http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java b/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java new file mode 100644 index 0000000..62b5056 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/hook/ReceiveMessageHookTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.jms.Message; +import org.apache.rocketmq.jms.exception.MessageExpiredException; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; + +public class ReceiveMessageHookTest { + + @Test(expected = MessageExpiredException.class) + public void testValidateFail() throws Exception { + ReceiveMessageHook hook = new ReceiveMessageHook(); + + Message message = new JMSTextMessage("text"); + message.setJMSExpiration(new Date().getTime()); + Thread.sleep(100); + hook.before(message); + } + + @Test + public void testValidateSuccess() throws Exception { + ReceiveMessageHook hook = new ReceiveMessageHook(); + + Message message = new JMSTextMessage("text"); + // never expired + message.setJMSExpiration(0); + hook.before(message); + + // expired in the future + message.setJMSExpiration(new SimpleDateFormat("yyyy-MM-dd").parse("2999-01-01").getTime()); + hook.before(message); + } + + @Test + public void setProviderProperties() throws Exception { + ReceiveMessageHook hook = new ReceiveMessageHook(); + + Message message = new JMSTextMessage("text"); + hook.before(message); + + assertThat(message.getLongProperty(JMSPropertiesEnum.JMSXRcvTimestamp.name()), greaterThan(0L)); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java b/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java new file mode 100644 index 0000000..29e91ec --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/hook/SendMessageHookTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.hook; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import org.apache.rocketmq.jms.RocketMQProducer; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.exception.UnsupportDeliveryModelException; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.hamcrest.core.IsNull.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SendMessageHookTest { + + @Test(expected = UnsupportDeliveryModelException.class) + public void testValidate() throws Exception { + final JMSTextMessage message = new JMSTextMessage("text"); + final RocketMQTopic destination = new RocketMQTopic("destination"); + final int deliveryMode = DeliveryMode.NON_PERSISTENT; + final int priority = 4; + final long timeToLive = 1000 * 100L; + + SendMessageHook hook = new SendMessageHook(); + hook.before(message, destination, deliveryMode, priority, timeToLive); + } + + @Test + public void testSetHeader() throws Exception { + RocketMQProducer producer = mock(RocketMQProducer.class); + when(producer.getDeliveryDelay()).thenReturn(0L); + + final JMSTextMessage message = new JMSTextMessage("text"); + final Destination destination = new RocketMQTopic("destination"); + final int deliveryMode = DeliveryMode.PERSISTENT; + final int priority = 5; + long timeToLive = JMSHeaderEnum.JMS_TIME_TO_LIVE_DEFAULT_VALUE; + SendMessageHook hook = new SendMessageHook(producer); + hook.before(message, destination, deliveryMode, priority, timeToLive); + + assertThat(message.getJMSDestination(), is(destination)); + assertThat(message.getJMSDeliveryMode(), is(JMSHeaderEnum.JMS_DELIVERY_MODE_DEFAULT_VALUE)); + assertThat(message.getJMSExpiration(), is(0L)); + assertThat(message.getJMSDeliveryTime(), notNullValue()); + assertThat(message.getJMSPriority(), is(5)); + assertThat(message.getJMSMessageID(), notNullValue()); + assertThat(message.getJMSTimestamp(), notNullValue()); + } + + /** + * Disable ID,timestamp, and set expired time + * + * @throws Exception + */ + @Test + public void testSetHeader2() throws Exception { + RocketMQProducer producer = mock(RocketMQProducer.class); + when(producer.getUserName()).thenReturn("user"); + when(producer.getDisableMessageID()).thenReturn(true); + when(producer.getDisableMessageTimestamp()).thenReturn(true); + + final JMSTextMessage message = new JMSTextMessage("text"); + final Destination destination = new RocketMQTopic("destination"); + final int deliveryMode = DeliveryMode.PERSISTENT; + final int priority = 5; + final long timeToLive = 1000 * 100L; + SendMessageHook hook = new SendMessageHook(producer); + hook.before(message, destination, deliveryMode, priority, timeToLive); + + // assert header + assertThat(message.getJMSMessageID(), nullValue()); + assertThat(message.getJMSTimestamp(), is(0L)); + assertThat(message.getJMSExpiration(), not(is(0L))); + + // assert properties + assertThat(message.getStringProperty(JMSPropertiesEnum.JMSXUserID.name()), is("user")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java b/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java new file mode 100644 index 0000000..ae08bec --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/AppConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source; + +import javax.jms.ConnectionFactory; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.annotation.EnableJms; +import org.springframework.jms.config.DefaultJmsListenerContainerFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.converter.SimpleMessageConverter; + +@Configuration +@ComponentScan(basePackageClasses = {RocketMQServer.class}) +@EnableJms +public class AppConfig { + + @Bean + public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { + DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory()); + factory.setConcurrency("1"); + return factory; + } + + @Bean + public ConnectionFactory connectionFactory() { +// CachingConnectionFactory factory = new CachingConnectionFactory(); +// factory.setTargetConnectionFactory(new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS)); +// return factory; + //todo + return new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS); + } + + @Bean + public JmsTemplate jmsTemplate() { + JmsTemplate jmsTemplate = new JmsTemplate(); + jmsTemplate.setConnectionFactory(connectionFactory()); + jmsTemplate.setMessageConverter(new SimpleMessageConverter()); + return jmsTemplate; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java b/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java new file mode 100644 index 0000000..0f7a6b1 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/Constant.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source; + +public class Constant { + + public static final String NAME_SERVER_IP = "127.0.0.1"; + + public static final int NAME_SERVER_PORT = 9153; + + public static final String NAME_SERVER_ADDRESS = NAME_SERVER_IP + ":" + NAME_SERVER_PORT; + + public static final String BROKER_IP = "127.0.0.1"; + + public static final int BROKER_PORT = 9055; + + public static final String BROKER_ADDRESS = BROKER_IP + ":" + BROKER_PORT; + + public static final int BROKER_HA_PORT = 9043; + + public static final String CLIENT_ID = "coffee"; + + public static final String CLIENT_ID_SECOND = "tea"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java new file mode 100644 index 0000000..e602601 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQAdmin.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source; + +import com.google.common.collect.Sets; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_ADDRESS; +import static org.apache.rocketmq.jms.integration.source.Constant.NAME_SERVER_ADDRESS; + +@Service +public class RocketMQAdmin { + + private static final Logger log = LoggerFactory.getLogger(RocketMQAdmin.class); + + @Autowired + // make sure RocketMQServer start ahead + private RocketMQServer rocketMQServer; + + //MQAdmin client + private DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); + + @PostConstruct + public void start() { + // reduce rebalance waiting time + System.setProperty("rocketmq.client.rebalance.waitInterval", "1000"); + + defaultMQAdminExt.setNamesrvAddr(NAME_SERVER_ADDRESS); + try { + defaultMQAdminExt.start(); + log.info("Start RocketMQAdmin Successfully"); + } + catch (MQClientException e) { + log.error("Failed to start MQAdmin", e); + System.exit(1); + } + } + + @PreDestroy + public void shutdown() { + defaultMQAdminExt.shutdown(); + } + + public void createTopic(String topic) { + createTopic(topic, 1); + } + + public void createTopic(String topic, int queueNum) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setReadQueueNums(queueNum); + topicConfig.setWriteQueueNums(queueNum); + try { + defaultMQAdminExt.createAndUpdateTopicConfig(BROKER_ADDRESS, topicConfig); + } + catch (Exception e) { + log.error("Create topic:{}, addr:{} failed:{}", topic, BROKER_ADDRESS, ExceptionUtils.getStackTrace(e)); + } + } + + public void deleteTopic(String topic) { + try { + defaultMQAdminExt.deleteTopicInBroker(Sets.newHashSet(BROKER_ADDRESS), topic); + } + catch (Exception e) { + log.error("Delete topic:{}, addr:{} failed:{}", topic, BROKER_ADDRESS, ExceptionUtils.getStackTrace(e)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java new file mode 100644 index 0000000..71ae6b1 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/RocketMQServer.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import static java.io.File.separator; +import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_HA_PORT; +import static org.apache.rocketmq.jms.integration.source.Constant.BROKER_PORT; +import static org.apache.rocketmq.jms.integration.source.Constant.NAME_SERVER_ADDRESS; + +@Service +public class RocketMQServer { + public static Logger log = LoggerFactory.getLogger(RocketMQServer.class); + private final SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss"); + private final String rootDir = System.getProperty("user.home") + separator + "rocketmq-jms" + separator; + // fixed location of config files which is updated after RMQ3.2.6 + private final String configDir = System.getProperty("user.home") + separator + "store/config"; + + private String serverDir; + private volatile boolean started = false; + + //name server + private NamesrvConfig namesrvConfig = new NamesrvConfig(); + private NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); + private NamesrvController namesrvController; + + //broker + private final String brokerName = "JmsTestBrokerName"; + private BrokerController brokerController; + private BrokerConfig brokerConfig = new BrokerConfig(); + private NettyServerConfig nettyServerConfig = new NettyServerConfig(); + private NettyClientConfig nettyClientConfig = new NettyClientConfig(); + private MessageStoreConfig storeConfig = new MessageStoreConfig(); + + public RocketMQServer() { + this.storeConfig.setDiskMaxUsedSpaceRatio(95); + } + + @PostConstruct + public void start() { + if (started) { + return; + } + + createServerDir(); + + startNameServer(); + + startBroker(); + + started = true; + + log.info("Start RocketServer Successfully"); + } + + private void createServerDir() { + for (int i = 0; i < 5; i++) { + serverDir = rootDir + sf.format(new Date()); + final File file = new File(serverDir); + if (!file.exists()) { + return; + } + } + log.error("Has retry 5 times to register base dir,but still failed."); + System.exit(1); + } + + private void startNameServer() { + namesrvConfig.setKvConfigPath(serverDir + separator + "namesrv" + separator + "kvConfig.json"); + nameServerNettyServerConfig.setListenPort(Constant.NAME_SERVER_PORT); + namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig); + try { + namesrvController.initialize(); + log.info("Success to start Name Server:{}", NAME_SERVER_ADDRESS); + namesrvController.start(); + } + catch (Exception e) { + log.error("Failed to start Name Server", e); + System.exit(1); + } + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, NAME_SERVER_ADDRESS); + } + + private void startBroker() { + System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.98"); + brokerConfig.setBrokerName(brokerName); + brokerConfig.setBrokerIP1(Constant.BROKER_IP); + brokerConfig.setNamesrvAddr(NAME_SERVER_ADDRESS); + storeConfig.setStorePathRootDir(serverDir); + storeConfig.setStorePathCommitLog(serverDir + separator + "commitlog"); + storeConfig.setHaListenPort(BROKER_HA_PORT); + nettyServerConfig.setListenPort(BROKER_PORT); + brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); + + try { + brokerController.initialize(); + log.info("Broker Start name:{} address:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr()); + brokerController.start(); + + } + catch (Exception e) { + log.error("Failed to start Broker", e); + System.exit(1); + } + } + + @PreDestroy + private void shutdown() { + brokerController.shutdown(); + namesrvController.shutdown(); + deleteFile(new File(rootDir)); + deleteFile(new File(configDir)); + } + + public void deleteFile(File file) { + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + } + else if (file.isDirectory()) { + File[] files = file.listFiles(); + for (int i = 0; i < files.length; i++) { + deleteFile(files[i]); + } + file.delete(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java b/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java new file mode 100644 index 0000000..f4f8df8 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/SimpleTextListener.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source; + +import java.util.ArrayList; +import java.util.List; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +@Component +public class SimpleTextListener { + + public static final String DESTINATION = "orderTest"; + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + private List<String> receivedMsgs = new ArrayList(); + + public SimpleTextListener() { + } + + @PostConstruct + public void init() { + this.rocketMQAdmin.createTopic(DESTINATION); + } + + @PreDestroy + public void destroy() { + this.rocketMQAdmin.deleteTopic(DESTINATION); + } + + @JmsListener(destination = DESTINATION) + public void processOrder(Message<String> message) { + receivedMsgs.add(message.getPayload()); + } + + public List<String> getReceivedMsg() { + return receivedMsgs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java b/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java new file mode 100644 index 0000000..5136f37 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/support/ConditionMatcher.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source.support; + +public interface ConditionMatcher { + + boolean match(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java b/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java new file mode 100644 index 0000000..e03c6c9 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/source/support/TimeLimitAssert.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.source.support; + +import org.apache.commons.lang.time.StopWatch; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TimeLimitAssert { + + public static void doAssert(ConditionMatcher conditionMatcher, int timeLimit) throws InterruptedException { + StopWatch watch = new StopWatch(); + watch.start(); + + while (!conditionMatcher.match()) { + Thread.sleep(500); + if (watch.getTime() > timeLimit * 1000) { + assertFalse(String.format("Doesn't match assert condition in %s second", timeLimit), true); + } + } + + assertTrue(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java new file mode 100644 index 0000000..018e844 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeAsynchronousTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.source.AppConfig; +import org.apache.rocketmq.jms.integration.source.Constant; +import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; +import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class ConsumeAsynchronousTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + @Test + public void testConsumeAsynchronous() throws Exception { + final String rmqTopicName = "coffee-async" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //producer + TextMessage message = session.createTextMessage("mocha coffee,please"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + //consumer + final List<Message> received = new ArrayList(); + MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); + consumer.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + received.add(message); + } + }); + + connection.start(); + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return received.size() == 1; + } + }, 5); + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java new file mode 100644 index 0000000..8a9fa41 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/test/ConsumeSynchronousTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.test; + +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.source.AppConfig; +import org.apache.rocketmq.jms.integration.source.Constant; +import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNull.notNullValue; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class ConsumeSynchronousTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + @Test + public void testConsumeSynchronous() throws Exception { + final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //producer + TextMessage message = session.createTextMessage("a"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + //consumer + MessageConsumer consumer = session.createDurableConsumer(topic, "consumer"); + + connection.start(); + + Message msg = consumer.receive(); + + assertThat(msg, notNullValue()); + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java new file mode 100644 index 0000000..045d615 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/test/NonDurableConsumeTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.source.AppConfig; +import org.apache.rocketmq.jms.integration.source.Constant; +import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; +import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class NonDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + /** + * Test messages that producer after consumer inactive will not be delivered to consumer when it start again. + * + * <p>Test step: + * 1. Create a consumer and start the connection + * 2. Create a producer and send a message(msgA) to the topic subscribed by previous consumer + * 3. MsgA should be consumed successfully + * 4. Close the consumer and stop the connection + * 5. Producer sends a message(msgB) after the consumer closed + * 6. Create another consumer which is a non-durable one, and start the connection + * 7. Result: msgB should be consumed by the previous non-durable consumer + * + * @throws Exception + */ + @Test + public void testConsumeNotDurable() throws Exception { + final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //consumer + final List<Message> received = new ArrayList(); + final MessageListener msgListener = new MessageListener() { + @Override public void onMessage(Message message) { + received.add(message); + } + }; + MessageConsumer consumer = session.createConsumer(topic); + consumer.setMessageListener(msgListener); + + connection.start(); + + Thread.sleep(1000 * 3); + + //producer + TextMessage message = session.createTextMessage("a"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return received.size() == 1; + } + }, 3); + + received.clear(); + + // close the consumer + connection.stop(); + consumer.close(); + + // send message + TextMessage lostMessage = session.createTextMessage("b"); + producer.send(lostMessage); + + Thread.sleep(1000 * 2); + + // start the non-durable consumer again + consumer = session.createConsumer(topic, "topic"); + consumer.setMessageListener(msgListener); + connection.start(); + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return received.size() == 0; + } + }, 5); + + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java new file mode 100644 index 0000000..c3731de --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/test/SharedDurableConsumeTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.integration.source.AppConfig; +import org.apache.rocketmq.jms.integration.source.Constant; +import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; +import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class SharedDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + /** + * Test messages will be deliver to every consumer if these consumers are in shared durable subscription. + * + * <p>Test step: + * 1. Create a share durable consumer(consumerA) via the first connection(connectionA) + * 2. Create a share durable consumer(consumerB) via another connection(connectionB) + * 3. The two consumer must subscribe the same topic with identical subscriptionName, + * and they also have the same clientID. + * 4. Send several(eg:10) messages to this topic + * 5. Result: all messages should be received by both consumerA and consumerB + * + * @throws Exception + */ + @Test + public void testConsumeAllMessages() throws Exception { + final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connectionA = null, connectionB = null; + final String subscriptionName = "MySubscription"; + final List<Message> receivedA = new ArrayList(), receivedB = new ArrayList(); + + try { + // consumerA + connectionA = factory.createConnection(); + Session sessionA = connectionA.createSession(); + connectionA.start(); + Topic topic = sessionA.createTopic(rmqTopicName); + MessageConsumer consumerA = sessionA.createSharedDurableConsumer(topic, subscriptionName); + consumerA.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + receivedA.add(message); + } + }); + + // consumerB + connectionB = factory.createConnection(); + Session sessionB = connectionB.createSession(); + MessageConsumer consumerB = sessionB.createSharedDurableConsumer(topic, subscriptionName); + consumerB.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + receivedB.add(message); + } + }); + connectionB.start(); + + //producer + TextMessage message = sessionA.createTextMessage("a"); + MessageProducer producer = sessionA.createProducer(topic); + for (int i = 0; i < 10; i++) { + producer.send(message); + } + + Thread.sleep(1000 * 2); + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return receivedA.size()==10 && receivedB.size()==10; + } + },5); + } + finally { + connectionA.close(); + connectionB.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java new file mode 100644 index 0000000..809dbae --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/test/UnsharedDurableConsumeTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.apache.rocketmq.jms.RocketMQSession; +import org.apache.rocketmq.jms.exception.DuplicateSubscriptionException; +import org.apache.rocketmq.jms.integration.source.AppConfig; +import org.apache.rocketmq.jms.integration.source.Constant; +import org.apache.rocketmq.jms.integration.source.RocketMQAdmin; +import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class UnsharedDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + /** + * Test each message will be deliver to only one consumer if these consumers are in unshared durable subscription. + * + * <p>Test step: + * 1. Create a unshared durable consumer(consumerA) via the first connection(connectionA) + * 2. Create a unshared durable consumer(consumerB) via another connection(connectionB) + * 3. Result: + * a. The creating consumerB should throw a JMSException as consumerA and consumberB have the same subscription + * b. All messages should be received by consumerA + * + * @throws Exception + * @see {@link RocketMQSession} + */ + @Test + public void testEachMessageOnlyConsumeByOneConsumer() throws Exception { + final String rmqTopicName = "coffee" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName, 2); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connectionA = null, connectionB = null; + final String subscriptionName = "MySubscription"; + final List<Message> receivedA = new ArrayList(); + + try { + // consumerA + connectionA = factory.createConnection(); + Session sessionA = connectionA.createSession(); + connectionA.start(); + Topic topic = sessionA.createTopic(rmqTopicName); + MessageConsumer consumerA = sessionA.createDurableConsumer(topic, subscriptionName); + consumerA.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + receivedA.add(message); + } + }); + + Thread.sleep(1000 * 2); + + // consumerB + try { + connectionB = factory.createConnection(); + Session sessionB = connectionB.createSession(); + sessionB.createDurableConsumer(topic, subscriptionName); + assertFalse("Doesn't get the expected " + DuplicateSubscriptionException.class.getSimpleName(), true); + } + catch (DuplicateSubscriptionException e) { + assertTrue(true); + } + + connectionA.start(); + + //producer + TextMessage message = sessionA.createTextMessage("a"); + MessageProducer producer = sessionA.createProducer(topic); + for (int i = 0; i < 10; i++) { + producer.send(message); + } + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return receivedA.size() == 10; + } + }, 5); + } + finally { + connectionA.close(); + connectionB.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java b/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java new file mode 100644 index 0000000..920a49d --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/integration/test/listener/SimpleTextListenerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration.test.listener; + +import org.apache.commons.lang.time.StopWatch; +import org.apache.rocketmq.jms.integration.source.AppConfig; +import org.apache.rocketmq.jms.integration.source.SimpleTextListener; +import org.apache.rocketmq.jms.integration.source.support.ConditionMatcher; +import org.apache.rocketmq.jms.integration.source.support.TimeLimitAssert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.apache.rocketmq.jms.integration.source.SimpleTextListener.DESTINATION; + +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class SimpleTextListenerTest { + + private static final Logger log = LoggerFactory.getLogger(SimpleTextListenerTest.class); + + @Autowired + private JmsTemplate jmsTemplate; + + @Autowired + private SimpleTextListener simpleTextListener; + + @Test + public void testListener() throws Exception { + jmsTemplate.convertAndSend(DESTINATION, "first"); + StopWatch watch = new StopWatch(); + watch.start(); + + TimeLimitAssert.doAssert(new ConditionMatcher() { + @Override public boolean match() { + return simpleTextListener.getReceivedMsg().size() == 1; + } + }, 60); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java b/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java new file mode 100644 index 0000000..20520f6 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSBytesMessageTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class JMSBytesMessageTest { + + private byte[] receiveData = "receive data test".getBytes(); + private byte[] sendData = "send data test".getBytes(); + + @Test + public void testGetData() throws Exception { + JMSBytesMessage readMessage = new JMSBytesMessage(receiveData); + assertThat(new String(receiveData), is(new String(readMessage.getBody()))); + + JMSBytesMessage sendMessage = new JMSBytesMessage(); + sendMessage.writeBytes(sendData, 0, sendData.length); + assertThat(new String(sendData), is(new String(sendMessage.getBody()))); + } + + @Test + public void testGetBodyLength() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(receiveData); + assertThat(msg.getBodyLength(), is(new Long(receiveData.length))); + } + + @Test + public void testReadBytes1() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(receiveData); + byte[] receiveValue = new byte[receiveData.length]; + msg.readBytes(receiveValue); + assertThat(new String(receiveValue), is(new String(receiveData))); + } + + @Test + public void testReadBytes2() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(receiveData); + + byte[] receiveValue1 = new byte[2]; + msg.readBytes(receiveValue1); + assertThat(new String(receiveData).substring(0, 2), is(new String(receiveValue1))); + + byte[] receiveValue2 = new byte[2]; + msg.readBytes(receiveValue2); + assertThat(new String(receiveData).substring(2, 4), is(new String(receiveValue2))); + + } + + @Test + public void testWriteBytes() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(); + msg.writeBytes(sendData); + assertThat(new String(msg.getBody()), is(new String(sendData))); + } + + @Test(expected = MessageNotReadableException.class) + public void testNotReadableException() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(); + msg.writeBoolean(true); + msg.readBoolean(); + } + + @Test(expected = MessageNotWriteableException.class) + public void testNotWritableException() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(receiveData); + msg.writeBoolean(true); + } + + @Test + public void testClearBody() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(receiveData); + msg.clearBody(); + msg.writeBoolean(true); + } + + @Test + public void testReset() throws Exception { + JMSBytesMessage msg = new JMSBytesMessage(receiveData); + byte[] b = new byte[2]; + msg.readBytes(b); + msg.reset(); + msg.readBytes(b); + assertThat(new String(receiveData).substring(0, 2), is(new String(b))); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java b/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java new file mode 100644 index 0000000..cb34653 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSMapMessageTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class JMSMapMessageTest { + + @Test + public void testGetBoolean() throws Exception { + JMSMapMessage msg = new JMSMapMessage(); + + // get an empty value will return false + assertThat(msg.getBoolean("man"), is(false)); + + // get an not empty value + msg.setBoolean("man", true); + assertThat(msg.getBoolean("man"), is(true)); + + // key is null + try { + msg.getBoolean(null); + assertTrue(false); + } + catch (JMSException e) { + assertTrue(true); + } + + // in read-only model + msg.setReadOnly(true); + try { + msg.setBoolean("man", true); + assertTrue(false); + } + catch (MessageNotWriteableException e) { + assertTrue(true); + } + + // both read and write are allowed after clearBody() + msg.clearBody(); + msg.setBoolean("man", false); + msg.getBoolean("man"); + + // map is empty after clearBody() + msg.clearBody(); + assertThat(msg.getBoolean("man"), is(false)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java b/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java new file mode 100644 index 0000000..63c03ae --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSObjectMessageTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import java.io.Serializable; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class JMSObjectMessageTest { + + @Test + public void testGetObject() throws Exception { + final User user = new User("jack", 20); + JMSObjectMessage msg = new JMSObjectMessage(user); + assertThat((User)msg.getObject(), is(user)); + } + + @Test + public void testGetBody() throws Exception { + final User user = new User("jack", 20); + JMSObjectMessage msg = new JMSObjectMessage(user); + assertThat((User)msg.getBody(Object.class), is((User)msg.getObject())); + } + + private class User implements Serializable { + private String name; + private int age; + + private User(String name, int age) { + this.name = name; + this.age = age; + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java b/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java new file mode 100644 index 0000000..d9c0cac --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/JMSTextMessageTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class JMSTextMessageTest { + private String text = "jmsRocketMQTextMessage test"; + + @Test + public void testGetBody() throws Exception { + JMSTextMessage msg = new JMSTextMessage(text); + assertThat(msg.getBody(String.class), is(text)); + } + + @Test + public void testSetText() throws Exception { + JMSTextMessage msg = new JMSTextMessage(); + msg.setText(text); + assertThat(msg.getText(), is(text)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java b/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java new file mode 100644 index 0000000..13a048c --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/convert/JMS2RMQMessageConvertTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.apache.rocketmq.jms.msg.AbstractJMSMessage; +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; +import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum; +import org.junit.Test; + +import static org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum.MSG_MODEL_NAME; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class JMS2RMQMessageConvertTest { + + @Test + public void testConvert() throws Exception { + AbstractJMSMessage jmsMessage = new JMSTextMessage("text"); + + // given + jmsMessage.setJMSDestination(new RocketMQTopic("topic")); + jmsMessage.setJMSMessageID("ID:XXX"); + jmsMessage.setJMSTimestamp(1488273583542L); + jmsMessage.setJMSExpiration(0L); + + jmsMessage.setStringProperty("MyProperty", "MyValue"); + + // when + MessageExt rmqMessage = JMS2RMQMessageConvert.convert(jmsMessage); + + // then + assertThat(rmqMessage.getTopic(), is("topic")); + assertThat(rmqMessage.getUserProperty(JMSHeaderEnum.JMSMessageID.name()), is("ID:XXX")); + assertThat(rmqMessage.getBornTimestamp(), is(1488273583542L)); + assertThat(rmqMessage.getUserProperty(JMSHeaderEnum.JMSExpiration.name()), is("0")); + assertThat(rmqMessage.getKeys(), is("ID:XXX")); + + assertThat(rmqMessage.getUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX + "MyProperty"), is("MyValue")); + assertThat(rmqMessage.getUserProperty(MSG_MODEL_NAME), is(JMSMessageModelEnum.STRING.name())); + assertThat(new String(rmqMessage.getBody()), is("text")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java b/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java new file mode 100644 index 0000000..1d5bb11 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/convert/RMQ2JMSMessageConvertTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.convert; + +import javax.jms.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.jms.msg.JMSBytesMessage; +import org.apache.rocketmq.jms.msg.enums.JMSHeaderEnum; +import org.apache.rocketmq.jms.msg.enums.JMSMessageModelEnum; +import org.apache.rocketmq.jms.msg.enums.JMSPropertiesEnum; +import org.apache.rocketmq.jms.support.JMSUtils; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class RMQ2JMSMessageConvertTest { + + @Test + public void testConvert() throws Exception { + MessageExt rmqMessage = new MessageExt(); + + // given + rmqMessage.setBody("body".getBytes()); + rmqMessage.putUserProperty(JMSMessageModelEnum.MSG_MODEL_NAME, JMSMessageModelEnum.BYTE.name()); + rmqMessage.putUserProperty(JMSHeaderEnum.JMSMessageID.name(), "ID:YYY"); + rmqMessage.setBornTimestamp(1488273585542L); + rmqMessage.putUserProperty(JMSHeaderEnum.JMSExpiration.name(), "0"); + rmqMessage.setReconsumeTimes(2); + rmqMessage.setTopic("topic"); + + rmqMessage.putUserProperty(JMSPropertiesEnum.JMSXDeliveryCount.name(), "2"); + rmqMessage.putUserProperty(JMS2RMQMessageConvert.USER_PROPERTY_PREFIX + "MyProperty", "MyValue"); + + // when + Message jmsMessage = RMQ2JMSMessageConvert.convert(rmqMessage); + + // then + assertThat(JMSBytesMessage.class.isInstance(jmsMessage), is(true)); + assertThat(jmsMessage.getJMSMessageID(), is("ID:YYY")); + assertThat(jmsMessage.getJMSTimestamp(), is(1488273585542L)); + assertThat(jmsMessage.getJMSExpiration(), is(0L)); + assertThat(jmsMessage.getJMSRedelivered(), is(true)); + assertThat(JMSUtils.getDestinationName(jmsMessage.getJMSDestination()), is("topic")); + + assertThat(jmsMessage.getStringProperty("MyProperty"), is("MyValue")); + assertThat(jmsMessage.getIntProperty(JMSPropertiesEnum.JMSXDeliveryCount.name()), is(3)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java b/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java new file mode 100644 index 0000000..28255dd --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/enums/JMSMessageModelEnumTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.enums; + +import org.apache.rocketmq.jms.msg.JMSTextMessage; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class JMSMessageModelEnumTest { + @Test + public void testToMsgModelEnum() throws Exception { + assertThat(JMSMessageModelEnum.toMsgModelEnum(new JMSTextMessage("text")), is(JMSMessageModelEnum.STRING)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java b/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java new file mode 100644 index 0000000..5204fb3 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/serialize/MapSerializeTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class MapSerializeTest { + + @Test + public void serializeAndDeserialize() throws Exception { + Map map = new HashMap(); + map.put("name", "John"); + map.put("age", 20); + + byte[] bytes = MapSerialize.instance().serialize(map); + Map newMap = MapSerialize.instance().deserialize(bytes); + + assertThat(map.size(), is(newMap.size())); + assertThat(newMap.get("name").toString(), is("John")); + assertThat(newMap.get("age").toString(), is("20")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java b/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java new file mode 100644 index 0000000..1661b08 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/serialize/ObjectSerializeTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import java.io.Serializable; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class ObjectSerializeTest { + + @Test + public void serializeAndDeserialize() throws Exception { + Person person = new Person(); + person.setName("John"); + person.setAge(30); + + byte[] bytes = ObjectSerialize.instance().serialize(person); + Person newPerson = (Person)ObjectSerialize.instance().deserialize(bytes); + + assertThat(newPerson.getName(), is(person.getName())); + assertThat(newPerson.getAge(), is(person.getAge())); + } + + private static class Person implements Serializable { + private static final long serialVersionUID = -4981805070659153282L; + + private String name; + private int age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java b/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java new file mode 100644 index 0000000..4e6a54a --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/msg/serialize/StringSerializeTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.msg.serialize; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class StringSerializeTest { + + @Test + public void serializeAndDeserialize() throws Exception { + String text = "MyText"; + + byte[] bytes = StringSerialize.instance().serialize(text); + String newText = StringSerialize.instance().deserialize(bytes); + + assertThat(text, is(newText)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/db8e0dd1/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java b/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java new file mode 100644 index 0000000..db15fee --- /dev/null +++ b/src/test/java/org/apache/rocketmq/jms/support/JMSUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.support; + +import org.apache.rocketmq.jms.destination.RocketMQQueue; +import org.apache.rocketmq.jms.destination.RocketMQTopic; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.junit.Assert.assertThat; + +public class JMSUtilsTest { + + @Test + public void getTopicName() throws Exception { + RocketMQTopic topic = new RocketMQTopic("topic"); + assertThat(JMSUtils.getDestinationName(topic), is("topic")); + + RocketMQQueue queue = new RocketMQQueue("queue"); + assertThat(JMSUtils.getDestinationName(queue), is("queue")); + } + + @Test + public void getConsumerGroup() throws Exception { + final String subscriptionName = "subscriptionName"; + final String clientID = "clientID"; + String consumerGroupA = JMSUtils.getConsumerGroup(subscriptionName, clientID, true); + assertThat(consumerGroupA.contains(subscriptionName), is(true)); + assertThat(consumerGroupA.contains(clientID), is(true)); + assertThat(consumerGroupA.substring(subscriptionName.length() + clientID.length() + 2).length(), is(36)); + + String consumerGroupB = JMSUtils.getConsumerGroup(subscriptionName, clientID, false); + assertThat(consumerGroupB.contains(subscriptionName), is(true)); + assertThat(consumerGroupB.contains(clientID), is(true)); + assertThat(consumerGroupB.length(), is(subscriptionName.length() + clientID.length() + 1)); + + String consumerGroupC = JMSUtils.getConsumerGroup(null, null, true); + assertThat(consumerGroupC.length(), is(36)); + } + + @Test + public void uuid() throws Exception { + assertThat(JMSUtils.uuid(), notNullValue()); + } + +} \ No newline at end of file