Add new job option to allow contining a job even its direct dependent job fails.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/be660245 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/be660245 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/be660245 Branch: refs/heads/helix-0.6.x Commit: be660245fc1a9f4b22fba58c4b25a1af19555066 Parents: 579d82f Author: Lei Xia <l...@linkedin.com> Authored: Wed Jan 27 10:10:31 2016 -0800 Committer: Lei Xia <l...@linkedin.com> Committed: Tue Jul 5 14:44:56 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 26 +- .../org/apache/helix/task/JobRebalancer.java | 26 +- .../org/apache/helix/task/TaskRebalancer.java | 79 +++++- .../org/apache/helix/task/WorkflowConfig.java | 23 +- .../apache/helix/task/WorkflowRebalancer.java | 28 +- .../apache/helix/integration/task/MockTask.java | 2 +- .../helix/integration/task/TaskTestUtil.java | 7 +- .../task/TestJobFailureDependence.java | 283 +++++++++++++++++++ .../task/TestRunJobsWithMissingTarget.java | 41 ++- 9 files changed, 469 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 37a2f35..65a9caf 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -96,6 +96,11 @@ public class JobConfig { TASK_RETRY_DELAY("TaskRetryDelay"), /** + * Whether failure of directly dependent jobs should fail this job. + */ + IGNORE_DEPENDENT_JOB_FAILURE("IgnoreDependentJobFailure"), + + /** * The individual task configurations, if any * */ TASK_CONFIGS("TaskConfigs"), @@ -124,6 +129,7 @@ public class JobConfig { public static final int DEFAULT_FAILURE_THRESHOLD = 0; public static final int DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK = 0; public static final boolean DEFAULT_DISABLE_EXTERNALVIEW = false; + public static final boolean DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE = false; private final String _workflow; private final String _targetResource; @@ -138,13 +144,14 @@ public class JobConfig { private final int _failureThreshold; private final long _retryDelay; private final boolean _disableExternalView; + private final boolean _ignoreDependentJobFailure; private final Map<String, TaskConfig> _taskConfigMap; private JobConfig(String workflow, String targetResource, List<String> targetPartitions, Set<String> targetPartitionStates, String command, Map<String, String> jobCommandConfigMap, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay, - boolean disableExternalView, Map<String, TaskConfig> taskConfigMap) { + boolean disableExternalView, boolean ignoreDependentJobFailure, Map<String, TaskConfig> taskConfigMap) { _workflow = workflow; _targetResource = targetResource; _targetPartitions = targetPartitions; @@ -158,6 +165,7 @@ public class JobConfig { _failureThreshold = failureThreshold; _retryDelay = retryDelay; _disableExternalView = disableExternalView; + _ignoreDependentJobFailure = ignoreDependentJobFailure; if (taskConfigMap != null) { _taskConfigMap = taskConfigMap; } else { @@ -217,6 +225,8 @@ public class JobConfig { return _disableExternalView; } + public boolean isIgnoreDependentJobFailure() { return _ignoreDependentJobFailure; } + public Map<String, TaskConfig> getTaskConfigMap() { return _taskConfigMap; } @@ -260,6 +270,8 @@ public class JobConfig { Boolean.toString(_disableExternalView)); cfgMap.put(JobConfigProperty.NUM_CONCURRENT_TASKS_PER_INSTANCE.value(), "" + _numConcurrentTasksPerInstance); + cfgMap.put(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value(), + Boolean.toString(_ignoreDependentJobFailure)); return cfgMap; } @@ -281,6 +293,7 @@ public class JobConfig { private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD; private long _retryDelay = DEFAULT_TASK_RETRY_DELAY; private boolean _disableExternalView = DEFAULT_DISABLE_EXTERNALVIEW; + private boolean _ignoreDependentJobFailure = DEFAULT_IGNORE_DEPENDENT_JOB_FAILURE; public JobConfig build() { validate(); @@ -288,7 +301,7 @@ public class JobConfig { return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, - _disableExternalView, _taskConfigMap); + _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap); } /** @@ -346,6 +359,10 @@ public class JobConfig { b.setDisableExternalView( Boolean.valueOf(cfg.get(JobConfigProperty.DISABLE_EXTERNALVIEW.value()))); } + if (cfg.containsKey(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value())) { + b.setIgnoreDependentJobFailure( + Boolean.valueOf(cfg.get(JobConfigProperty.IGNORE_DEPENDENT_JOB_FAILURE.value()))); + } return b; } @@ -414,6 +431,11 @@ public class JobConfig { return this; } + public Builder setIgnoreDependentJobFailure(boolean ignoreDependentJobFailure) { + _ignoreDependentJobFailure = ignoreDependentJobFailure; + return this; + } + public Builder addTaskConfigs(List<TaskConfig> taskConfigs) { if (taskConfigs != null) { for (TaskConfig taskConfig : taskConfigs) { http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 7eeafc7..5b41773 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -86,22 +86,28 @@ public class JobRebalancer extends TaskRebalancer { return buildEmptyAssignment(jobName, currStateOutput); } + // Stop current run of the job if workflow or job is already in final state (failed or completed) + TaskState workflowState = workflowCtx.getWorkflowState(); TaskState jobState = workflowCtx.getJobState(jobName); // The job is already in a final state (completed/failed). - if (jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { - LOG.info("Job " + jobName + " is failed or already completed, clean up IS."); + if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED || + jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { + LOG.info(String.format( + "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.", + workflowResource, jobName, workflowState, jobState)); cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName); _scheduledRebalancer.removeScheduledRebalance(jobName); return buildEmptyAssignment(jobName, currStateOutput); } if (!isWorkflowReadyForSchedule(workflowCfg)) { - LOG.info("Job is not ready to be scheduled since workflow is not ready " + jobName); + LOG.info("Job is not ready to be run since workflow is not ready " + jobName); return buildEmptyAssignment(jobName, currStateOutput); } - if (!isJobReadyToSchedule(jobName, workflowCfg, workflowCtx)) { - LOG.info("Job is not ready to be scheduled " + jobName); + if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, + workflowCtx)) { + LOG.info("Job is not ready to run " + jobName); return buildEmptyAssignment(jobName, currStateOutput); } @@ -429,16 +435,6 @@ public class JobRebalancer extends TaskRebalancer { return ra; } - private void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, - WorkflowContext workflowContext) { - long currentTime = System.currentTimeMillis(); - workflowContext.setJobState(jobName, TaskState.FAILED); - jobContext.setFinishTime(currentTime); - if (isWorkflowFinished(workflowContext, workflowConfig)) { - workflowContext.setFinishTime(currentTime); - } - } - private void markJobComplete(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, WorkflowContext workflowContext) { long currentTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index b006efc..6aaeb5f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -74,13 +74,17 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { */ protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg) { boolean incomplete = false; + int failedJobs = 0; for (String job : cfg.getJobDag().getAllNodes()) { TaskState jobState = ctx.getJobState(job); if (jobState == TaskState.FAILED) { - ctx.setWorkflowState(TaskState.FAILED); - return true; + failedJobs ++; + if (failedJobs > cfg.getFailureThreshold()) { + ctx.setWorkflowState(TaskState.FAILED); + return true; + } } - if (jobState != TaskState.COMPLETED) { + if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED) { incomplete = true; } } @@ -136,31 +140,78 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { int notStartedCount = 0; - int inCompleteCount = 0; int failedCount = 0; - for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) { - TaskState jobState = workflowCtx.getJobState(ancestor); + for (String parent : workflowCfg.getJobDag().getDirectParents(job)) { + TaskState jobState = workflowCtx.getJobState(parent); if (jobState == null || jobState == TaskState.NOT_STARTED) { ++notStartedCount; - } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { - ++inCompleteCount; - } else if (jobState == TaskState.FAILED) { + } + if (jobState == TaskState.FAILED) { ++failedCount; } } - if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs() - || failedCount > 0) { - LOG.debug(String.format( - "Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d, failedParent(s)=%d.", - job, notStartedCount, inCompleteCount, failedCount)); + if (notStartedCount > 0) { + LOG.debug(String + .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount)); + return false; + } + + JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job); + if (failedCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) { + markJobFailed(job, null, workflowCfg, workflowCtx); + LOG.debug( + String.format("Job %s is not ready to start, failedCount(s)=%d.", job, failedCount)); + return false; + } + + int inCompleteCount = getInCompleteJobCount(workflowCfg, workflowCtx); + if (inCompleteCount >= workflowCfg.getParallelJobs()) { + LOG.debug(String + .format("Job %s is not ready to schedule, inCompleteJobs(s)=%d.", job, inCompleteCount)); return false; } return true; } + protected boolean isJobStarted(String job, WorkflowContext workflowContext) { + TaskState jobState = workflowContext.getJobState(job); + return (jobState != null && jobState != TaskState.NOT_STARTED); + } + + /** + * Count the number of jobs in a workflow that are in progress. + * + * @param workflowCfg + * @param workflowCtx + * @return + */ + protected int getInCompleteJobCount(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { + int inCompleteCount = 0; + for (String jobName : workflowCfg.getJobDag().getAllNodes()) { + TaskState jobState = workflowCtx.getJobState(jobName); + if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { + ++inCompleteCount; + } + } + + return inCompleteCount; + } + + protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, + WorkflowContext workflowContext) { + long currentTime = System.currentTimeMillis(); + workflowContext.setJobState(jobName, TaskState.FAILED); + if (jobContext != null) { + jobContext.setFinishTime(currentTime); + } + if (isWorkflowFinished(workflowContext, workflowConfig)) { + workflowContext.setFinishTime(currentTime); + } + } + /** * Check if a workflow is ready to schedule. * http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index 4c81654..955cb77 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -40,9 +40,11 @@ public class WorkflowConfig { public static final String RECURRENCE_UNIT = "RecurrenceUnit"; public static final String RECURRENCE_INTERVAL = "RecurrenceInterval"; public static final String TERMINABLE = "Terminable"; + public static final String FAILURE_THRESHOLD = "FailureThreshold"; /* Default values */ public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000; + public static final int DEFAULT_FAILURE_THRESHOLD = 0; /* Member variables */ // TODO: jobDag should not be in the workflowConfig. @@ -56,13 +58,15 @@ public class WorkflowConfig { private final long _expiry; private final boolean _terminable; private final ScheduleConfig _scheduleConfig; + private final int _failureThreshold; protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long expiry, - boolean terminable, ScheduleConfig scheduleConfig) { + int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig) { _jobDag = jobDag; _parallelJobs = parallelJobs; _targetState = targetState; _expiry = expiry; + _failureThreshold = failureThreshold; _terminable = terminable; _scheduleConfig = scheduleConfig; } @@ -83,6 +87,10 @@ public class WorkflowConfig { return _expiry; } + public int getFailureThreshold() { + return _failureThreshold; + } + public boolean isTerminable() { return _terminable; } @@ -128,6 +136,7 @@ public class WorkflowConfig { cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry())); cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name()); cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable())); + cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(getFailureThreshold())); // Populate schedule if present ScheduleConfig scheduleConfig = getScheduleConfig(); @@ -151,13 +160,15 @@ public class WorkflowConfig { private int _parallelJobs = 1; private TargetState _targetState = TargetState.START; private long _expiry = DEFAULT_EXPIRY; + private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD; private boolean _isTerminable = true; private ScheduleConfig _scheduleConfig; public WorkflowConfig build() { validate(); - return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _isTerminable, _scheduleConfig); + return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold, + _isTerminable, _scheduleConfig); } public Builder() {} @@ -191,6 +202,11 @@ public class WorkflowConfig { return this; } + public Builder setFailureThreshold(int failureThreshold) { + _failureThreshold = failureThreshold; + return this; + } + public Builder setTerminable(boolean isTerminable) { _isTerminable = isTerminable; return this; @@ -211,6 +227,9 @@ public class WorkflowConfig { if (cfg.containsKey(EXPIRY)) { b.setExpiry(Long.parseLong(cfg.get(EXPIRY))); } + if (cfg.containsKey(FAILURE_THRESHOLD)) { + b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD))); + } if (cfg.containsKey(DAG)) { b.setJobDag(JobDag.fromJson(cfg.get(DAG))); } http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 05b6dc6..682ac77 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -30,7 +30,13 @@ import org.apache.log4j.Logger; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; /** * Custom rebalancer implementation for the {@code Workflow} in task state model. @@ -63,7 +69,7 @@ public class WorkflowRebalancer extends TaskRebalancer { TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context."); - cleanupWorkflow(workflow, workflowCfg, workflowCtx); + cleanupWorkflow(workflow, workflowCfg); return buildEmptyAssignment(workflow, currStateOutput); } @@ -91,7 +97,7 @@ public class WorkflowRebalancer extends TaskRebalancer { // Check if this workflow has been finished past its expiry. if (workflowCtx.getFinishTime() + expiryTime <= currentTime) { LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow context."); - cleanupWorkflow(workflow, workflowCfg, workflowCtx); + cleanupWorkflow(workflow, workflowCfg); } else { // schedule future cleanup work long cleanupTime = workflowCtx.getFinishTime() + expiryTime; @@ -113,7 +119,7 @@ public class WorkflowRebalancer extends TaskRebalancer { scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx); if (isReady) { // Schedule jobs from this workflow. - scheduleJobs(workflowCfg, workflowCtx); + scheduleJobs(workflow, workflowCfg, workflowCtx); } else { LOG.debug("Workflow " + workflow + " is not ready to be scheduled."); } @@ -126,23 +132,32 @@ public class WorkflowRebalancer extends TaskRebalancer { * Figure out whether the jobs in the workflow should be run, * and if it's ready, then just schedule it */ - private void scheduleJobs(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { + private void scheduleJobs(String workflow, WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); if (scheduleConfig != null && scheduleConfig.isRecurring()) { LOG.debug("Jobs from recurring workflow are not schedule-able"); return; } + int scheduledJobs = 0; for (String job : workflowCfg.getJobDag().getAllNodes()) { TaskState jobState = workflowCtx.getJobState(job); if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) { LOG.debug("Job " + job + " is already started or completed."); continue; } + + if (scheduledJobs >= workflowCfg.getParallelJobs()) { + LOG.debug(String.format("Workflow %s already have enough job in progress, " + + "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs)); + break; + } + // check ancestor job status if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) { JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job); scheduleSingleJob(job, jobConfig); + scheduledJobs++; } } } @@ -382,8 +397,7 @@ public class WorkflowRebalancer extends TaskRebalancer { * Cleans up workflow configs and workflow contexts associated with this workflow, * including all job-level configs and context, plus workflow-level information. */ - private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg, - WorkflowContext workflowCtx) { + private void cleanupWorkflow(String workflow, WorkflowConfig workflowcfg) { LOG.info("Cleaning up workflow: " + workflow); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java index 71fa12d..dad9949 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java @@ -39,7 +39,7 @@ public class MockTask implements Task { if (cfg == null) { cfg = Collections.emptyMap(); } - _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L; + _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 100L; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index 9796497..011f532 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -258,7 +258,7 @@ public class TaskTestUtil { return buildRecurrentJobQueue(jobQueueName, 0); } - public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart) { + public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart, int failureThreshold) { Map<String, String> cfgMap = new HashMap<String, String>(); cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000)); Calendar cal = Calendar.getInstance(); @@ -267,10 +267,13 @@ public class TaskTestUtil { cal.set(Calendar.MILLISECOND, 0); cfgMap.put(WorkflowConfig.START_TIME, WorkflowConfig.getDefaultDateFormat().format(cal.getTime())); + if (failureThreshold > 0) { + cfgMap.put(WorkflowConfig.FAILURE_THRESHOLD, String.valueOf(failureThreshold)); + } return new JobQueue.Builder(jobQueueName).fromMap(cfgMap); } public static JobQueue.Builder buildJobQueue(String jobQueueName) { - return buildJobQueue(jobQueueName, 0); + return buildJobQueue(jobQueueName, 0, 0); } } http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java new file mode 100644 index 0000000..9e2456c --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java @@ -0,0 +1,283 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Sets; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestJobFailureDependence extends ZkIntegrationTestBase { + private static final Logger LOG = Logger.getLogger(TestJobFailureDependence.class); + private static final int num_nodes = 5; + private static final int num_dbs = 5; + private static final int START_PORT = 12918; + private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; + private static final int NUM_PARTITIONS = 20; + private static final int NUM_REPLICAS = 3; + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); + private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes]; + private ClusterControllerManager _controller; + private ClusterSetup _setupTool; + + private List<String> _test_dbs = new ArrayList<String>(); + + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < num_nodes; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // Set up target dbs + for (int i = 0; i < num_dbs; i++) { + String db = "TestDB" + i; + _setupTool + .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL, + IdealState.RebalanceMode.FULL_AUTO.toString()); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS); + _test_dbs.add(db); + } + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() { + @Override public Task createNewTask(TaskCallbackContext context) { + return new MockTask(context); + } + }); + + // start dummy participants + for (int i = 0; i < num_nodes; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[i], taskFactoryReg)); + + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + + _driver = new TaskDriver(_manager); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _manager.disconnect(); + _controller.syncStop(); + for (int i = 0; i < num_nodes; i++) { + _participants[i].syncStop(); + } + } + + @Test + public void testJobDependantFailure() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 100); + // Create and Enqueue jobs + List<String> currentJobNames = new ArrayList<String>(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2)); + + // all jobs after failed job should fail too. + for (int i = 2; i < num_dbs; i++) { + String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(i)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.FAILED); + } + } + + @Test + public void testJobDependantWorkflowFailure() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName); + // Create and Enqueue jobs + List<String> currentJobNames = new ArrayList<String>(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2)); + + String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED); + } + + @Test + public void testIgnoreJobDependantFailure() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 100); + // Create and Enqueue jobs + List<String> currentJobNames = new ArrayList<String>(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2)); + String namedSpaceJob2 = String.format("%s_%s", queueName, currentJobNames.get(2)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob2, TaskState.FAILED); + + // all jobs after failed job should complete. + for (int i = 3; i < num_dbs; i++) { + String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(i)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.COMPLETED); + } + } + + @Test + public void testWorkflowFailureJobThreshold() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3); + // Create and Enqueue jobs + List<String> currentJobNames = new ArrayList<String>(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1)); + + String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + String lastJob = + String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1)); + TaskTestUtil.pollForJobState(_driver, queueName, lastJob, TaskState.COMPLETED); + + _driver.flushQueue(queueName); + + WorkflowConfig currentWorkflowConfig = _driver.getWorkflowConfig(queueName); + WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig); + + configBuilder.setFailureThreshold(0); + _driver.updateWorkflow(queueName, configBuilder.build()); + _driver.stop(queueName); + + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + _driver.enqueueJob(queueName, jobName, jobConfig); + } + + _driver.resume(queueName); + + namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED); + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/be660245/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java index 101604b..7eeb3f4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java @@ -140,6 +140,7 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { for (int i = 0; i < num_nodes; i++) { _participants[i].syncStop(); } + _setupTool.deleteCluster(CLUSTER_NAME); } @Test @@ -161,12 +162,44 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { currentJobNames.add(jobName); } - _driver.start(queueBuilder.build()); _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2)); + _driver.start(queueBuilder.build()); - String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2)); - TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(2)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.FAILED); TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED); + + _driver.delete(queueName); + } + + @Test + public void testJobContinueUponParentJobFailure() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3); + // Create and Enqueue jobs + List<String> currentJobNames = new ArrayList<String>(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(1)); + + String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1)); + TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); + String lastJob = + String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1)); + TaskTestUtil.pollForJobState(_driver, queueName, lastJob, TaskState.COMPLETED); + + _driver.delete(queueName); } @Test @@ -193,5 +226,7 @@ public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0)); TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED); TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED); + + _driver.delete(queueName); } }