jiajunwang commented on a change in pull request #1439:
URL: https://github.com/apache/helix/pull/1439#discussion_r502646181



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -77,6 +76,11 @@
   /** Default time out for monitoring workflow or job state */
   private final static int DEFAULT_TIMEOUT = 5 * 60 * 1000; /* 5 mins */
 
+  /** The illegal job states for the jobs to accept new task */
+  private static Set<TaskState> illegalJobStatesForTaskAddition = new 
HashSet<>(

Review comment:
       nit, private final static... please
   Also, in this case, the var name should be 
ILLEGAL_JOB_STATES_FOR_TASK_ADDITION (or TASKS_MODIFICATION?)

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +530,163 @@ public void enqueueJobs(final String queue, final 
List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started 
yet. Timeout for this
+   * operation is default timeout

Review comment:
       "default timeout" is not clear enough.
   Please mention the real timeout here. Maybe use java doc feature to link 
with the variable would be a good idea. But I'm not sure if it works fine. 
please have a try.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +530,163 @@ public void enqueueJobs(final String queue, final 
List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started 
yet. Timeout for this
+   * operation is default timeout
+   * Note1: Task cannot be added if the job is in an illegal state. The states 
that job can accept
+   * new task is if the job is in progress or the job has not started yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws 
and exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check 
whether the task has
+   * been successfully added ot not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @throws Exception
+   */
+  public void addTask(String workflowName, String jobName, TaskConfig 
taskConfig) throws Exception {
+    addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job or the job which is not started 
yet
+   * Note1: Task may cannot be added if the job is in an illegal state. The 
states that job can
+   * accept new task is if the job is in progress or the job has not started 
yet.
+   * Note2: The job can only be added to non-targeted jobs.
+   * Note3: The taskID for new task should be unique. If not, this API throws 
and exception.
+   * Note4: In case of timeout exception, it is user's responsibility to check 
whether the task has
+   * been successfully added ot not.
+   * Note5: timeout is the time that this API checks whether the task has been 
successfully added or
+   * not.
+   * @param workflowName
+   * @param jobName
+   * @param taskConfig
+   * @param timeout

Review comment:
       unit

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
##########
@@ -526,6 +524,123 @@ public void enqueueJobs(final String queue, final 
List<String> jobs,
     }
   }
 
+  /**
+   * Add task to a running (IN-PROGRESS) job with default timeout
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String 
jobName) throws Exception {
+    addTask(taskConfig, workflowName, jobName, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * Add task to a running (IN-PROGRESS) job
+   * @param taskConfig
+   * @param workflowName
+   * @param jobName
+   * @param timeout
+   * @throws Exception
+   */
+  public void addTask(TaskConfig taskConfig, String workflowName, String 
jobName, long timeout)
+      throws Exception {
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, 
workflowName);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    if (workflowConfig == null) {
+      throw new HelixException("Workflow " + workflowName + " config does not 
exist!");
+    }
+
+    String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, 
jobName);
+    JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName);
+    if (jobConfig == null) {
+      throw new HelixException("Job " + nameSpaceJobName + " config does not 
exist!");
+    }
+
+    if (jobConfig.getTargetResource() != null) {
+      throw new HelixException(
+          "Job " + nameSpaceJobName + " is a targeted job. New task cannot be 
added to this job!");
+    }
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext == null) {
+      throw new HelixException("Workflow " + workflowName + " context does not 
exist!");
+    }
+
+    if (taskConfig == null) {
+      throw new IllegalArgumentException("Task cannot be added because 
taskConfig is null!");
+    }
+
+    if (taskConfig.getId() == null) {
+      throw new HelixException(
+          "Task cannot be added because taskID is null!");
+    }
+
+    if (taskConfig.getCommand() == null && jobConfig.getCommand() == null) {
+      throw new HelixException(
+          "Task cannot be added because both of the job and task have null 
command!");
+    }
+
+    if (taskConfig.getCommand() != null && jobConfig.getCommand() != null) {
+      throw new HelixException(
+          "Task cannot be added because command existed for both of job and 
task!");
+    }
+
+    for (String taskEntry : jobConfig.getMapConfigs().keySet()) {
+      if (taskEntry.equals(taskConfig.getId())) {
+        throw new HelixException(
+            "Task cannot be added because another task with the same ID 
already exists!");
+      }
+    }
+
+    Set<TaskState> illegalJobStateForTaskAddition =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, 
TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED, TaskState.COMPLETED, 
TaskState.STOPPED,
+            TaskState.STOPPING));
+
+    TaskState jobState = workflowContext.getJobState(nameSpaceJobName);
+
+    if (jobState == null) {
+      throw new HelixException(
+          "Task cannot be added because JobState " + nameSpaceJobName + " is 
null!");
+    }
+
+    if (illegalJobStateForTaskAddition.contains(jobState)) {
+      throw new HelixException("Job " + nameSpaceJobName
+          + " is in illegal state to accept new task. Job State is " + 
jobState);
+    }
+
+    DataUpdater<ZNRecord> updater = currentData -> {
+      if (currentData != null) {
+        currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      } else {
+        LOG.warn("JobConfig DataUpdater: Fails to update JobConfig. 
CurrentData is null.");
+      }
+      return currentData;
+    };
+
+    String path = 
_accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath();
+    boolean status = _accessor.getBaseDataAccessor().update(path, updater, 
AccessOption.PERSISTENT);
+    if (!status) {
+      LOG.error("Failed to add task to the job {}", nameSpaceJobName);
+      throw new HelixException("Failed to add task to the job");
+    }
+
+    String taskID = taskConfig.getId();
+    while (System.currentTimeMillis() <= endTime) {
+      JobContext jobContext = getJobContext(nameSpaceJobName);
+      for (Map.Entry<String, Integer> entry : 
jobContext.getTaskIdPartitionMap().entrySet()) {
+        if (entry.getKey().equals(taskID) && getWorkflowContext(workflowName)
+            .getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) {
+          return;
+        }
+      }
+      Thread.sleep(1000L);

Review comment:
       If there are other places, then please don't hardcode the time.
   In addition, IMO, the timeout less than 1000ms does not make sense, so we 
shall reject the request. Obviously, we shall mention in the public API that 
timeout less than 1000ms means no wait, or just being rejected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to