dasahcc commented on a change in pull request #1500:
URL: https://github.com/apache/helix/pull/1500#discussion_r517562369



##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -804,40 +809,67 @@ public void onMessage(String instanceName, List<Message> 
messages,
         // skip the following operations for the no-op messages.
         continue;
       }
-      // create message handlers, if handlers not found, leave its state as NEW
+
       NotificationContext msgWorkingContext = changeContext.clone();
+      MessageHandler msgHandler = null;
       try {
-        MessageHandler msgHandler = createMessageHandler(message, 
msgWorkingContext);
-        if (msgHandler == null) {
-          // Failed to create message handler, skip processing this message in 
this callback.
-          // The same message process will be retried in the next round.
-          continue;
-        }
-        if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) 
|| message.getMsgType()
-            .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          if (validateAndProcessStateTransitionMessage(message, instanceName, 
manager,
-              stateTransitionHandlers, msgHandler)) {
-            // Need future process by triggering state transition
-            String msgTarget =
-                getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-            stateTransitionHandlers.put(msgTarget, msgHandler);
-            stateTransitionContexts.put(msgTarget, msgWorkingContext);
-          } else {
-            // skip the following operations for the invalid/expired state 
transition messages.
-            continue;
-          }
+        // create message handlers, if handlers not found but no exception, 
leave its state as NEW
+        msgHandler = createMessageHandler(message, msgWorkingContext);
+      } catch (Exception ex) {
+        // Failed to create message handler and there is an Exception.
+        int remainingRetryCount = message.getRetryCount();
+        LOG.error(
+            "Exception happens when creating Message Handler for message {}. 
Current remaining retry count is {}.",
+            message.getMsgId(), remainingRetryCount);
+        // Set the message retry count to avoid infinite retrying.
+        message.setRetryCount(remainingRetryCount - 1);
+        message.setExecuteSessionId(sessionId);
+        // continue processing in the next section where handler object is 
double-checked.
+      }
+
+      if (msgHandler == null) {
+        if (message.getRetryCount() < 0) {

Review comment:
       NIT: From the retry count definition, we should let retry count check 
happens before you try another round of creation. If retry count is less than 
0, it means it has one more retry than what we defined for the retry count. I 
know this could be a minor issue. But logically, it breaks the counting rule. 
Better to reorganize it.

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -993,67 +1034,78 @@ private boolean checkAndProcessNoOpMessage(Message 
message, String instanceName,
    * Preprocess the state transition message to validate if the request is 
valid.
    * If no operation needs to be triggered, discard the the message.
    * @param message
-   * @param instanceName
    * @param manager
    * @param stateTransitionHandlers
    * @param createHandler
    * @return True if the requested state transition is valid, and need to 
schedule the transition.
    *         False if no more operation is required.
    */
-  private boolean validateAndProcessStateTransitionMessage(Message message, 
String instanceName,
-      HelixManager manager, Map<String, MessageHandler> 
stateTransitionHandlers,
-      MessageHandler createHandler) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
+  private boolean validateAndProcessStateTransitionMessage(Message message, 
HelixManager manager,
+      Map<String, MessageHandler> stateTransitionHandlers, MessageHandler 
createHandler) {
     String messageTarget = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
-        && isStateTransitionInProgress(messageTarget)) {
-      String taskId = _messageTaskMap.get(messageTarget);
-      Message msg = _taskMap.get(taskId).getTask().getMessage();
-      // If there is another state transition for same partition is going on,
-      // discard the message. Controller will resend if this is a valid message
-      String errMsg = String.format(
-          "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-          message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
-          msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
-          message.getFromState(), message.getToState());
-      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
-          manager);
-      return false;
-    }
-    if (createHandler instanceof HelixStateTransitionHandler) {
-      // We only check to state if there is no ST task scheduled/executing.
-      HelixStateTransitionHandler.StaleMessageValidateResult result =
-          ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
-      if (!result.isValid) {
-        handleUnprocessableMessage(message, null /* exception */, 
result.exception.getMessage(),
-            accessor, instanceName, manager);
+
+    try {
+      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+          && isStateTransitionInProgress(messageTarget)) {
+        String taskId = _messageTaskMap.get(messageTarget);
+        Message msg = _taskMap.get(taskId).getTask().getMessage();
+        // If there is another state transition for same partition is going on,
+        // discard the message. Controller will resend if this is a valid 
message
+        String errMsg = String.format(
+            "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+            message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+            msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
+            message.getFromState(), message.getToState());
+        updateUnprocessableMessage(message, null /* exception */, errMsg, 
manager);

Review comment:
       This changes the behavior. If the message is not exact state we are 
targeting, it should be a intermediate state, we should throw away this message 
instead of let it hanging there. Otherwise, we will face a lot of human 
involved operations.
   
   For example, for the p2p arrived but timedout, the message will be 
discarded. But if you hang it as unprocessable, the partition will be stuck.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to