Repository: incubator-rocketmq
Updated Branches:
  refs/heads/TravisCI_Test [created] 68278b37f


[ROCKETMQ-52] Resolve infinite loop issue in rocketmq-client UT


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/68278b37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/68278b37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/68278b37

Branch: refs/heads/TravisCI_Test
Commit: 68278b37f5c302f460e6ab43b3751e40554a34a1
Parents: a3aff81
Author: yukon <yu...@apache.org>
Authored: Mon Jan 23 11:05:01 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Mon Jan 23 11:05:01 2017 +0800

----------------------------------------------------------------------
 .../consumer/DefaultMQPushConsumerTest.java     | 27 ++++++++++----------
 1 file changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/68278b37/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 048e456..9c639df 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import org.apache.rocketmq.client.ClientConfig;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -35,7 +35,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
 import 
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -58,7 +57,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Spy;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
@@ -69,17 +67,16 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DefaultMQPushConsumerTest {
     private String consumerGroup;
     private String topic = "FooBar";
     private String brokerName = "BrokerA";
-    @Spy
-    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private MQClientInstance mQClientFactory;
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
     private PullAPIWrapper pullAPIWrapper;
@@ -89,7 +86,6 @@ public class DefaultMQPushConsumerTest {
     @Before
     public void init() throws Exception {
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
-        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, 
consumerGroup, false));
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
@@ -110,6 +106,7 @@ public class DefaultMQPushConsumerTest {
         pushConsumer.subscribe(topic, "*");
         pushConsumer.start();
 
+        mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
         field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
@@ -119,15 +116,17 @@ public class DefaultMQPushConsumerTest {
         field.setAccessible(true);
         field.set(mQClientFactory, mQClientAPIImpl);
 
+        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, 
consumerGroup, false));
         field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
         field.setAccessible(true);
         field.set(pushConsumerImpl, pullAPIWrapper);
 
         
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
         mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
-        mQClientFactory.start();
 
-        doAnswer(new Answer() {
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
+            anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
+            .thenAnswer(new Answer<Object>() {
             @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
                 PullMessageRequestHeader requestHeader = mock.getArgument(1);
                 MessageClientExt messageClientExt = new MessageClientExt();
@@ -140,9 +139,9 @@ public class DefaultMQPushConsumerTest {
                 messageClientExt.setStoreHost(new InetSocketAddress(8080));
                 PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
                 ((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
-                return null;
+                return pullResult;
             }
-        }).when(mQClientAPIImpl).pullMessage(anyString(), 
any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class));
+        });
 
         doReturn(new FindBrokerResult("127.0.0.1:10911", 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean());
         
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(),
 anyString());
@@ -191,12 +190,12 @@ public class DefaultMQPushConsumerTest {
                 return null;
             }
         }));
-        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
 
+        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
         PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
-        pullMessageService.executePullRequestImmediately(createPullRequest());
+        pullMessageService.executePullRequestLater(createPullRequest(), 100);
 
-        countDownLatch.await();
+        countDownLatch.await(10, TimeUnit.SECONDS);
         assertThat(messageExts[0].getTopic()).isEqualTo(topic);
         assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
     }

Reply via email to