Add new job option to allow contining a job even its direct dependent job fails.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/be660245
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/be660245
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/be660245

Branch: refs/heads/helix-0.6.x
Commit: be660245fc1a9f4b22fba58c4b25a1af19555066
Parents: 579d82f
Author: Lei Xia <l...@linkedin.com>
Authored: Wed Jan 27 10:10:31 2016 -0800
Committer: Lei Xia <l...@linkedin.com>
Committed: Tue Jul 5 14:44:56 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   |  26 +-
 .../org/apache/helix/task/JobRebalancer.java    |  26 +-
 .../org/apache/helix/task/TaskRebalancer.java   |  79 +++++-
 .../org/apache/helix/task/WorkflowConfig.java   |  23 +-
 .../apache/helix/task/WorkflowRebalancer.java   |  28 +-
 .../apache/helix/integration/task/MockTask.java |   2 +-
 .../helix/integration/task/TaskTestUtil.java    |   7 +-
 .../task/TestJobFailureDependence.java          | 283 +++++++++++++++++++
 .../task/TestRunJobsWithMissingTarget.java      |  41 ++-
 9 files changed, 469 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java 
b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 37a2f35..65a9caf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -96,6 +96,11 @@ public class JobConfig {
     TASK_RETRY_DELAY("TaskRetryDelay"),
 
     /**
+     * Whether failure of directly dependent jobs should fail this job.
+     */
+    IGNORE_DEPENDENT_JOB_FAILURE("IgnoreDependentJobFailure"),
+
+    /**
      * The individual task configurations, if any *
      */
     TASK_CONFIGS("TaskConfigs"),
@@ -124,6 +129,7 @@ public class JobConfig {
   public static final int DEFAULT_FAILURE_THRESHOLD = 0;
   public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0;
   public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false;
+  public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false;
 
   private final String _workflow;
   private final String _targetResource;
@@ -138,13 +144,14 @@ public class JobConfig {
   private final int _failureThreshold;
   private final long _retryDelay;
   private final boolean _disableExternalView;
+  private final boolean _ignoreDependentJobFailure;
   private final Map<String, TaskConfig> _taskConfigMap;
 
   private JobConfig(String workflow, String targetResource, List<String> 
targetPartitions,
       Set<String> targetPartitionStates, String command, Map<String, String> 
jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int 
maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
-      boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) {
+      boolean disableExternalView, boolean ignoreDependentJobFailure, 
Map<String, TaskConfig> taskConfigMap) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -158,6 +165,7 @@ public class JobConfig {
     _failureThreshold = failureThreshold;
     _retryDelay = retryDelay;
     _disableExternalView = disableExternalView;
+    _ignoreDependentJobFailure = ignoreDependentJobFailure;
     if (taskConfigMap != null) {
       _taskConfigMap = taskConfigMap;
     } else {
@@ -217,6 +225,8 @@ public class JobConfig {
     return _disableExternalView;
   }
 
+  public boolean isIgnoreDependentJobFailure() { return 
_ignoreDependentJobFailure; }
+
   public Map<String, TaskConfig> getTaskConfigMap() {
     return _taskConfigMap;
   }
@@ -260,6 +270,8 @@ public class JobConfig {
         Boolean.toString(_disableExternalView));
     cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(),
         "" + _numConcurrentTasksPerInstance);
+    cfgMap.put(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value(),
+        Boolean.toString(_ignoreDependentJobFailure));
     return cfgMap;
   }
 
@@ -281,6 +293,7 @@ public class JobConfig {
     private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
     private long _retryDelay = DEFAULT_TASK_RETRY_DELAY;
     private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW;
+    private boolean _ignoreDependentJobFailure = 
DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE;
 
     public JobConfig build() {
       validate();
@@ -288,7 +301,7 @@ public class JobConfig {
       return new JobConfig(_workflow, _targetResource, _targetPartitions, 
_targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, 
_numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, 
_failureThreshold, _retryDelay,
-          _disableExternalView, _taskConfigMap);
+          _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap);
     }
 
     /**
@@ -346,6 +359,10 @@ public class JobConfig {
         b.setDisableExternalView(
             
Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value())));
       }
+      if 
(cfg.containsKey(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())) {
+        b.setIgnoreDependentJobFailure(
+            
Boolean.valueOf(cfg.get(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())));
+      }
       return b;
     }
 
@@ -414,6 +431,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setIgnoreDependentJobFailure(boolean 
ignoreDependentJobFailure) {
+      _ignoreDependentJobFailure = ignoreDependentJobFailure;
+      return this;
+    }
+
     public Builder addTaskConfigs(List<TaskConfig> taskConfigs) {
       if (taskConfigs != null) {
         for (TaskConfig taskConfig : taskConfigs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 7eeafc7..5b41773 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -86,22 +86,28 @@ public class JobRebalancer extends TaskRebalancer {
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
+    // Stop current run of the job if workflow or job is already in final 
state (failed or completed)
+    TaskState workflowState = workflowCtx.getWorkflowState();
     TaskState jobState = workflowCtx.getJobState(jobName);
     // The job is already in a final state (completed/failed).
-    if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
-      LOG.info("Job " + jobName + " is failed or already completed, clean up 
IS.");
+    if (workflowState == TaskState.FAILED || workflowState == 
TaskState.COMPLETED ||
+        jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
+      LOG.info(String.format(
+          "Workflow %s or job %s is already failed or completed, workflow 
state (%s), job state (%s), clean up job IS.",
+          workflowResource, jobName, workflowState, jobState));
       cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       _scheduledRebalancer.removeScheduledRebalance(jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
     if (!isWorkflowReadyForSchedule(workflowCfg)) {
-      LOG.info("Job is not ready to be scheduled since workflow is not ready " 
+ jobName);
+      LOG.info("Job is not ready to be run since workflow is not ready " + 
jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
-    if (!isJobReadyToSchedule(jobName, workflowCfg, workflowCtx)) {
-      LOG.info("Job is not ready to be scheduled " + jobName);
+    if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, 
workflowCfg,
+        workflowCtx)) {
+      LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -429,16 +435,6 @@ public class JobRebalancer extends TaskRebalancer {
     return ra;
   }
 
-  private void markJobFailed(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
-    long currentTime = System.currentTimeMillis();
-    workflowContext.setJobState(jobName, TaskState.FAILED);
-    jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig)) {
-      workflowContext.setFinishTime(currentTime);
-    }
-  }
-
   private void markJobComplete(String jobName, JobContext jobContext,
       WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
     long currentTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index b006efc..6aaeb5f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -74,13 +74,17 @@ public abstract class TaskRebalancer implements Rebalancer, 
MappingCalculator {
    */
   protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig 
cfg) {
     boolean incomplete = false;
+    int failedJobs = 0;
     for (String job : cfg.getJobDag().getAllNodes()) {
       TaskState jobState = ctx.getJobState(job);
       if (jobState == TaskState.FAILED) {
-        ctx.setWorkflowState(TaskState.FAILED);
-        return true;
+        failedJobs ++;
+        if (failedJobs > cfg.getFailureThreshold()) {
+          ctx.setWorkflowState(TaskState.FAILED);
+          return true;
+        }
       }
-      if (jobState != TaskState.COMPLETED) {
+      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED) {
         incomplete = true;
       }
     }
@@ -136,31 +140,78 @@ public abstract class TaskRebalancer implements 
Rebalancer, MappingCalculator {
   protected boolean isJobReadyToSchedule(String job, WorkflowConfig 
workflowCfg,
       WorkflowContext workflowCtx) {
     int notStartedCount = 0;
-    int inCompleteCount = 0;
     int failedCount = 0;
 
-    for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
-      TaskState jobState = workflowCtx.getJobState(ancestor);
+    for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
+      TaskState jobState = workflowCtx.getJobState(parent);
       if (jobState == null || jobState == TaskState.NOT_STARTED) {
         ++notStartedCount;
-      } else if (jobState == TaskState.IN_PROGRESS || jobState == 
TaskState.STOPPED) {
-        ++inCompleteCount;
-      } else if (jobState == TaskState.FAILED) {
+      }
+      if (jobState == TaskState.FAILED) {
         ++failedCount;
       }
     }
 
-    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()
-        || failedCount > 0) {
-      LOG.debug(String.format(
-          "Job %s is not ready to start, notStartedParent(s)=%d, 
inCompleteParent(s)=%d, failedParent(s)=%d.",
-          job, notStartedCount, inCompleteCount, failedCount));
+    if (notStartedCount > 0) {
+      LOG.debug(String
+          .format("Job %s is not ready to start, notStartedParent(s)=%d.", 
job, notStartedCount));
+      return false;
+    }
+
+    JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
+    if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
+      markJobFailed(job, null, workflowCfg, workflowCtx);
+      LOG.debug(
+          String.format("Job %s is not ready to start, failedCount(s)=%d.", 
job, failedCount));
+      return false;
+    }
+
+    int inCompleteCount = getInCompleteJobCount(workflowCfg, workflowCtx);
+    if (inCompleteCount >= workflowCfg.getParallelJobs()) {
+      LOG.debug(String
+          .format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", 
job, inCompleteCount));
       return false;
     }
 
     return true;
   }
 
+  protected boolean isJobStarted(String job, WorkflowContext workflowContext) {
+    TaskState jobState = workflowContext.getJobState(job);
+    return (jobState != null && jobState != TaskState.NOT_STARTED);
+  }
+
+  /**
+   * Count the number of jobs in a workflow that are in progress.
+   *
+   * @param workflowCfg
+   * @param workflowCtx
+   * @return
+   */
+  protected int getInCompleteJobCount(WorkflowConfig workflowCfg, 
WorkflowContext workflowCtx) {
+    int inCompleteCount = 0;
+    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+      TaskState jobState = workflowCtx.getJobState(jobName);
+      if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+        ++inCompleteCount;
+      }
+    }
+
+    return inCompleteCount;
+  }
+
+  protected void markJobFailed(String jobName, JobContext jobContext, 
WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.FAILED);
+    if (jobContext != null) {
+      jobContext.setFinishTime(currentTime);
+    }
+    if (isWorkflowFinished(workflowContext, workflowConfig)) {
+      workflowContext.setFinishTime(currentTime);
+    }
+  }
+
   /**
    * Check if a workflow is ready to schedule.
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 4c81654..955cb77 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -40,9 +40,11 @@ public class WorkflowConfig {
   public static final String RECURRENCE_UNIT = "RecurrenceUnit";
   public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
   public static final String TERMINABLE = "Terminable";
+  public static final String FAILURE_THRESHOLD = "FailureThreshold";
 
   /* Default values */
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
+  public static final int DEFAULT_FAILURE_THRESHOLD = 0;
 
   /* Member variables */
   // TODO: jobDag should not be in the workflowConfig.
@@ -56,13 +58,15 @@ public class WorkflowConfig {
   private final long _expiry;
   private final boolean _terminable;
   private final ScheduleConfig _scheduleConfig;
+  private final int _failureThreshold;
 
   protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState 
targetState, long expiry,
-      boolean terminable, ScheduleConfig scheduleConfig) {
+      int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig) 
{
     _jobDag = jobDag;
     _parallelJobs = parallelJobs;
     _targetState = targetState;
     _expiry = expiry;
+    _failureThreshold = failureThreshold;
     _terminable = terminable;
     _scheduleConfig = scheduleConfig;
   }
@@ -83,6 +87,10 @@ public class WorkflowConfig {
     return _expiry;
   }
 
+  public int getFailureThreshold() {
+    return _failureThreshold;
+  }
+
   public boolean isTerminable() {
     return _terminable;
   }
@@ -128,6 +136,7 @@ public class WorkflowConfig {
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
     cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
     cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable()));
+    cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, 
String.valueOf(getFailureThreshold()));
 
     // Populate schedule if present
     ScheduleConfig scheduleConfig = getScheduleConfig();
@@ -151,13 +160,15 @@ public class WorkflowConfig {
     private int _parallelJobs = 1;
     private TargetState _targetState = TargetState.START;
     private long _expiry = DEFAULT_EXPIRY;
+    private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD;
     private boolean _isTerminable = true;
     private ScheduleConfig _scheduleConfig;
 
     public WorkflowConfig build() {
       validate();
 
-      return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, 
_expiry, _isTerminable, _scheduleConfig);
+      return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, 
_expiry, _failureThreshold,
+          _isTerminable, _scheduleConfig);
     }
 
     public Builder() {}
@@ -191,6 +202,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setFailureThreshold(int failureThreshold) {
+      _failureThreshold = failureThreshold;
+      return this;
+    }
+
     public Builder setTerminable(boolean isTerminable) {
       _isTerminable = isTerminable;
       return this;
@@ -211,6 +227,9 @@ public class WorkflowConfig {
       if (cfg.containsKey(EXPIRY)) {
         b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
       }
+      if (cfg.containsKey(FAILURE_THRESHOLD)) {
+        b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD)));
+      }
       if (cfg.containsKey(DAG)) {
         b.setJobDag(JobDag.fromJson(cfg.get(DAG)));
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 05b6dc6..682ac77 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -30,7 +30,13 @@ import org.apache.log4j.Logger;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
 
 /**
  * Custom rebalancer implementation for the {@code Workflow} in task state 
model.
@@ -63,7 +69,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the 
workflow context.");
-      cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+      cleanupWorkflow(workflow, workflowCfg);
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -91,7 +97,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       // Check if this workflow has been finished past its expiry.
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
         LOG.info("Workflow " + workflow + " passed expiry time, cleaning up 
the workflow context.");
-        cleanupWorkflow(workflow, workflowCfg, workflowCtx);
+        cleanupWorkflow(workflow, workflowCfg);
       } else {
         // schedule future cleanup work
         long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
@@ -113,7 +119,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
         scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx);
     if (isReady) {
       // Schedule jobs from this workflow.
-      scheduleJobs(workflowCfg, workflowCtx);
+      scheduleJobs(workflow, workflowCfg, workflowCtx);
     } else {
       LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
     }
@@ -126,23 +132,32 @@ public class WorkflowRebalancer extends TaskRebalancer {
    * Figure out whether the jobs in the workflow should be run,
    * and if it's ready, then just schedule it
    */
-  private void scheduleJobs(WorkflowConfig workflowCfg, WorkflowContext 
workflowCtx) {
+  private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, 
WorkflowContext workflowCtx) {
     ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
     if (scheduleConfig != null && scheduleConfig.isRecurring()) {
       LOG.debug("Jobs from recurring workflow are not schedule-able");
       return;
     }
 
+    int scheduledJobs = 0;
     for (String job : workflowCfg.getJobDag().getAllNodes()) {
       TaskState jobState = workflowCtx.getJobState(job);
       if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
         LOG.debug("Job " + job + " is already started or completed.");
         continue;
       }
+
+      if (scheduledJobs >= workflowCfg.getParallelJobs()) {
+        LOG.debug(String.format("Workflow %s already have enough job in 
progress, "
+                + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, 
scheduledJobs));
+        break;
+      }
+
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
         JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
         scheduleSingleJob(job, jobConfig);
+        scheduledJobs++;
       }
     }
   }
@@ -382,8 +397,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
    * Cleans up workflow configs and workflow contexts associated with this 
workflow,
    * including all job-level configs and context, plus workflow-level 
information.
    */
-  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg,
-      WorkflowContext workflowCtx) {
+  private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) {
     LOG.info("Cleaning up workflow: " + workflow);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java 
b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 71fa12d..dad9949 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -39,7 +39,7 @@ public class MockTask implements Task {
     if (cfg == null) {
       cfg = Collections.emptyMap();
     }
-    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? 
Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? 
Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 100L;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java 
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 9796497..011f532 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -258,7 +258,7 @@ public class TaskTestUtil {
     return buildRecurrentJobQueue(jobQueueName, 0);
   }
 
-  public static JobQueue.Builder buildJobQueue(String jobQueueName, int 
delayStart) {
+  public static JobQueue.Builder buildJobQueue(String jobQueueName, int 
delayStart, int failureThreshold) {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
     Calendar cal = Calendar.getInstance();
@@ -267,10 +267,13 @@ public class TaskTestUtil {
     cal.set(Calendar.MILLISECOND, 0);
     cfgMap.put(WorkflowConfig.START_TIME,
         WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    if (failureThreshold > 0) {
+      cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, 
String.valueOf(failureThreshold));
+    }
     return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
   }
 
   public static JobQueue.Builder buildJobQueue(String jobQueueName) {
-    return buildJobQueue(jobQueueName, 0);
+    return buildJobQueue(jobQueueName, 0, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
new file mode 100644
index 0000000..9e2456c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
@@ -0,0 +1,283 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestJobFailureDependence extends ZkIntegrationTestBase {
+  private static final Logger LOG = 
Logger.getLogger(TestJobFailureDependence.class);
+  private static final int num_nodes = 5;
+  private static final int num_dbs = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + 
getShortClassName();
+  private final MockParticipantManager[] _participants = new 
MockParticipantManager[num_nodes];
+  private ClusterControllerManager _controller;
+  private ClusterSetup _setupTool;
+
+  private List<String> _test_dbs = new ArrayList<String>();
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < num_nodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target dbs
+    for (int i = 0; i < num_dbs; i++) {
+      String db = "TestDB" + i;
+      _setupTool
+          .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, 
MASTER_SLAVE_STATE_MODEL,
+              IdealState.RebalanceMode.FULL_AUTO.toString());
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+      _test_dbs.add(db);
+    }
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, 
TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < num_nodes; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = 
_participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, 
ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, 
CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    _controller.syncStop();
+    for (int i = 0; i < num_nodes; i++) {
+      _participants[i].syncStop();
+    }
+  }
+
+  @Test
+  public void testJobDependantFailure() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 
100);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+    // all jobs after failed job should fail too.
+    for (int i = 2; i < num_dbs; i++) {
+      String namedSpaceJob = String.format("%s_%s", queueName, 
currentJobNames.get(i));
+      TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, 
TaskState.FAILED);
+    }
+  }
+
+  @Test
+  public void testJobDependantWorkflowFailure() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, 
currentJobNames.get(2));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, 
TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+  }
+
+  @Test
+  public void testIgnoreJobDependantFailure() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 
100);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              
.setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+    String namedSpaceJob2 = String.format("%s_%s", queueName, 
currentJobNames.get(2));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob2, 
TaskState.FAILED);
+
+    // all jobs after failed job should complete.
+    for (int i = 3; i < num_dbs; i++) {
+      String namedSpaceJob = String.format("%s_%s", queueName, 
currentJobNames.get(i));
+      TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, 
TaskState.COMPLETED);
+    }
+  }
+
+  @Test
+  public void testWorkflowFailureJobThreshold() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 
3);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              
.setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, 
currentJobNames.get(1));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, 
TaskState.FAILED);
+    String lastJob =
+        String.format("%s_%s", queueName, 
currentJobNames.get(currentJobNames.size() - 1));
+    TaskTestUtil.pollForJobState(_driver, queueName, lastJob, 
TaskState.COMPLETED);
+
+    _driver.flushQueue(queueName);
+
+    WorkflowConfig currentWorkflowConfig = 
_driver.getWorkflowConfig(queueName);
+    WorkflowConfig.Builder configBuilder = new 
WorkflowConfig.Builder(currentWorkflowConfig);
+
+    configBuilder.setFailureThreshold(0);
+    _driver.updateWorkflow(queueName, configBuilder.build());
+    _driver.stop(queueName);
+
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              
.setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      _driver.enqueueJob(queueName, jobName, jobConfig);
+    }
+
+    _driver.resume(queueName);
+
+    namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, 
TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 101604b..7eeb3f4 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -140,6 +140,7 @@ public class TestRunJobsWithMissingTarget extends 
ZkIntegrationTestBase {
     for (int i = 0; i < num_nodes; i++) {
       _participants[i].syncStop();
     }
+    _setupTool.deleteCluster(CLUSTER_NAME);
   }
 
   @Test
@@ -161,12 +162,44 @@ public class TestRunJobsWithMissingTarget extends 
ZkIntegrationTestBase {
       currentJobNames.add(jobName);
     }
 
-    _driver.start(queueBuilder.build());
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+    _driver.start(queueBuilder.build());
 
-    String namedSpaceJob1 = String.format("%s_%s", queueName, 
currentJobNames.get(2));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, 
TaskState.FAILED);
+    String namedSpaceJob = String.format("%s_%s", queueName, 
currentJobNames.get(2));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, 
TaskState.FAILED);
     TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+
+    _driver.delete(queueName);
+  }
+
+  @Test
+  public void testJobContinueUponParentJobFailure() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 
3);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              
.setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, 
currentJobNames.get(1));
+    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, 
TaskState.FAILED);
+    String lastJob =
+        String.format("%s_%s", queueName, 
currentJobNames.get(currentJobNames.size() - 1));
+    TaskTestUtil.pollForJobState(_driver, queueName, lastJob, 
TaskState.COMPLETED);
+
+    _driver.delete(queueName);
   }
 
   @Test
@@ -193,5 +226,7 @@ public class TestRunJobsWithMissingTarget extends 
ZkIntegrationTestBase {
     String namedSpaceJob1 = String.format("%s_%s", queueName, 
currentJobNames.get(0));
     TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, 
TaskState.FAILED);
     TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+
+    _driver.delete(queueName);
   }
 }

Reply via email to