alirezazamani commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r516265126
##########
File path:
helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
##########
@@ -146,6 +147,20 @@ public
ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
return taskAssignment;
}
+ @Override
+ public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
Review comment:
I think it is safe to keep it as it is because for other methods the
same rule is applied. For example, getAllTaskPartiotions are the same in both
classes.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
##########
@@ -95,6 +97,20 @@
return placement.computeMapping(jobCfg, jobContext, partitionNums,
resourceId);
}
+ @Override
+ public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
+ // Get all partitions existed in the context
+ Set<Integer> deletedPartitions = new HashSet<>();
+ // Check whether the tasks have been deleted from jobConfig
+ for (Integer partition : jobContext.getPartitionSet()) {
+ String partitionID = jobContext.getTaskIdForPartition(partition);
+ if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
+ deletedPartitions.add(partition);
+ }
+ }
Review comment:
I will sync up with you on this offline.
##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -585,36 +585,195 @@ public void addTask(String workflowName, String jobName,
TaskConfig taskConfig,
long endTime = System.currentTimeMillis() + timeoutMs;
- validateAddTaskConfigs(workflowName, jobName, taskConfig);
+ validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+ for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+ if (taskEntry.equals(taskConfig.getId())) {
+ throw new HelixException(
+ "Task cannot be added because another task with the same ID
already exists!");
+ }
+ }
+
WorkflowContext workflowContext = getWorkflowContext(workflowName);
JobContext jobContext = getJobContext(nameSpaceJobName);
+ // If workflow context or job context is null. It means job has not been
started. Hence task can
+ // be added to the job
+ if (workflowContext != null && jobContext != null) {
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ if (jobState != null &&
ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state for task addition. Job State is " +
jobState);
+ }
+ }
+
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ } else {
+ LOG.error("JobConfig DataUpdater: Fails to update JobConfig.
CurrentData is null.");
+ }
+ return currentData;
+ };
+
+ updateTaskInJobConfig(workflowName, jobName, updater);
+
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+
if (workflowContext == null || jobContext == null) {
- // Workflow context or job context is null. It means job has not been
started. Hence task can
- // be added to the job
- addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
return;
}
- TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ String taskID = taskConfig.getId();
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) ==
TaskState.IN_PROGRESS) {
+ return;
+ }
- if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
- throw new HelixException(
- String.format("Job %s is in illegal state to accept new task. Job
State is %s",
- nameSpaceJobName, jobState));
+ Thread.sleep(DEFAULT_SLEEP);
+ }
+ throw new TimeoutException("An unexpected issue happened while task being
added to the job!");
+ }
+
+ /**
+ * Delete an existing task from a running (IN-PROGRESS) job or a job which
has not started yet.
+ * Timeout for this operation is the default timeout which is 5 minutes.
+ * {@link TaskDriver#DEFAULT_TIMEOUT}
+ * Note1: Task cannot be deleted from the job which is in an illegal state.
Task can be deleted
+ * from the job if the job is in-progress or it has not started yet.
+ * Note2: The tasks can only be deleted from non-targeted jobs.
+ * Note3: In case of timeout exception, it is the user's responsibility to
check whether the task
+ * has been successfully deleted or not.
+ * Note4: timeout is the time that this API checks whether the task has been
successfully deleted
+ * or not.
+ * @param workflowName
+ * @param jobName
+ * @param taskID
+ * @throws TimeoutException if the outcome of the task deletion is unknown
and cannot be verified
+ * @throws IllegalArgumentException if the inputs are invalid
+ * @throws HelixException if the job is not in the states to accept a new
task or if there is any
+ * issue in updating jobConfig.
+ */
+ public void deleteTask(String workflowName, String jobName, String taskID)
+ throws TimeoutException, InterruptedException {
+ deleteTask(workflowName, jobName, taskID, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Delete an existing task from a running (IN-PROGRESS) job or a job which
has not started yet.
+ * Note1: Task cannot be deleted from the job which is in an illegal state.
Task can be deleted
+ * from the job if the job is in-progress or it has not started yet.
+ * Note2: The tasks can only be deleted from non-targeted jobs.
+ * Note3: In case of timeout exception, it is the user's responsibility to
check whether the task
+ * has been successfully deleted or not.
+ * Note4: timeout is the time that this API checks whether the task has been
successfully deleted
+ * or not.
+ * @param workflowName
+ * @param jobName
+ * @param taskID
+ * @param timeoutMs
+ * @throws TimeoutException if the outcome of the task deletion is unknown
and cannot be verified
+ * @throws IllegalArgumentException if the inputs are invalid
+ * @throws HelixException if the job is not in the states to accept a new
task or if there is any
+ * issue in updating jobConfig.
+ */
+ public void deleteTask(String workflowName, String jobName, String taskID,
long timeoutMs)
+ throws TimeoutException, InterruptedException {
+ long endTime = System.currentTimeMillis() + timeoutMs;
+
+ String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = getJobConfig(nameSpaceJobName);
+ if (jobConfig == null) {
+ throw new IllegalArgumentException("Job " + nameSpaceJobName + " config
does not exist!");
+ }
+
+ TaskConfig taskConfig = null;
+ Map<String, TaskConfig> allTaskConfigs = jobConfig.getTaskConfigMap();
+ for (Map.Entry<String, TaskConfig> entry : allTaskConfigs.entrySet()) {
+ if (entry.getKey().equals(taskID)) {
+ taskConfig = entry.getValue();
+ }
+ }
+
+ validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
+
+ WorkflowContext workflowContext = getWorkflowContext(workflowName);
+ JobContext jobContext = getJobContext(nameSpaceJobName);
+ // If workflow context or job context is null. It means job has not been
started. Hence task can
+ // be deleted from the job
+ if (workflowContext != null && jobContext != null) {
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ if (jobState != null &&
ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state for task deletion. Job State is " +
jobState);
+ }
+ }
+
+ DataUpdater<ZNRecord> taskRemover = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ Map<String, Map<String, String>> taskMap =
currentData.getMapFields();
+ if (taskMap == null) {
+ LOG.warn("Could not update the jobConfig: " + jobName + " Znode
MapField is null.");
+ return null;
+ }
+ Map<String, Map<String, String>> newTaskMap = new HashMap<String,
Map<String, String>>();
+ for (Map.Entry<String, Map<String, String>> entry :
taskMap.entrySet()) {
+ if (!entry.getKey().equals(taskID)) {
+ newTaskMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ currentData.setMapFields(newTaskMap);
+ }
+ return currentData;
+ }
+ };
+
+ updateTaskInJobConfig(workflowName, jobName, taskRemover);
+
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+
+ if (workflowContext == null || jobContext == null) {
+ return;
}
- addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ if (!jobContext.getTaskIdPartitionMap().containsKey(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) ==
TaskState.IN_PROGRESS) {
Review comment:
Yeah, I agree. removed the IN_PROGRESS condition.
##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -585,36 +585,195 @@ public void addTask(String workflowName, String jobName,
TaskConfig taskConfig,
long endTime = System.currentTimeMillis() + timeoutMs;
- validateAddTaskConfigs(workflowName, jobName, taskConfig);
+ validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+ for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+ if (taskEntry.equals(taskConfig.getId())) {
+ throw new HelixException(
+ "Task cannot be added because another task with the same ID
already exists!");
+ }
+ }
+
WorkflowContext workflowContext = getWorkflowContext(workflowName);
JobContext jobContext = getJobContext(nameSpaceJobName);
+ // If workflow context or job context is null. It means job has not been
started. Hence task can
+ // be added to the job
+ if (workflowContext != null && jobContext != null) {
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ if (jobState != null &&
ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state for task addition. Job State is " +
jobState);
+ }
+ }
+
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ } else {
+ LOG.error("JobConfig DataUpdater: Fails to update JobConfig.
CurrentData is null.");
+ }
+ return currentData;
+ };
+
+ updateTaskInJobConfig(workflowName, jobName, updater);
+
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+
if (workflowContext == null || jobContext == null) {
- // Workflow context or job context is null. It means job has not been
started. Hence task can
- // be added to the job
- addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
return;
}
- TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ String taskID = taskConfig.getId();
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) ==
TaskState.IN_PROGRESS) {
Review comment:
Added some java docs note. Thanks.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
##########
@@ -95,6 +97,20 @@
return placement.computeMapping(jobCfg, jobContext, partitionNums,
resourceId);
}
+ @Override
+ public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
+ // Get all partitions existed in the context
+ Set<Integer> deletedPartitions = new HashSet<>();
+ // Check whether the tasks have been deleted from jobConfig
+ for (Integer partition : jobContext.getPartitionSet()) {
+ String partitionID = jobContext.getTaskIdForPartition(partition);
+ if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
+ deletedPartitions.add(partition);
+ }
+ }
Review comment:
Synced offline. Resolved.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
##########
@@ -146,6 +147,20 @@ public
ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
return taskAssignment;
}
+ @Override
+ public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
Review comment:
As we discussed offline, I will address this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]