Fix task assignment in instance group tag check 1. Fixed task assignment with instacnce group. When target state is not set, it ont check the state of instance. 2. Add a new test to assign task. The instance will be assign to has been disabled. So the task should be hanging there.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9a30df46 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9a30df46 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9a30df46 Branch: refs/heads/helix-0.6.x Commit: 9a30df46bc054514a708b5070f166ee5aa2e7cd1 Parents: 9508a1a Author: Junkai Xue <j...@linkedin.com> Authored: Fri May 6 12:05:30 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Tue Jul 5 16:19:11 2016 -0700 ---------------------------------------------------------------------- .../FixedTargetTaskAssignmentCalculator.java | 3 +- .../integration/task/TestTaskAssignment.java | 62 ++++++++++++++++++++ .../task/TestTaskWithInstanceDisabled.java | 4 +- 3 files changed, 65 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/9a30df46/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java index 0a2e8c5..09db616 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java @@ -168,8 +168,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato String s = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance); - String state = (s == null ? null : s.toString()); - if (tgtStates == null || tgtStates.contains(state)) { + if (s != null && (tgtStates == null || tgtStates.contains(s))) { result.get(instance).add(pId); } } http://git-wip-us.apache.org/repos/asf/helix/blob/9a30df46/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java new file mode 100644 index 0000000..df976b1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java @@ -0,0 +1,62 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestTaskAssignment extends TaskTestBase { + + @BeforeClass + public void beforeClass() throws Exception { + _numDbs = 1; + _numNodes = 2; + _numParitions = 1; + _numReplicas = 1; + _instanceGroupTag = true; + super.beforeClass(); + } + + @Test + public void testTaskAssignment() throws InterruptedException { + _setupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_" + (_startPort + 0), false); + String jobResource = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB); + + Workflow flow = + WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); + _driver.start(flow); + + // Wait 1 sec. The task should not be complete since it is not assigned. + Thread.sleep(1000L); + + // The task is not assigned so the task state should be null in this case. + Assert.assertNull( + _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)).getPartitionState(0)); + + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/9a30df46/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java index 1c5bd36..84e5168 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java @@ -36,7 +36,7 @@ public class TestTaskWithInstanceDisabled extends TaskTestBase { _numDbs = 1; _numNodes = 2; _numParitions = 1; - _numReplicas = 1; + _numReplicas = 2; _partitionVary = false; super.beforeClass(); } @@ -51,7 +51,7 @@ public class TestTaskWithInstanceDisabled extends TaskTestBase { WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build(); _driver.start(flow); - TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED); JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)); Assert.assertEquals(ctx.getAssignedParticipant(0), PARTICIPANT_PREFIX + "_" + (_startPort + 1)); }