xyuanlu commented on a change in pull request #1362:
URL: https://github.com/apache/helix/pull/1362#discussion_r492320893
##########
File path:
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -212,18 +213,33 @@ public TestStateTransitionHandlerFactory(String msgType,
long delay) {
_delay = delay;
}
- class TestStateTransitionMessageHandler extends MessageHandler {
+ class TestStateTransitionMessageHandler extends
HelixStateTransitionHandler {
+ boolean _testIsMessageStaled;
+
public TestStateTransitionMessageHandler(Message message,
NotificationContext context) {
- super(message, context);
+ super(null, null, message, context, null);
+ _testIsMessageStaled = false;
+ }
+
+ public TestStateTransitionMessageHandler(Message message,
NotificationContext context,
+ CurrentState currentStateDelta) {
+ super(null, null, message, context, currentStateDelta);
+ _testIsMessageStaled = true;
+
+
}
@Override
- public HelixTaskResult handleMessage() throws InterruptedException {
+ public HelixTaskResult handleMessage() {
HelixTaskResult result = new HelixTaskResult();
_processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
if (_delay > 0) {
System.out.println("Sleeping..." + _delay);
- Thread.sleep(_delay);
+ try{
+ Thread.sleep(_delay);
+ } catch (Exception e) {
+ assert (false);
Review comment:
I think the reason is that the base handleMessage does not throw
exception. The try-catch block here is only for handle exceptions in test.
##########
File path:
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
##########
@@ -232,11 +248,29 @@ public HelixTaskResult handleMessage() throws
InterruptedException {
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
}
+
+ @Override
+ public void precheckForStaleMessage() throws Exception {
+ if (_testIsMessageStaled) {
+ super.precheckForStaleMessage();
+ }
+ }
}
@Override
public MessageHandler createHandler(Message message, NotificationContext
context) {
- return new TestStateTransitionMessageHandler(message, context);
+ if (message.getResourceName()!="testStaledMessageResource") {
Review comment:
TFTR. Updated.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -919,9 +912,25 @@ public void onMessage(String instanceName, List<Message>
messages,
// discard the message. Controller will resend if this is a valid
message
throw new HelixException(String.format(
"Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(), String.valueOf(msg.isRelayMessage()),
- msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(),
- message.getToState()));
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
+ System.currentTimeMillis(), message.getFromState(),
message.getToState()));
+ }
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task
scheduled/executing.
+ Exception err = ((HelixStateTransitionHandler)
createHandler).validateStaleMessage(true /*inSchedulerCheck*/);
+ if (err != null) {
+ throw err;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
Review comment:
Yes. We do a filtering first based on current state, and if the message
is stale, we will directly delete it. (and we will throw exception as well, but
not at the execution stage as before)
Although there is no existing bug complaining 2 valid new messages at the
same time, I think we should not assume there will be no duplicated messages so
we should still keep the 3rd check.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]