jiajunwang commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r488302199
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType
type) {
}
+ // Verify the fromState and current state of the stateModel.
+ public Exception validateStaleMessage(boolean inSchedulerCheck) {
Review comment:
Usually, our validate method follows the same convention. It either
returns boolean or void (in this case, throw an exception if invalid). I
suggest we follow the same logic here.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message>
messages,
// discard the message. Controller will resend if this is a valid
message
throw new HelixException(String.format(
"Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(),
message.getToState()));
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task
scheduled/executing.
+ Exception err = ((HelixStateTransitionHandler)
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
+ if (err != null) {
+ throw err;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
+ // If there are 2 messages in same batch about same partition's
state transition,
+ // the later one is discarded
+ Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
+ throw new HelixException(String.format(
Review comment:
Same for this one. It is the existing logic, but this exception will be
processed by the following catch, which records the error as "Failed to create
message handler...". This seems to be inaccurate.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message>
messages,
// discard the message. Controller will resend if this is a valid
message
throw new HelixException(String.format(
"Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(),
message.getToState()));
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task
scheduled/executing.
+ Exception err = ((HelixStateTransitionHandler)
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
Review comment:
What will be wrong if we do validateStaleMessage(false) here? If it
works, then we can have a common logic, right?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType
type) {
}
+ // Verify the fromState and current state of the stateModel.
+ public Exception validateStaleMessage(boolean inSchedulerCheck) {
Review comment:
Passing an additional boolean parameter is hard for the user to call. I
suggest doing the following,
1. create a private method "private void validateStaleMessage(boolean
checkFromState) {...}"
2. create a public method "public void precheckForStaleMessage()
{validateStaleMessage(true)}"
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message>
messages,
// discard the message. Controller will resend if this is a valid
message
throw new HelixException(String.format(
"Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(),
message.getToState()));
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task
scheduled/executing.
+ Exception err = ((HelixStateTransitionHandler)
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
+ if (err != null) {
+ throw err;
Review comment:
I think we want to "continue" instead of throwing the Exception for more
graceful handling.
We can do this:
"reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);"
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message>
messages,
// discard the message. Controller will resend if this is a valid
message
throw new HelixException(String.format(
"Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(),
message.getToState()));
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task
scheduled/executing.
+ Exception err = ((HelixStateTransitionHandler)
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
+ if (err != null) {
+ throw err;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
Review comment:
Just curious, why we want this to happen later than the other 2?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]