alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r426807892
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assigned participant needs to be updated regardless of the current
state and context
+ // information because it will prevent controller to stuck in race
condition while there is two
+ // current states. In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
Review comment:
Here we check the if current state has been changed and it is not equal
to context information. This check is necessary because we do not need to
update the context in every pipeline. Also, the more important part of this
check is for the fields such as finish time. Let’s say we have a delay retry
time for the task. This functionality depends on the finish time in the
context. If we update finish time as long as we have current state (note that
we set it to current time), we will never schedule the task with delay retry
time. Because finish time in the context and delay retry time will be messed
up. That is why we only update the context at the first time that the current
state has been changed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]