zhangmeng916 commented on a change in pull request #1500:
URL: https://github.com/apache/helix/pull/1500#discussion_r516897658



##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -804,40 +809,67 @@ public void onMessage(String instanceName, List<Message> 
messages,
         // skip the following operations for the no-op messages.
         continue;
       }
-      // create message handlers, if handlers not found, leave its state as NEW
+
       NotificationContext msgWorkingContext = changeContext.clone();
+      MessageHandler msgHandler = null;
       try {
-        MessageHandler msgHandler = createMessageHandler(message, 
msgWorkingContext);
-        if (msgHandler == null) {
-          // Failed to create message handler, skip processing this message in 
this callback.
-          // The same message process will be retried in the next round.
-          continue;
-        }
-        if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) 
|| message.getMsgType()
-            .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
-          if (validateAndProcessStateTransitionMessage(message, instanceName, 
manager,
-              stateTransitionHandlers, msgHandler)) {
-            // Need future process by triggering state transition
-            String msgTarget =
-                getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-            stateTransitionHandlers.put(msgTarget, msgHandler);
-            stateTransitionContexts.put(msgTarget, msgWorkingContext);
-          } else {
-            // skip the following operations for the invalid/expired state 
transition messages.
-            continue;
-          }
+        // create message handlers, if handlers not found but no exception, 
leave its state as NEW
+        msgHandler = createMessageHandler(message, msgWorkingContext);
+      } catch (Exception ex) {
+        // Failed to create message handler and there is an Exception.
+        int remainingRetryCount = message.getRetryCount();
+        LOG.error(
+            "Exception happens when creating Message Handler for message {}. 
Current remaining retry count is {}.",
+            message.getMsgId(), remainingRetryCount);
+        // Set the message retry count to avoid infinite retrying.
+        message.setRetryCount(remainingRetryCount - 1);
+        message.setExecuteSessionId(sessionId);
+        // continue processing in the next section where handler object is 
double-checked.
+      }
+
+      if (msgHandler == null) {
+        if (message.getRetryCount() < 0) {
+          // If no more retry count remains, then mark the message to be 
UNPROCESSABLE.
+          String errorMsg = String
+              .format("No available message Handler found!"
+                      + " Stop processing message %s since it has a negative 
remaining retry count %d!",
+                  message.getMsgId(), message.getRetryCount());
+          updateUnprocessableMessage(message, null, errorMsg, manager);

Review comment:
       Seems even there is no exception, you will still set the message as 
unprocessable after retry instead of leaving it as NEW?

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -613,7 +613,12 @@ void reset() {
 
       MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
       if (item.factory() != null) {
-        item.factory().reset();
+        try {
+          item.factory().reset();
+        } catch (Exception ex) {

Review comment:
       I don't quite understand why we swallow exception here. This exception 
means user's reset has an issue, and can we just let the error go?

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -993,67 +1034,78 @@ private boolean checkAndProcessNoOpMessage(Message 
message, String instanceName,
    * Preprocess the state transition message to validate if the request is 
valid.
    * If no operation needs to be triggered, discard the the message.
    * @param message
-   * @param instanceName
    * @param manager
    * @param stateTransitionHandlers
    * @param createHandler
    * @return True if the requested state transition is valid, and need to 
schedule the transition.
    *         False if no more operation is required.
    */
-  private boolean validateAndProcessStateTransitionMessage(Message message, 
String instanceName,
-      HelixManager manager, Map<String, MessageHandler> 
stateTransitionHandlers,
-      MessageHandler createHandler) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
+  private boolean validateAndProcessStateTransitionMessage(Message message, 
HelixManager manager,
+      Map<String, MessageHandler> stateTransitionHandlers, MessageHandler 
createHandler) {
     String messageTarget = getMessageTarget(message.getResourceName(), 
message.getPartitionName());
-    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
-        && isStateTransitionInProgress(messageTarget)) {
-      String taskId = _messageTaskMap.get(messageTarget);
-      Message msg = _taskMap.get(taskId).getTask().getMessage();
-      // If there is another state transition for same partition is going on,
-      // discard the message. Controller will resend if this is a valid message
-      String errMsg = String.format(
-          "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
-          message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
-          msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
-          message.getFromState(), message.getToState());
-      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
-          manager);
-      return false;
-    }
-    if (createHandler instanceof HelixStateTransitionHandler) {
-      // We only check to state if there is no ST task scheduled/executing.
-      HelixStateTransitionHandler.StaleMessageValidateResult result =
-          ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
-      if (!result.isValid) {
-        handleUnprocessableMessage(message, null /* exception */, 
result.exception.getMessage(),
-            accessor, instanceName, manager);
+
+    try {
+      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+          && isStateTransitionInProgress(messageTarget)) {
+        String taskId = _messageTaskMap.get(messageTarget);
+        Message msg = _taskMap.get(taskId).getTask().getMessage();
+        // If there is another state transition for same partition is going on,
+        // discard the message. Controller will resend if this is a valid 
message
+        String errMsg = String.format(
+            "Another state transition for %s:%s is in progress with msg: %s, 
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+            message.getResourceName(), message.getPartitionName(), 
msg.getMsgId(),
+            msg.isRelayMessage(), msg.getReadTimeStamp(), 
System.currentTimeMillis(),
+            message.getFromState(), message.getToState());
+        updateUnprocessableMessage(message, null /* exception */, errMsg, 
manager);
         return false;
       }
-    }
-    if (stateTransitionHandlers.containsKey(messageTarget)) {
-      // If there are 2 messages in same batch about same partition's state 
transition,
-      // the later one is discarded
-      Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
-      String errMsg = String.format(
-          "Duplicated state transition message: %s. Existing: %s->%s; New 
(Discarded): %s->%s",
-          message.getMsgId(), duplicatedMessage.getFromState(), 
duplicatedMessage.getToState(),
-          message.getFromState(), message.getToState());
-      handleUnprocessableMessage(message, null /* exception */, errMsg, 
accessor, instanceName,
+      if (createHandler instanceof HelixStateTransitionHandler) {
+        // We only check to state if there is no ST task scheduled/executing.
+        HelixStateTransitionHandler.StaleMessageValidateResult result =
+            ((HelixStateTransitionHandler) 
createHandler).staleMessageValidator();
+        if (!result.isValid) {
+          updateUnprocessableMessage(message, null /* exception */, 
result.exception.getMessage(),
+              manager);
+          return false;
+        }
+      }
+      if (stateTransitionHandlers.containsKey(messageTarget)) {
+        // If there are 2 messages in same batch about same partition's state 
transition,
+        // the later one is discarded
+        Message duplicatedMessage = 
stateTransitionHandlers.get(messageTarget)._message;
+        String errMsg = String.format(
+            "Duplicated state transition message: %s. Existing: %s->%s; New 
(Discarded): %s->%s",
+            message.getMsgId(), duplicatedMessage.getFromState(), 
duplicatedMessage.getToState(),
+            message.getFromState(), message.getToState());
+        updateUnprocessableMessage(message, null /* exception */, errMsg, 
manager);

Review comment:
       I think we changed the original behavior, and never remove messages in 
this case. There is a mismatch in the comment. But, is this what we would like 
to do? not removing duplicated message, but only mark it? Seems we leave many 
unnecessary messages on ZK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to