alirezazamani commented on a change in pull request #1006:
URL: https://github.com/apache/helix/pull/1006#discussion_r426909636



##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
##########
@@ -96,4 +102,60 @@ public void testJobSubmitGenericWorkflows() throws 
InterruptedException {
 
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testQueueParallelJobs() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder()
+        
.setWorkflowId(queueName).setParallelJobs(3).setAllowOverlapJobAssignment(true);
+    
_driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "10000"));
+
+    // Add 4 jobs to the queue
+    for (int i = 0; i <= 3; i++) {

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
##########
@@ -146,8 +146,13 @@ public boolean finishJob(String job) {
     }
     // Add finished job's successors to ready-list
     if (_isJobQueue) {
-      if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
-        
_readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next());
+      while (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
+        String nextJob = _parentsToChildren.get(_lastJob).iterator().next();
+        if (!_readyJobList.contains(nextJob)) {

Review comment:
       I understand you concern about time complexity. However, if you look at 
the whole structure of the code, finish job is only called once and only if the 
job is finished. Also parallel jobs is not usually set to a large value, I 
think we are safe here. Also note that we need that for correctness of TF.

##########
File path: 
helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
##########
@@ -96,4 +102,60 @@ public void testJobSubmitGenericWorkflows() throws 
InterruptedException {
 
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testQueueParallelJobs() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder()
+        
.setWorkflowId(queueName).setParallelJobs(3).setAllowOverlapJobAssignment(true);
+    
_driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "10000"));
+
+    // Add 4 jobs to the queue
+    for (int i = 0; i <= 3; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+    for (int i = 0; i <= 3; i++) {
+      _driver.pollForJobState(queueName, 
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+          TaskState.COMPLETED);
+    }
+
+    // Stop the Controller
+    _controller.syncStop();
+
+    // Add 3 more jobs to the queue which should run in parallel after the 
Controller is started
+    for (int i = 4; i <= 6; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Start the Controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // Wait until all of the newly added jobs (Job4 to Job6) are finished
+    for (int i = 4; i <= 6; i++) {
+      _driver.pollForJobState(queueName, 
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+          TaskState.COMPLETED);
+    }
+
+    // Make sure the jobs have been running in parallel by checking the jobs 
start time and finish
+    // time
+    Set<Long> startTimes = new HashSet<>();
+    Set<Long> endTime = new HashSet<>();
+
+    for (int i = 4; i <= 6; i++) {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(queueName, "JOB" 
+ i));
+      startTimes.add(jobContext.getStartTime());
+      endTime.add(jobContext.getFinishTime());
+    }
+    Assert.assertTrue(Collections.min(endTime) > Collections.max(startTimes));

Review comment:
       Changed it to one long and get minimum/maximum in every loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to