zhangmeng916 commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r488206119
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType
type) {
}
+ // Verify the fromState and current state of the stateModel.
+ public Exception isMessageStaled(boolean inSchedulerCheck) {
+ String fromState = _message.getFromState();
+ String toState = _message.getToState();
+ String partitionName = _message.getPartitionName();
+
+ // state in _currentStateDelta uses current state from state model. It has
the
+ // most up-to-date. current state. In case currentState in stateModel is
null,
+ // partition is in initial state and we using it as current state.
+ // Defined in HelixStateMachineEngine.
+ String state = _currentStateDelta.getState(partitionName);
+
+ Exception err = null;
+ if (toState.equalsIgnoreCase(state)) {
+ // To state equals current state, we can just ignore the message
+ err = new HelixDuplicatedStateTransitionException(String
+ .format("Partition %s current state is same as toState (%s->%s) from
message.",
+ partitionName, fromState, toState));
+ } else if (!inSchedulerCheck && fromState != null &&
!fromState.equals("*") && !fromState
Review comment:
Besides the current_state equals to_state case, is this also a case that
we can move to scheduling phase instead of execution phase? That could help
reduce even more threads usage.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
##########
@@ -463,6 +442,35 @@ public void onError(Exception e, ErrorCode code, ErrorType
type) {
}
+ // Verify the fromState and current state of the stateModel.
+ public Exception isMessageStaled(boolean inSchedulerCheck) {
Review comment:
minor: change isMessageStaled to isMessageStale. We didn't on purpose
stale the message.
##########
File path:
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws
InterruptedException {
System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
}
+ @Test()
+ public void testStaledMessage() throws InterruptedException {
+ System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+ TestStateTransitionHandlerFactory stateTransitionFactory =
+ new
TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
1000);
+
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+ stateTransitionFactory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs = 1;
+ String instanceName = manager.getInstanceName();
+ for (int i = 0; i < nMsgs; i++) {
+ Message msg =
+ new Message(Message.MessageType.STATE_TRANSITION.name(),
UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setCreateTimeStamp((long) i);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setPartitionName("Partition");
+ msg.setResourceName("testStaledMessageResource");
+ msg.setStateModelDef("DummyMasterSlave");
+ msg.setFromState("SLAVE");
+ msg.setToState("MASTER");
+ dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+ msgList.add(msg);
+ }
+
+
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
true).size(),
+ nMsgs);
+
+ changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+ executor.onMessage(instanceName, msgList, changeContext);
+
+ Thread.sleep(200);
+
+ // The message should be ignored since toState is the same as current
state.
+
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
true).size(),
Review comment:
How soon is the message gets deleted from instance?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message>
messages,
// discard the message. Controller will resend if this is a valid
message
throw new HelixException(String.format(
"Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(),
message.getToState()));
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task
scheduled/executing.
+ Exception err = ((HelixStateTransitionHandler)
createHandler).isMessageStaled(true /*inSchedulerCheck*/);
Review comment:
The return type is confusing. `isMessageStaled` implies a boolean value
as return. You can change the function name to something like
validateStaleMessage, etc, or change the return type.
##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -980,9 +980,10 @@ public TaskState pollForJobState(String workflowName,
String jobName, long timeo
&& System.currentTimeMillis() < st + timeout);
if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
- throw new HelixException(
- String.format("Workflow \"%s\" context is null or job \"%s\" is not
in states: %s",
- workflowName, jobName, allowedStates));
+ String cur = ctx == null ? "null" : ctx.getJobState(jobName).toString();
+ throw new HelixException(String.format(
+ "Workflow \"%s\" context is null or job \"%s\" is not in states: %s,
cur state is: %s",
+ workflowName, jobName, allowedStates, cur));
Review comment:
Is this just a log improvement? The logic is not impacted by this PR's
change, right?
##########
File path:
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -377,6 +414,55 @@ public void testDuplicatedMessage() throws
InterruptedException {
System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
}
+ @Test()
+ public void testStaledMessage() throws InterruptedException {
+ System.out.println("START TestHelixTaskExecutor.testStaledMessage()");
+ HelixTaskExecutor executor = new HelixTaskExecutor();
+ HelixManager manager = new MockClusterManager();
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+ TestStateTransitionHandlerFactory stateTransitionFactory =
+ new
TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
1000);
+
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+ stateTransitionFactory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+ List<Message> msgList = new ArrayList<Message>();
+
+ int nMsgs = 1;
+ String instanceName = manager.getInstanceName();
+ for (int i = 0; i < nMsgs; i++) {
+ Message msg =
+ new Message(Message.MessageType.STATE_TRANSITION.name(),
UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setCreateTimeStamp((long) i);
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setPartitionName("Partition");
+ msg.setResourceName("testStaledMessageResource");
+ msg.setStateModelDef("DummyMasterSlave");
+ msg.setFromState("SLAVE");
+ msg.setToState("MASTER");
+ dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+ msgList.add(msg);
+ }
+
+
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
true).size(),
+ nMsgs);
+
+ changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+ executor.onMessage(instanceName, msgList, changeContext);
+
+ Thread.sleep(200);
+
+ // The message should be ignored since toState is the same as current
state.
+
Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName),
true).size(),
+ 0);
+
+ System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
Review comment:
The test name is wrong here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]