xyuanlu commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r488225516
##########
File path:
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws
InterruptedException {
System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
}
+ @Test()
+ public void testStaledMessage() throws InterruptedException {
+ System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+ TestStateTransitionHandlerFactory stateTransitionFactory =
+ new
TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
1000);
+
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+ stateTransitionFactory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs = 1;
+ String instanceName = manager.getInstanceName();
+ for (int i = 0; i < nMsgs; i++) {
+ Message msg =
+ new Message(Message.MessageType.STATE_TRANSITION.name(),
UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setCreateTimeStamp((long) i);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setPartitionName("Partition");
+ msg.setResourceName("testStaledMessageResource");
+ msg.setStateModelDef("DummyMasterSlave");
+ msg.setFromState("SLAVE");
+ msg.setToState("MASTER");
+ dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+ msgList.add(msg);
+ }
+
+
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
true).size(),
+ nMsgs);
+
+ changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+ executor.onMessage(instanceName, msgList, changeContext);
+
+ Thread.sleep(200);
+
+ // The message should be ignored since toState is the same as current
state.
+
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
true).size(),
Review comment:
In scheduling phase, HelixUtil.removeMessageFromZK will be called as
soon as we hit the HelixDuplicatedStateTransitionException exception. There is
no thread queueing in between. So I think it should be the IO delay time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]