Add StrictMatchExternalViewVerifier that verifies whether the ExternalViews of given resources (or all resources in the cluster) match exactly as its ideal mapping (in idealstate).
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/981d0e29 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/981d0e29 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/981d0e29 Branch: refs/heads/helix-0.6.x Commit: 981d0e295e01cd0839c8d5d0e54350f794ad52f7 Parents: 04495e7 Author: Lei Xia <l...@linkedin.com> Authored: Thu Jul 21 11:29:02 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Sun Feb 5 18:57:09 2017 -0800 ---------------------------------------------------------------------- .../BestPossibleExternalViewVerifier.java | 18 +- .../StrictMatchExternalViewVerifier.java | 331 +++++++++++++++++++ .../apache/helix/tools/TestClusterVerifier.java | 82 +++-- 3 files changed, 391 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/981d0e29/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java index b350d91..295f5f8 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java @@ -82,6 +82,10 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { private String _zkAddr; private ZkClient _zkClient; + public Builder(String clusterName) { + _clusterName = clusterName; + } + public BestPossibleExternalViewVerifier build() { if (_clusterName == null || (_zkAddr == null && _zkClient == null)) { throw new IllegalArgumentException("Cluster name or zookeeper info is missing!"); @@ -99,11 +103,6 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { return _clusterName; } - public Builder setClusterName(String clusterName) { - _clusterName = clusterName; - return this; - } - public Map<String, Map<String, String>> getErrStates() { return _errStates; } @@ -333,6 +332,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { runStage(event, new ResourceComputationStage()); runStage(event, new CurrentStateComputationStage()); + + // TODO: be caution here, should be handled statelessly. runStage(event, new BestPossibleStateCalcStage()); BestPossibleStateOutput output = @@ -351,9 +352,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { @Override public String toString() { - String verifierName = getClass().getName(); - verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length()); - return verifierName + "(" + _clusterName + "@" + _zkClient.getServers() + "@resources[" - + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])"; + String verifierName = getClass().getSimpleName(); + return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources[" + + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])"; } } http://git-wip-us.apache.org/repos/asf/helix/blob/981d0e29/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java new file mode 100644 index 0000000..8b5bb77 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java @@ -0,0 +1,331 @@ +package org.apache.helix.tools.ClusterStateVerifier; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.PropertyKey; +import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.task.TaskConstants; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Verifier that verifies whether the ExternalViews of given resources (or all resources in the cluster) + * match exactly as its ideal mapping (in idealstate). + */ +public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier { + private static Logger LOG = Logger.getLogger(StrictMatchExternalViewVerifier.class); + + private final Set<String> _resources; + private final Set<String> _expectLiveInstances; + + public StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources, + Set<String> expectLiveInstances) { + super(zkAddr, clusterName); + _resources = resources; + _expectLiveInstances = expectLiveInstances; + } + + public StrictMatchExternalViewVerifier(ZkClient zkClient, String clusterName, + Set<String> resources, Set<String> expectLiveInstances) { + super(zkClient, clusterName); + _resources = resources; + _expectLiveInstances = expectLiveInstances; + } + + public static class Builder { + private String _clusterName; + private Set<String> _resources; + private Set<String> _expectLiveInstances; + private String _zkAddr; + private ZkClient _zkClient; + + public StrictMatchExternalViewVerifier build() { + if (_clusterName == null || (_zkAddr == null && _zkClient == null)) { + throw new IllegalArgumentException("Cluster name or zookeeper info is missing!"); + } + + if (_zkClient != null) { + return new StrictMatchExternalViewVerifier(_zkClient, _clusterName, _resources, + _expectLiveInstances); + } + return new StrictMatchExternalViewVerifier(_zkAddr, _clusterName, _resources, + _expectLiveInstances); + } + + public Builder(String clusterName) { + _clusterName = clusterName; + } + + public String getClusterName() { + return _clusterName; + } + + public Set<String> getResources() { + return _resources; + } + + public Builder setResources(Set<String> resources) { + _resources = resources; + return this; + } + + public Set<String> getExpectLiveInstances() { + return _expectLiveInstances; + } + + public Builder setExpectLiveInstances(Set<String> expectLiveInstances) { + _expectLiveInstances = expectLiveInstances; + return this; + } + + public String getZkAddr() { + return _zkAddr; + } + + public Builder setZkAddr(String zkAddr) { + _zkAddr = zkAddr; + return this; + } + + public ZkClient getZkClient() { + return _zkClient; + } + + public Builder setZkClient(ZkClient zkClient) { + _zkClient = zkClient; + return this; + } + } + + @Override + public boolean verify(long timeout) { + return verifyByZkCallback(timeout); + } + + @Override + public boolean verifyByZkCallback(long timeout) { + List<ClusterVerifyTrigger> triggers = new ArrayList<ClusterVerifyTrigger>(); + + // setup triggers + if (_resources != null && !_resources.isEmpty()) { + for (String resource : _resources) { + triggers + .add(new ClusterVerifyTrigger(_keyBuilder.idealStates(resource), true, false, false)); + triggers + .add(new ClusterVerifyTrigger(_keyBuilder.externalView(resource), true, false, false)); + } + + } else { + triggers.add(new ClusterVerifyTrigger(_keyBuilder.idealStates(), false, true, true)); + triggers.add(new ClusterVerifyTrigger(_keyBuilder.externalViews(), false, true, true)); + } + + return verifyByCallback(timeout, triggers); + } + + @Override + protected boolean verifyState() { + try { + PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); + // read cluster once and do verification + ClusterDataCache cache = new ClusterDataCache(); + cache.refresh(_accessor); + + Map<String, IdealState> idealStates = cache.getIdealStates(); + if (idealStates == null) { + // ideal state is null because ideal state is dropped + idealStates = Collections.emptyMap(); + } + + // filter out all resources that use Task state model + Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, IdealState> pair = it.next(); + if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + it.remove(); + } + } + + // verify live instances. + if (_expectLiveInstances != null && !_expectLiveInstances.isEmpty()) { + Set<String> actualLiveNodes = cache.getLiveInstances().keySet(); + if (!_expectLiveInstances.equals(actualLiveNodes)) { + return false; + } + } + + Map<String, ExternalView> extViews = _accessor.getChildValuesMap(keyBuilder.externalViews()); + if (extViews == null) { + extViews = Collections.emptyMap(); + } + + // Filter resources if requested + if (_resources != null && !_resources.isEmpty()) { + idealStates.keySet().retainAll(_resources); + extViews.keySet().retainAll(_resources); + } + + // if externalView is not empty and idealState doesn't exist + // add empty idealState for the resource + for (String resource : extViews.keySet()) { + if (!idealStates.containsKey(resource)) { + idealStates.put(resource, new IdealState(resource)); + } + } + + for (String resourceName : idealStates.keySet()) { + ExternalView extView = extViews.get(resourceName); + IdealState idealState = idealStates.get(resourceName); + if (extView == null) { + if (idealState.isExternalViewDisabled()) { + continue; + } else { + LOG.debug("externalView for " + resourceName + " is not available"); + return false; + } + } + + boolean result = verifyExternalView(cache, extView, idealState); + if (!result) { + return false; + } + } + return true; + } catch (Exception e) { + LOG.error("exception in verification", e); + return false; + } + } + + private boolean verifyExternalView(ClusterDataCache dataCache, ExternalView externalView, + IdealState idealState) { + Map<String, Map<String, String>> mappingInExtview = externalView.getRecord().getMapFields(); + Map<String, Map<String, String>> idealPartitionState; + + switch (idealState.getRebalanceMode()) { + case FULL_AUTO: + case SEMI_AUTO: + case USER_DEFINED: + idealPartitionState = computeIdealPartitionState(dataCache, idealState); + break; + case CUSTOMIZED: + idealPartitionState = idealState.getRecord().getMapFields(); + break; + case TASK: + // ignore jobs + default: + return true; + } + + return mappingInExtview.equals(idealPartitionState); + } + + private Map<String, Map<String, String>> computeIdealPartitionState(ClusterDataCache cache, + IdealState idealState) { + String stateModelDefName = idealState.getStateModelDefRef(); + StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName); + + Map<String, Map<String, String>> idealPartitionState = + new HashMap<String, Map<String, String>>(); + + Set<String> liveEnabledInstances = new HashSet<String>(cache.getLiveInstances().keySet()); + liveEnabledInstances.removeAll(cache.getDisabledInstances()); + + for (String partition : idealState.getPartitionSet()) { + List<String> preferenceList = ConstraintBasedAssignment + .getPreferenceList(cache, new Partition(partition), idealState, stateModelDef); + Map<String, String> idealMapping = + computeIdealMapping(preferenceList, stateModelDef, liveEnabledInstances); + idealPartitionState.put(partition, idealMapping); + } + + return idealPartitionState; + } + + /** + * compute the ideal mapping for resource in SEMI-AUTO based on its preference list + */ + private Map<String, String> computeIdealMapping(List<String> instancePreferenceList, + StateModelDefinition stateModelDef, Set<String> liveEnabledInstances) { + Map<String, String> instanceStateMap = new HashMap<String, String>(); + + if (instancePreferenceList == null) { + return instanceStateMap; + } + + List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); + boolean assigned[] = new boolean[instancePreferenceList.size()]; + + for (String state : statesPriorityList) { + String num = stateModelDef.getNumInstancesPerState(state); + int stateCount = -1; + if ("N".equals(num)) { + stateCount = liveEnabledInstances.size(); + } else if ("R".equals(num)) { + stateCount = instancePreferenceList.size(); + } else { + try { + stateCount = Integer.parseInt(num); + } catch (Exception e) { + LOG.error("Invalid count for state:" + state + " ,count=" + num); + } + } + if (stateCount > 0) { + int count = 0; + for (int i = 0; i < instancePreferenceList.size(); i++) { + String instanceName = instancePreferenceList.get(i); + + if (!assigned[i]) { + instanceStateMap.put(instanceName, state); + count = count + 1; + assigned[i] = true; + if (count == stateCount) { + break; + } + } + } + } + } + + return instanceStateMap; + } + + @Override + public String toString() { + String verifierName = getClass().getSimpleName(); + return verifierName + "(" + _clusterName + "@" + _zkClient.getServers() + "@resources[" + + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])"; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/981d0e29/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java index 92d8640..5e251c8 100644 --- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java +++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java @@ -30,6 +30,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterStateVerifier.HelixClusterVerifier; +import org.apache.helix.tools.ClusterStateVerifier.StrictMatchExternalViewVerifier; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -39,48 +40,50 @@ import java.util.Arrays; public class TestClusterVerifier extends ZkUnitTestBase { final String[] RESOURCES = { - "resource0", "resource1" + "resource0", "resource1", "resource2", "resource3" }; private HelixAdmin _admin; private MockParticipantManager[] _participants; private ClusterControllerManager _controller; private String _clusterName; + private ClusterSetup _setupTool; @BeforeMethod public void beforeMethod() throws InterruptedException { - final int NUM_PARTITIONS = 1; - final int NUM_REPLICAS = 1; + final int NUM_PARTITIONS = 10; + final int NUM_REPLICAS = 3; // Cluster and resource setup String className = TestHelper.getTestClassName(); String methodName = TestHelper.getTestMethodName(); _clusterName = className + "_" + methodName; - ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); - _admin = setupTool.getClusterManagementTool(); - setupTool.addCluster(_clusterName, true); - setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS, + _setupTool = new ClusterSetup(ZK_ADDR); + _admin = _setupTool.getClusterManagementTool(); + _setupTool.addCluster(_clusterName, true); + _setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.toString()); - setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS, + _setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS, BuiltInStateModelDefinitions.OnlineOffline.name(), RebalanceMode.SEMI_AUTO.toString()); + _setupTool.addResourceToCluster(_clusterName, RESOURCES[2], NUM_PARTITIONS, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.toString()); + _setupTool.addResourceToCluster(_clusterName, RESOURCES[3], NUM_PARTITIONS, + BuiltInStateModelDefinitions.OnlineOffline.name(), RebalanceMode.FULL_AUTO.toString()); + // Configure and start the participants _participants = new MockParticipantManager[RESOURCES.length]; for (int i = 0; i < _participants.length; i++) { String host = "localhost"; int port = 12918 + i; String id = host + '_' + port; - setupTool.addInstanceToCluster(_clusterName, id); + _setupTool.addInstanceToCluster(_clusterName, id); _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, id); _participants[i].syncStart(); } // Rebalance the resources for (int i = 0; i < RESOURCES.length; i++) { - IdealState idealState = _admin.getResourceIdealState(_clusterName, RESOURCES[i]); - idealState.setReplicas(Integer.toString(NUM_REPLICAS)); - idealState.getRecord().setListField(RESOURCES[i] + "_0", - Arrays.asList(_participants[i].getInstanceName())); - _admin.setResourceIdealState(_clusterName, RESOURCES[i], idealState); + _setupTool.rebalanceResource(_clusterName, RESOURCES[i], NUM_REPLICAS); } // Start the controller @@ -99,41 +102,58 @@ public class TestClusterVerifier extends ZkUnitTestBase { _admin.dropCluster(_clusterName); } - @Test public void testEntireCluster() { + @Test + public void testEntireCluster() throws InterruptedException { // Just ensure that the entire cluster passes // ensure that the external view coalesces - HelixClusterVerifier verifier = - new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName) - .setZkClient(_gZkClient).build(); + HelixClusterVerifier bestPossibleVerifier = + new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build(); + Assert.assertTrue(bestPossibleVerifier.verify(10000)); - boolean result = verifier.verify(10000); - Assert.assertTrue(result); + HelixClusterVerifier strictMatchVerifier = + new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build(); + Assert.assertTrue(strictMatchVerifier.verify(10000)); + + _participants[0].syncStop(); + Thread.sleep(1000); + Assert.assertFalse(strictMatchVerifier.verify(10000)); } - @Test - public void testResourceSubset() throws InterruptedException { + @Test public void testResourceSubset() throws InterruptedException { + String testDB = "resource-testDB"; + _setupTool.addResourceToCluster(_clusterName, testDB, 1, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.toString()); + + IdealState idealState = _admin.getResourceIdealState(_clusterName, testDB); + idealState.setReplicas(Integer.toString(2)); + idealState.getRecord().setListField(testDB + "_0", + Arrays.asList(_participants[1].getInstanceName(), _participants[2].getInstanceName())); + _admin.setResourceIdealState(_clusterName, testDB, idealState); + // Ensure that this passes even when one resource is down _admin.enableInstance(_clusterName, "localhost_12918", false); Thread.sleep(1000); _admin.enableCluster(_clusterName, false); _admin.enableInstance(_clusterName, "localhost_12918", true); - HelixClusterVerifier verifier = - new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName) - .setZkClient(_gZkClient).setResources(Sets.newHashSet(RESOURCES[1])).build(); + new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient) + .setResources(Sets.newHashSet(testDB)).build(); + Assert.assertTrue(verifier.verify()); - boolean result = verifier.verify(); - Assert.assertTrue(result); + verifier = new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient) + .setResources(Sets.newHashSet(testDB)).build(); + Assert.assertTrue(verifier.verify()); // But the full cluster verification should fail + verifier = + new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build(); + Assert.assertFalse(verifier.verify()); verifier = - new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName) - .setZkClient(_gZkClient).build(); + new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build(); + Assert.assertFalse(verifier.verify()); - result = verifier.verify(); - Assert.assertFalse(result); _admin.enableCluster(_clusterName, true); } }