alirezazamani commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r513066579
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -478,4 +482,61 @@ private TaskAssignmentCalculator
getAssignmentCalculator(JobConfig jobConfig,
}
return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
}
+
+ /**
+ * Find the tasks that have been removed from job config, add them to
tasksToDrop. If task's
+ * currentState and pending message have been removed, delete the task from
job context.
+ * @param jobName
+ * @param jobConfig
+ * @param jobContext
+ * @param currentInstanceToTaskAssignments
+ * @param tasksToDrop
+ * @param currStateOutput
+ * @param allPartitions
+ */
+ private void handleDeletedTasks(String jobName, JobConfig jobConfig,
JobContext jobContext,
+ Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+ Map<String, Set<Integer>> tasksToDrop, CurrentStateOutput
currStateOutput,
+ Set<Integer> allPartitions) {
+ if (TaskUtil.isGenericTaskJob(jobConfig)) {
+ // Get all partitions existed in the context
+ Set<Integer> contextPartitions = jobContext.getPartitionSet();
+ // Check whether the tasks have been deleted from jobConfig
+ for (Integer partition : contextPartitions) {
+ String partitionID = jobContext.getTaskIdForPartition(partition);
+ if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
Review comment:
I moved this logic to taskAssignmentCalculator class. However, we cannot
avoid some of the loops because we need to get the partitionID using
jobContext.getTaskIdForPartition(partition) and find the one which has been
removed from the config.
We basically find the partitionID in the context and see if such taskID
existed in the config it not now.
```
@Override
public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
// Get all partitions existed in the context
Set<Integer> deletedPartitions = new HashSet<>();
// Check whether the tasks have been deleted from jobConfig
for (Integer partition : jobContext.getPartitionSet()) {
String partitionID = jobContext.getTaskIdForPartition(partition);
if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
deletedPartitions.add(partition);
}
}
return deletedPartitions;
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]