Repository: incubator-gobblin Updated Branches: refs/heads/master 52bf10de7 -> 97e29f436
[GOBBLIN-302] Handle stuck Helix workflow Closes #2157 from arjun4084346/stuckHelixJob Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/97e29f43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/97e29f43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/97e29f43 Branch: refs/heads/master Commit: 97e29f436db8a5949c9d7b593c80097a8952ab68 Parents: 52bf10d Author: Arjun <ab...@linkedin.com> Authored: Thu Feb 22 09:40:51 2018 -0800 Committer: Hung Tran <hut...@linkedin.com> Committed: Thu Feb 22 09:40:51 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 4 ++ .../cluster/GobblinHelixJobLauncher.java | 41 ++++++++++++++++++-- .../runtime/GobblinMultiTaskAttempt.java | 12 +++++- .../java/org/apache/gobblin/runtime/Task.java | 30 +++++++++++++- 4 files changed, 80 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index d07d740..c80ceaf 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -108,6 +108,10 @@ public class ConfigurationKeys { public static final String SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY = "scheduler.wait.for.job.completion"; public static final String DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION = Boolean.TRUE.toString(); + public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "job.timeout.enabled"; + public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false"; + public static final String HELIX_JOB_TIMEOUT_SECONDS = "job.timeout.seconds"; + public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800"; /** * Task executor and state tracker configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index af15469..62c9b3f 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -107,6 +107,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final TaskDriver helixTaskDriver; private final String helixQueueName; private final String jobResourceName; + private JobListener jobListener; private final FileSystem fs; private final Path appWorkDir; @@ -167,6 +168,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir); + startCancellationExecutor(); } @Override @@ -290,6 +292,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { public void launchJob(@Nullable JobListener jobListener) throws JobException { + this.jobListener = jobListener; boolean isLaunched = false; this.runningMap.putIfAbsent(this.jobContext.getJobName(), false); try { @@ -359,8 +362,14 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { return workUnitFile.toString(); } - private void waitForJobCompletion() throws InterruptedException { - while (true) { + private void waitForJobCompletion() throws InterruptedException { + LOGGER.info("Waiting for job to complete..."); + boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, + ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); + long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, + ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); + long endTime = System.currentTimeMillis() + timeoutInSeconds*1000; + while (!timeoutEnabled || System.currentTimeMillis() <= endTime) { WorkflowContext workflowContext = TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName); if (workflowContext != null) { org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(this.jobResourceName); @@ -370,9 +379,35 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { return; } } - Thread.sleep(1000); } + helixTaskDriverWaitToStop(this.helixQueueName, 10L); + try { + cancelJob(this.jobListener); + } catch (JobException e) { + throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e); + } + this.helixTaskDriver.resume(this.helixQueueName); + LOGGER.info("stopped the queue, deleted the job"); + } + + /** + * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4 + * is not available in currently used version 0.6.9 + */ + private void helixTaskDriverWaitToStop(String workflow, long timeoutInSeconds) throws InterruptedException { + this.helixTaskDriver.stop(workflow); + long endTime = System.currentTimeMillis() + timeoutInSeconds*1000; + while (System.currentTimeMillis() <= endTime) { + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName); + if (workflowContext == null || workflowContext.getWorkflowState() + .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) { + Thread.sleep(1000); + } else { + LOGGER.info("Successfully stopped the queue"); + return; + } + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java index aa42121..e5643c0 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java @@ -202,9 +202,17 @@ public class GobblinMultiTaskAttempt { task.shutdown(); } - for (Task task: this.tasks) { + for (Task task : this.tasks) { task.awaitShutdown(1000); } + + for (Task task : this.tasks) { + if (task.cancel()) { + log.info("Task {} cancelled.", task.getTaskId()); + } else { + log.info("Task {} could not be cancelled.", task.getTaskId()); + } + } } private void persistTaskStateStore() @@ -343,8 +351,8 @@ public class GobblinMultiTaskAttempt { // Create a new task from the work unit and submit the task to run Task task = createTaskRunnable(workUnitState, countDownLatch); this.taskStateTracker.registerNewTask(task); + task.setTaskFuture(this.taskExecutor.submit(task)); tasks.add(task); - this.taskExecutor.execute(task); } new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build() http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index c3c1b99..52b0960 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -150,6 +150,7 @@ public class Task implements TaskIFace { private final AtomicBoolean shutdownRequested; private volatile long shutdownRequestedTime = Long.MAX_VALUE; private final CountDownLatch shutdownLatch; + private Future<?> taskFuture; /** * Instantiate a new {@link Task}. @@ -364,8 +365,15 @@ public class Task implements TaskIFace { } catch (Throwable t) { failTask(t); } finally { - this.taskStateTracker.onTaskRunCompletion(this); - completeShutdown(); + synchronized (this) { + if (this.taskFuture == null || !this.taskFuture.isCancelled()) { + this.taskStateTracker.onTaskRunCompletion(this); + completeShutdown(); + this.taskFuture = null; + } else { + LOG.info("will not decrease count down latch as this task is cancelled"); + } + } } } @@ -952,4 +960,22 @@ public class Task implements TaskIFace { } return true; } + + public synchronized void setTaskFuture(Future<?> taskFuture) { + this.taskFuture = taskFuture; + } + + /** + * return true if the task is successfully cancelled. + * @return + */ + public synchronized boolean cancel() { + if (this.taskFuture != null && this.taskFuture.cancel(true)) { + this.taskStateTracker.onTaskRunCompletion(this); + this.completeShutdown(); + return true; + } else { + return false; + } + } }