zhangmeng916 commented on a change in pull request #1500:
URL: https://github.com/apache/helix/pull/1500#discussion_r516897658
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -804,40 +809,67 @@ public void onMessage(String instanceName, List<Message>
messages,
// skip the following operations for the no-op messages.
continue;
}
- // create message handlers, if handlers not found, leave its state as NEW
+
NotificationContext msgWorkingContext = changeContext.clone();
+ MessageHandler msgHandler = null;
try {
- MessageHandler msgHandler = createMessageHandler(message,
msgWorkingContext);
- if (msgHandler == null) {
- // Failed to create message handler, skip processing this message in
this callback.
- // The same message process will be retried in the next round.
- continue;
- }
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
|| message.getMsgType()
- .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
- if (validateAndProcessStateTransitionMessage(message, instanceName,
manager,
- stateTransitionHandlers, msgHandler)) {
- // Need future process by triggering state transition
- String msgTarget =
- getMessageTarget(message.getResourceName(),
message.getPartitionName());
- stateTransitionHandlers.put(msgTarget, msgHandler);
- stateTransitionContexts.put(msgTarget, msgWorkingContext);
- } else {
- // skip the following operations for the invalid/expired state
transition messages.
- continue;
- }
+ // create message handlers, if handlers not found but no exception,
leave its state as NEW
+ msgHandler = createMessageHandler(message, msgWorkingContext);
+ } catch (Exception ex) {
+ // Failed to create message handler and there is an Exception.
+ int remainingRetryCount = message.getRetryCount();
+ LOG.error(
+ "Exception happens when creating Message Handler for message {}.
Current remaining retry count is {}.",
+ message.getMsgId(), remainingRetryCount);
+ // Set the message retry count to avoid infinite retrying.
+ message.setRetryCount(remainingRetryCount - 1);
+ message.setExecuteSessionId(sessionId);
+ // continue processing in the next section where handler object is
double-checked.
+ }
+
+ if (msgHandler == null) {
+ if (message.getRetryCount() < 0) {
+ // If no more retry count remains, then mark the message to be
UNPROCESSABLE.
+ String errorMsg = String
+ .format("No available message Handler found!"
+ + " Stop processing message %s since it has a negative
remaining retry count %d!",
+ message.getMsgId(), message.getRetryCount());
+ updateUnprocessableMessage(message, null, errorMsg, manager);
Review comment:
Seems even there is no exception, you will still set the message as
unprocessable after retry instead of leaving it as NEW?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -613,7 +613,12 @@ void reset() {
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
if (item.factory() != null) {
- item.factory().reset();
+ try {
+ item.factory().reset();
+ } catch (Exception ex) {
Review comment:
I don't quite understand why we swallow exception here. This exception
means user's reset has an issue, and can we just let the error go?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -993,67 +1034,78 @@ private boolean checkAndProcessNoOpMessage(Message
message, String instanceName,
* Preprocess the state transition message to validate if the request is
valid.
* If no operation needs to be triggered, discard the the message.
* @param message
- * @param instanceName
* @param manager
* @param stateTransitionHandlers
* @param createHandler
* @return True if the requested state transition is valid, and need to
schedule the transition.
* False if no more operation is required.
*/
- private boolean validateAndProcessStateTransitionMessage(Message message,
String instanceName,
- HelixManager manager, Map<String, MessageHandler>
stateTransitionHandlers,
- MessageHandler createHandler) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
+ private boolean validateAndProcessStateTransitionMessage(Message message,
HelixManager manager,
+ Map<String, MessageHandler> stateTransitionHandlers, MessageHandler
createHandler) {
String messageTarget = getMessageTarget(message.getResourceName(),
message.getPartitionName());
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
- && isStateTransitionInProgress(messageTarget)) {
- String taskId = _messageTaskMap.get(messageTarget);
- Message msg = _taskMap.get(taskId).getTask().getMessage();
- // If there is another state transition for same partition is going on,
- // discard the message. Controller will resend if this is a valid message
- String errMsg = String.format(
- "Another state transition for %s:%s is in progress with msg: %s,
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
- msg.isRelayMessage(), msg.getReadTimeStamp(),
System.currentTimeMillis(),
- message.getFromState(), message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor, instanceName,
- manager);
- return false;
- }
- if (createHandler instanceof HelixStateTransitionHandler) {
- // We only check to state if there is no ST task scheduled/executing.
- HelixStateTransitionHandler.StaleMessageValidateResult result =
- ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
- if (!result.isValid) {
- handleUnprocessableMessage(message, null /* exception */,
result.exception.getMessage(),
- accessor, instanceName, manager);
+
+ try {
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+ && isStateTransitionInProgress(messageTarget)) {
+ String taskId = _messageTaskMap.get(messageTarget);
+ Message msg = _taskMap.get(taskId).getTask().getMessage();
+ // If there is another state transition for same partition is going on,
+ // discard the message. Controller will resend if this is a valid
message
+ String errMsg = String.format(
+ "Another state transition for %s:%s is in progress with msg: %s,
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ msg.isRelayMessage(), msg.getReadTimeStamp(),
System.currentTimeMillis(),
+ message.getFromState(), message.getToState());
+ updateUnprocessableMessage(message, null /* exception */, errMsg,
manager);
return false;
}
- }
- if (stateTransitionHandlers.containsKey(messageTarget)) {
- // If there are 2 messages in same batch about same partition's state
transition,
- // the later one is discarded
- Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
- String errMsg = String.format(
- "Duplicated state transition message: %s. Existing: %s->%s; New
(Discarded): %s->%s",
- message.getMsgId(), duplicatedMessage.getFromState(),
duplicatedMessage.getToState(),
- message.getFromState(), message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor, instanceName,
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task scheduled/executing.
+ HelixStateTransitionHandler.StaleMessageValidateResult result =
+ ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
+ if (!result.isValid) {
+ updateUnprocessableMessage(message, null /* exception */,
result.exception.getMessage(),
+ manager);
+ return false;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
+ // If there are 2 messages in same batch about same partition's state
transition,
+ // the later one is discarded
+ Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
+ String errMsg = String.format(
+ "Duplicated state transition message: %s. Existing: %s->%s; New
(Discarded): %s->%s",
+ message.getMsgId(), duplicatedMessage.getFromState(),
duplicatedMessage.getToState(),
+ message.getFromState(), message.getToState());
+ updateUnprocessableMessage(message, null /* exception */, errMsg,
manager);
Review comment:
I think we changed the original behavior, and never remove messages in
this case. There is a mismatch in the comment. But, is this what we would like
to do? not removing duplicated message, but only mark it? Seems we leave many
unnecessary messages on ZK.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]