zhangmeng916 commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r488206119



##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  public Exception isMessageStaled(boolean inSchedulerCheck) {
+    String fromState = _message.getFromState();
+    String toState = _message.getToState();
+    String partitionName = _message.getPartitionName();
+
+    // state in _currentStateDelta uses current state from state model. It has 
the
+    // most up-to-date. current state. In case currentState in stateModel is 
null,
+    // partition is in initial state and we using it as current state.
+    // Defined in HelixStateMachineEngine.
+    String state = _currentStateDelta.getState(partitionName);
+
+    Exception err = null;
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      err = new HelixDuplicatedStateTransitionException(String
+          .format("Partition %s current state is same as toState (%s->%s) from 
message.",
+              partitionName, fromState, toState));
+    } else if (!inSchedulerCheck && fromState != null && 
!fromState.equals("*") && !fromState

Review comment:
       Besides the current_state equals to_state case, is this also a case that 
we can move to scheduling phase instead of execution phase? That could help 
reduce even more threads usage. 

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  public Exception isMessageStaled(boolean inSchedulerCheck) {

Review comment:
       minor: change isMessageStaled to isMessageStale. We didn't on purpose 
stale the message. 

##########
File path: 
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws 
InterruptedException {
     System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
   }
 
+  @Test()
+  public void testStaledMessage() throws InterruptedException {
+    System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    TestStateTransitionHandlerFactory stateTransitionFactory =
+        new 
TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 
1000);
+    
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+        stateTransitionFactory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs = 1;
+    String instanceName = manager.getInstanceName();
+    for (int i = 0; i < nMsgs; i++) {
+      Message msg =
+          new Message(Message.MessageType.STATE_TRANSITION.name(), 
UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setCreateTimeStamp((long) i);
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setPartitionName("Partition");
+      msg.setResourceName("testStaledMessageResource");
+      msg.setStateModelDef("DummyMasterSlave");
+      msg.setFromState("SLAVE");
+      msg.setToState("MASTER");
+      dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+      msgList.add(msg);
+    }
+
+    
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
 true).size(),
+            nMsgs);
+
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage(instanceName, msgList, changeContext);
+
+    Thread.sleep(200);
+
+    // The message should be ignored since toState is the same as current 
state.
+    
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
 true).size(),

Review comment:
       How soon is the message gets deleted from instance?

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message> 
messages,
             // discard the message. Controller will resend if this is a valid 
message
             throw new HelixException(String.format(
                 "Another state transition for %s:%s is in progress with msg: 
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
-                msg.getReadTimeStamp(), System.currentTimeMillis(), 
message.getFromState(),
-                message.getToState()));
+                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+                System.currentTimeMillis(), message.getFromState(), 
message.getToState()));
+          }
+          if (createHandler instanceof HelixStateTransitionHandler) {
+            // We only check to state if there is no ST task 
scheduled/executing.
+            Exception err = ((HelixStateTransitionHandler) 
createHandler).isMessageStaled(true /*inSchedulerCheck*/);

Review comment:
       The return type is confusing. `isMessageStaled` implies a boolean value 
as return. You can change the function name to something like 
validateStaleMessage, etc, or change the return type.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -980,9 +980,10 @@ public TaskState pollForJobState(String workflowName, 
String jobName, long timeo
         && System.currentTimeMillis() < st + timeout);
 
     if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
-      throw new HelixException(
-          String.format("Workflow \"%s\" context is null or job \"%s\" is not 
in states: %s",
-              workflowName, jobName, allowedStates));
+      String cur = ctx == null ? "null" : ctx.getJobState(jobName).toString();
+      throw new HelixException(String.format(
+          "Workflow \"%s\" context is null or job \"%s\" is not in states: %s, 
cur state is: %s",
+          workflowName, jobName, allowedStates, cur));

Review comment:
       Is this just a log improvement? The logic is not impacted by this PR's 
change, right?

##########
File path: 
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws 
InterruptedException {
     System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
   }
 
+  @Test()
+  public void testStaledMessage() throws InterruptedException {
+    System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    TestStateTransitionHandlerFactory stateTransitionFactory =
+        new 
TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 
1000);
+    
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+        stateTransitionFactory);
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs = 1;
+    String instanceName = manager.getInstanceName();
+    for (int i = 0; i < nMsgs; i++) {
+      Message msg =
+          new Message(Message.MessageType.STATE_TRANSITION.name(), 
UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setCreateTimeStamp((long) i);
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setPartitionName("Partition");
+      msg.setResourceName("testStaledMessageResource");
+      msg.setStateModelDef("DummyMasterSlave");
+      msg.setFromState("SLAVE");
+      msg.setToState("MASTER");
+      dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+      msgList.add(msg);
+    }
+
+    
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
 true).size(),
+            nMsgs);
+
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage(instanceName, msgList, changeContext);
+
+    Thread.sleep(200);
+
+    // The message should be ignored since toState is the same as current 
state.
+    
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
 true).size(),
+        0);
+
+    System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");

Review comment:
       The test name is wrong here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to