http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java
new file mode 100644
index 0000000..995bf41
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithMulConsumerIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.tag;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.factory.TagMessage;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class TagMessageWithMulConsumerIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        String consumerId = initConsumerGroup();
+        logger.info(String.format("use topic: %s; consumerId: %s !", topic, 
consumerId));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSendTwoTag() {
+        String tag1 = "jueyin1";
+        String tag2 = "jueyin2";
+        int msgSize = 10;
+        RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tag1,
+            new RMQNormalListner());
+        RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tag2,
+            new RMQNormalListner());
+
+        List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, 
msgSize);
+        producer.send(tag1Msgs);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, 
msgSize);
+        producer.send(tag2Msgs);
+        Assert.assertEquals("Not all are sent", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+
+        
consumerTag1.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag1Msgs),
+            consumeTime);
+        
consumerTag2.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs),
+            consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerTag1.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag1Msgs));
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerTag2.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs));
+    }
+
+    @Test
+    public void testSendMessagesWithTwoTag() {
+        String tags[] = {"jueyin1", "jueyin2"};
+        int msgSize = 10;
+
+        TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
+        RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, tags[0],
+            new RMQNormalListner());
+        RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, tags[1],
+            new RMQNormalListner());
+
+        List<Object> tagMsgs = tagMessage.getMixedTagMessages();
+        producer.send(tagMsgs);
+        Assert.assertEquals("Not all are sent", msgSize * tags.length,
+            producer.getAllUndupMsgBody().size());
+
+        
consumerTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]),
+            consumeTime);
+        
consumerTag2.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[1]),
+            consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerTag1.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerTag2.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[1]));
+    }
+
+    @Test
+    public void testTwoConsumerOneMatchOneOtherMatchAll() {
+        String tags[] = {"jueyin1", "jueyin2"};
+        String sub1 = String.format("%s||%s", tags[0], tags[1]);
+        String sub2 = String.format("%s|| noExist", tags[0]);
+        int msgSize = 10;
+
+        TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
+        RMQNormalConsumer consumerTag1 = getConsumer(nsAddr, topic, sub1,
+            new RMQNormalListner());
+        RMQNormalConsumer consumerTag2 = getConsumer(nsAddr, topic, sub2,
+            new RMQNormalListner());
+
+        List<Object> tagMsgs = tagMessage.getMixedTagMessages();
+        producer.send(tagMsgs);
+        Assert.assertEquals("Not all are sent", msgSize * tags.length,
+            producer.getAllUndupMsgBody().size());
+
+        
consumerTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags),
+            consumeTime);
+        
consumerTag2.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]),
+            consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerTag1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(tagMessage.getAllTagMessageBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerTag2.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
+    }
+
+    @Test
+    public void testSubKindsOf() {
+        String tags[] = {"jueyin1", "jueyin2"};
+        String sub1 = String.format("%s||%s", tags[0], tags[1]);
+        String sub2 = String.format("%s|| noExist", tags[0]);
+        String sub3 = tags[0];
+        String sub4 = "*";
+        int msgSize = 10;
+
+        RMQNormalConsumer consumerSubTwoMatchAll = getConsumer(nsAddr, topic, 
sub1,
+            new RMQNormalListner());
+        RMQNormalConsumer consumerSubTwoMachieOne = getConsumer(nsAddr, topic, 
sub2,
+            new RMQNormalListner());
+        RMQNormalConsumer consumerSubTag1 = getConsumer(nsAddr, topic, sub3,
+            new RMQNormalListner());
+        RMQNormalConsumer consumerSubAll = getConsumer(nsAddr, topic, sub4,
+            new RMQNormalListner());
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        Collection<Object> msgsWithNoTag = producer.getMsgBodysCopy();
+
+        TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
+        List<Object> tagMsgs = tagMessage.getMixedTagMessages();
+        producer.send(tagMsgs);
+        Assert.assertEquals("Not all are sent", msgSize * 3, 
producer.getAllUndupMsgBody().size());
+
+        consumerSubTwoMatchAll.getListner()
+            .waitForMessageConsume(tagMessage.getMessageBodyByTag(tags), 
consumeTime);
+        consumerSubTwoMachieOne.getListner()
+            .waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]), 
consumeTime);
+        
consumerSubTag1.getListner().waitForMessageConsume(tagMessage.getMessageBodyByTag(tags[0]),
+            consumeTime);
+        consumerSubAll.getListner().waitForMessageConsume(
+            MQMessageFactory.getMessage(msgsWithNoTag, 
tagMessage.getAllTagMessageBody()),
+            consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerSubTwoMatchAll.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(tagMessage.getAllTagMessageBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerSubTwoMachieOne.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerSubTag1.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(tagMessage.getMessageBodyByTag(tags[0]));
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumerSubAll.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(MQMessageFactory.getMessage(msgsWithNoTag,
+                tagMessage.getAllTagMessageBody()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
new file mode 100644
index 0000000..03e81eb
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.tag;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class TagMessageWithSameGroupConsumerIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerWithSameGroup() {
+        String tag = "jueyin";
+        int msgSize = 20;
+        String originMsgDCName = RandomUtils.getStringByUUID();
+        String msgBodyDCName = RandomUtils.getStringByUUID();
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
+            new RMQNormalListner(originMsgDCName, msgBodyDCName));
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), tag,
+            new RMQNormalListner(originMsgDCName, msgBodyDCName));
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testConsumerStartWithInterval() {
+        String tag = "jueyin";
+        int msgSize = 100;
+        String originMsgDCName = RandomUtils.getStringByUUID();
+        String msgBodyDCName = RandomUtils.getStringByUUID();
+
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
+            new RMQNormalListner(originMsgDCName, msgBodyDCName));
+        producer.send(tag, msgSize, 100);
+        TestUtils.waitForMonment(5);
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), tag,
+            new RMQNormalListner(originMsgDCName, msgBodyDCName));
+        TestUtils.waitForMonment(5);
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testConsumerStartTwoAndCrashOnsAfterWhile() {
+        String tag = "jueyin";
+        int msgSize = 100;
+        String originMsgDCName = RandomUtils.getStringByUUID();
+        String msgBodyDCName = RandomUtils.getStringByUUID();
+
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
+            new RMQNormalListner(originMsgDCName, msgBodyDCName));
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), tag,
+            new RMQNormalListner(originMsgDCName, msgBodyDCName));
+
+        producer.send(tag, msgSize, 100);
+        TestUtils.waitForMonment(5);
+        consumer2.shutdown();
+        mqClients.remove(1);
+        TestUtils.waitForMonment(5);
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java
new file mode 100644
index 0000000..98d858b
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/MulConsumerMulTopicIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.topic;
+
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.MQWait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class MulConsumerMulTopicIT extends BaseConf {
+    private RMQNormalProducer producer = null;
+
+    @Before
+    public void setUp() {
+        producer = getProducer(nsAddr, null);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSynSendMessage() {
+        int msgSize = 10;
+        String topic1 = initTopic();
+        String topic2 = initTopic();
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer1.subscribe(topic2, "*");
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic1,
+            "*", new RMQNormalListner());
+        consumer2.subscribe(topic2, "*");
+
+        producer.send(MQMessageFactory.getMsg(topic1, msgSize));
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize));
+        Assert.assertEquals("Not all sent succeeded", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+
+    @Test
+    public void testConsumeWithDiffTag() {
+        int msgSize = 10;
+        String topic1 = initTopic();
+        String topic2 = initTopic();
+        String tag = "jueyin_tag";
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer1.subscribe(topic2, tag);
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic1,
+            "*", new RMQNormalListner());
+        consumer2.subscribe(topic2, tag);
+
+        producer.send(MQMessageFactory.getMsg(topic1, msgSize));
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag));
+        Assert.assertEquals("Not all sent succeeded", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+
+    @Test
+    public void testConsumeWithDiffTagAndFilter() {
+        int msgSize = 10;
+        String topic1 = initTopic();
+        String topic2 = initTopic();
+        String tag1 = "jueyin_tag_1";
+        String tag2 = "jueyin_tag_2";
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer1.subscribe(topic2, tag1);
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer2.subscribe(topic2, tag1);
+
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2));
+        producer.clearMsg();
+        producer.send(MQMessageFactory.getMsg(topic1, msgSize));
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1));
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java
new file mode 100644
index 0000000..969fa79
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/topic/OneConsumerMulTopicIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.topic;
+
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OneConsumerMulTopicIT extends BaseConf {
+    private RMQNormalProducer producer = null;
+
+    @Before
+    public void setUp() {
+        producer = getProducer(nsAddr, null);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSynSendMessage() {
+        int msgSize = 10;
+        String topic1 = initTopic();
+        String topic2 = initTopic();
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer.subscribe(topic2, "*");
+
+        producer.send(MQMessageFactory.getMsg(topic1, msgSize));
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize));
+
+        Assert.assertEquals("Not all are sent", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testConsumeWithDiffTag() {
+        int msgSize = 10;
+        String topic1 = initTopic();
+        String topic2 = initTopic();
+        String tag = "jueyin_tag";
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer.subscribe(topic2, tag);
+
+        producer.send(MQMessageFactory.getMsg(topic1, msgSize));
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag));
+
+        Assert.assertEquals("Not all are sent", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testConsumeWithDiffTagAndFilter() {
+        int msgSize = 10;
+        String topic1 = initTopic();
+        String topic2 = initTopic();
+        String tag1 = "jueyin_tag_1";
+        String tag2 = "jueyin_tag_2";
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic1, "*", new 
RMQNormalListner());
+        consumer.subscribe(topic2, tag1);
+
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag2));
+        producer.clearMsg();
+        producer.send(MQMessageFactory.getMsg(topic1, msgSize));
+        producer.send(MQMessageFactory.getMsg(topic2, msgSize, tag1));
+
+        Assert.assertEquals("Not all are sent", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
new file mode 100644
index 0000000..4125433
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.async;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.factory.ProducerFactory;
+import org.apache.rocketmq.test.factory.SendCallBackFactory;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class AsyncSendExceptionIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private static boolean sendFail = false;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSendCallBackNull() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        SendCallback sendCallback = null;
+        producer.send(msg, sendCallback);
+    }
+
+    @Test(expected = java.lang.NullPointerException.class)
+    public void testSendMQNull() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        MessageQueue messageQueue = null;
+        producer.send(msg, messageQueue, 
SendCallBackFactory.getSendCallBack());
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSendSelectorNull() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        MessageQueueSelector selector = null;
+        producer.send(msg, selector, 100, 
SendCallBackFactory.getSendCallBack());
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSelectorThrowsException() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        producer.send(msg, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> list, Message 
message, Object o) {
+                String str = null;
+                return list.get(str.length());
+            }
+        }, null, SendCallBackFactory.getSendCallBack());
+    }
+
+    @Test
+    public void testQueueIdBigThanQueueNum() throws Exception {
+        int queueId = 100;
+        sendFail = false;
+        MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+
+        producer.send(msg, mq, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+            }
+
+            @Override
+            public void onException(Throwable throwable) {
+                sendFail = true;
+            }
+        });
+
+        int checkNum = 50;
+        while (!sendFail && checkNum > 0) {
+            checkNum--;
+            TestUtils.waitForMonment(100);
+        }
+        producer.shutdown();
+        assertThat(sendFail).isEqualTo(true);
+    }
+
+    @Test
+    public void testQueueIdSmallZero() throws Exception {
+        int queueId = -100;
+        sendFail = true;
+        MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+
+        producer.send(msg, mq, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                sendFail = false;
+            }
+
+            @Override
+            public void onException(Throwable throwable) {
+                sendFail = true;
+            }
+        });
+
+        int checkNum = 50;
+        while (sendFail && checkNum > 0) {
+            checkNum--;
+            TestUtils.waitForMonment(100);
+        }
+        producer.shutdown();
+        assertThat(sendFail).isEqualTo(false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java
new file mode 100644
index 0000000..53a992c
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.async;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class AsyncSendWithMessageQueueIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private static boolean sendFail = false;
+    private RMQAsyncSendProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+        producer = getAsyncProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testAsyncSendWithMQ() {
+        int msgSize = 20;
+        int queueId = 0;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
+
+        producer.asyncSend(msgSize, mq);
+        producer.waitForResponse(5 * 1000);
+        assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        VerifyUtils.verifyMessageQueueId(queueId, 
consumer.getListner().getAllOriginMsg());
+
+        producer.clearMsg();
+        consumer.clearMsg();
+
+        mq = new MessageQueue(topic, broker2Name, queueId);
+        producer.asyncSend(msgSize, mq);
+        producer.waitForResponse(5 * 1000);
+        assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        VerifyUtils.verifyMessageQueueId(queueId, 
consumer.getListner().getAllOriginMsg());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java
new file mode 100644
index 0000000..68c2b0e
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithMessageQueueSelectorIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.async;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class AsyncSendWithMessageQueueSelectorIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private static boolean sendFail = false;
+    private RMQAsyncSendProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+        producer = getAsyncProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSendWithSelector() {
+        int msgSize = 20;
+        final int queueId = 0;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        producer.asyncSend(msgSize, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> list, Message 
message, Object o) {
+                for (MessageQueue mq : list) {
+                    if (mq.getQueueId() == queueId && 
mq.getBrokerName().equals(broker1Name)) {
+                        return mq;
+                    }
+                }
+                return list.get(0);
+            }
+        });
+        producer.waitForResponse(5 * 1000);
+        assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        VerifyUtils.verifyMessageQueueId(queueId, 
consumer.getListner().getAllOriginMsg());
+
+        producer.clearMsg();
+        consumer.clearMsg();
+
+        producer.asyncSend(msgSize, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> list, Message 
message, Object o) {
+                for (MessageQueue mq : list) {
+                    if (mq.getQueueId() == queueId && 
mq.getBrokerName().equals(broker2Name)) {
+                        return mq;
+                    }
+                }
+                return list.get(8);
+            }
+        });
+        producer.waitForResponse(5 * 1000);
+        assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        VerifyUtils.verifyMessageQueueId(queueId, 
consumer.getListner().getAllOriginMsg());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java
new file mode 100644
index 0000000..51aeef4
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendWithOnlySendCallBackIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.async;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class AsyncSendWithOnlySendCallBackIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private RMQAsyncSendProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+        producer = getAsyncProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSendWithOnlyCallBack() {
+        int msgSize = 20;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        producer.asyncSend(msgSize);
+        producer.waitForResponse(10 * 1000);
+        assertThat(producer.getSuccessMsgCount()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java
new file mode 100644
index 0000000..e524fb3
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/ChinaPropIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.exception.msg;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.factory.MessageFactory;
+import org.apache.rocketmq.test.factory.ProducerFactory;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class ChinaPropIT extends BaseConf {
+    private static DefaultMQProducer producer = null;
+    private static String topic = null;
+
+    @Before
+    public void setUp() {
+        producer = ProducerFactory.getRMQProducer(nsAddr);
+        topic = initTopic();
+    }
+
+    @After
+    public void tearDown() {
+        producer.shutdown();
+    }
+
+    /**
+     * @since version3.4.6
+     */
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQBrokerException.class)
+    public void testSend20kChinaPropMsg() throws Exception {
+        Message msg = MessageFactory.getRandomMessage(topic);
+        msg.putUserProperty("key", RandomUtils.getCheseWord(32 * 1024 + 1));
+        producer.send(msg);
+    }
+
+    /**
+     * @since version3.4.6
+     */
+    @Test
+    public void testSend10kChinaPropMsg() {
+
+        Message msg = MessageFactory.getRandomMessage(topic);
+        msg.putUserProperty("key", RandomUtils.getCheseWord(10 * 1024));
+        SendResult sendResult = null;
+        try {
+            sendResult = producer.send(msg);
+        } catch (Exception e) {
+        }
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
new file mode 100644
index 0000000..6fa8bd9
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageExceptionIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.exception.msg;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.factory.MessageFactory;
+import org.apache.rocketmq.test.factory.ProducerFactory;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class MessageExceptionIT extends BaseConf {
+    private static DefaultMQProducer producer = null;
+    private static String topic = null;
+
+    @Before
+    public void setUp() {
+        producer = ProducerFactory.getRMQProducer(nsAddr);
+        topic = initTopic();
+    }
+
+    @After
+    public void tearDown() {
+        producer.shutdown();
+    }
+
+    @Test
+    public void testProducerSmoke() {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        SendResult sendResult = null;
+        try {
+            sendResult = producer.send(msg);
+        } catch (Exception e) {
+        }
+
+        assertThat(sendResult).isNotEqualTo(null);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSynSendNullMessage() throws Exception {
+        producer.send(null);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSynSendNullBodyMessage() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        msg.setBody(null);
+        producer.send(msg);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSynSendZeroSizeBodyMessage() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        msg.setBody(new byte[0]);
+        producer.send(msg);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSynSendOutOfSizeBodyMessage() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        msg.setBody(new byte[1024 * 1024 * 4 + 1]);
+        producer.send(msg);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSynSendNullTopicMessage() throws Exception {
+        Message msg = new Message(null, 
RandomUtils.getStringByUUID().getBytes());
+        producer.send(msg);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSynSendBlankTopicMessage() throws Exception {
+        Message msg = new Message("", 
RandomUtils.getStringByUUID().getBytes());
+        producer.send(msg);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSend128kMsg() throws Exception {
+        Message msg = new Message(topic,
+            RandomUtils.getStringWithNumber(1024 * 1024 * 4 + 1).getBytes());
+        producer.send(msg);
+    }
+
+    @Test
+    public void testSendLess128kMsg() {
+        Message msg = new Message(topic, RandomUtils.getStringWithNumber(128 * 
1024).getBytes());
+        SendResult sendResult = null;
+        try {
+            sendResult = producer.send(msg);
+        } catch (Exception e) {
+        }
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+    }
+
+    @Test
+    public void testSendMsgWithUserProperty() {
+        Message msg = MessageFactory.getRandomMessage(topic);
+        msg.putUserProperty("key", RandomUtils.getCheseWord(10 * 1024));
+        SendResult sendResult = null;
+        try {
+            sendResult = producer.send(msg);
+        } catch (Exception e) {
+        }
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java
new file mode 100644
index 0000000..b5882df
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/msg/MessageUserPropIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.exception.msg;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.test.base.BaseConf;
+import 
org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MessageFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class MessageUserPropIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    /**
+     * @since version3.4.6
+     */
+    @Test
+    public void testSendEnglishUserProp() {
+        Message msg = MessageFactory.getRandomMessage(topic);
+        String msgKey = "jueyinKey";
+        String msgValue = "jueyinValue";
+        msg.putUserProperty(msgKey, msgValue);
+
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        producer.send(msg, null);
+        assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        Message sendMsg = (Message) producer.getFirstMsg();
+        Message recvMsg = (Message) consumer.getListner().getFirstMsg();
+        
assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey));
+    }
+
+    /**
+     * @since version3.4.6
+     */
+    @Test
+    public void testSendChinaUserProp() {
+        Message msg = MessageFactory.getRandomMessage(topic);
+        String msgKey = "jueyinKey";
+        String msgValue = "jueyinzhi";
+        msg.putUserProperty(msgKey, msgValue);
+
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        producer.send(msg, null);
+        assertThat(producer.getAllMsgBody().size()).isEqualTo(1);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        Message sendMsg = (Message) producer.getFirstMsg();
+        Message recvMsg = (Message) consumer.getListner().getFirstMsg();
+        
assertThat(recvMsg.getUserProperty(msgKey)).isEqualTo(sendMsg.getUserProperty(msgKey));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java
new file mode 100644
index 0000000..fbaa3c2
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/exception/producer/ProducerGroupAndInstanceNameValidityIT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.exception.producer;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class ProducerGroupAndInstanceNameValidityIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(ProducerGroupAndInstanceNameValidityIT.class);
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    /**
+     * @since version3.4.6
+     */
+    @Test
+    public void testTwoProducerSameGroupAndInstanceName() {
+        RMQNormalProducer producer1 = getProducer(nsAddr, topic);
+        assertThat(producer1.isStartSuccess()).isEqualTo(true);
+        RMQNormalProducer producer2 = getProducer(nsAddr, topic,
+            producer1.getProducerGroupName(), 
producer1.getProducerInstanceName());
+        assertThat(producer2.isStartSuccess()).isEqualTo(false);
+    }
+
+    /**
+     * @since version3.4.6
+     */
+    @Test
+    public void testTwoProducerSameGroup() {
+        RMQNormalProducer producer1 = getProducer(nsAddr, topic);
+        assertThat(producer1.isStartSuccess()).isEqualTo(true);
+        RMQNormalProducer producer2 = getProducer(nsAddr, topic,
+            producer1.getProducerGroupName(), RandomUtils.getStringByUUID());
+        assertThat(producer2.isStartSuccess()).isEqualTo(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java
new file mode 100644
index 0000000..19d8ca5
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendExceptionIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.oneway;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.factory.ProducerFactory;
+import org.apache.rocketmq.test.util.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OneWaySendExceptionIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private static boolean sendFail = false;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test(expected = java.lang.NullPointerException.class)
+    public void testSendMQNull() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        MessageQueue messageQueue = null;
+        producer.sendOneway(msg, messageQueue);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSendSelectorNull() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        MessageQueueSelector selector = null;
+        producer.sendOneway(msg, selector, 100);
+    }
+
+    @Test(expected = 
org.apache.rocketmq.client.exception.MQClientException.class)
+    public void testSelectorThrowsException() throws Exception {
+        Message msg = new Message(topic, 
RandomUtils.getStringByUUID().getBytes());
+        DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
+        producer.sendOneway(msg, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> list, Message 
message, Object o) {
+                String str = null;
+                return list.get(str.length());
+            }
+        }, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java
new file mode 100644
index 0000000..37df4f8
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.oneway;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OneWaySendIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private RMQAsyncSendProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+        producer = getAsyncProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testOneWaySendWithOnlyMsgAsParam() {
+        int msgSize = 20;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        producer.sendOneWay(msgSize);
+        producer.waitForResponse(5 * 1000);
+        assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java
new file mode 100644
index 0000000..a2b601b
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithMQIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.oneway;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OneWaySendWithMQIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private static boolean sendFail = false;
+    private RMQAsyncSendProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+        producer = getAsyncProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testAsyncSendWithMQ() {
+        int msgSize = 20;
+        int queueId = 0;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        MessageQueue mq = new MessageQueue(topic, broker1Name, queueId);
+
+        producer.sendOneWay(msgSize, mq);
+        producer.waitForResponse(5 * 1000);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        producer.clearMsg();
+        consumer.clearMsg();
+
+        mq = new MessageQueue(topic, broker2Name, queueId);
+        producer.asyncSend(msgSize, mq);
+        producer.waitForResponse(5 * 1000);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java
new file mode 100644
index 0000000..aa70556
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/oneway/OneWaySendWithSelectorIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.oneway;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OneWaySendWithSelectorIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private static boolean sendFail = false;
+    private RMQAsyncSendProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("user topic[%s]!", topic));
+        producer = getAsyncProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSendWithSelector() {
+        int msgSize = 20;
+        final int queueId = 0;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        producer.sendOneWay(msgSize, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> list, Message 
message, Object o) {
+                for (MessageQueue mq : list) {
+                    if (mq.getQueueId() == queueId && 
mq.getBrokerName().equals(broker1Name)) {
+                        return mq;
+                    }
+                }
+                return list.get(0);
+            }
+        });
+        assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        VerifyUtils.verifyMessageQueueId(queueId, 
consumer.getListner().getAllOriginMsg());
+
+        producer.clearMsg();
+        consumer.clearMsg();
+
+        producer.sendOneWay(msgSize, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> list, Message 
message, Object o) {
+                for (MessageQueue mq : list) {
+                    if (mq.getQueueId() == queueId && 
mq.getBrokerName().equals(broker2Name)) {
+                        return mq;
+                    }
+                }
+                return list.get(8);
+            }
+        });
+        assertThat(producer.getAllMsgBody().size()).isEqualTo(msgSize);
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        VerifyUtils.verifyMessageQueueId(queueId, 
consumer.getListner().getAllOriginMsg());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java
new file mode 100644
index 0000000..a6520b4
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgDynamicRebalanceIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.order;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import 
org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OrderMsgDynamicRebalanceIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerAndCrashOne() {
+        int msgSize = 10;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*",
+            new RMQOrderListener("1"));
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener("2"));
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        MQWait.waitConsumeAll(30 * 1000, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner());
+        consumer2.shutdown();
+
+        mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer1.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer2.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testThreeConsumerAndCrashOne() {
+        int msgSize = 10;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*",
+            new RMQOrderListener("1"));
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener("2"));
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQOrderListener("3"));
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner(), consumer3.getListner());
+        consumer3.shutdown();
+
+        mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        boolean recvAll = MQWait.waitConsumeAll(30 * 1000, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner(), 
consumer3.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer1.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer2.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer3.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java
new file mode 100644
index 0000000..006aaa1
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.producer.order;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OrderMsgIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(OrderMsgIT.class);
+    private RMQNormalProducer producer = null;
+    private RMQNormalConsumer consumer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+        consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
+    }
+
+    @After
+    public void tearDown() {
+        shutDown();
+    }
+
+    @Test
+    public void testOrderMsg() {
+        int msgSize = 10;
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(mqMsgs.getMsgBodys());
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testSendOneQueue() {
+        int msgSize = 20;
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new 
MessageQueueMsg(MQMessageFactory.getMessageQueues(mqs.get(0)),
+            msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(mqMsgs.getMsgBodys());
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+
+    @Test
+    public void testSendRandomQueues() {
+        int msgSize = 10;
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(
+            MQMessageFactory.getMessageQueues(mqs.get(0), mqs.get(1), 
mqs.get(mqs.size() - 1)),
+            msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(mqMsgs.getMsgBodys());
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+}

Reply via email to