dasahcc commented on a change in pull request #1468:
URL: https://github.com/apache/helix/pull/1468#discussion_r516205706
##########
File path:
helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
##########
@@ -95,6 +97,20 @@
return placement.computeMapping(jobCfg, jobContext, partitionNums,
resourceId);
}
+ @Override
+ public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
+ // Get all partitions existed in the context
+ Set<Integer> deletedPartitions = new HashSet<>();
+ // Check whether the tasks have been deleted from jobConfig
+ for (Integer partition : jobContext.getPartitionSet()) {
+ String partitionID = jobContext.getTaskIdForPartition(partition);
+ if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
+ deletedPartitions.add(partition);
+ }
+ }
Review comment:
We can save the loop operation by set operation as:
Set<Integer> deletedPartitions = new HashSet<>(jobContext.getPartitionSet());
deletedPartitions.removeAll(jobConfig.getTaskConfigMap().keySet());
##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -585,36 +585,195 @@ public void addTask(String workflowName, String jobName,
TaskConfig taskConfig,
long endTime = System.currentTimeMillis() + timeoutMs;
- validateAddTaskConfigs(workflowName, jobName, taskConfig);
+ validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+ for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+ if (taskEntry.equals(taskConfig.getId())) {
+ throw new HelixException(
+ "Task cannot be added because another task with the same ID
already exists!");
+ }
+ }
+
WorkflowContext workflowContext = getWorkflowContext(workflowName);
JobContext jobContext = getJobContext(nameSpaceJobName);
+ // If workflow context or job context is null. It means job has not been
started. Hence task can
+ // be added to the job
+ if (workflowContext != null && jobContext != null) {
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ if (jobState != null &&
ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state for task addition. Job State is " +
jobState);
+ }
+ }
+
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ } else {
+ LOG.error("JobConfig DataUpdater: Fails to update JobConfig.
CurrentData is null.");
+ }
+ return currentData;
+ };
+
+ updateTaskInJobConfig(workflowName, jobName, updater);
+
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+
if (workflowContext == null || jobContext == null) {
- // Workflow context or job context is null. It means job has not been
started. Hence task can
- // be added to the job
- addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
return;
}
- TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ String taskID = taskConfig.getId();
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) ==
TaskState.IN_PROGRESS) {
Review comment:
Please make sure user understand it. If they have the delay scheduling
between tasks or if there are dependencies, this add may fail. Also we do not
do "rollback" for added tasks.
##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -585,36 +585,195 @@ public void addTask(String workflowName, String jobName,
TaskConfig taskConfig,
long endTime = System.currentTimeMillis() + timeoutMs;
- validateAddTaskConfigs(workflowName, jobName, taskConfig);
+ validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+ for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+ if (taskEntry.equals(taskConfig.getId())) {
+ throw new HelixException(
+ "Task cannot be added because another task with the same ID
already exists!");
+ }
+ }
+
WorkflowContext workflowContext = getWorkflowContext(workflowName);
JobContext jobContext = getJobContext(nameSpaceJobName);
+ // If workflow context or job context is null. It means job has not been
started. Hence task can
+ // be added to the job
+ if (workflowContext != null && jobContext != null) {
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ if (jobState != null &&
ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state for task addition. Job State is " +
jobState);
+ }
+ }
+
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ } else {
+ LOG.error("JobConfig DataUpdater: Fails to update JobConfig.
CurrentData is null.");
+ }
+ return currentData;
+ };
+
+ updateTaskInJobConfig(workflowName, jobName, updater);
+
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+
if (workflowContext == null || jobContext == null) {
- // Workflow context or job context is null. It means job has not been
started. Hence task can
- // be added to the job
- addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
return;
}
- TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ String taskID = taskConfig.getId();
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ if (jobContext.getTaskIdPartitionMap().containsKey(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) ==
TaskState.IN_PROGRESS) {
+ return;
+ }
- if (ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
- throw new HelixException(
- String.format("Job %s is in illegal state to accept new task. Job
State is %s",
- nameSpaceJobName, jobState));
+ Thread.sleep(DEFAULT_SLEEP);
+ }
+ throw new TimeoutException("An unexpected issue happened while task being
added to the job!");
+ }
+
+ /**
+ * Delete an existing task from a running (IN-PROGRESS) job or a job which
has not started yet.
+ * Timeout for this operation is the default timeout which is 5 minutes.
+ * {@link TaskDriver#DEFAULT_TIMEOUT}
+ * Note1: Task cannot be deleted from the job which is in an illegal state.
Task can be deleted
+ * from the job if the job is in-progress or it has not started yet.
+ * Note2: The tasks can only be deleted from non-targeted jobs.
+ * Note3: In case of timeout exception, it is the user's responsibility to
check whether the task
+ * has been successfully deleted or not.
+ * Note4: timeout is the time that this API checks whether the task has been
successfully deleted
+ * or not.
+ * @param workflowName
+ * @param jobName
+ * @param taskID
+ * @throws TimeoutException if the outcome of the task deletion is unknown
and cannot be verified
+ * @throws IllegalArgumentException if the inputs are invalid
+ * @throws HelixException if the job is not in the states to accept a new
task or if there is any
+ * issue in updating jobConfig.
+ */
+ public void deleteTask(String workflowName, String jobName, String taskID)
+ throws TimeoutException, InterruptedException {
+ deleteTask(workflowName, jobName, taskID, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Delete an existing task from a running (IN-PROGRESS) job or a job which
has not started yet.
+ * Note1: Task cannot be deleted from the job which is in an illegal state.
Task can be deleted
+ * from the job if the job is in-progress or it has not started yet.
+ * Note2: The tasks can only be deleted from non-targeted jobs.
+ * Note3: In case of timeout exception, it is the user's responsibility to
check whether the task
+ * has been successfully deleted or not.
+ * Note4: timeout is the time that this API checks whether the task has been
successfully deleted
+ * or not.
+ * @param workflowName
+ * @param jobName
+ * @param taskID
+ * @param timeoutMs
+ * @throws TimeoutException if the outcome of the task deletion is unknown
and cannot be verified
+ * @throws IllegalArgumentException if the inputs are invalid
+ * @throws HelixException if the job is not in the states to accept a new
task or if there is any
+ * issue in updating jobConfig.
+ */
+ public void deleteTask(String workflowName, String jobName, String taskID,
long timeoutMs)
+ throws TimeoutException, InterruptedException {
+ long endTime = System.currentTimeMillis() + timeoutMs;
+
+ String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = getJobConfig(nameSpaceJobName);
+ if (jobConfig == null) {
+ throw new IllegalArgumentException("Job " + nameSpaceJobName + " config
does not exist!");
+ }
+
+ TaskConfig taskConfig = null;
+ Map<String, TaskConfig> allTaskConfigs = jobConfig.getTaskConfigMap();
+ for (Map.Entry<String, TaskConfig> entry : allTaskConfigs.entrySet()) {
+ if (entry.getKey().equals(taskID)) {
+ taskConfig = entry.getValue();
+ }
+ }
+
+ validateConfigsForTaskModifications(workflowName, jobName, taskConfig);
+
+ WorkflowContext workflowContext = getWorkflowContext(workflowName);
+ JobContext jobContext = getJobContext(nameSpaceJobName);
+ // If workflow context or job context is null. It means job has not been
started. Hence task can
+ // be deleted from the job
+ if (workflowContext != null && jobContext != null) {
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+ if (jobState != null &&
ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state for task deletion. Job State is " +
jobState);
+ }
+ }
+
+ DataUpdater<ZNRecord> taskRemover = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData != null) {
+ Map<String, Map<String, String>> taskMap =
currentData.getMapFields();
+ if (taskMap == null) {
+ LOG.warn("Could not update the jobConfig: " + jobName + " Znode
MapField is null.");
+ return null;
+ }
+ Map<String, Map<String, String>> newTaskMap = new HashMap<String,
Map<String, String>>();
+ for (Map.Entry<String, Map<String, String>> entry :
taskMap.entrySet()) {
+ if (!entry.getKey().equals(taskID)) {
+ newTaskMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ currentData.setMapFields(newTaskMap);
+ }
+ return currentData;
+ }
+ };
+
+ updateTaskInJobConfig(workflowName, jobName, taskRemover);
+
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+
+ if (workflowContext == null || jobContext == null) {
+ return;
}
- addTaskToJobConfig(workflowName, jobName, taskConfig, endTime);
+
+ while (System.currentTimeMillis() <= endTime) {
+ jobContext =
+
_accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName,
jobName));
+ workflowContext =
+
_accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName));
+ if (!jobContext.getTaskIdPartitionMap().containsKey(taskID)
+ && workflowContext.getJobState(nameSpaceJobName) ==
TaskState.IN_PROGRESS) {
Review comment:
This condition may not be correct. If this is the only task, job can be
completed or something.
##########
File path:
helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
##########
@@ -146,6 +147,20 @@ public
ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
return taskAssignment;
}
+ @Override
+ public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext
jobContext, Set<Integer> allPartitions) {
Review comment:
Since this is same implementation as generic one, can we have this
implementation as default implement in abstract class? Then if we need
difference implementation, let override it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]