pkuwm commented on a change in pull request #1006:
URL: https://github.com/apache/helix/pull/1006#discussion_r426842209
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
##########
@@ -96,4 +102,60 @@ public void testJobSubmitGenericWorkflows() throws
InterruptedException {
_driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
}
-}
\ No newline at end of file
+
+ @Test
+ public void testQueueParallelJobs() throws InterruptedException {
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+ WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder()
+
.setWorkflowId(queueName).setParallelJobs(3).setAllowOverlapJobAssignment(true);
+
_driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+ JobConfig.Builder jobBuilder =
+ new
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "10000"));
+
+ // Add 4 jobs to the queue
+ for (int i = 0; i <= 3; i++) {
+ _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+ }
+
+ // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+ for (int i = 0; i <= 3; i++) {
+ _driver.pollForJobState(queueName,
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+ TaskState.COMPLETED);
+ }
+
+ // Stop the Controller
+ _controller.syncStop();
+
+ // Add 3 more jobs to the queue which should run in parallel after the
Controller is started
+ for (int i = 4; i <= 6; i++) {
+ _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+ }
+
+ // Start the Controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ // Wait until all of the newly added jobs (Job4 to Job6) are finished
+ for (int i = 4; i <= 6; i++) {
+ _driver.pollForJobState(queueName,
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+ TaskState.COMPLETED);
+ }
+
+ // Make sure the jobs have been running in parallel by checking the jobs
start time and finish
+ // time
+ Set<Long> startTimes = new HashSet<>();
+ Set<Long> endTime = new HashSet<>();
+
+ for (int i = 4; i <= 6; i++) {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(queueName, "JOB"
+ i));
+ startTimes.add(jobContext.getStartTime());
+ endTime.add(jobContext.getFinishTime());
+ }
+ Assert.assertTrue(Collections.min(endTime) > Collections.max(startTimes));
Review comment:
Could you help understand: If you are only checking one: maxEndtime >
maxStartime, why are HashSets necessary?
##########
File path: helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
##########
@@ -146,8 +146,13 @@ public boolean finishJob(String job) {
}
// Add finished job's successors to ready-list
if (_isJobQueue) {
- if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
-
_readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next());
+ while (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
+ String nextJob = _parentsToChildren.get(_lastJob).iterator().next();
+ if (!_readyJobList.contains(nextJob)) {
Review comment:
What's the time complexity of this operation?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]