[HELIX-784] TASK: Fix a bug in getExpiredJobs

getExpiredJobs(), when the job config is null, would just continue instead of 
adding it to expiredJobs so that the job cleanup/purge would be re-tried. This 
could possibly cause purge failures to leave a lot of jobs un-purged with just 
the job config missing in ZK. This RB fixes this.

Changelist:
1. Add the job name to expiredJobs if the job config does not exist in ZK
2. Add a more detailed description in the error log
3. Add an integration test for two task-related stages: TaskPersistDataStage 
and TaskGarbageCollectionStage in TestTaskStage.java


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/befb1036
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/befb1036
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/befb1036

Branch: refs/heads/master
Commit: befb1036f8d8be2729a800d3dde88fc1362a6489
Parents: 3d9c030
Author: narendly <naren...@gmail.com>
Authored: Thu Nov 1 16:57:33 2018 -0700
Committer: narendly <naren...@gmail.com>
Committed: Thu Nov 1 16:57:33 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskUtil.java    |   6 +-
 .../stages/TestTaskPersistDataStage.java        |  91 ----------
 .../helix/controller/stages/TestTaskStage.java  | 165 +++++++++++++++++++
 3 files changed, 170 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index b587408..8392d89 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -719,7 +719,11 @@ public class TaskUtil {
         JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
         JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
         if (jobConfig == null) {
-          LOG.error(String.format("Job %s exists in JobDAG but JobConfig is 
missing!", job));
+          LOG.error(String.format(
+              "Job %s exists in JobDAG but JobConfig is missing! Job might 
have been deleted manually from the JobQueue: %s, or left in the DAG due to a 
failed clean-up attempt from last purge.",
+              job, workflowConfig.getWorkflowId()));
+          // Add the job name to expiredJobs so that purge operation will be 
tried again on this job
+          expiredJobs.add(job);
           continue;
         }
         long expiry = jobConfig.getExpiry();

http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
deleted file mode 100644
index 8fcedfa..0000000
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.helix.controller.stages;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.common.ZkTestBase;
-import org.apache.helix.common.caches.TaskDataCache;
-import org.apache.helix.controller.stages.task.TaskPersistDataStage;
-import org.apache.helix.participant.MockZKHelixManager;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.Workflow;
-import org.apache.helix.task.WorkflowContext;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestTaskPersistDataStage extends ZkTestBase {
-  private String CLUSTER_NAME = "TestCluster_" + TestHelper.getTestClassName();
-  private HelixManager _helixManager;
-  private TaskDriver _driver;
-
-  @BeforeClass
-  public void beforeClass() {
-    _helixManager = new MockZKHelixManager(CLUSTER_NAME, "MockInstance", 
InstanceType.ADMINISTRATOR,
-        _gZkClient);
-    _driver = new TaskDriver(_gZkClient, CLUSTER_NAME);
-  }
-
-  @Test
-  public void testTaskContextUpdate() {
-    ClusterEvent event = new ClusterEvent(CLUSTER_NAME, 
ClusterEventType.CurrentStateChange);
-    event.addAttribute(AttributeName.helixmanager.name(), _helixManager);
-    TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
-
-    ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
-    TaskDataCache taskDataCache = cache.getTaskDataCache();
-    String testWorkflow = TestHelper.getTestMethodName();
-    String testJobPrefix = testWorkflow + "_Job_";
-
-    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(testWorkflow));
-    wfCtx.setJobState(testJobPrefix + "0", TaskState.IN_PROGRESS);
-    wfCtx.setJobState(testJobPrefix + "1", TaskState.COMPLETED);
-    wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
-    wfCtx.setName(testWorkflow);
-    wfCtx.setStartTime(System.currentTimeMillis());
-
-    JobContext jbCtx0 = new JobContext(new ZNRecord(testJobPrefix + "0"));
-    jbCtx0.setName(testJobPrefix + "0");
-    jbCtx0.setStartTime(System.currentTimeMillis());
-    jbCtx0.setPartitionState(0, TaskPartitionState.RUNNING);
-
-    JobContext jbCtx1 = new JobContext((new ZNRecord(testJobPrefix + "1")));
-    jbCtx1.setName(testJobPrefix + "1");
-    jbCtx1.setStartTime(System.currentTimeMillis());
-    jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
-
-    taskDataCache.updateWorkflowContext(testWorkflow, wfCtx);
-    taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0);
-    taskDataCache.updateJobContext(testJobPrefix + "1", jbCtx1);
-
-    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
-    persistDataStage.process(event);
-
-    jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
-    wfCtx.setJobState(testJobPrefix + "0", TaskState.FAILED);
-    taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0);
-
-    wfCtx.getJobStates().remove(testJobPrefix + "1");
-    taskDataCache.removeContext(testJobPrefix + "1");
-
-    JobContext jbCtx2 = new JobContext(new ZNRecord(testJobPrefix + "2"));
-    jbCtx2.setName(testJobPrefix + "2");
-    jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
-    wfCtx.setJobState(testJobPrefix + "2", TaskState.IN_PROGRESS);
-    taskDataCache.updateJobContext(testJobPrefix + "2", jbCtx2);
-
-    taskDataCache.updateWorkflowContext(testWorkflow, wfCtx);
-    persistDataStage.process(event);
-
-    Assert.assertEquals(_driver.getWorkflowContext(testWorkflow), wfCtx);
-    Assert.assertEquals(_driver.getJobContext(testJobPrefix + "0"), jbCtx0);
-    Assert.assertEquals(_driver.getJobContext(testJobPrefix + "2"), jbCtx2);
-    Assert.assertNull(_driver.getJobContext(testJobPrefix + "1"));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
new file mode 100644
index 0000000..6dea943
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -0,0 +1,165 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.task.TaskPersistDataStage;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskStage extends TaskTestBase {
+  private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, 
ClusterEventType.CurrentStateChange);
+  private PropertyKey.Builder _keyBuilder;
+  private String _testWorkflow = TestHelper.getTestClassName();
+  private String _testJobPrefix = _testWorkflow + "_Job_";
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    // Stop the controller for isolated testing of the stage
+    _controller.syncStop();
+    _keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+  }
+
+  @Test
+  public void testPersistContextData() {
+    _event.addAttribute(AttributeName.helixmanager.name(), _manager);
+
+    ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
+    cache.setTaskCache(true);
+    TaskDataCache taskDataCache = cache.getTaskDataCache();
+
+    // Build a queue
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(_testWorkflow);
+    JobConfig.Builder jobBuilder_0 =
+        new 
JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
+    JobConfig.Builder jobBuilder_1 =
+        new 
JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
+    JobConfig.Builder jobBuilder_2 =
+        new 
JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
+    queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", 
jobBuilder_1)
+        .enqueueJob("Job_2", jobBuilder_2);
+
+    _driver.createQueue(queueBuilder.build());
+    // Manually trigger a cache refresh
+    cache.refresh(new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor));
+
+    // Create the IdealState ZNode for the jobs
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, 
_testJobPrefix + "0", 1,
+        TaskConstants.STATE_MODEL_NAME);
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, 
_testJobPrefix + "1", 1,
+        TaskConstants.STATE_MODEL_NAME);
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, 
_testJobPrefix + "2", 1,
+        TaskConstants.STATE_MODEL_NAME);
+
+    // Create the context
+    WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
+    wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
+    wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
+    wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    wfCtx.setName(_testWorkflow);
+    wfCtx.setStartTime(System.currentTimeMillis());
+
+    JobContext jbCtx0 = new JobContext(new ZNRecord(_testJobPrefix + "0"));
+    jbCtx0.setName(_testJobPrefix + "0");
+    jbCtx0.setStartTime(System.currentTimeMillis());
+    jbCtx0.setPartitionState(0, TaskPartitionState.COMPLETED);
+
+    JobContext jbCtx1 = new JobContext((new ZNRecord(_testJobPrefix + "1")));
+    jbCtx1.setName(_testJobPrefix + "1");
+    jbCtx1.setStartTime(System.currentTimeMillis());
+    jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
+
+    taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
+    taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
+    taskDataCache.updateJobContext(_testJobPrefix + "1", jbCtx1);
+
+    _event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+
+    // Write contexts to ZK first
+    TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
+    persistDataStage.process(_event);
+
+    Assert.assertNotNull(_driver.getWorkflowContext(_testWorkflow));
+    Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "0"));
+    Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "1"));
+
+    jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
+    wfCtx.setJobState(_testJobPrefix + "0", TaskState.FAILED);
+    taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
+
+    wfCtx.getJobStates().remove(_testJobPrefix + "1");
+    taskDataCache.removeContext(_testJobPrefix + "1");
+
+    JobContext jbCtx2 = new JobContext(new ZNRecord(_testJobPrefix + "2"));
+    jbCtx2.setName(_testJobPrefix + "2");
+    jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
+    wfCtx.setJobState(_testJobPrefix + "2", TaskState.IN_PROGRESS);
+    taskDataCache.updateJobContext(_testJobPrefix + "2", jbCtx2);
+
+    taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
+
+    persistDataStage.process(_event);
+
+    Assert.assertEquals(_driver.getWorkflowContext(_testWorkflow), wfCtx);
+    Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "0"), jbCtx0);
+    Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "2"), jbCtx2);
+    Assert.assertNull(_driver.getJobContext(_testJobPrefix + "1"));
+  }
+
+  /**
+   * Test that if there is a job in the DAG with JobConfig gone (due to ZK 
delete failure), the
+   * async job purge will try to delete it again.
+   */
+  @Test(dependsOnMethods = "testPersistContextData")
+  public void testPartialDataPurge() {
+    // Manually delete JobConfig
+    deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
+    deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
+    deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
+
+    // Then purge jobs
+    TaskGarbageCollectionStage garbageCollectionStage = new 
TaskGarbageCollectionStage();
+    garbageCollectionStage.execute(_event);
+
+    // Check that IS and contexts have been purged for the 2 jobs in both old 
and new ZNode paths
+    // IdealState check
+    checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "0");
+    checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "1");
+    checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
+  }
+
+  private void deleteJobConfigs(String workflowName, String jobName) {
+    String oldPath = 
_manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
+    String newPath = _manager.getHelixDataAccessor().keyBuilder()
+        .jobConfigZNode(workflowName, jobName).getPath();
+    _baseAccessor.remove(oldPath, AccessOption.PERSISTENT);
+    _baseAccessor.remove(newPath, AccessOption.PERSISTENT);
+  }
+
+  private void checkForIdealStateAndContextRemoval(String workflow, String 
job) {
+    // IdealState
+    Assert.assertFalse(
+        _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), 
AccessOption.PERSISTENT));
+
+    // JobContexts in old ZNode path
+    String oldPath =
+        String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", 
CLUSTER_NAME, job);
+    String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
+    Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
+    Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
+  }
+}

Reply via email to