dasahcc commented on a change in pull request #1514:
URL: https://github.com/apache/helix/pull/1514#discussion_r520811867
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
}
}
- private void updateMessageState(List<Message> readMsgs, HelixDataAccessor
accessor,
+ private void updateMessageState(Collection<Message> msgsToBeUpdated,
HelixDataAccessor accessor,
String instanceName) {
- Builder keyBuilder = accessor.keyBuilder();
- List<String> readMsgPaths = new ArrayList<>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
- for (Message msg : readMsgs) {
- readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
- _knownMessageIds.add(msg.getId());
- /**
- * We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
- * If there is no message at this path, meaning the message is removed
so we do not write the message
- */
- updaters.add(currentData -> {
- if (currentData == null) {
- LOG.warn(
- "Message {} targets at {} has already been removed before it is
set as READ on instance {}",
- msg.getId(), msg.getTgtName(), instanceName);
- return null;
- }
- return msg.getRecord();
- });
+ if (!msgsToBeUpdated.isEmpty()) {
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> updateMsgPaths = new ArrayList<>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+ for (Message msg : msgsToBeUpdated) {
+ updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+ /**
+ * We use the updater to avoid race condition between writing message
to zk as READ state and removing message after ST is done
+ * If there is no message at this path, meaning the message is removed
so we do not write the message
+ */
+ updaters.add(currentData -> {
+ if (currentData == null) {
+ LOG.warn(
+ "Message {} targets at {} has already been removed before it
is set as READ on instance {}",
+ msg.getId(), msg.getTgtName(), instanceName);
+ return null;
+ }
+ return msg.getRecord();
+ });
+ }
+ accessor.updateChildren(updateMsgPaths, updaters,
AccessOption.PERSISTENT);
+ }
+
+ // Note that only cache the known message Ids after the update to ZK is
successfully done.
+ // This is to avoid inconsistent cache.
+
+ // if a message in "NEW" state is updated, then we might need to process
it soon.
+ boolean isNewMessageUpdated = false;
+ for (Message msg : msgsToBeUpdated) {
+ if (msg.getMsgState().equals(MessageState.NEW)) {
+ isNewMessageUpdated = true;
+ // If a message is still "NEW", it is not a known message. The message
may not be able to
Review comment:
nit: an known
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
}
}
- private void updateMessageState(List<Message> readMsgs, HelixDataAccessor
accessor,
+ private void updateMessageState(Collection<Message> msgsToBeUpdated,
HelixDataAccessor accessor,
String instanceName) {
- Builder keyBuilder = accessor.keyBuilder();
- List<String> readMsgPaths = new ArrayList<>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
- for (Message msg : readMsgs) {
- readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
- _knownMessageIds.add(msg.getId());
- /**
- * We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
- * If there is no message at this path, meaning the message is removed
so we do not write the message
- */
- updaters.add(currentData -> {
- if (currentData == null) {
- LOG.warn(
- "Message {} targets at {} has already been removed before it is
set as READ on instance {}",
- msg.getId(), msg.getTgtName(), instanceName);
- return null;
- }
- return msg.getRecord();
- });
+ if (!msgsToBeUpdated.isEmpty()) {
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> updateMsgPaths = new ArrayList<>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+ for (Message msg : msgsToBeUpdated) {
+ updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+ /**
+ * We use the updater to avoid race condition between writing message
to zk as READ state and removing message after ST is done
+ * If there is no message at this path, meaning the message is removed
so we do not write the message
+ */
+ updaters.add(currentData -> {
+ if (currentData == null) {
+ LOG.warn(
+ "Message {} targets at {} has already been removed before it
is set as READ on instance {}",
+ msg.getId(), msg.getTgtName(), instanceName);
+ return null;
+ }
+ return msg.getRecord();
+ });
+ }
+ accessor.updateChildren(updateMsgPaths, updaters,
AccessOption.PERSISTENT);
+ }
+
+ // Note that only cache the known message Ids after the update to ZK is
successfully done.
+ // This is to avoid inconsistent cache.
+
+ // if a message in "NEW" state is updated, then we might need to process
it soon.
+ boolean isNewMessageUpdated = false;
+ for (Message msg : msgsToBeUpdated) {
Review comment:
Why we dont move this check in the previous block? Since it already does
the looping of the msgsToBeUpdated.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -70,6 +70,8 @@
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60
* 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
+ // TODO: Make the message retry count configurable through the Cluster
Config or IdealStates.
+ public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
Review comment:
Is this the same thing as the message attribute of retrying? Otherwise,
we need an accurate name since it is the retry for state model creation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]