jiajunwang commented on a change in pull request #1500:
URL: https://github.com/apache/helix/pull/1500#discussion_r514904986
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -804,40 +809,66 @@ public void onMessage(String instanceName, List<Message>
messages,
// skip the following operations for the no-op messages.
continue;
}
- // create message handlers, if handlers not found, leave its state as NEW
+
NotificationContext msgWorkingContext = changeContext.clone();
+ MessageHandler msgHandler = null;
try {
- MessageHandler msgHandler = createMessageHandler(message,
msgWorkingContext);
- if (msgHandler == null) {
- // Failed to create message handler, skip processing this message in
this callback.
- // The same message process will be retried in the next round.
- continue;
- }
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
|| message.getMsgType()
- .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
- if (validateAndProcessStateTransitionMessage(message, instanceName,
manager,
- stateTransitionHandlers, msgHandler)) {
- // Need future process by triggering state transition
- String msgTarget =
- getMessageTarget(message.getResourceName(),
message.getPartitionName());
- stateTransitionHandlers.put(msgTarget, msgHandler);
- stateTransitionContexts.put(msgTarget, msgWorkingContext);
- } else {
- // skip the following operations for the invalid/expired state
transition messages.
- continue;
- }
+ // create message handlers, if handlers not found but no exception,
leave its state as NEW
+ msgHandler = createMessageHandler(message, msgWorkingContext);
+ } catch (Exception ex) {
+ // Failed to create message handler and there is an Exception.
+ int remainingRetryCount = message.getRetryCount();
+ LOG.error(
+ "Exception happens when creating Message Handler for message {}.
Current remaining retry count is {}.",
+ message.getMsgId(), remainingRetryCount);
+ // Set the message retry count to avoid infinite retrying.
+ message.setRetryCount(remainingRetryCount - 1);
+ message.setExecuteSessionId(sessionId);
+ // continue processing in the next section where handler object is
double-checked.
+ }
+
+ if (msgHandler == null) {
+ if (message.getRetryCount() < 0) {
+ // If no more retry count remains, then mark the message to be
UNPROCESSABLE.
+ String errorMsg = String
+ .format("Message %s has a negative remaining retry count %d.
Stop processing it!",
Review comment:
Not true for "this message has been retried for "retryCount" times".
Since at this place, we don't know how many times this message has been
retried. It is not possible to print this information out.
I can add the information that the handler is not available in the error
message to make it self-containing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]