Fix task assignment in instance group tag check

1. Fixed task assignment with instacnce group. When target state is not set, it 
ont check the state of instance.
2. Add a new test to assign task. The instance will be assign to has been 
disabled. So the task should be hanging there.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9a30df46
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9a30df46
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9a30df46

Branch: refs/heads/helix-0.6.x
Commit: 9a30df46bc054514a708b5070f166ee5aa2e7cd1
Parents: 9508a1a
Author: Junkai Xue <j...@linkedin.com>
Authored: Fri May 6 12:05:30 2016 -0700
Committer: Lei Xia <l...@linkedin.com>
Committed: Tue Jul 5 16:19:11 2016 -0700

----------------------------------------------------------------------
 .../FixedTargetTaskAssignmentCalculator.java    |  3 +-
 .../integration/task/TestTaskAssignment.java    | 62 ++++++++++++++++++++
 .../task/TestTaskWithInstanceDisabled.java      |  4 +-
 3 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9a30df46/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 0a2e8c5..09db616 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -168,8 +168,7 @@ public class FixedTargetTaskAssignmentCalculator extends 
TaskAssignmentCalculato
           String s =
               currStateOutput.getCurrentState(tgtIs.getResourceName(), new 
Partition(pName),
                   instance);
-          String state = (s == null ? null : s.toString());
-          if (tgtStates == null || tgtStates.contains(state)) {
+          if (s != null && (tgtStates == null || tgtStates.contains(s))) {
             result.get(instance).add(pId);
           }
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/9a30df46/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
new file mode 100644
index 0000000..df976b1
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
@@ -0,0 +1,62 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskAssignment extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numNodes = 2;
+    _numParitions = 1;
+    _numReplicas = 1;
+    _instanceGroupTag = true;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testTaskAssignment() throws InterruptedException {
+    _setupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + 
0), false);
+    String jobResource = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new 
JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB);
+
+    Workflow flow =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, 
jobBuilder).build();
+    _driver.start(flow);
+
+    // Wait 1 sec. The task should not be complete since it is not assigned.
+    Thread.sleep(1000L);
+
+    // The task is not assigned so the task state should be null in this case.
+    Assert.assertNull(
+        
_driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)).getPartitionState(0));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9a30df46/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
index 1c5bd36..84e5168 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
@@ -36,7 +36,7 @@ public class TestTaskWithInstanceDisabled extends 
TaskTestBase {
     _numDbs = 1;
     _numNodes = 2;
     _numParitions = 1;
-    _numReplicas = 1;
+    _numReplicas = 2;
     _partitionVary = false;
     super.beforeClass();
   }
@@ -51,7 +51,7 @@ public class TestTaskWithInstanceDisabled extends 
TaskTestBase {
         WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, 
jobBuilder).build();
     _driver.start(flow);
 
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, 
TaskState.COMPLETED, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, 
TaskState.COMPLETED);
     JobContext ctx = 
_driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     Assert.assertEquals(ctx.getAssignedParticipant(0), PARTICIPANT_PREFIX + 
"_" + (_startPort + 1));
   }

Reply via email to