http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java new file mode 100644 index 0000000..995bf41 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.tag; + +import java.util.Collection; +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.factory.TagMessage; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class TagMessageWithMulConsumerIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + String consumerId = initConsumerGroup(); + logger.info(String.format("use topic: %s; consumerId: %s !", topic, consumerId)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSendTwoTag() { + String tag1 = "jueyin1"; + String tag2 = "jueyin2"; + int msgSize = 10; + RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tag1, + new RMQNormalListner()); + RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tag2, + new RMQNormalListner()); + + List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); + producer.send(tag1Msgs); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); + producer.send(tag2Msgs); + Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); + + consumerTag1.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag1Msgs), + consumeTime); + consumerTag2.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs), + consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerTag1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag1Msgs)); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerTag2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs)); + } + + @Test + public void testSendMessagesWithTwoTag() { + String tags[] = {"jueyin1", "jueyin2"}; + int msgSize = 10; + + TagMessage tagMessage = new TagMessage(tags, topic, msgSize); + RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tags[0], + new RMQNormalListner()); + RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tags[1], + new RMQNormalListner()); + + List<Object> tagMsgs = tagMessage.getMixedTagMessages(); + producer.send(tagMsgs); + Assert.assertEquals("Not all are sent", msgSize * tags.length, + producer.getAllUndupMsgBody().size()); + + consumerTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), + consumeTime); + consumerTag2.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[1]), + consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerTag1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerTag2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[1])); + } + + @Test + public void testTwoConsumerOneMatchOneOtherMatchAll() { + String tags[] = {"jueyin1", "jueyin2"}; + String sub1 = String.format("%s||%s", tags[0], tags[1]); + String sub2 = String.format("%s|| noExist", tags[0]); + int msgSize = 10; + + TagMessage tagMessage = new TagMessage(tags, topic, msgSize); + RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, sub1, + new RMQNormalListner()); + RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, sub2, + new RMQNormalListner()); + + List<Object> tagMsgs = tagMessage.getMixedTagMessages(); + producer.send(tagMsgs); + Assert.assertEquals("Not all are sent", msgSize * tags.length, + producer.getAllUndupMsgBody().size()); + + consumerTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags), + consumeTime); + consumerTag2.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), + consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerTag1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getAllTagMessageBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerTag2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); + } + + @Test + public void testSubKindsOf() { + String tags[] = {"jueyin1", "jueyin2"}; + String sub1 = String.format("%s||%s", tags[0], tags[1]); + String sub2 = String.format("%s|| noExist", tags[0]); + String sub3 = tags[0]; + String sub4 = "*"; + int msgSize = 10; + + RMQNormalConsumer consumerSubTwoMatchAll = getConsumer(nsAddr, topic, sub1, + new RMQNormalListner()); + RMQNormalConsumer consumerSubTwoMachieOne = getConsumer(nsAddr, topic, sub2, + new RMQNormalListner()); + RMQNormalConsumer consumerSubTag1 = getConsumer(nsAddr, topic, sub3, + new RMQNormalListner()); + RMQNormalConsumer consumerSubAll = getConsumer(nsAddr, topic, sub4, + new RMQNormalListner()); + + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + Collection<Object> msgsWithNoTag = producer.getMsgBodysCopy(); + + TagMessage tagMessage = new TagMessage(tags, topic, msgSize); + List<Object> tagMsgs = tagMessage.getMixedTagMessages(); + producer.send(tagMsgs); + Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size()); + + consumerSubTwoMatchAll.getListner() + .waitForMessageConsume(tagMessage.getMessageBodyByTag(tags), consumeTime); + consumerSubTwoMachieOne.getListner() + .waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), consumeTime); + consumerSubTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), + consumeTime); + consumerSubAll.getListner().waitForMessageConsume( + MQMessageFactory.getMessage(msgsWithNoTag, tagMessage.getAllTagMessageBody()), + consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerSubTwoMatchAll.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getAllTagMessageBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerSubTwoMachieOne.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerSubTag1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0])); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumerSubAll.getListner().getAllMsgBody())) + .containsExactlyElementsIn(MQMessageFactory.getMessage(msgsWithNoTag, + tagMessage.getAllTagMessageBody())); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java new file mode 100644 index 0000000..03e81eb --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.tag; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.RandomUtils; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class TagMessageWithSameGroupConsumerIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerWithSameGroup() { + String tag = "jueyin"; + int msgSize = 20; + String originMsgDCName = RandomUtils.getStringByUUID(); + String msgBodyDCName = RandomUtils.getStringByUUID(); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, + new RMQNormalListner(originMsgDCName, msgBodyDCName)); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + new RMQNormalListner(originMsgDCName, msgBodyDCName)); + producer.send(tag, msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testConsumerStartWithInterval() { + String tag = "jueyin"; + int msgSize = 100; + String originMsgDCName = RandomUtils.getStringByUUID(); + String msgBodyDCName = RandomUtils.getStringByUUID(); + + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, + new RMQNormalListner(originMsgDCName, msgBodyDCName)); + producer.send(tag, msgSize, 100); + TestUtils.waitForMonment(5); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + new RMQNormalListner(originMsgDCName, msgBodyDCName)); + TestUtils.waitForMonment(5); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testConsumerStartTwoAndCrashOnsAfterWhile() { + String tag = "jueyin"; + int msgSize = 100; + String originMsgDCName = RandomUtils.getStringByUUID(); + String msgBodyDCName = RandomUtils.getStringByUUID(); + + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, + new RMQNormalListner(originMsgDCName, msgBodyDCName)); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, + new RMQNormalListner(originMsgDCName, msgBodyDCName)); + + producer.send(tag, msgSize, 100); + TestUtils.waitForMonment(5); + consumer2.shutdown(); + mqClients.remove(1); + TestUtils.waitForMonment(5); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java new file mode 100644 index 0000000..98d858b --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.topic; + +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.MQWait; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class MulConsumerMulTopicIT extends BaseConf { + private RMQNormalProducer producer = null; + + @Before + public void setUp() { + producer = getProducer(nsAddr, null); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSynSendMessage() { + int msgSize = 10; + String topic1 = initTopic(); + String topic2 = initTopic(); + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer1.subscribe(topic2, "*"); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1, + "*", new RMQNormalListner()); + consumer2.subscribe(topic2, "*"); + + producer.send(MQMessageFactory.getMsg(topic1, msgSize)); + producer.send(MQMessageFactory.getMsg(topic2, msgSize)); + Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + } + + @Test + public void testConsumeWithDiffTag() { + int msgSize = 10; + String topic1 = initTopic(); + String topic2 = initTopic(); + String tag = "jueyin_tag"; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer1.subscribe(topic2, tag); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic1, + "*", new RMQNormalListner()); + consumer2.subscribe(topic2, tag); + + producer.send(MQMessageFactory.getMsg(topic1, msgSize)); + producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag)); + Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + } + + @Test + public void testConsumeWithDiffTagAndFilter() { + int msgSize = 10; + String topic1 = initTopic(); + String topic2 = initTopic(); + String tag1 = "jueyin_tag_1"; + String tag2 = "jueyin_tag_2"; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer1.subscribe(topic2, tag1); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer2.subscribe(topic2, tag1); + + producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2)); + producer.clearMsg(); + producer.send(MQMessageFactory.getMsg(topic1, msgSize)); + producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1)); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java new file mode 100644 index 0000000..969fa79 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.topic; + +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OneConsumerMulTopicIT extends BaseConf { + private RMQNormalProducer producer = null; + + @Before + public void setUp() { + producer = getProducer(nsAddr, null); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSynSendMessage() { + int msgSize = 10; + String topic1 = initTopic(); + String topic2 = initTopic(); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer.subscribe(topic2, "*"); + + producer.send(MQMessageFactory.getMsg(topic1, msgSize)); + producer.send(MQMessageFactory.getMsg(topic2, msgSize)); + + Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testConsumeWithDiffTag() { + int msgSize = 10; + String topic1 = initTopic(); + String topic2 = initTopic(); + String tag = "jueyin_tag"; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer.subscribe(topic2, tag); + + producer.send(MQMessageFactory.getMsg(topic1, msgSize)); + producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag)); + + Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testConsumeWithDiffTagAndFilter() { + int msgSize = 10; + String topic1 = initTopic(); + String topic2 = initTopic(); + String tag1 = "jueyin_tag_1"; + String tag2 = "jueyin_tag_2"; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new RMQNormalListner()); + consumer.subscribe(topic2, tag1); + + producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2)); + producer.clearMsg(); + producer.send(MQMessageFactory.getMsg(topic1, msgSize)); + producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1)); + + Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java new file mode 100644 index 0000000..4125433 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.async; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.apache.rocketmq.test.factory.SendCallBackFactory; +import org.apache.rocketmq.test.util.RandomUtils; +import org.apache.rocketmq.test.util.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class AsyncSendExceptionIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private static boolean sendFail = false; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSendCallBackNull() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + SendCallback sendCallback = null; + producer.send(msg, sendCallback); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testSendMQNull() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + MessageQueue messageQueue = null; + producer.send(msg, messageQueue, SendCallBackFactory.getSendCallBack()); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSendSelectorNull() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + MessageQueueSelector selector = null; + producer.send(msg, selector, 100, SendCallBackFactory.getSendCallBack()); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSelectorThrowsException() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> list, Message message, Object o) { + String str = null; + return list.get(str.length()); + } + }, null, SendCallBackFactory.getSendCallBack()); + } + + @Test + public void testQueueIdBigThanQueueNum() throws Exception { + int queueId = 100; + sendFail = false; + MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + + producer.send(msg, mq, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + } + + @Override + public void onException(Throwable throwable) { + sendFail = true; + } + }); + + int checkNum = 50; + while (!sendFail && checkNum > 0) { + checkNum--; + TestUtils.waitForMonment(100); + } + producer.shutdown(); + assertThat(sendFail).isEqualTo(true); + } + + @Test + public void testQueueIdSmallZero() throws Exception { + int queueId = -100; + sendFail = true; + MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + + producer.send(msg, mq, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + sendFail = false; + } + + @Override + public void onException(Throwable throwable) { + sendFail = true; + } + }); + + int checkNum = 50; + while (sendFail && checkNum > 0) { + checkNum--; + TestUtils.waitForMonment(100); + } + producer.shutdown(); + assertThat(sendFail).isEqualTo(false); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java new file mode 100644 index 0000000..53a992c --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.async; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class AsyncSendWithMessageQueueIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private static boolean sendFail = false; + private RMQAsyncSendProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + producer = getAsyncProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testAsyncSendWithMQ() { + int msgSize = 20; + int queueId = 0; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); + + producer.asyncSend(msgSize, mq); + producer.waitForResponse(5 * 1000); + assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); + + producer.clearMsg(); + consumer.clearMsg(); + + mq = new MessageQueue(topic, broker2Name, queueId); + producer.asyncSend(msgSize, mq); + producer.waitForResponse(5 * 1000); + assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java new file mode 100644 index 0000000..68c2b0e --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.async; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class AsyncSendWithMessageQueueSelectorIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private static boolean sendFail = false; + private RMQAsyncSendProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + producer = getAsyncProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSendWithSelector() { + int msgSize = 20; + final int queueId = 0; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + producer.asyncSend(msgSize, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> list, Message message, Object o) { + for (MessageQueue mq : list) { + if (mq.getQueueId() == queueId && mq.getBrokerName().equals(broker1Name)) { + return mq; + } + } + return list.get(0); + } + }); + producer.waitForResponse(5 * 1000); + assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); + + producer.clearMsg(); + consumer.clearMsg(); + + producer.asyncSend(msgSize, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> list, Message message, Object o) { + for (MessageQueue mq : list) { + if (mq.getQueueId() == queueId && mq.getBrokerName().equals(broker2Name)) { + return mq; + } + } + return list.get(8); + } + }); + producer.waitForResponse(5 * 1000); + assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java new file mode 100644 index 0000000..51aeef4 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.async; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class AsyncSendWithOnlySendCallBackIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private RMQAsyncSendProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + producer = getAsyncProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSendWithOnlyCallBack() { + int msgSize = 20; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + producer.asyncSend(msgSize); + producer.waitForResponse(10 * 1000); + assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java new file mode 100644 index 0000000..e524fb3 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.exception.msg; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.factory.MessageFactory; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.apache.rocketmq.test.util.RandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class ChinaPropIT extends BaseConf { + private static DefaultMQProducer producer = null; + private static String topic = null; + + @Before + public void setUp() { + producer = ProducerFactory.getRMQProducer(nsAddr); + topic = initTopic(); + } + + @After + public void tearDown() { + producer.shutdown(); + } + + /** + * @since version3.4.6 + */ + @Test(expected = org.apache.rocketmq.client.exception.MQBrokerException.class) + public void testSend20kChinaPropMsg() throws Exception { + Message msg = MessageFactory.getRandomMessage(topic); + msg.putUserProperty("key", RandomUtils.getCheseWord(32 * 1024 + 1)); + producer.send(msg); + } + + /** + * @since version3.4.6 + */ + @Test + public void testSend10kChinaPropMsg() { + + Message msg = MessageFactory.getRandomMessage(topic); + msg.putUserProperty("key", RandomUtils.getCheseWord(10 * 1024)); + SendResult sendResult = null; + try { + sendResult = producer.send(msg); + } catch (Exception e) { + } + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java new file mode 100644 index 0000000..6fa8bd9 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.exception.msg; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.factory.MessageFactory; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.apache.rocketmq.test.util.RandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class MessageExceptionIT extends BaseConf { + private static DefaultMQProducer producer = null; + private static String topic = null; + + @Before + public void setUp() { + producer = ProducerFactory.getRMQProducer(nsAddr); + topic = initTopic(); + } + + @After + public void tearDown() { + producer.shutdown(); + } + + @Test + public void testProducerSmoke() { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + SendResult sendResult = null; + try { + sendResult = producer.send(msg); + } catch (Exception e) { + } + + assertThat(sendResult).isNotEqualTo(null); + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSynSendNullMessage() throws Exception { + producer.send(null); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSynSendNullBodyMessage() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + msg.setBody(null); + producer.send(msg); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSynSendZeroSizeBodyMessage() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + msg.setBody(new byte[0]); + producer.send(msg); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSynSendOutOfSizeBodyMessage() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + msg.setBody(new byte[1024 * 1024 * 4 + 1]); + producer.send(msg); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSynSendNullTopicMessage() throws Exception { + Message msg = new Message(null, RandomUtils.getStringByUUID().getBytes()); + producer.send(msg); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSynSendBlankTopicMessage() throws Exception { + Message msg = new Message("", RandomUtils.getStringByUUID().getBytes()); + producer.send(msg); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSend128kMsg() throws Exception { + Message msg = new Message(topic, + RandomUtils.getStringWithNumber(1024 * 1024 * 4 + 1).getBytes()); + producer.send(msg); + } + + @Test + public void testSendLess128kMsg() { + Message msg = new Message(topic, RandomUtils.getStringWithNumber(128 * 1024).getBytes()); + SendResult sendResult = null; + try { + sendResult = producer.send(msg); + } catch (Exception e) { + } + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + } + + @Test + public void testSendMsgWithUserProperty() { + Message msg = MessageFactory.getRandomMessage(topic); + msg.putUserProperty("key", RandomUtils.getCheseWord(10 * 1024)); + SendResult sendResult = null; + try { + sendResult = producer.send(msg); + } catch (Exception e) { + } + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java new file mode 100644 index 0000000..b5882df --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.exception.msg; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class MessageUserPropIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + /** + * @since version3.4.6 + */ + @Test + public void testSendEnglishUserProp() { + Message msg = MessageFactory.getRandomMessage(topic); + String msgKey = "jueyinKey"; + String msgValue = "jueyinValue"; + msg.putUserProperty(msgKey, msgValue); + + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + producer.send(msg, null); + assertThat(producer.getAllMsgBody().size()).isEqualTo(1); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + Message sendMsg = (Message) producer.getFirstMsg(); + Message recvMsg = (Message) consumer.getListner().getFirstMsg(); + assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey)); + } + + /** + * @since version3.4.6 + */ + @Test + public void testSendChinaUserProp() { + Message msg = MessageFactory.getRandomMessage(topic); + String msgKey = "jueyinKey"; + String msgValue = "jueyinzhi"; + msg.putUserProperty(msgKey, msgValue); + + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + producer.send(msg, null); + assertThat(producer.getAllMsgBody().size()).isEqualTo(1); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + Message sendMsg = (Message) producer.getFirstMsg(); + Message recvMsg = (Message) consumer.getListner().getFirstMsg(); + assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java new file mode 100644 index 0000000..fbaa3c2 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.exception.producer; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.util.RandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class ProducerGroupAndInstanceNameValidityIT extends BaseConf { + private static Logger logger = Logger.getLogger(ProducerGroupAndInstanceNameValidityIT.class); + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + } + + @After + public void tearDown() { + super.shutDown(); + } + + /** + * @since version3.4.6 + */ + @Test + public void testTwoProducerSameGroupAndInstanceName() { + RMQNormalProducer producer1 = getProducer(nsAddr, topic); + assertThat(producer1.isStartSuccess()).isEqualTo(true); + RMQNormalProducer producer2 = getProducer(nsAddr, topic, + producer1.getProducerGroupName(), producer1.getProducerInstanceName()); + assertThat(producer2.isStartSuccess()).isEqualTo(false); + } + + /** + * @since version3.4.6 + */ + @Test + public void testTwoProducerSameGroup() { + RMQNormalProducer producer1 = getProducer(nsAddr, topic); + assertThat(producer1.isStartSuccess()).isEqualTo(true); + RMQNormalProducer producer2 = getProducer(nsAddr, topic, + producer1.getProducerGroupName(), RandomUtils.getStringByUUID()); + assertThat(producer2.isStartSuccess()).isEqualTo(true); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java new file mode 100644 index 0000000..19d8ca5 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.oneway; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.factory.ProducerFactory; +import org.apache.rocketmq.test.util.RandomUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class OneWaySendExceptionIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private static boolean sendFail = false; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test(expected = java.lang.NullPointerException.class) + public void testSendMQNull() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + MessageQueue messageQueue = null; + producer.sendOneway(msg, messageQueue); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSendSelectorNull() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + MessageQueueSelector selector = null; + producer.sendOneway(msg, selector, 100); + } + + @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class) + public void testSelectorThrowsException() throws Exception { + Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes()); + DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); + producer.sendOneway(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> list, Message message, Object o) { + String str = null; + return list.get(str.length()); + } + }, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java new file mode 100644 index 0000000..37df4f8 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.oneway; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OneWaySendIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private RMQAsyncSendProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + producer = getAsyncProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testOneWaySendWithOnlyMsgAsParam() { + int msgSize = 20; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + producer.sendOneWay(msgSize); + producer.waitForResponse(5 * 1000); + assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java new file mode 100644 index 0000000..a2b601b --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.oneway; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OneWaySendWithMQIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private static boolean sendFail = false; + private RMQAsyncSendProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + producer = getAsyncProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testAsyncSendWithMQ() { + int msgSize = 20; + int queueId = 0; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + MessageQueue mq = new MessageQueue(topic, broker1Name, queueId); + + producer.sendOneWay(msgSize, mq); + producer.waitForResponse(5 * 1000); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + producer.clearMsg(); + consumer.clearMsg(); + + mq = new MessageQueue(topic, broker2Name, queueId); + producer.asyncSend(msgSize, mq); + producer.waitForResponse(5 * 1000); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java new file mode 100644 index 0000000..aa70556 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.oneway; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT; +import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OneWaySendWithSelectorIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private static boolean sendFail = false; + private RMQAsyncSendProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("user topic[%s]!", topic)); + producer = getAsyncProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSendWithSelector() { + int msgSize = 20; + final int queueId = 0; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + producer.sendOneWay(msgSize, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> list, Message message, Object o) { + for (MessageQueue mq : list) { + if (mq.getQueueId() == queueId && mq.getBrokerName().equals(broker1Name)) { + return mq; + } + } + return list.get(0); + } + }); + assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); + + producer.clearMsg(); + consumer.clearMsg(); + + producer.sendOneWay(msgSize, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> list, Message message, Object o) { + for (MessageQueue mq : list) { + if (mq.getQueueId() == queueId && mq.getBrokerName().equals(broker2Name)) { + return mq; + } + } + return list.get(8); + } + }); + assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + VerifyUtils.verifyMessageQueueId(queueId, consumer.getListner().getAllOriginMsg()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java new file mode 100644 index 0000000..a6520b4 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.order; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; +import org.apache.rocketmq.test.message.MessageQueueMsg; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OrderMsgDynamicRebalanceIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerAndCrashOne() { + int msgSize = 10; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", + new RMQOrderListener("1")); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener("2")); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner()); + consumer2.shutdown(); + + mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testThreeConsumerAndCrashOne() { + int msgSize = 10; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", + new RMQOrderListener("1")); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener("2")); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener("3")); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner(), consumer3.getListner()); + consumer3.shutdown(); + + mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + boolean recvAll = MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); + assertThat(recvAll).isEqualTo(true); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListner()).getMsgs())) + .isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java new file mode 100644 index 0000000..006aaa1 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.order; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; +import org.apache.rocketmq.test.message.MessageQueueMsg; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OrderMsgIT extends BaseConf { + private static Logger logger = Logger.getLogger(OrderMsgIT.class); + private RMQNormalProducer producer = null; + private RMQNormalConsumer consumer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + } + + @After + public void tearDown() { + shutDown(); + } + + @Test + public void testOrderMsg() { + int msgSize = 10; + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(mqMsgs.getMsgBodys()); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testSendOneQueue() { + int msgSize = 20; + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(MQMessageFactory.getMessageQueues(mqs.get(0)), + msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(mqMsgs.getMsgBodys()); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testSendRandomQueues() { + int msgSize = 10; + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg( + MQMessageFactory.getMessageQueues(mqs.get(0), mqs.get(1), mqs.get(mqs.size() - 1)), + msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(mqMsgs.getMsgBodys()); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } +}