Repository: helix
Updated Branches:
  refs/heads/master 0beeb8fa2 -> 0c251bbf6


[HELIX-772] add TaskDriver.addUserContent() api and related tests


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0c251bbf
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0c251bbf
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0c251bbf

Branch: refs/heads/master
Commit: 0c251bbf640206729755301c3dda734eea78343f
Parents: 0beeb8f
Author: Harry Zhang <hrzh...@linkedin.com>
Authored: Tue Oct 30 16:25:12 2018 -0700
Committer: Harry Zhang <hrzh...@linkedin.com>
Committed: Wed Oct 31 13:47:52 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  46 ++++-
 .../java/org/apache/helix/task/TaskUtil.java    |  52 +++--
 .../task/TestIndependentTaskRebalancer.java     |  43 ++--
 .../helix/task/TestGetSetUserContentStore.java  | 206 +++++++++++++++++++
 .../helix/task/TestGetUserContentStore.java     | 144 -------------
 5 files changed, 309 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index ea529e8..e6256ed 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -19,8 +19,26 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.*;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -35,8 +53,6 @@ import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
 /**
  * CLI for scheduling/canceling workflows
  */
@@ -1003,6 +1019,30 @@ public class TaskDriver {
   }
 
   /**
+   * Set user content defined by the given key and string
+   * @param key content key
+   * @param value content value
+   * @param workflowName name of the workflow - must provide when scope is 
WORKFLOW
+   * @param jobName name of the job - must provide when scope is JOB or TASK
+   * @param taskName name of the task - must provide when scope is TASK
+   * @param scope scope of the content
+   */
+  public void addUserContent(String key, String value, String workflowName, 
String jobName, String taskName,
+      UserContentStore.Scope scope) {
+    switch (scope) {
+    case WORKFLOW:
+      TaskUtil.addWorkflowJobUserContent(_propertyStore, workflowName, key, 
value);
+      break;
+    case JOB:
+      TaskUtil.addWorkflowJobUserContent(_propertyStore, jobName, key, value);
+      break;
+    default:
+      TaskUtil.addTaskUserContent(_propertyStore, jobName, taskName, key, 
value);
+      break;
+    }
+  }
+
+  /**
    * Throw Exception if children nodes will exceed limitation after adding 
newNodesCount children.
    * @param newConfigNodeCount
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 1ce448c..3461233 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,7 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,12 +40,13 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 
 /**
  * Static utility methods.
@@ -334,10 +334,19 @@ public class TaskUtil {
    */
   protected static void addWorkflowJobUserContent(final HelixManager manager,
       String workflowJobResource, final String key, final String value) {
-    String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, 
workflowJobResource,
-        USER_CONTENT_NODE);
+   addWorkflowJobUserContent(manager.getHelixPropertyStore(), 
workflowJobResource, key, value);
+  }
+
+  /* package */
+  static void addWorkflowJobUserContent(final HelixPropertyStore<ZNRecord> 
propertyStore,
+      String workflowJobResource, final String key, final String value) {
+    if (workflowJobResource == null) {
+      throw new IllegalArgumentException("workflowJobResource must be not null 
when adding workflow / job user content");
+    }
+    String path = Joiner.on("/")
+        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, 
USER_CONTENT_NODE);
 
-    manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
+    propertyStore.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord znRecord) {
         znRecord.setSimpleField(key, value);
@@ -372,10 +381,19 @@ public class TaskUtil {
    */
   protected static void addTaskUserContent(final HelixManager manager, String 
job,
       final String task, final String key, final String value) {
+    addTaskUserContent(manager.getHelixPropertyStore(), job, task, key, value);
+  }
+
+  /* package */
+  static void addTaskUserContent(final HelixPropertyStore<ZNRecord> 
propertyStore,
+      final String job, final String task, final String key, final String 
value) {
+    if (job == null || task == null) {
+      throw new IllegalArgumentException("job and task must be not null when 
adding task user content");
+    }
     String path =
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, 
USER_CONTENT_NODE);
 
-    manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() {
+    propertyStore.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord znRecord) {
         if (znRecord.getMapField(task) == null) {
@@ -400,14 +418,14 @@ public class TaskUtil {
   protected static String getUserContent(HelixPropertyStore propertyStore, 
String key,
       UserContentStore.Scope scope, String workflowName, String jobName, 
String taskName) {
     switch (scope) {
-      case WORKFLOW:
-        return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, 
key);
-      case JOB:
-        return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key);
-      case TASK:
-        return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, 
key);
-      default:
-        throw new HelixException("Invalid scope : " + scope.name());
+    case WORKFLOW:
+      return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, 
key);
+    case JOB:
+      return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key);
+    case TASK:
+      return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, 
key);
+    default:
+      throw new HelixException("Invalid scope : " + scope.name());
     }
   }
 
@@ -838,9 +856,7 @@ public class TaskUtil {
 
   /**
    * Check whether tasks are just started or still running
-   *
    * @param jobContext The job context
-   *
    * @return False if still tasks not in final state. Otherwise return true
    */
   public static boolean checkJobStopped(JobContext jobContext) {
@@ -853,10 +869,8 @@ public class TaskUtil {
     return true;
   }
 
-
   /**
    * Count the number of jobs in a workflow that are not in final state.
-   *
    * @param workflowCfg
    * @param workflowCtx
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 5509858..1b068a0 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
@@ -56,6 +57,7 @@ import org.testng.collections.Sets;
 public class TestIndependentTaskRebalancer extends TaskTestBase {
   private Set<String> _invokedClasses = Sets.newHashSet();
   private Map<String, Integer> _runCounts = Maps.newHashMap();
+  private static final AtomicBoolean _failureCtl = new AtomicBoolean(true);
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -85,6 +87,25 @@ public class TestIndependentTaskRebalancer extends 
TaskTestBase {
           return new TaskTwo(context, instanceName);
         }
       });
+      taskFactoryReg.put("ControllableFailTask", new TaskFactory() {
+        @Override public Task createNewTask(TaskCallbackContext context) {
+          return new Task() {
+            @Override
+            public TaskResult run() {
+              if (_failureCtl.get()) {
+                return new TaskResult(Status.FAILED, null);
+              } else {
+                return new TaskResult(Status.COMPLETED, null);
+              }
+            }
+
+            @Override
+            public void cancel() {
+
+            }
+          };
+        }
+      });
       taskFactoryReg.put("SingleFailTask", new TaskFactory() {
         @Override
         public Task createNewTask(TaskCallbackContext context) {
@@ -179,15 +200,7 @@ public class TestIndependentTaskRebalancer extends 
TaskTestBase {
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
 
-    // This is error prone as ThreadCountBasedTaskAssigner will always be 
re-assign
-    // task to same instance given we only have 1 task to assign and the order 
or
-    // iterating all nodes during assignment is always the same. Rarely some 
change
-    // will alter the order of iteration debug assignment so we need to change
-    // this instance name to keep on testing this functionality.
-    final String failInstance = "localhost_12919";
-    Map<String, String> taskConfigMap = 
Maps.newHashMap(ImmutableMap.of("fail", "" + true,
-        "failInstance", failInstance));
-    TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
+    TaskConfig taskConfig1 = new TaskConfig("ControllableFailTask", new 
HashMap<String, String>());
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
     jobCommandMap.put("Timeout", "1000");
@@ -212,17 +225,15 @@ public class TestIndependentTaskRebalancer extends 
TaskTestBase {
     }
 
     if (trial == 1000) {
+      // Fail if no re-attempts
       Assert.fail("Job " + jobName + " is not retried");
     }
 
-    // disable failed instance
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
failInstance, false);
-    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+    // Signal the next retry to be successful
+    _failureCtl.set(false);
 
-    // Ensure that the class was invoked
-    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
-    
Assert.assertNotSame(_driver.getJobContext(jobName).getAssignedParticipant(0), 
failInstance);
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
failInstance, true);
+    // Verify that retry will go on and the workflow will finally complete
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
new file mode 100644
index 0000000..d4ba29a
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java
@@ -0,0 +1,206 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGetSetUserContentStore extends TaskTestBase {
+  private static final String JOB_COMMAND = "DummyCommand";
+  private static final int NUM_JOB = 5;
+  private Map<String, String> _jobCommandMap;
+
+  private final CountDownLatch allTasksReady = new CountDownLatch(NUM_JOB);
+  private final CountDownLatch adminReady = new CountDownLatch(1);
+
+  private enum TaskDumpResultKey {
+    WorkflowContent,
+    JobContent,
+    TaskContent
+  }
+
+  private class TaskRecord {
+    String workflowName;
+    String jobName;
+    String taskName;
+
+    public TaskRecord(String workflow, String job, String task) {
+      workflowName = workflow;
+      jobName = job;
+      taskName = task;
+    }
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _participants = new MockParticipantManager[_numNodes];
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < _numNodes; i++) {
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+      // Set task callbacks
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+      TaskFactory shortTaskFactory = new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new WriteTask(context);
+        }
+      };
+      taskFactoryReg.put("WriteTask", shortTaskFactory);
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = 
_participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    _jobCommandMap = new HashMap<>();
+  }
+
+  @Test
+  public void testGetUserContentStore() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    WorkflowConfig.Builder configBuilder = new 
WorkflowConfig.Builder(workflowName);
+    configBuilder.setAllowOverlapJobAssignment(true);
+    workflowBuilder.setWorkflowConfig(configBuilder.build());
+
+    Map<String, TaskRecord> recordMap = new HashMap<>();
+    // Create 5 jobs with 1 WriteTask each
+    for (int i = 0; i < NUM_JOB; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, 
String>()));
+      JobConfig.Builder jobConfigBulider = new 
JobConfig.Builder().setCommand(JOB_COMMAND)
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
+      String jobSuffix = "JOB" + i;
+      String jobName = workflowName + "_" + jobSuffix;
+      String taskName = jobName + "_0";
+      workflowBuilder.addJob("JOB" + i, jobConfigBulider);
+      recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskName));
+    }
+
+    // Start the workflow and wait for all tasks started
+    _driver.start(workflowBuilder.build());
+    allTasksReady.await();
+
+    // add "workflow":"workflow" to the workflow's user content
+    _driver.addUserContent(workflowName, workflowName, workflowName, null, 
null, UserContentStore.Scope.WORKFLOW);
+    for (TaskRecord rec : recordMap.values()) {
+      // add "job":"job" to the job's user content
+      _driver.addUserContent(rec.jobName, rec.jobName, null, rec.jobName, 
null, UserContentStore.Scope.JOB);
+      // String taskId = 
_driver.getJobContext(rec.jobName).getTaskIdForPartition(0);
+
+
+      // add "taskId":"taskId" to the task's user content
+      _driver.addUserContent(rec.taskName, rec.taskName, null, rec.jobName, 
rec.taskName, UserContentStore.Scope.TASK);
+    }
+    adminReady.countDown();
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    // Aggregate key-value mappings in UserContentStore
+    for (TaskRecord rec : recordMap.values()) {
+      Assert.assertEquals(_driver
+              .getUserContent(TaskDumpResultKey.WorkflowContent.name(), 
UserContentStore.Scope.WORKFLOW,
+                  rec.workflowName, rec.jobName, rec.taskName),
+          constructContentStoreResultString(rec.workflowName, 
rec.workflowName));
+      Assert.assertEquals(_driver
+              .getUserContent(TaskDumpResultKey.JobContent.name(), 
UserContentStore.Scope.JOB,
+                  rec.workflowName, rec.jobName, rec.taskName),
+          constructContentStoreResultString(rec.jobName, rec.jobName));
+      Assert.assertEquals(_driver
+              .getUserContent(TaskDumpResultKey.TaskContent.name(), 
UserContentStore.Scope.TASK,
+                  rec.workflowName, rec.jobName, rec.taskName),
+          constructContentStoreResultString(rec.taskName, rec.taskName));
+    }
+  }
+
+  /**
+   * A mock task that writes to UserContentStore. MockTask extends 
UserContentStore.
+   */
+  private class WriteTask extends MockTask {
+
+    public WriteTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public TaskResult run() {
+      allTasksReady.countDown();
+      try {
+        adminReady.await();
+      } catch (Exception e) {
+        return new TaskResult(TaskResult.Status.FATAL_FAILED, e.getMessage());
+      }
+      String workflowStoreContent = 
constructContentStoreResultString(_workflowName, getUserContent(_workflowName, 
Scope.WORKFLOW));
+      String jobStoreContent = constructContentStoreResultString(_jobName, 
getUserContent(_jobName, Scope.JOB));
+      String taskStoreContent = constructContentStoreResultString(_taskName, 
getUserContent(_taskName, Scope.TASK));
+      putUserContent(TaskDumpResultKey.WorkflowContent.name(), 
workflowStoreContent, Scope.WORKFLOW);
+      putUserContent(TaskDumpResultKey.JobContent.name(), jobStoreContent, 
Scope.JOB);
+      putUserContent(TaskDumpResultKey.TaskContent.name(), taskStoreContent, 
Scope.TASK);
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
+    }
+  }
+
+  private static String constructContentStoreResultString(String key, String 
value) {
+    return String.format("%s::%s", key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java 
b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
deleted file mode 100644
index 392c278..0000000
--- 
a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.MockTask;
-import org.apache.helix.integration.task.TaskTestBase;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.tools.ClusterSetup;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestGetUserContentStore extends TaskTestBase {
-  private static final String JOB_COMMAND = "DummyCommand";
-  private Map<String, String> _jobCommandMap;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    _participants = new MockParticipantManager[_numNodes];
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-
-    // Setup cluster and instances
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < _numNodes; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
-      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    // start dummy participants
-    for (int i = 0; i < _numNodes; i++) {
-      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
-
-      // Set task callbacks
-      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
-      TaskFactory shortTaskFactory = new TaskFactory() {
-        @Override
-        public Task createNewTask(TaskCallbackContext context) {
-          return new WriteTask(context);
-        }
-      };
-      taskFactoryReg.put("WriteTask", shortTaskFactory);
-
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = 
_participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-      _participants[i].syncStart();
-    }
-
-    // Start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
-    _controller.syncStart();
-
-    // Start an admin connection
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
-
-    _jobCommandMap = new HashMap<>();
-  }
-
-  @Test
-  public void testGetUserContentStore() throws InterruptedException {
-    String workflowName = TestHelper.getTestMethodName();
-    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
-    WorkflowConfig.Builder configBuilder = new 
WorkflowConfig.Builder(workflowName);
-    configBuilder.setAllowOverlapJobAssignment(true);
-    workflowBuilder.setWorkflowConfig(configBuilder.build());
-
-    List<String> jobsThatRan = new ArrayList<>();
-    // Create 5 jobs with 1 WriteTask each
-    for (int i = 0; i < 5; i++) {
-      List<TaskConfig> taskConfigs = new ArrayList<>();
-      taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, 
String>()));
-      JobConfig.Builder jobConfigBulider = new 
JobConfig.Builder().setCommand(JOB_COMMAND)
-          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap);
-      workflowBuilder.addJob("JOB" + i, jobConfigBulider);
-      jobsThatRan.add(workflowName + "_JOB" + i);
-    }
-
-    // Start the workflow and wait until completion
-    _driver.start(workflowBuilder.build());
-    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
-
-    // Aggregate key-value mappings in UserContentStore
-    int runCount = 0;
-    for (String jobName : jobsThatRan) {
-      String value = _driver.getUserContent(jobName, 
UserContentStore.Scope.WORKFLOW, workflowName,
-          jobName, null);
-      runCount += Integer.parseInt(value);
-    }
-    Assert.assertEquals(runCount, 5);
-  }
-
-  /**
-   * A mock task that writes to UserContentStore. MockTask extends 
UserContentStore.
-   */
-  private class WriteTask extends MockTask {
-
-    public WriteTask(TaskCallbackContext context) {
-      super(context);
-    }
-
-    @Override
-    public TaskResult run() {
-      putUserContent(_jobName, Integer.toString(1), Scope.WORKFLOW);
-      return new TaskResult(TaskResult.Status.COMPLETED, "");
-    }
-  }
-}

Reply via email to