pkuwm commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r512294311
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -478,4 +485,52 @@ private TaskAssignmentCalculator
getAssignmentCalculator(JobConfig jobConfig,
}
return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
}
+
+ /**
+ * Add the removed task to tasksToDrop to drop its current state. If task's
currentState and
+ * pending message have been removed, delete the task from job context.
+ * @param jobName
Review comment:
The method is private, we may not bother to add the `param` with empty
descriptions? I suggest we just remove them.
##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -478,4 +485,52 @@ private TaskAssignmentCalculator
getAssignmentCalculator(JobConfig jobConfig,
}
return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
}
+
+ /**
+ * Add the removed task to tasksToDrop to drop its current state. If task's
currentState and
+ * pending message have been removed, delete the task from job context.
+ * @param jobName
+ * @param jobContext
+ * @param currentInstanceToTaskAssignments
+ * @param tasksToDrop
+ * @param currStateOutput
+ * @param allPartitions
+ */
+ private void handleDeletedTasks(String jobName, JobContext jobContext,
+ Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
+ Map<String, Set<Integer>> tasksToDrop, CurrentStateOutput
currStateOutput,
+ Set<Integer> allPartitions, Set<Integer> removedPartitions) {
+ for (Integer partition : removedPartitions) {
+ boolean hasCurrentState = false;
+ for (Map.Entry<String, SortedSet<Integer>> instanceToPartitions :
currentInstanceToTaskAssignments
+ .entrySet()) {
+ String instance = instanceToPartitions.getKey();
+ if (instanceToPartitions.getValue().contains(partition)) {
+ LOG.info(
+ "Task {} should be removed from job {}. Current State should be
removed from instance {} as well!",
+ partition, jobName, instance);
+ if (!tasksToDrop.containsKey(instance)) {
+ tasksToDrop.put(instance, new HashSet<>());
+ }
+ tasksToDrop.get(instance).add(partition);
+
+ // If current state or pending message have not been removed yet, we
should not
+ // delete the context and leave unclean currentState
+ String pName = pName(jobName, partition);
+ if (currStateOutput.getCurrentState(jobName, new Partition(pName),
instance) != null
Review comment:
If `!hasCurrentState` is also added to this if condition, when it is
true, we don't need to `new Partition()` for later checks?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]