lei-xia commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r500479505
##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final
List<String> jobs,
}
}
+ /**
+ * Add task to a running (IN-PROGRESS) job with default timeout
+ * @param taskConfig
+ * @param workflowName
+ * @param jobName
+ * @throws Exception
+ */
+ public void addTask(TaskConfig taskConfig, String workflowName, String
jobName) throws Exception {
+ addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+ }
+
+ /**
+ * Add task to a running (IN-PROGRESS) job
+ * @param taskConfig
+ * @param workflowName
+ * @param jobName
+ * @param timeout
+ * @throws Exception
+ */
+ public void addTask(TaskConfig taskConfig, String workflowName, String
jobName, long timeout)
+ throws Exception {
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor,
workflowName);
+ long endTime = System.currentTimeMillis() + timeout;
+
+ if (workflowConfig == null) {
+ throw new HelixException("Workflow " + workflowName + " config does not
exist!");
+ }
+
+ String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName,
jobName);
+ JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+ if (jobConfig == null) {
+ throw new HelixException("Job " + nameSpaceJobName + " config does not
exist!");
+ }
+
+ if (jobConfig.getTargetResource() != null) {
+ throw new HelixException(
+ "Job " + nameSpaceJobName + " is a targeted job. New task cannot be
added to this job!");
+ }
+
+ WorkflowContext workflowContext = getWorkflowContext(workflowName);
+ if (workflowContext == null) {
+ throw new HelixException("Workflow " + workflowName + " context does not
exist!");
+ }
+
+ if (taskConfig == null) {
+ throw new IllegalArgumentException("Task cannot be added because
taskConfig is null!");
+ }
+
+ if (taskConfig.getId() == null) {
+ throw new HelixException(
+ "Task cannot be added because taskID is null!");
+ }
+
+ if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+ throw new HelixException(
+ "Task cannot be added because both of the job and task have null
command!");
+ }
+
+ if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+ throw new HelixException(
+ "Task cannot be added because command existed for both of job and
task!");
+ }
+
+ for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+ if (taskEntry.equals(taskConfig.getId())) {
+ throw new HelixException(
+ "Task cannot be added because another task with the same ID
already exists!");
+ }
+ }
+
+ Set<TaskState> illegalJobStateForTaskAddition =
+ new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT,
TaskState.FAILING,
+ TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED,
TaskState.STOPPED,
+ TaskState.STOPPING));
+
+ TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+ if (jobState == null) {
+ throw new HelixException(
+ "Task cannot be added because JobState " + nameSpaceJobName + " is
null!");
+ }
+
+ if (illegalJobStateForTaskAddition.contains(jobState)) {
+ throw new HelixException("Job " + nameSpaceJobName
+ + " is in illegal state to accept new task. Job State is " +
jobState);
+ }
+
+ DataUpdater<ZNRecord> updater = currentData -> {
+ if (currentData != null) {
+ currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+ } else {
+ LOG.warn("JobConfig DataUpdater: Fails to update JobConfig.
CurrentData is null.");
+ }
+ return currentData;
+ };
+
+ String path =
_accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+ boolean status = _accessor.getBaseDataAccessor().update(path, updater,
AccessOption.PERSISTENT);
+ if (!status) {
+ LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+ throw new HelixException("Failed to add task to the job");
+ }
+
+ String taskID = taskConfig.getId();
+ while (System.currentTimeMillis() <= endTime) {
+ JobContext jobContext = getJobContext(nameSpaceJobName);
+ for (Map.Entry<String, Integer> entry :
jobContext.getTaskIdPartitionMap().entrySet()) {
+ if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+ .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+ return;
+ }
+ }
+ Thread.sleep(1000L);
+ }
+ throw new HelixException("An unexpected issue happened while task being
added to the job!");
Review comment:
Should we throw a specific TimeoutException instead of a general
exception. What is the caller expected to do if it is timeout, call another
method to verify whether the task has been successfully added, or retry the
addTask()?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]