jiajunwang commented on a change in pull request #1489:
URL: https://github.com/apache/helix/pull/1489#discussion_r513811045
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -810,164 +799,52 @@ public void onMessage(String instanceName, List<Message>
messages,
Set<String> createCurStateNames = new HashSet<>();
for (Message message : messages) {
- try {
- // nop messages are simply removed. It is used to trigger onMessage()
in
- // situations such as register a new message handler factory
- if
(message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
- LOG.info(
- "Dropping NO-OP message. mid: " + message.getId() + ", from: " +
message.getMsgSrc());
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
- continue;
- }
-
- String tgtSessionId = message.getTgtSessionId();
- // sessionId mismatch normally means message comes from expired
session, just remove it
- if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
- String warningMessage =
- "SessionId does NOT match. expected sessionId: " + sessionId
- + ", tgtSessionId in message: " + tgtSessionId + ",
messageId: "
- + message.getMsgId();
- LOG.warn(warningMessage);
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
- _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class,
warningMessage, manager);
-
- // Proactively send a session sync message from participant to
controller
- // upon session mismatch after a new session is established
- if (manager.getInstanceType() == InstanceType.PARTICIPANT
- || manager.getInstanceType() ==
InstanceType.CONTROLLER_PARTICIPANT) {
- if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
- syncSessionToController(manager);
- }
- }
- continue;
- }
-
- if ((manager.getInstanceType() == InstanceType.CONTROLLER
- || manager.getInstanceType() ==
InstanceType.CONTROLLER_PARTICIPANT)
- &&
MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
- LOG.info(String.format("Controller received
PARTICIPANT_SESSION_CHANGE msg from src: %s",
- message.getMsgSrc()));
- PropertyKey key = new
Builder(manager.getClusterName()).liveInstances();
- List<LiveInstance> liveInstances =
- manager.getHelixDataAccessor().getChildValues(key, true);
- _controller.onLiveInstanceChange(liveInstances, changeContext);
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.COMPLETED);
- continue;
- }
-
- // don't process message that is of READ or UNPROCESSABLE state
- if (MessageState.NEW != message.getMsgState()) {
- // It happens because we don't delete message right after
- // read. Instead we keep it until the current state is updated.
- // We will read the message again if there is a new message but we
- // check for the status and ignore if its already read
- if (LOG.isTraceEnabled()) {
- LOG.trace("Message already read. msgId: " + message.getMsgId());
- }
- continue;
- }
-
- if (message.isExpired()) {
- LOG.info(
- "Dropping expired message. mid: " + message.getId() + ", from: "
+ message.getMsgSrc()
- + " relayed from: " + message.getRelaySrcHost());
- reportAndRemoveMessage(message, accessor, instanceName,
ProcessedMessageState.DISCARDED);
- continue;
- }
-
- // State Transition Cancellation
- if
(message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name()))
{
- boolean success = cancelNotStartedStateTransition(message,
stateTransitionHandlers, accessor, instanceName);
- if (success) {
- continue;
- }
- }
-
- _monitor.reportReceivedMessage(message);
- } catch (Exception e) {
- LOG.error("Failed to process the message {}. Deleting the message from
ZK. Exception: {}",
- message, e);
- removeMessageFromTaskAndFutureMap(message);
- removeMessageFromZK(accessor, message, instanceName);
+ if (checkForNoOpMessage(message, instanceName, changeContext, manager,
sessionId,
+ stateTransitionHandlers)) {
+ // skip the following operations for the no-op messages.
continue;
}
-
// create message handlers, if handlers not found, leave its state as NEW
NotificationContext msgWorkingContext = changeContext.clone();
try {
- MessageHandler createHandler = createMessageHandler(message,
msgWorkingContext);
- if (createHandler == null) {
+ MessageHandler msgHandler = createMessageHandler(message,
msgWorkingContext);
+ if (msgHandler == null) {
+ // Failed to create message handler, skip processing this message in
this callback.
+ // The same message process will be retried in the next round.
continue;
}
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
|| message.getMsgType()
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
- String messageTarget =
- getMessageTarget(message.getResourceName(),
message.getPartitionName());
-
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
- && isStateTransitionInProgress(messageTarget)) {
-
- String taskId = _messageTaskMap.get(messageTarget);
- Message msg = _taskMap.get(taskId).getTask().getMessage();
-
- // If there is another state transition for same partition is
going on,
- // discard the message. Controller will resend if this is a valid
message
- String errMsg = String.format(
- "Another state transition for %s:%s is in progress with msg:
%s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
- String.valueOf(msg.isRelayMessage()), msg.getReadTimeStamp(),
- System.currentTimeMillis(), message.getFromState(),
message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor,
- instanceName, manager);
- continue;
- }
- if (createHandler instanceof HelixStateTransitionHandler) {
- // We only check to state if there is no ST task
scheduled/executing.
- HelixStateTransitionHandler.StaleMessageValidateResult result =
- ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
- if (!result.isValid) {
- handleUnprocessableMessage(message, null /* exception */,
- result.exception.getMessage(), accessor, instanceName,
manager);
- continue;
- }
- }
- if (stateTransitionHandlers.containsKey(messageTarget)) {
- // If there are 2 messages in same batch about same partition's
state transition,
- // the later one is discarded
- Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
- String errMsg = String.format(
- "Duplicated state transition message: %s. Existing: %s->%s;
New (Discarded): %s->%s",
- message.getMsgId(), duplicatedMessage.getFromState(),
- duplicatedMessage.getToState(), message.getFromState(),
message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor,
- instanceName, manager);
+ if (validateStateTransitionMessage(message, instanceName, manager,
+ stateTransitionHandlers, msgHandler)) {
+ // Need future process by triggering state transition
+ String msgTarget =
+ getMessageTarget(message.getResourceName(),
message.getPartitionName());
+ stateTransitionHandlers.put(msgTarget, msgHandler);
+ stateTransitionContexts.put(msgTarget, msgWorkingContext);
+ } else {
+ // skip the following operations for the invalid/expired state
transition messages.
continue;
}
-
- stateTransitionHandlers
- .put(getMessageTarget(message.getResourceName(),
message.getPartitionName()),
- createHandler);
- stateTransitionContexts
- .put(getMessageTarget(message.getResourceName(),
message.getPartitionName()),
- msgWorkingContext);
} else {
- nonStateTransitionHandlers.add(createHandler);
+ // Need future process non state transition messages by triggering
the handler
+ nonStateTransitionHandlers.add(msgHandler);
nonStateTransitionContexts.add(msgWorkingContext);
}
} catch (Exception e) {
handleUnprocessableMessage(message, e, e.getMessage(), accessor,
instanceName, manager);
continue;
}
- markReadMessage(message, msgWorkingContext, manager);
- readMsgs.add(message);
-
Review comment:
Nothing being removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]