alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428931451
##########
File path:
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState
updateJobContextAndGetTaskCurrentState(
return stateFromContext == null ? TaskPartitionState.INIT :
stateFromContext;
}
TaskPartitionState currentState =
TaskPartitionState.valueOf(currentStateString);
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currentStateOutput, jobResource,
currentState, jobCtx,
+ pId, pName, instance);
+ return currentState;
+ }
+
+ /**
+ * Based on the CurrentState of this task and Context information, the task
information in the job
+ * context gets updated.
+ * @param currentStateOutput
+ * @param jobResource
+ * @param currentState
+ * @param jobCtx
+ * @param pId
+ * @param pName
+ * @param instance
+ */
+ private void updatePartitionInformationInJobContext(CurrentStateOutput
currentStateOutput,
+ String jobResource, TaskPartitionState currentState, JobContext jobCtx,
Integer pId,
+ String pName, String instance) {
+ // The assignedParticipant field needs to be updated regardless of the
current state and context
+ // information because it will prevent controller to assign the task to
the wrong participant
+ // for targeted tasks when two CurrentStates exist for one task.
+ // In the updatePreviousAssignedTasksStatus, we check
+ // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the
assignment if instance is
+ // not equal to job context's AssignedParticipant for this pId.
jobCtx.setAssignedParticipant(pId, instance);
- jobCtx.setPartitionState(pId, currentState);
- String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
- if (taskMsg != null) {
- jobCtx.setPartitionInfo(pId, taskMsg);
+ // If job context needs to be updated with new state, update it accordingly
+ // This check is necessary because we are relying on current state and we
do not want to update
+ // context as long as current state existed. We just want to update
context information
+ // (specially finish time) once.
+ // This condition checks whether jobContext's state is out of date or not.
+ if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+ jobCtx.setPartitionState(pId, currentState);
+ String taskMsg = currentStateOutput.getInfo(jobResource, new
Partition(pName), instance);
+ if (taskMsg != null) {
+ jobCtx.setPartitionInfo(pId, taskMsg);
+ }
Review comment:
The participant read the message, handle state change and then delete
the message. So when messages get deleted from ZK after ST is completed. So we
should be safe here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]