jiajunwang commented on a change in pull request #1514:
URL: https://github.com/apache/helix/pull/1514#discussion_r520853964
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
}
}
- private void updateMessageState(List<Message> readMsgs, HelixDataAccessor
accessor,
+ private void updateMessageState(Collection<Message> msgsToBeUpdated,
HelixDataAccessor accessor,
String instanceName) {
- Builder keyBuilder = accessor.keyBuilder();
- List<String> readMsgPaths = new ArrayList<>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
- for (Message msg : readMsgs) {
- readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
- _knownMessageIds.add(msg.getId());
- /**
- * We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
- * If there is no message at this path, meaning the message is removed
so we do not write the message
- */
- updaters.add(currentData -> {
- if (currentData == null) {
- LOG.warn(
- "Message {} targets at {} has already been removed before it is
set as READ on instance {}",
- msg.getId(), msg.getTgtName(), instanceName);
- return null;
- }
- return msg.getRecord();
- });
+ if (!msgsToBeUpdated.isEmpty()) {
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> updateMsgPaths = new ArrayList<>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+ for (Message msg : msgsToBeUpdated) {
+ updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+ /**
+ * We use the updater to avoid race condition between writing message
to zk as READ state and removing message after ST is done
+ * If there is no message at this path, meaning the message is removed
so we do not write the message
+ */
+ updaters.add(currentData -> {
+ if (currentData == null) {
+ LOG.warn(
+ "Message {} targets at {} has already been removed before it
is set as READ on instance {}",
+ msg.getId(), msg.getTgtName(), instanceName);
+ return null;
+ }
+ return msg.getRecord();
+ });
+ }
+ accessor.updateChildren(updateMsgPaths, updaters,
AccessOption.PERSISTENT);
+ }
+
+ // Note that only cache the known message Ids after the update to ZK is
successfully done.
+ // This is to avoid inconsistent cache.
+
+ // if a message in "NEW" state is updated, then we might need to process
it soon.
+ boolean isNewMessageUpdated = false;
+ for (Message msg : msgsToBeUpdated) {
Review comment:
It turns out that I can move the check to the loop above, but I cannot
remove the loop here after the update call. For the reason, please check the
comment in code. Paste it here as well.
> // Note that only cache the known message Ids after the update to ZK
is successfully done.
> // This is to avoid inconsistent cache.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]