[ROCKETMQ-51] Add unit tests for PullMessageProcessor

Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f17da4f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f17da4f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f17da4f4

Branch: refs/heads/ROCKETMQ-54
Commit: f17da4f4f8c0d10deff9a51075f366ff3e141319
Parents: 264a056
Author: yukon <yu...@apache.org>
Authored: Sat Jan 21 21:12:51 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Sat Jan 21 23:21:27 2017 +0800

----------------------------------------------------------------------
 .../processor/PullMessageProcessorTest.java     | 197 +++++++++++++++++++
 .../processor/SendMessageProcessorTest.java     |  12 +-
 2 files changed, 199 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f17da4f4/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
new file mode 100644
index 0000000..9c3ec67
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullMessageProcessorTest {
+    private PullMessageProcessor pullMessageProcessor;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Mock
+    private MessageStore messageStore;
+    private ClientChannelInfo clientChannelInfo;
+    private String group = "FooBarGroup";
+    private String topic = "FooBar";
+
+    @Before
+    public void init() {
+        brokerController.setMessageStore(messageStore);
+        pullMessageProcessor = new PullMessageProcessor(brokerController);
+        Channel mockChannel = mock(Channel.class);
+        when(mockChannel.remoteAddress()).thenReturn(new 
InetSocketAddress(1024));
+        when(handlerContext.channel()).thenReturn(mockChannel);
+        
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
+        clientChannelInfo = new ClientChannelInfo(mockChannel);
+        ConsumerData consumerData = createConsumerData();
+        brokerController.getConsumerManager().registerConsumer(
+            consumerData.getGroupName(),
+            clientChannelInfo,
+            consumerData.getConsumeType(),
+            consumerData.getMessageModel(),
+            consumerData.getConsumeFromWhere(),
+            consumerData.getSubscriptionDataSet(),
+            false);
+    }
+
+    @Test
+    public void testProcessRequest_TopicNotExist() throws 
RemotingCommandException {
+        
brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+        assertThat(response.getRemark()).contains("topic[" + topic + "] not 
exist");
+    }
+
+    @Test
+    public void testProcessRequest_SubNotExist() throws 
RemotingCommandException {
+        brokerController.getConsumerManager().unregisterConsumer(group, 
clientChannelInfo, false);
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+        assertThat(response.getRemark()).contains("consumer's group info not 
exist");
+    }
+
+    @Test
+    public void testProcessRequest_SubNotLatest() throws 
RemotingCommandException {
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        request.addExtField("subVersion", String.valueOf(101));
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+        assertThat(response.getRemark()).contains("subscription not latest");
+    }
+
+    @Test
+    public void testProcessRequest_Found() throws RemotingCommandException {
+        GetMessageResult getMessageResult = createGetMessageResult();
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testProcessRequest_MsgWasRemoving() throws 
RemotingCommandException {
+        GetMessageResult getMessageResult = createGetMessageResult();
+        getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY);
+    }
+
+    @Test
+    public void testProcessRequest_NoMsgInQueue() throws 
RemotingCommandException {
+        GetMessageResult getMessageResult = createGetMessageResult();
+        getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+
+        final RemotingCommand request = 
createPullMsgCommand(RequestCode.PULL_MESSAGE);
+        RemotingCommand response = 
pullMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response).isNotNull();
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED);
+    }
+
+    private RemotingCommand createPullMsgCommand(int requestCode) {
+        PullMessageRequestHeader requestHeader = new 
PullMessageRequestHeader();
+        requestHeader.setCommitOffset(123L);
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setMaxMsgNums(100);
+        requestHeader.setQueueId(1);
+        requestHeader.setQueueOffset(456L);
+        requestHeader.setSubscription("*");
+        requestHeader.setTopic(topic);
+        requestHeader.setSysFlag(0);
+        requestHeader.setSubVersion(100L);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+
+    private ConsumerData createConsumerData() {
+        ConsumerData consumerData = new ConsumerData();
+        
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        consumerData.setGroupName(group);
+        consumerData.setMessageModel(MessageModel.CLUSTERING);
+        Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(topic);
+        subscriptionData.setSubString("*");
+        subscriptionData.setSubVersion(100L);
+        subscriptionDataSet.add(subscriptionData);
+        consumerData.setSubscriptionDataSet(subscriptionDataSet);
+        return consumerData;
+    }
+
+    private GetMessageResult createGetMessageResult() {
+        GetMessageResult getMessageResult = new GetMessageResult();
+        getMessageResult.setStatus(GetMessageStatus.FOUND);
+        getMessageResult.setMinOffset(100);
+        getMessageResult.setMaxOffset(1024);
+        getMessageResult.setNextBeginOffset(516);
+        return getMessageResult;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f17da4f4/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index d41d03a..2cf8d45 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -153,13 +153,7 @@ public class SendMessageProcessorTest {
         requestHeader.setReconsumeTimes(0);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
-        request.addExtField("queueId", 
String.valueOf(requestHeader.getQueueId()));
-        request.addExtField("topic", String.valueOf(requestHeader.getTopic()));
-        request.addExtField("defaultTopicQueueNums", 
String.valueOf(requestHeader.getDefaultTopicQueueNums()));
-        request.addExtField("defaultTopic", requestHeader.getDefaultTopic());
-        request.addExtField("sysFlag", 
String.valueOf(requestHeader.getSysFlag()));
-        request.addExtField("flag", String.valueOf(requestHeader.getFlag()));
-        request.addExtField("bornTimestamp", 
String.valueOf(requestHeader.getBornTimestamp()));
+        request.makeCustomHeaderToNet();
         return request;
     }
 
@@ -172,9 +166,7 @@ public class SendMessageProcessorTest {
         requestHeader.setOffset(123L);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(requestCode, requestHeader);
-        request.addExtField("group", requestHeader.getGroup());
-        request.addExtField("offset", 
String.valueOf(requestHeader.getOffset()));
-        request.addExtField("delayLevel", 
String.valueOf(requestHeader.getDelayLevel()));
+        request.makeCustomHeaderToNet();
         return request;
     }
 

Reply via email to