dasahcc commented on a change in pull request #1514:
URL: https://github.com/apache/helix/pull/1514#discussion_r520811867



##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
     }
   }
 
-  private void updateMessageState(List<Message> readMsgs, HelixDataAccessor 
accessor,
+  private void updateMessageState(Collection<Message> msgsToBeUpdated, 
HelixDataAccessor accessor,
       String instanceName) {
-    Builder keyBuilder = accessor.keyBuilder();
-    List<String> readMsgPaths = new ArrayList<>();
-    List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
-    for (Message msg : readMsgs) {
-      readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
-      _knownMessageIds.add(msg.getId());
-      /**
-       * We use the updater to avoid race condition between writing message to 
zk as READ state and removing message after ST is done
-       * If there is no message at this path, meaning the message is removed 
so we do not write the message
-       */
-      updaters.add(currentData -> {
-        if (currentData == null) {
-          LOG.warn(
-              "Message {} targets at {} has already been removed before it is 
set as READ on instance {}",
-              msg.getId(), msg.getTgtName(), instanceName);
-          return null;
-        }
-        return msg.getRecord();
-      });
+    if (!msgsToBeUpdated.isEmpty()) {
+      Builder keyBuilder = accessor.keyBuilder();
+      List<String> updateMsgPaths = new ArrayList<>();
+      List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+      for (Message msg : msgsToBeUpdated) {
+        updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+        /**
+         * We use the updater to avoid race condition between writing message 
to zk as READ state and removing message after ST is done
+         * If there is no message at this path, meaning the message is removed 
so we do not write the message
+         */
+        updaters.add(currentData -> {
+          if (currentData == null) {
+            LOG.warn(
+                "Message {} targets at {} has already been removed before it 
is set as READ on instance {}",
+                msg.getId(), msg.getTgtName(), instanceName);
+            return null;
+          }
+          return msg.getRecord();
+        });
+      }
+      accessor.updateChildren(updateMsgPaths, updaters, 
AccessOption.PERSISTENT);
+    }
+
+    // Note that only cache the known message Ids after the update to ZK is 
successfully done.
+    // This is to avoid inconsistent cache.
+
+    // if a message in "NEW" state is updated, then we might need to process 
it soon.
+    boolean isNewMessageUpdated = false;
+    for (Message msg : msgsToBeUpdated) {
+      if (msg.getMsgState().equals(MessageState.NEW)) {
+        isNewMessageUpdated = true;
+        // If a message is still "NEW", it is not a known message. The message 
may not be able to

Review comment:
       nit: an known

##########
File path: 
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
     }
   }
 
-  private void updateMessageState(List<Message> readMsgs, HelixDataAccessor 
accessor,
+  private void updateMessageState(Collection<Message> msgsToBeUpdated, 
HelixDataAccessor accessor,
       String instanceName) {
-    Builder keyBuilder = accessor.keyBuilder();
-    List<String> readMsgPaths = new ArrayList<>();
-    List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
-    for (Message msg : readMsgs) {
-      readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
-      _knownMessageIds.add(msg.getId());
-      /**
-       * We use the updater to avoid race condition between writing message to 
zk as READ state and removing message after ST is done
-       * If there is no message at this path, meaning the message is removed 
so we do not write the message
-       */
-      updaters.add(currentData -> {
-        if (currentData == null) {
-          LOG.warn(
-              "Message {} targets at {} has already been removed before it is 
set as READ on instance {}",
-              msg.getId(), msg.getTgtName(), instanceName);
-          return null;
-        }
-        return msg.getRecord();
-      });
+    if (!msgsToBeUpdated.isEmpty()) {
+      Builder keyBuilder = accessor.keyBuilder();
+      List<String> updateMsgPaths = new ArrayList<>();
+      List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+      for (Message msg : msgsToBeUpdated) {
+        updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+        /**
+         * We use the updater to avoid race condition between writing message 
to zk as READ state and removing message after ST is done
+         * If there is no message at this path, meaning the message is removed 
so we do not write the message
+         */
+        updaters.add(currentData -> {
+          if (currentData == null) {
+            LOG.warn(
+                "Message {} targets at {} has already been removed before it 
is set as READ on instance {}",
+                msg.getId(), msg.getTgtName(), instanceName);
+            return null;
+          }
+          return msg.getRecord();
+        });
+      }
+      accessor.updateChildren(updateMsgPaths, updaters, 
AccessOption.PERSISTENT);
+    }
+
+    // Note that only cache the known message Ids after the update to ZK is 
successfully done.
+    // This is to avoid inconsistent cache.
+
+    // if a message in "NEW" state is updated, then we might need to process 
it soon.
+    boolean isNewMessageUpdated = false;
+    for (Message msg : msgsToBeUpdated) {

Review comment:
       Why we dont move this check in the previous block? Since it already does 
the looping of the msgsToBeUpdated.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -70,6 +70,8 @@
       
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 
* 1000);
   private final static String PENDING_MESSAGE = "pending message";
   private final static String STALE_MESSAGE = "stale message";
+  // TODO: Make the message retry count configurable through the Cluster 
Config or IdealStates.
+  public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;

Review comment:
       Is this the same thing as the message attribute of retrying? Otherwise, 
we need an accurate name since it is the retry for state model creation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to