jiajunwang commented on a change in pull request #1500:
URL: https://github.com/apache/helix/pull/1500#discussion_r516994162



##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -993,67 +1034,78 @@ private boolean checkAndProcessNoOpMessage(Message 
message, String instanceName,
    * Preprocess the state transition message to validate if the request is 
valid.
    * If no operation needs to be triggered, discard the the message.
    * @param message
-   * @param instanceName
    * @param manager
    * @param stateTransitionHandlers
    * @param createHandler
    * @return True if the requested state transition is valid, and need to 
schedule the transition.
    *         False if no more operation is required.
    */
-  private boolean validateAndProcessStateTransitionMessage(Message message, 
String instanceName,
-      HelixManager manager, Map<String, MessageHandler> 
stateTransitionHandlers,
-      MessageHandler createHandler) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
+  private boolean validateAndProcessStateTransitionMessage(Message message, 
HelixManager manager,
+      Map<String, MessageHandler> stateTransitionHandlers, MessageHandler 
createHandler) {
     String messageTarget = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
-        && isStateTransitionInProgress(messageTarget)) {
-      String taskId = _messageTaskMap.get(messageTarget);
-      Message msg = _taskMap.get(taskId).getTask().getMessage();
-      // If there is another state transition for same partition is going on,
-      // discard the message. Controller will resend if this is a valid message
-      String errMsg = String.format(
-          "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-          message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
-          msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
-          message.getFromState(), message.getToState());
-      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
-          manager);
-      return false;
-    }
-    if (createHandler instanceof HelixStateTransitionHandler) {
-      // We only check to state if there is no ST task scheduled/executing.
-      HelixStateTransitionHandler.StaleMessageValidateResult result =
-          ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
-      if (!result.isValid) {
-        handleUnprocessableMessage(message, null /* exception */, 
result.exception.getMessage(),
-            accessor, instanceName, manager);
+
+    try {
+      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+          && isStateTransitionInProgress(messageTarget)) {
+        String taskId = _messageTaskMap.get(messageTarget);
+        Message msg = _taskMap.get(taskId).getTask().getMessage();
+        // If there is another state transition for same partition is going on,
+        // discard the message. Controller will resend if this is a valid 
message
+        String errMsg = String.format(
+            "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+            message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+            msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
+            message.getFromState(), message.getToState());
+        updateUnprocessableMessage(message, null /* exception */, errMsg, 
manager);
         return false;
       }
-    }
-    if (stateTransitionHandlers.containsKey(messageTarget)) {
-      // If there are 2 messages in same batch about same partition's state 
transition,
-      // the later one is discarded
-      Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
-      String errMsg = String.format(
-          "Duplicated state transition message: %s. Existing: %s->%s; New 
(Discarded): %s->%s",
-          message.getMsgId(), duplicatedMessage.getFromState(), 
duplicatedMessage.getToState(),
-          message.getFromState(), message.getToState());
-      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
+      if (createHandler instanceof HelixStateTransitionHandler) {
+        // We only check to state if there is no ST task scheduled/executing.
+        HelixStateTransitionHandler.StaleMessageValidateResult result =
+            ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
+        if (!result.isValid) {
+          updateUnprocessableMessage(message, null /* exception */, 
result.exception.getMessage(),
+              manager);
+          return false;
+        }
+      }
+      if (stateTransitionHandlers.containsKey(messageTarget)) {
+        // If there are 2 messages in same batch about same partition's state 
transition,
+        // the later one is discarded
+        Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
+        String errMsg = String.format(
+            "Duplicated state transition message: %s. Existing: %s->%s; New 
(Discarded): %s->%s",
+            message.getMsgId(), duplicatedMessage.getFromState(), 
duplicatedMessage.getToState(),
+            message.getFromState(), message.getToState());
+        updateUnprocessableMessage(message, null /* exception */, errMsg, 
manager);

Review comment:
       I moved the removing message logic to the caller. So for this specific 
case, it will keep the original behavior.
   
   This change is for reusing the updateUnprocessableMessage in the other 
conditions. In addition, I also want to avoid silently removing a message in a 
very deep private call. So the new logic puts all the message processing 
(update or remove) in a smaller scope.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to