Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 52bf10de7 -> 97e29f436


[GOBBLIN-302] Handle stuck Helix workflow

Closes #2157 from arjun4084346/stuckHelixJob


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/97e29f43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/97e29f43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/97e29f43

Branch: refs/heads/master
Commit: 97e29f436db8a5949c9d7b593c80097a8952ab68
Parents: 52bf10d
Author: Arjun <ab...@linkedin.com>
Authored: Thu Feb 22 09:40:51 2018 -0800
Committer: Hung Tran <hut...@linkedin.com>
Committed: Thu Feb 22 09:40:51 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  4 ++
 .../cluster/GobblinHelixJobLauncher.java        | 41 ++++++++++++++++++--
 .../runtime/GobblinMultiTaskAttempt.java        | 12 +++++-
 .../java/org/apache/gobblin/runtime/Task.java   | 30 +++++++++++++-
 4 files changed, 80 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d07d740..c80ceaf 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -108,6 +108,10 @@ public class ConfigurationKeys {
   public static final String SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY = 
"scheduler.wait.for.job.completion";
   public static final String DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION = 
Boolean.TRUE.toString();
 
+  public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = 
"job.timeout.enabled";
+  public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
+  public static final String HELIX_JOB_TIMEOUT_SECONDS = "job.timeout.seconds";
+  public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
 
   /**
    * Task executor and state tracker configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index af15469..62c9b3f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -107,6 +107,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final TaskDriver helixTaskDriver;
   private final String helixQueueName;
   private final String jobResourceName;
+  private JobListener jobListener;
 
   private final FileSystem fs;
   private final Path appWorkDir;
@@ -167,6 +168,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     this.taskStateCollectorService = new TaskStateCollectorService(jobProps, 
this.jobContext.getJobState(),
         this.eventBus, this.stateStores.getTaskStateStore(), 
outputTaskStateDir);
+    startCancellationExecutor();
   }
 
   @Override
@@ -290,6 +292,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
   public void launchJob(@Nullable JobListener jobListener)
       throws JobException {
+    this.jobListener = jobListener;
     boolean isLaunched = false;
     this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);
     try {
@@ -359,8 +362,14 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     return workUnitFile.toString();
   }
 
-  private void waitForJobCompletion() throws InterruptedException {
-    while (true) {
+  private void waitForJobCompletion()  throws InterruptedException {
+    LOGGER.info("Waiting for job to complete...");
+    boolean timeoutEnabled = 
Boolean.parseBoolean(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+    long timeoutInSeconds = 
Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+        ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+    long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
+    while (!timeoutEnabled || System.currentTimeMillis() <= endTime) {
       WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
       if (workflowContext != null) {
         org.apache.helix.task.TaskState helixJobState = 
workflowContext.getJobState(this.jobResourceName);
@@ -370,9 +379,35 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
           return;
         }
       }
-
       Thread.sleep(1000);
     }
+    helixTaskDriverWaitToStop(this.helixQueueName, 10L);
+    try {
+      cancelJob(this.jobListener);
+    } catch (JobException e) {
+      throw new RuntimeException("Unable to cancel job " + 
jobContext.getJobName() + ": ", e);
+    }
+    this.helixTaskDriver.resume(this.helixQueueName);
+    LOGGER.info("stopped the queue, deleted the job");
+  }
+
+  /**
+   * Because fix 
https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4
+   * is not available in currently used version 0.6.9
+   */
+  private void helixTaskDriverWaitToStop(String workflow, long 
timeoutInSeconds) throws InterruptedException {
+    this.helixTaskDriver.stop(workflow);
+    long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
+    while (System.currentTimeMillis() <= endTime) {
+      WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName);
+      if (workflowContext == null || workflowContext.getWorkflowState()
+          .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) {
+        Thread.sleep(1000);
+      } else {
+        LOGGER.info("Successfully stopped the queue");
+        return;
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index aa42121..e5643c0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -202,9 +202,17 @@ public class GobblinMultiTaskAttempt {
       task.shutdown();
     }
 
-    for (Task task: this.tasks) {
+    for (Task task : this.tasks) {
       task.awaitShutdown(1000);
     }
+
+    for (Task task : this.tasks) {
+      if (task.cancel()) {
+        log.info("Task {} cancelled.", task.getTaskId());
+      } else {
+        log.info("Task {} could not be cancelled.", task.getTaskId());
+      }
+    }
   }
 
   private void persistTaskStateStore()
@@ -343,8 +351,8 @@ public class GobblinMultiTaskAttempt {
       // Create a new task from the work unit and submit the task to run
       Task task = createTaskRunnable(workUnitState, countDownLatch);
       this.taskStateTracker.registerNewTask(task);
+      task.setTaskFuture(this.taskExecutor.submit(task));
       tasks.add(task);
-      this.taskExecutor.execute(task);
     }
 
     new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), 
"gobblin.runtime").build()

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/97e29f43/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index c3c1b99..52b0960 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -150,6 +150,7 @@ public class Task implements TaskIFace {
   private final AtomicBoolean shutdownRequested;
   private volatile long shutdownRequestedTime = Long.MAX_VALUE;
   private final CountDownLatch shutdownLatch;
+  private Future<?> taskFuture;
 
   /**
    * Instantiate a new {@link Task}.
@@ -364,8 +365,15 @@ public class Task implements TaskIFace {
     } catch (Throwable t) {
       failTask(t);
     } finally {
-      this.taskStateTracker.onTaskRunCompletion(this);
-      completeShutdown();
+      synchronized (this) {
+        if (this.taskFuture == null || !this.taskFuture.isCancelled()) {
+          this.taskStateTracker.onTaskRunCompletion(this);
+          completeShutdown();
+          this.taskFuture = null;
+        } else {
+          LOG.info("will not decrease count down latch as this task is 
cancelled");
+        }
+      }
     }
   }
 
@@ -952,4 +960,22 @@ public class Task implements TaskIFace {
     }
     return true;
   }
+
+  public synchronized void setTaskFuture(Future<?> taskFuture) {
+    this.taskFuture = taskFuture;
+  }
+
+  /**
+   * return true if the task is successfully cancelled.
+   * @return
+   */
+  public synchronized boolean cancel() {
+    if (this.taskFuture != null && this.taskFuture.cancel(true)) {
+      this.taskStateTracker.onTaskRunCompletion(this);
+      this.completeShutdown();
+      return true;
+    } else {
+      return false;
+    }
+  }
 }

Reply via email to