[HELIX-784] TASK: Fix a bug in getExpiredJobs getExpiredJobs(), when the job config is null, would just continue instead of adding it to expiredJobs so that the job cleanup/purge would be re-tried. This could possibly cause purge failures to leave a lot of jobs un-purged with just the job config missing in ZK. This RB fixes this.
Changelist: 1. Add the job name to expiredJobs if the job config does not exist in ZK 2. Add a more detailed description in the error log 3. Add an integration test for two task-related stages: TaskPersistDataStage and TaskGarbageCollectionStage in TestTaskStage.java Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/befb1036 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/befb1036 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/befb1036 Branch: refs/heads/master Commit: befb1036f8d8be2729a800d3dde88fc1362a6489 Parents: 3d9c030 Author: narendly <naren...@gmail.com> Authored: Thu Nov 1 16:57:33 2018 -0700 Committer: narendly <naren...@gmail.com> Committed: Thu Nov 1 16:57:33 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskUtil.java | 6 +- .../stages/TestTaskPersistDataStage.java | 91 ---------- .../helix/controller/stages/TestTaskStage.java | 165 +++++++++++++++++++ 3 files changed, 170 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index b587408..8392d89 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -719,7 +719,11 @@ public class TaskUtil { JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job); JobContext jobContext = TaskUtil.getJobContext(propertyStore, job); if (jobConfig == null) { - LOG.error(String.format("Job %s exists in JobDAG but JobConfig is missing!", job)); + LOG.error(String.format( + "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last purge.", + job, workflowConfig.getWorkflowId())); + // Add the job name to expiredJobs so that purge operation will be tried again on this job + expiredJobs.add(job); continue; } long expiry = jobConfig.getExpiry(); http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java deleted file mode 100644 index 8fcedfa..0000000 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.helix.controller.stages; - -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; -import org.apache.helix.TestHelper; -import org.apache.helix.ZNRecord; -import org.apache.helix.common.ZkTestBase; -import org.apache.helix.common.caches.TaskDataCache; -import org.apache.helix.controller.stages.task.TaskPersistDataStage; -import org.apache.helix.participant.MockZKHelixManager; -import org.apache.helix.task.JobContext; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskPartitionState; -import org.apache.helix.task.TaskState; -import org.apache.helix.task.Workflow; -import org.apache.helix.task.WorkflowContext; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class TestTaskPersistDataStage extends ZkTestBase { - private String CLUSTER_NAME = "TestCluster_" + TestHelper.getTestClassName(); - private HelixManager _helixManager; - private TaskDriver _driver; - - @BeforeClass - public void beforeClass() { - _helixManager = new MockZKHelixManager(CLUSTER_NAME, "MockInstance", InstanceType.ADMINISTRATOR, - _gZkClient); - _driver = new TaskDriver(_gZkClient, CLUSTER_NAME); - } - - @Test - public void testTaskContextUpdate() { - ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange); - event.addAttribute(AttributeName.helixmanager.name(), _helixManager); - TaskPersistDataStage persistDataStage = new TaskPersistDataStage(); - - ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME); - TaskDataCache taskDataCache = cache.getTaskDataCache(); - String testWorkflow = TestHelper.getTestMethodName(); - String testJobPrefix = testWorkflow + "_Job_"; - - WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(testWorkflow)); - wfCtx.setJobState(testJobPrefix + "0", TaskState.IN_PROGRESS); - wfCtx.setJobState(testJobPrefix + "1", TaskState.COMPLETED); - wfCtx.setWorkflowState(TaskState.IN_PROGRESS); - wfCtx.setName(testWorkflow); - wfCtx.setStartTime(System.currentTimeMillis()); - - JobContext jbCtx0 = new JobContext(new ZNRecord(testJobPrefix + "0")); - jbCtx0.setName(testJobPrefix + "0"); - jbCtx0.setStartTime(System.currentTimeMillis()); - jbCtx0.setPartitionState(0, TaskPartitionState.RUNNING); - - JobContext jbCtx1 = new JobContext((new ZNRecord(testJobPrefix + "1"))); - jbCtx1.setName(testJobPrefix + "1"); - jbCtx1.setStartTime(System.currentTimeMillis()); - jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED); - - taskDataCache.updateWorkflowContext(testWorkflow, wfCtx); - taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0); - taskDataCache.updateJobContext(testJobPrefix + "1", jbCtx1); - - event.addAttribute(AttributeName.ClusterDataCache.name(), cache); - persistDataStage.process(event); - - jbCtx0.setPartitionState(0, TaskPartitionState.ERROR); - wfCtx.setJobState(testJobPrefix + "0", TaskState.FAILED); - taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0); - - wfCtx.getJobStates().remove(testJobPrefix + "1"); - taskDataCache.removeContext(testJobPrefix + "1"); - - JobContext jbCtx2 = new JobContext(new ZNRecord(testJobPrefix + "2")); - jbCtx2.setName(testJobPrefix + "2"); - jbCtx2.setPartitionState(1, TaskPartitionState.INIT); - wfCtx.setJobState(testJobPrefix + "2", TaskState.IN_PROGRESS); - taskDataCache.updateJobContext(testJobPrefix + "2", jbCtx2); - - taskDataCache.updateWorkflowContext(testWorkflow, wfCtx); - persistDataStage.process(event); - - Assert.assertEquals(_driver.getWorkflowContext(testWorkflow), wfCtx); - Assert.assertEquals(_driver.getJobContext(testJobPrefix + "0"), jbCtx0); - Assert.assertEquals(_driver.getJobContext(testJobPrefix + "2"), jbCtx2); - Assert.assertNull(_driver.getJobContext(testJobPrefix + "1")); - - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java new file mode 100644 index 0000000..6dea943 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java @@ -0,0 +1,165 @@ +package org.apache.helix.controller.stages; + +import org.apache.helix.AccessOption; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.controller.stages.task.TaskPersistDataStage; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.TaskTestUtil; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.WorkflowContext; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestTaskStage extends TaskTestBase { + private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange); + private PropertyKey.Builder _keyBuilder; + private String _testWorkflow = TestHelper.getTestClassName(); + private String _testJobPrefix = _testWorkflow + "_Job_"; + + @BeforeClass + public void beforeClass() throws Exception { + super.beforeClass(); + // Stop the controller for isolated testing of the stage + _controller.syncStop(); + _keyBuilder = _manager.getHelixDataAccessor().keyBuilder(); + } + + @Test + public void testPersistContextData() { + _event.addAttribute(AttributeName.helixmanager.name(), _manager); + + ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME); + cache.setTaskCache(true); + TaskDataCache taskDataCache = cache.getTaskDataCache(); + + // Build a queue + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(_testWorkflow); + JobConfig.Builder jobBuilder_0 = + new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1"); + JobConfig.Builder jobBuilder_1 = + new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1"); + JobConfig.Builder jobBuilder_2 = + new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1"); + queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1) + .enqueueJob("Job_2", jobBuilder_2); + + _driver.createQueue(queueBuilder.build()); + // Manually trigger a cache refresh + cache.refresh(new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor)); + + // Create the IdealState ZNode for the jobs + _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "0", 1, + TaskConstants.STATE_MODEL_NAME); + _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "1", 1, + TaskConstants.STATE_MODEL_NAME); + _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "2", 1, + TaskConstants.STATE_MODEL_NAME); + + // Create the context + WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow)); + wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED); + wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED); + wfCtx.setWorkflowState(TaskState.IN_PROGRESS); + wfCtx.setName(_testWorkflow); + wfCtx.setStartTime(System.currentTimeMillis()); + + JobContext jbCtx0 = new JobContext(new ZNRecord(_testJobPrefix + "0")); + jbCtx0.setName(_testJobPrefix + "0"); + jbCtx0.setStartTime(System.currentTimeMillis()); + jbCtx0.setPartitionState(0, TaskPartitionState.COMPLETED); + + JobContext jbCtx1 = new JobContext((new ZNRecord(_testJobPrefix + "1"))); + jbCtx1.setName(_testJobPrefix + "1"); + jbCtx1.setStartTime(System.currentTimeMillis()); + jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED); + + taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx); + taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0); + taskDataCache.updateJobContext(_testJobPrefix + "1", jbCtx1); + + _event.addAttribute(AttributeName.ClusterDataCache.name(), cache); + + // Write contexts to ZK first + TaskPersistDataStage persistDataStage = new TaskPersistDataStage(); + persistDataStage.process(_event); + + Assert.assertNotNull(_driver.getWorkflowContext(_testWorkflow)); + Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "0")); + Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "1")); + + jbCtx0.setPartitionState(0, TaskPartitionState.ERROR); + wfCtx.setJobState(_testJobPrefix + "0", TaskState.FAILED); + taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0); + + wfCtx.getJobStates().remove(_testJobPrefix + "1"); + taskDataCache.removeContext(_testJobPrefix + "1"); + + JobContext jbCtx2 = new JobContext(new ZNRecord(_testJobPrefix + "2")); + jbCtx2.setName(_testJobPrefix + "2"); + jbCtx2.setPartitionState(1, TaskPartitionState.INIT); + wfCtx.setJobState(_testJobPrefix + "2", TaskState.IN_PROGRESS); + taskDataCache.updateJobContext(_testJobPrefix + "2", jbCtx2); + + taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx); + + persistDataStage.process(_event); + + Assert.assertEquals(_driver.getWorkflowContext(_testWorkflow), wfCtx); + Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "0"), jbCtx0); + Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "2"), jbCtx2); + Assert.assertNull(_driver.getJobContext(_testJobPrefix + "1")); + } + + /** + * Test that if there is a job in the DAG with JobConfig gone (due to ZK delete failure), the + * async job purge will try to delete it again. + */ + @Test(dependsOnMethods = "testPersistContextData") + public void testPartialDataPurge() { + // Manually delete JobConfig + deleteJobConfigs(_testWorkflow, _testJobPrefix + "0"); + deleteJobConfigs(_testWorkflow, _testJobPrefix + "1"); + deleteJobConfigs(_testWorkflow, _testJobPrefix + "2"); + + // Then purge jobs + TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage(); + garbageCollectionStage.execute(_event); + + // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths + // IdealState check + checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "0"); + checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "1"); + checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2"); + } + + private void deleteJobConfigs(String workflowName, String jobName) { + String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath(); + String newPath = _manager.getHelixDataAccessor().keyBuilder() + .jobConfigZNode(workflowName, jobName).getPath(); + _baseAccessor.remove(oldPath, AccessOption.PERSISTENT); + _baseAccessor.remove(newPath, AccessOption.PERSISTENT); + } + + private void checkForIdealStateAndContextRemoval(String workflow, String job) { + // IdealState + Assert.assertFalse( + _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)); + + // JobContexts in old ZNode path + String oldPath = + String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job); + String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath(); + Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT)); + Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT)); + } +}