zhangmeng916 commented on a change in pull request #1514:
URL: https://github.com/apache/helix/pull/1514#discussion_r520777177
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1288,6 +1308,19 @@ private void removeMessageFromZK(HelixDataAccessor
accessor, Message message,
}
}
+ private void sendNopMessage(HelixDataAccessor accessor, String instanceName)
{
Review comment:
This function will trigger another event and retry the message, right?
Why in the PR it said the message will not be automatically retried?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1288,6 +1308,19 @@ private void removeMessageFromZK(HelixDataAccessor
accessor, Message message,
}
}
+ private void sendNopMessage(HelixDataAccessor accessor, String instanceName)
{
+ try {
+ Message nopMsg = new Message(MessageType.NO_OP,
UUID.randomUUID().toString());
+ nopMsg.setSrcName(instanceName);
+ nopMsg.setTgtName(instanceName);
+ accessor
+ .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(),
nopMsg.getId()), nopMsg);
+ LOG.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: " +
nopMsg.getId());
+ } catch (Exception e) {
+ LOG.error(e.toString());
Review comment:
Can we have a more detailed error message here?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -821,32 +843,29 @@ public void onMessage(String instanceName, List<Message>
messages,
LOG.error(
"Exception happens when creating Message Handler for message {}.
Current remaining retry count is {}.",
message.getMsgId(), remainingRetryCount);
- // Set the message retry count to avoid infinite retrying.
+ // Reduce the message retry count to avoid infinite retrying.
message.setRetryCount(remainingRetryCount - 1);
message.setExecuteSessionId(sessionId);
- // continue processing in the next section where handler object is
double-checked.
- }
-
- if (msgHandler == null) {
// Note that we are re-using the retry count of Message that was
original designed to control
// timeout retries. So it is not checked before the first try in order
to ensure consistent
// behavior. It is possible that we introduce a new behavior for this
method. But it requires
// us to split the configuration item so as to avoid confusion.
- if (message.getRetryCount() < 0) {
+ if (message.getRetryCount() <= 0) {
// If no more retry count remains, then mark the message to be
UNPROCESSABLE.
- String errorMsg = String
- .format("No available message Handler found!"
- + " Stop processing message %s since it has a negative
remaining retry count %d!",
- message.getMsgId(), message.getRetryCount());
+ String errorMsg = String.format("No available message Handler found!"
+ + " Stop processing message %s since it has negative
remaining retry count %d!",
Review comment:
"negative" doesn't match if condition.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
}
}
- private void updateMessageState(List<Message> readMsgs, HelixDataAccessor
accessor,
+ private void updateMessageState(Collection<Message> msgsToBeUpdated,
HelixDataAccessor accessor,
String instanceName) {
- Builder keyBuilder = accessor.keyBuilder();
- List<String> readMsgPaths = new ArrayList<>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
- for (Message msg : readMsgs) {
- readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
- _knownMessageIds.add(msg.getId());
- /**
- * We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
- * If there is no message at this path, meaning the message is removed
so we do not write the message
- */
- updaters.add(currentData -> {
- if (currentData == null) {
- LOG.warn(
- "Message {} targets at {} has already been removed before it is
set as READ on instance {}",
- msg.getId(), msg.getTgtName(), instanceName);
- return null;
- }
- return msg.getRecord();
- });
+ if (!msgsToBeUpdated.isEmpty()) {
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> updateMsgPaths = new ArrayList<>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+ for (Message msg : msgsToBeUpdated) {
+ updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+ /**
+ * We use the updater to avoid race condition between writing message
to zk as READ state and removing message after ST is done
+ * If there is no message at this path, meaning the message is removed
so we do not write the message
+ */
+ updaters.add(currentData -> {
+ if (currentData == null) {
+ LOG.warn(
+ "Message {} targets at {} has already been removed before it
is set as READ on instance {}",
+ msg.getId(), msg.getTgtName(), instanceName);
+ return null;
+ }
+ return msg.getRecord();
+ });
+ }
+ accessor.updateChildren(updateMsgPaths, updaters,
AccessOption.PERSISTENT);
+ }
+
+ // Note that only cache the known message Ids after the update to ZK is
successfully done.
+ // This is to avoid inconsistent cache.
+
+ // if a message in "NEW" state is updated, then we might need to process
it soon.
+ boolean isNewMessageUpdated = false;
+ for (Message msg : msgsToBeUpdated) {
+ if (msg.getMsgState().equals(MessageState.NEW)) {
+ isNewMessageUpdated = true;
+ // If a message is still "NEW", it is not a known message. The message
may not be able to
+ // processed now in an expected way.
+ } else {
+ // else, cache the known messages.
+ _knownMessageIds.add(msg.getId());
+ }
+ }
+ if (isNewMessageUpdated) {
+ // Sending a NO-OP message to trigger another message callback to
re-process the New and
+ // updated messsages.
Review comment:
same here: what does "new and updated" mean?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -521,29 +523,51 @@ public void finishTask(MessageTask task) {
}
}
- private void updateMessageState(List<Message> readMsgs, HelixDataAccessor
accessor,
+ private void updateMessageState(Collection<Message> msgsToBeUpdated,
HelixDataAccessor accessor,
String instanceName) {
- Builder keyBuilder = accessor.keyBuilder();
- List<String> readMsgPaths = new ArrayList<>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
- for (Message msg : readMsgs) {
- readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
- _knownMessageIds.add(msg.getId());
- /**
- * We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
- * If there is no message at this path, meaning the message is removed
so we do not write the message
- */
- updaters.add(currentData -> {
- if (currentData == null) {
- LOG.warn(
- "Message {} targets at {} has already been removed before it is
set as READ on instance {}",
- msg.getId(), msg.getTgtName(), instanceName);
- return null;
- }
- return msg.getRecord();
- });
+ if (!msgsToBeUpdated.isEmpty()) {
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> updateMsgPaths = new ArrayList<>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
+ for (Message msg : msgsToBeUpdated) {
+ updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
+ /**
+ * We use the updater to avoid race condition between writing message
to zk as READ state and removing message after ST is done
+ * If there is no message at this path, meaning the message is removed
so we do not write the message
+ */
+ updaters.add(currentData -> {
+ if (currentData == null) {
+ LOG.warn(
+ "Message {} targets at {} has already been removed before it
is set as READ on instance {}",
+ msg.getId(), msg.getTgtName(), instanceName);
+ return null;
+ }
+ return msg.getRecord();
+ });
+ }
+ accessor.updateChildren(updateMsgPaths, updaters,
AccessOption.PERSISTENT);
+ }
+
+ // Note that only cache the known message Ids after the update to ZK is
successfully done.
+ // This is to avoid inconsistent cache.
+
+ // if a message in "NEW" state is updated, then we might need to process
it soon.
+ boolean isNewMessageUpdated = false;
Review comment:
The naming here makes me a bit confusing. This message is updated as NEW
because it is cannot be processed right? Maybe call it MessageUpdatedAsNew or
something.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]