xyuanlu commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r493192875



##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,36 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  private void validateStaleMessage (boolean isPreCheck) throws Exception {
+    String fromState = _message.getFromState();
+    String toState = _message.getToState();
+    String partitionName = _message.getPartitionName();
+
+    // state in _currentStateDelta uses current state from state model. It has 
the
+    // most up-to-date. current state. In case currentState in stateModel is 
null,
+    // partition is in initial state and we using it as current state.
+    // Defined in HelixStateMachineEngine.
+    String state = _currentStateDelta.getState(partitionName);
+
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      throw new HelixDuplicatedStateTransitionException(String
+          .format("Partition %s current state is same as toState (%s->%s) from 
message.",
+              partitionName, fromState, toState));
+    } else if (!isPreCheck && fromState != null && !fromState.equals("*") && 
!fromState

Review comment:
       I had an offline sync with Ali. He has submit a PR to fix this issue 
(#1390). I changed my PR accordingly. 
   Currently my PR contains his patch.  Will rebase after his PR merged. 

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  public Exception isMessageStaled(boolean inSchedulerCheck) {
+    String fromState = _message.getFromState();
+    String toState = _message.getToState();
+    String partitionName = _message.getPartitionName();
+
+    // state in _currentStateDelta uses current state from state model. It has 
the
+    // most up-to-date. current state. In case currentState in stateModel is 
null,
+    // partition is in initial state and we using it as current state.
+    // Defined in HelixStateMachineEngine.
+    String state = _currentStateDelta.getState(partitionName);
+
+    Exception err = null;
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      err = new HelixDuplicatedStateTransitionException(String
+          .format("Partition %s current state is same as toState (%s->%s) from 
message.",
+              partitionName, fromState, toState));
+    } else if (!inSchedulerCheck && fromState != null && 
!fromState.equals("*") && !fromState

Review comment:
       I had an offline sync with Ali. He has submit a PR to fix this issue 
(#1390). I changed my PR accordingly. We now do the same validation for 
scheduling stage and execution stage.  
   Currently my PR contains his patch. Will rebase after his PR merged.

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message> 
messages,
             // discard the message. Controller will resend if this is a valid 
message
             throw new HelixException(String.format(
                 "Another state transition for %s:%s is in progress with msg: 
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
-                msg.getReadTimeStamp(), System.currentTimeMillis(), 
message.getFromState(),
-                message.getToState()));
+                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+                System.currentTimeMillis(), message.getFromState(), 
message.getToState()));
+          }
+          if (createHandler instanceof HelixStateTransitionHandler) {
+            // We only check to state if there is no ST task 
scheduled/executing.
+            Exception err = ((HelixStateTransitionHandler) 
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
+            if (err != null) {
+              throw err;
+            }
+          }
+          if (stateTransitionHandlers.containsKey(messageTarget)) {
+            // If there are 2 messages in same batch about same partition's 
state transition,
+            // the later one is discarded
+            Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
+            throw new HelixException(String.format(

Review comment:
       Update. 

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,36 @@ public void onError(Exception e, ErrorCode code, ErrorType 
type) {
 
   }
 
+  // Verify the fromState and current state of the stateModel.
+  private void validateStaleMessage (boolean isPreCheck) throws Exception {
+    String fromState = _message.getFromState();
+    String toState = _message.getToState();
+    String partitionName = _message.getPartitionName();
+
+    // state in _currentStateDelta uses current state from state model. It has 
the
+    // most up-to-date. current state. In case currentState in stateModel is 
null,
+    // partition is in initial state and we using it as current state.
+    // Defined in HelixStateMachineEngine.
+    String state = _currentStateDelta.getState(partitionName);
+
+    if (toState.equalsIgnoreCase(state)) {
+      // To state equals current state, we can just ignore the message
+      throw new HelixDuplicatedStateTransitionException(String
+          .format("Partition %s current state is same as toState (%s->%s) from 
message.",
+              partitionName, fromState, toState));
+    } else if (!isPreCheck && fromState != null && !fromState.equals("*") && 
!fromState

Review comment:
       I had an offline sync with Ali. He has submit a PR to fix this issue 
(#1390). I changed my PR accordingly. We now do the same validation for 
scheduling stage and execution stage.  
   Currently my PR contains his patch. Will rebase after his PR merged.

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message> 
messages,
             // discard the message. Controller will resend if this is a valid 
message
             throw new HelixException(String.format(
                 "Another state transition for %s:%s is in progress with msg: 
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
-                msg.getReadTimeStamp(), System.currentTimeMillis(), 
message.getFromState(),
-                message.getToState()));
+                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+                System.currentTimeMillis(), message.getFromState(), 
message.getToState()));
+          }
+          if (createHandler instanceof HelixStateTransitionHandler) {
+            // We only check to state if there is no ST task 
scheduled/executing.
+            Exception err = ((HelixStateTransitionHandler) 
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);

Review comment:
       I had an offline sync with Ali. He has submit a PR to fix this issue 
(#1390). I changed my PR accordingly. We now do the same validation for 
scheduling stage and execution stage.  
   Currently my PR contains his patch. Will rebase after his PR merged.

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message> 
messages,
             // discard the message. Controller will resend if this is a valid 
message
             throw new HelixException(String.format(
                 "Another state transition for %s:%s is in progress with msg: 
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
-                msg.getReadTimeStamp(), System.currentTimeMillis(), 
message.getFromState(),
-                message.getToState()));
+                message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+                String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+                System.currentTimeMillis(), message.getFromState(), 
message.getToState()));
+          }
+          if (createHandler instanceof HelixStateTransitionHandler) {
+            // We only check to state if there is no ST task 
scheduled/executing.
+            Exception err = ((HelixStateTransitionHandler) 
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
+            if (err != null) {
+              throw err;
+            }
+          }
+          if (stateTransitionHandlers.containsKey(messageTarget)) {
+            // If there are 2 messages in same batch about same partition's 
state transition,
+            // the later one is discarded
+            Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
+            throw new HelixException(String.format(

Review comment:
       Updated. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to