alirezazamani commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r513066579



##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -478,4 +482,61 @@ private TaskAssignmentCalculator 
getAssignmentCalculator(JobConfig jobConfig,
     }
     return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
   }
+
+  /**
+   * Find the tasks that have been removed from job config, add them to 
tasksToDrop. If task's
+   * currentState and pending message have been removed, delete the task from 
job context.
+   * @param jobName
+   * @param jobConfig
+   * @param jobContext
+   * @param currentInstanceToTaskAssignments
+   * @param tasksToDrop
+   * @param currStateOutput
+   * @param allPartitions
+   */
+  private void handleDeletedTasks(String jobName, JobConfig jobConfig, 
JobContext jobContext,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+      Map<String, Set<Integer>> tasksToDrop, CurrentStateOutput 
currStateOutput,
+      Set<Integer> allPartitions) {
+    if (TaskUtil.isGenericTaskJob(jobConfig)) {
+      // Get all partitions existed in the context
+      Set<Integer> contextPartitions = jobContext.getPartitionSet();
+      // Check whether the tasks have been deleted from jobConfig
+      for (Integer partition : contextPartitions) {
+        String partitionID = jobContext.getTaskIdForPartition(partition);
+        if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {

Review comment:
       I moved this logic to taskAssignmentCalculator class. However, we cannot 
avoid some of the loops because we need to get the partitionID using 
jobContext.getTaskIdForPartition(partition) and find the one which has been 
removed from the config.
   We basically find the partitionID in the context and see if such taskID 
existed in the config it not now.
   ```
     @Override
     public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext 
jobContext, Set<Integer> allPartitions) {
       // Get all partitions existed in the context
       Set<Integer> deletedPartitions = new HashSet<>();
       // Check whether the tasks have been deleted from jobConfig
       for (Integer partition : jobContext.getPartitionSet()) {
         String partitionID = jobContext.getTaskIdForPartition(partition);
         if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
           deletedPartitions.add(partition);
         }
       }
       return deletedPartitions;
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to