alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r426807350



##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -99,16 +99,15 @@ public void updatePreviousAssignedTasksStatus(
       }
 
       // If not an excluded instance, we must instantiate its entry in 
assignedPartitions
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
 
       // We need to remove all task pId's to be dropped because we already 
made an assignment in
       // paMap above for them to be dropped. The following does this.
       if (tasksToDrop.containsKey(instance)) {
         pSet.removeAll(tasksToDrop.get(instance));
       }
 
-      // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in one the INIT or DROPPED 
states.

Review comment:
       Yes. Done.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the 
context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the 
instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       The reason is the behavior has been changed in this PR. We have an 
assignment for it and the assignment is dropped. So, it goes to assigned 
partitions. Once it is dropped from participant, then we do not even consider 
it for the new assignment.
   

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the 
context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the 
instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));

Review comment:
       You are right. I also had this concern. But note that we do not change 
the JobContext to DROPPED. Let’s say controller sends COMPLETED to DROPPED 
message to the participant. Now, the current state will be COMPLETED until 
participant processes the message. Once it is processed, the current state will 
be null, and that task will not be considered again. So, to answer your 
question, it is totally depending on the behavior of the participant. If 
participant set the current state  to DROPPED, which is not the case here, your 
concern was correct. Since participant removes the current state of the task, 
we do not need to be worry about that case.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -263,7 +251,11 @@ public void updatePreviousAssignedTasksStatus(
         case TASK_ABORTED:
 
         case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
+          // First make this task which is in terminal state to be dropped.
+          // Later on, in next pipeline in handleAdditionalAssignments, the 
task will be retried if possible.
+          // (meaning it is not ABORTED and max number of attempts has not 
been reached yet)
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));

Review comment:
       The answer to this question is also similar to COMPLETED case :). Please 
see it above.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context

Review comment:
       Fixed. Thanks.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two

Review comment:
       The race condition that here is scenario that I am talking about is the 
scenario where we have two current states of a task in different participants. 
Since we have only one slot for each task in paMap, we want to make sure that 
the task is DROPPED from the wrong one and is assigned to the correct instance. 
I am basically talking about this #922 and #461. Let me rephrase this comment 
to make it clear.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.

Review comment:
       Exactly. We add this check in PR #923 to help resolve this scenario. 
Maybe using race condition is not the good word here. I rephrased the comments.

##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState 
updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current 
state and context
+    // information because it will prevent controller to stuck in race 
condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       Exactly. I added a comment about it.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx, 
Set<Integer> allPartitions,
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied 
over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be 
dropped
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName,
+  protected static Map<String, SortedSet<Integer>> 
getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, 
String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task 
Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = 
prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source 
of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task 
currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be 
dropped (after a
-    // copy-over, the Controller will send a message to drop the state 
currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as 
source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds here and make decision about the existing 
tasks with current state
+    // in updatePreviousAssignedTasksStatus method

Review comment:
       Fixed. Thanks.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx, 
Set<Integer> allPartitions,
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied 
over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be 
dropped
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName,
+  protected static Map<String, SortedSet<Integer>> 
getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, 
String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task 
Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = 
prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source 
of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task 
currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be 
dropped (after a
-    // copy-over, the Controller will send a message to drop the state 
currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as 
source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds here and make decision about the existing 
tasks with current state
+    // in updatePreviousAssignedTasksStatus method
     Map<Partition, Map<String, String>> partitions = 
currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : 
partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
       for (Map.Entry<String, String> instanceToCurrState : 
entry.getValue().entrySet()) {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), 
instance);
-        TaskPartitionState currState = 
TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping 
transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here 
because old
-          // Participants, upon connection reset, set tasks' currentStates to 
INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          // We must add all pIds back here

Review comment:
       I agree. Removed the comment. Basically, we have explained everything in 
the comments already.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx, 
Set<Integer> allPartitions,
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied 
over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be 
dropped
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName,
+  protected static Map<String, SortedSet<Integer>> 
getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, 
String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task 
Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = 
prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source 
of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task 
currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be 
dropped (after a
-    // copy-over, the Controller will send a message to drop the state 
currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as 
source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds here and make decision about the existing 
tasks with current state
+    // in updatePreviousAssignedTasksStatus method
     Map<Partition, Map<String, String>> partitions = 
currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : 
partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
       for (Map.Entry<String, String> instanceToCurrState : 
entry.getValue().entrySet()) {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), 
instance);
-        TaskPartitionState currState = 
TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping 
transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here 
because old
-          // Participants, upon connection reset, set tasks' currentStates to 
INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          // We must add all pIds back here
+          result.get(instance).add(pId);
           // Check if this task needs to be dropped. If so, we need to add to 
tasksToDrop no matter
-          // what its current state is so that it will be dropped
+          // what its current state is so that it will be dropped.

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -462,10 +427,10 @@ private static boolean isJobComplete(JobContext ctx, 
Set<Integer> allPartitions,
    * If partition is missing from prevInstanceToTaskAssignments (e.g. previous 
assignment is
    * deleted) it is added from context. Otherwise, the context won't be 
updated.
    * @param jobCtx Job Context
-   * @param prevInstanceToTaskAssignments instance -> partitionIds from 
previous assignment
+   * @param currentInstanceToTaskAssignments instance -> partitionIds from 
current assignment

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to