Repository: incubator-rocketmq Updated Branches: refs/heads/TravisCI_Test [created] 68278b37f
[ROCKETMQ-52] Resolve infinite loop issue in rocketmq-client UT Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/68278b37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/68278b37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/68278b37 Branch: refs/heads/TravisCI_Test Commit: 68278b37f5c302f460e6ab43b3751e40554a34a1 Parents: a3aff81 Author: yukon <yu...@apache.org> Authored: Mon Jan 23 11:05:01 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Mon Jan 23 11:05:01 2017 +0800 ---------------------------------------------------------------------- .../consumer/DefaultMQPushConsumerTest.java | 27 ++++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/68278b37/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 048e456..9c639df 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -24,7 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import org.apache.rocketmq.client.ClientConfig; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; @@ -35,7 +35,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientAPIImpl; -import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -58,7 +57,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; @@ -69,17 +67,16 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQPushConsumerTest { private String consumerGroup; private String topic = "FooBar"; private String brokerName = "BrokerA"; - @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory; @Mock private MQClientAPIImpl mQClientAPIImpl; private PullAPIWrapper pullAPIWrapper; @@ -89,7 +86,6 @@ public class DefaultMQPushConsumerTest { @Before public void init() throws Exception { consumerGroup = "FooBarGroup" + System.currentTimeMillis(); - pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); @@ -110,6 +106,7 @@ public class DefaultMQPushConsumerTest { pushConsumer.subscribe(topic, "*"); pushConsumer.start(); + mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(pushConsumerImpl, mQClientFactory); @@ -119,15 +116,17 @@ public class DefaultMQPushConsumerTest { field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); + pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper"); field.setAccessible(true); field.set(pushConsumerImpl, pullAPIWrapper); pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); - mQClientFactory.start(); - doAnswer(new Answer() { + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock mock) throws Throwable { PullMessageRequestHeader requestHeader = mock.getArgument(1); MessageClientExt messageClientExt = new MessageClientExt(); @@ -140,9 +139,9 @@ public class DefaultMQPushConsumerTest { messageClientExt.setStoreHost(new InetSocketAddress(8080)); PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt)); ((PullCallback)mock.getArgument(4)).onSuccess(pullResult); - return null; + return pullResult; } - }).when(mQClientAPIImpl).pullMessage(anyString(), any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)); + }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); @@ -191,12 +190,12 @@ public class DefaultMQPushConsumerTest { return null; } })); - pushConsumer.getDefaultMQPushConsumerImpl().doRebalance(); + pushConsumer.getDefaultMQPushConsumerImpl().doRebalance(); PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); - pullMessageService.executePullRequestImmediately(createPullRequest()); + pullMessageService.executePullRequestLater(createPullRequest(), 100); - countDownLatch.await(); + countDownLatch.await(10, TimeUnit.SECONDS); assertThat(messageExts[0].getTopic()).isEqualTo(topic); assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); }