NealSun96 commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r427611990



##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -238,21 +238,20 @@ private ResourceAssignment computeResourceMapping(String 
jobResource,
     // These dropping transitions will be prioritized above all task state 
transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, 
jobResource, tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, 
prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, 
currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + 
excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + 
excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, 
excludedInstances, jobResource,

Review comment:
       Should this function be renamed? 

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -121,17 +120,6 @@ public void updatePreviousAssignedTasksStatus(
               instance, pId);
           continue;
         }

Review comment:
       Would it be a problem that this skipping check is now done after 
markPartitionCompleted/markPartitionError? 

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }

Review comment:
       Is there a possibility that a message exists without state changes? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to