alirezazamani commented on a change in pull request #923: Fix the scheduling 
decision for multiple currentStates
URL: https://github.com/apache/helix/pull/923#discussion_r402011463
 
 

 ##########
 File path: 
helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
 ##########
 @@ -0,0 +1,348 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.common.caches.TaskDataCache;
+import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTargetedTaskStateChange {
+  private static final String CLUSTER_NAME = "TestCluster";
+  private static final String INSTANCE_PREFIX = "Instance_";
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final String WORKFLOW_NAME = "TestWorkflow";
+  private static final String JOB_NAME = "TestJob";
+  private static final String PARTITION_NAME = "0";
+  private static final String TARGET_RESOURCES = "TestDB";
+  private static final int NUM_TASKS = 1;
+  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigs;
+  private ClusterConfig _clusterConfig;
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + this.getClass().getSimpleName() + " at " + new 
Date(System.currentTimeMillis()));
+    // Populate live instances and their corresponding instance configs
+    _liveInstances = new HashMap<>();
+    _instanceConfigs = new HashMap<>();
+    _clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = INSTANCE_PREFIX + i;
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      _liveInstances.put(instanceName, liveInstance);
+      _instanceConfigs.put(instanceName, instanceConfig);
+    }
+    _assignableInstanceManager = new AssignableInstanceManager();
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there are two 
current states for two
+   * different instances.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Running
+   * CurrentState: Instance0: Running, Instance1: Running
+   * Expected paMap: Instance0 -> Dropped
+   */
+  @Test
+  public void testTwoRunningCurrentStates() {
+    MockTestInformation mock = new MockTestInformation();
+    
when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    
when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, 
mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    
when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    
when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, 
mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput, 
bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.DROPPED.name(), 
bestPossibleStateOutput
+        
.getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX
 + "0"));
+  }
+
+  /**
+   * This test checks the behaviour of the controller while there is one 
current state which is
+   * different from
+   * Previous Assignment information.
+   * Scenario:
+   * Instance0: Slave, Instance1: Master, Instance2: Slave
+   * PreviousAssignment of Task: Instance0: Dropped
+   * CurrentState: Instance0: Running
+   * Expected paMap: Instance1 -> Running
+   */
+  @Test
+  public void testOneRunningOneNull() {
+    MockTestInformation mock = new MockTestInformation();
+    
when(mock.cache.getWorkflowConfig(WORKFLOW_NAME)).thenReturn(mock._workflowConfig);
+    when(mock.cache.getJobConfig(JOB_NAME)).thenReturn(mock._jobConfig);
+    when(mock.cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
+    when(mock.cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
+    when(mock.cache.getIdealStates()).thenReturn(mock._idealStates);
+    
when(mock.cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock.cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock.cache.getTaskDataCache().getPreviousAssignment(JOB_NAME))
+        .thenReturn(mock._resourceAssignment2);
+    when(mock.cache.getClusterConfig()).thenReturn(_clusterConfig);
+    
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
+    _assignableInstanceManager.buildAssignableInstances(_clusterConfig, 
mock._taskDataCache,
+        _liveInstances, _instanceConfigs);
+    
when(mock.cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
+    
when(mock.cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    Set<String> inflightJobDag = new HashSet<>();
+    inflightJobDag.add(JOB_NAME);
+    
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
+        .thenReturn(inflightJobDag);
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    WorkflowDispatcher workflowDispatcher = new WorkflowDispatcher();
+    workflowDispatcher.updateCache(mock.cache);
+    workflowDispatcher.updateWorkflowStatus(WORKFLOW_NAME, 
mock._workflowConfig,
+        mock._workflowContext, mock._currentStateOutput2, 
bestPossibleStateOutput);
+    Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
+    Assert.assertEquals(TaskPartitionState.RUNNING.name(), 
bestPossibleStateOutput
+        
.getPartitionStateMap(JOB_NAME).getPartitionMap(taskPartition).get(INSTANCE_PREFIX
 + "1"));
+  }
+
+  private WorkflowConfig prepareWorkflowConfig() {
+    WorkflowConfig.Builder workflowConfigBuilder = new 
WorkflowConfig.Builder();
+    workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
+    workflowConfigBuilder.setTerminable(false);
+    workflowConfigBuilder.setTargetState(TargetState.START);
+    workflowConfigBuilder.setJobQueue(true);
+    JobDag jobDag = new JobDag();
+    jobDag.addNode(JOB_NAME);
+    workflowConfigBuilder.setJobDag(jobDag);
+    WorkflowConfig workflowConfig = workflowConfigBuilder.build();
+
+    return workflowConfig;
+  }
+
+  private JobConfig prepareJobConfig() {
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
+    jobConfigBuilder.setCommand("TestCommand");
+    jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
+    jobConfigBuilder.setJobId(JOB_NAME);
+    List<String> targetPartition = new ArrayList<>();
+    targetPartition.add(TARGET_RESOURCES + "_0");
+    jobConfigBuilder.setTargetPartitions(targetPartition);
+    Set<String> targetPartitionStates = new HashSet<>();
+    targetPartitionStates.add("MASTER");
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    taskConfigBuilder.setTaskId("0");
+    taskConfigs.add(taskConfigBuilder.build());
+    jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
+    jobConfigBuilder.addTaskConfigs(taskConfigs);
+    JobConfig jobConfig = jobConfigBuilder.build();
+    return jobConfig;
+  }
+
+  private WorkflowContext prepareWorkflowContext() {
+    ZNRecord record = new ZNRecord(WORKFLOW_NAME);
+    
record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(),
 "0");
+    
record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), 
WORKFLOW_NAME);
+    
record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
+        TaskState.IN_PROGRESS.name());
+    Map<String, String> jobState = new HashMap<>();
+    jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
+    
record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), 
jobState);
+    return new WorkflowContext(record);
+  }
+
+  private JobContext prepareJobContext(String instance) {
+    Set<Integer> _taskPartitionSet;
+    Map<Integer, TaskPartitionState> _taskPartitionStateMap;
+    Map<Integer, String> _partitionToTaskIDMap;
+    Map<Integer, String> _taskToInstanceMap;
+    _taskPartitionSet = new HashSet<>();
+    _taskPartitionStateMap = new HashMap<>();
+    _partitionToTaskIDMap = new HashMap<>();
+    _taskToInstanceMap = new HashMap<>();
 
 Review comment:
   I removed them. Realized we do not need them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to