zhangmeng916 commented on a change in pull request #1747: URL: https://github.com/apache/helix/pull/1747#discussion_r637432144
########## File path: helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizer.java ########## @@ -0,0 +1,201 @@ +package org.apache.helix.rest.server; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.IdealState; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestResourceAssignmentOptimizer extends AbstractTestClass { + + String cluster = "TestCluster_2"; + String instance1 = cluster + "dummyInstance_localhost_12930"; + String swapNewInstance = "swapNewInstance"; + String urlBase = "clusters/TestCluster_2/partitionAssignment/"; + String swapOldInstance, toRemoveInstance; + HelixDataAccessor helixDataAccessor; + List<String> resources; + List<String> liveInstances; + + @BeforeClass + public void beforeClass() { + helixDataAccessor = new ZKHelixDataAccessor(cluster, _baseAccessor); + _gSetupTool.addInstanceToCluster(cluster, instance1); + resources = _gSetupTool.getClusterManagementTool().getResourcesInCluster(cluster); + liveInstances = helixDataAccessor.getChildNames(helixDataAccessor.keyBuilder().liveInstances()); + Assert.assertFalse(resources.isEmpty() || liveInstances.isEmpty()); + + toRemoveInstance = liveInstances.get(0); + swapOldInstance = liveInstances.get(1); + + } + + @Test + public void testComputePartitionAssignment() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + // set all resource to FULL_AUTO except one + for (int i = 0; i < resources.size() - 1; ++i) { + String resource = resources.get(i); + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource); + idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); Review comment: SEMI_AUTO case is not tested. Shall we also test that case? ########## File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java ########## @@ -0,0 +1,347 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.rest.common.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.util.HelixUtil.computeIdealMapping; +import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto; +import static org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto; + + +@Path("/clusters/{clusterId}/partitionAssignment") +public class ResourceAssignmentOptimizer extends AbstractHelixResource { + private static Logger LOG = LoggerFactory + .getLogger(org.apache.helix.rest.server.resources.helix.ClusterAccessor.class.getName()); Review comment: We should log this class's name here. ########## File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java ########## @@ -0,0 +1,347 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.rest.common.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.util.HelixUtil.computeIdealMapping; +import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto; +import static org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto; + + +@Path("/clusters/{clusterId}/partitionAssignment") +public class ResourceAssignmentOptimizer extends AbstractHelixResource { + private static Logger LOG = LoggerFactory + .getLogger(org.apache.helix.rest.server.resources.helix.ClusterAccessor.class.getName()); + + private static class InputFields { + Set<String> resourceFilter = new HashSet<>(); + Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance. + List<String> newInstances = new ArrayList<>(); + List<String> instancesToRemove = new ArrayList<>(); + Set<String> instanceFilter = new HashSet<>(); + } + + private static class ClusterState { + List<InstanceConfig> instanceConfigs = new ArrayList<>(); + ClusterConfig clusterConfig; + List<String> resources = new ArrayList<>(); + List<String> instances; // cluster LiveInstance + addInstances - instancesToRemove. + } + + // Result format: Map of resource -> partition -> instance -> state. + private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> { + public AssignmentResult() { + super(); + } + } + + @ResponseMetered(name = HttpConstants.WRITE_REQUEST) + @Timed(name = HttpConstants.WRITE_REQUEST) + @POST + public Response computePotentialAssignment(@PathParam("clusterId") String clusterId, + String content) { + + InputFields inputFields; + ClusterState clusterState; + AssignmentResult result; + + try { + // 1. Try to parse the content string. If parseable, use it as a KV mapping. Otherwise, return a REASON String + inputFields = readInput(content); + // 2. Read cluster status from ZK. + clusterState = readClusterStateAndValidateInput(clusterId, inputFields); + // 3. Call rebalancer tools for each resource. + result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId); + // 4. Serialize result to JSON and return. + return JSONRepresentation(result); + } catch (InvalidParameterException ex) { + return badRequest(ex.getMessage()); + } catch (JsonProcessingException e) { + return badRequest("Invalid input: Input can but be parsed into a KV map." + e.getMessage()); + } catch (OutOfMemoryError e) { + LOG.error( + "OutOfMemoryError while calling AssignmentResult" + Arrays.toString(e.getStackTrace())); + return badRequest( + "Response size is too large to serialize. Please query by resources or instance filter"); + } catch (Exception e) { + LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace())); + return badRequest("Failed to compute partition assignment: " + e); + } + } + + private InputFields readInput(String content) + throws InvalidParameterException, JsonProcessingException { + + Map<String, Map<String, Object>> customFieldsMap; + InputFields inputMap = new InputFields(); + customFieldsMap = + OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, Map<String, Object>>>() { + }); + + // Content is given as a KV mapping. + for (Map.Entry<String, Map<String, Object>> entry : customFieldsMap.entrySet()) { + String key = entry.getKey(); + switch (key) { + case "InstanceChange": Review comment: I'd suggest to put the acceptable commands in an enum so that it is easy to track and validate. You can check the `Command` enum in `AbstractResource` ########## File path: helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizer.java ########## @@ -0,0 +1,201 @@ +package org.apache.helix.rest.server; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.IdealState; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestResourceAssignmentOptimizer extends AbstractTestClass { + + String cluster = "TestCluster_2"; + String instance1 = cluster + "dummyInstance_localhost_12930"; + String swapNewInstance = "swapNewInstance"; + String urlBase = "clusters/TestCluster_2/partitionAssignment/"; + String swapOldInstance, toRemoveInstance; + HelixDataAccessor helixDataAccessor; + List<String> resources; + List<String> liveInstances; + + @BeforeClass + public void beforeClass() { + helixDataAccessor = new ZKHelixDataAccessor(cluster, _baseAccessor); + _gSetupTool.addInstanceToCluster(cluster, instance1); + resources = _gSetupTool.getClusterManagementTool().getResourcesInCluster(cluster); + liveInstances = helixDataAccessor.getChildNames(helixDataAccessor.keyBuilder().liveInstances()); + Assert.assertFalse(resources.isEmpty() || liveInstances.isEmpty()); + + toRemoveInstance = liveInstances.get(0); + swapOldInstance = liveInstances.get(1); + Review comment: NIT: There're some blank lines in some files, and some typos in comments. Maybe you can double check and reformat. ########## File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java ########## @@ -0,0 +1,347 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.rest.common.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.util.HelixUtil.computeIdealMapping; +import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto; +import static org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto; + + +@Path("/clusters/{clusterId}/partitionAssignment") +public class ResourceAssignmentOptimizer extends AbstractHelixResource { + private static Logger LOG = LoggerFactory + .getLogger(org.apache.helix.rest.server.resources.helix.ClusterAccessor.class.getName()); + + private static class InputFields { + Set<String> resourceFilter = new HashSet<>(); + Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance. + List<String> newInstances = new ArrayList<>(); + List<String> instancesToRemove = new ArrayList<>(); + Set<String> instanceFilter = new HashSet<>(); Review comment: Is there a reason that some fields are list, while others are set? Keep them consistent is better. ########## File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java ########## @@ -0,0 +1,347 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.rest.common.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.util.HelixUtil.computeIdealMapping; +import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto; +import static org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto; + + +@Path("/clusters/{clusterId}/partitionAssignment") +public class ResourceAssignmentOptimizer extends AbstractHelixResource { + private static Logger LOG = LoggerFactory + .getLogger(org.apache.helix.rest.server.resources.helix.ClusterAccessor.class.getName()); + + private static class InputFields { + Set<String> resourceFilter = new HashSet<>(); + Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance. + List<String> newInstances = new ArrayList<>(); + List<String> instancesToRemove = new ArrayList<>(); + Set<String> instanceFilter = new HashSet<>(); + } + + private static class ClusterState { + List<InstanceConfig> instanceConfigs = new ArrayList<>(); + ClusterConfig clusterConfig; + List<String> resources = new ArrayList<>(); + List<String> instances; // cluster LiveInstance + addInstances - instancesToRemove. + } + + // Result format: Map of resource -> partition -> instance -> state. + private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> { + public AssignmentResult() { + super(); + } + } + + @ResponseMetered(name = HttpConstants.WRITE_REQUEST) + @Timed(name = HttpConstants.WRITE_REQUEST) + @POST + public Response computePotentialAssignment(@PathParam("clusterId") String clusterId, + String content) { + + InputFields inputFields; + ClusterState clusterState; + AssignmentResult result; + + try { + // 1. Try to parse the content string. If parseable, use it as a KV mapping. Otherwise, return a REASON String + inputFields = readInput(content); + // 2. Read cluster status from ZK. + clusterState = readClusterStateAndValidateInput(clusterId, inputFields); + // 3. Call rebalancer tools for each resource. + result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId); + // 4. Serialize result to JSON and return. + return JSONRepresentation(result); + } catch (InvalidParameterException ex) { + return badRequest(ex.getMessage()); + } catch (JsonProcessingException e) { + return badRequest("Invalid input: Input can but be parsed into a KV map." + e.getMessage()); + } catch (OutOfMemoryError e) { + LOG.error( + "OutOfMemoryError while calling AssignmentResult" + Arrays.toString(e.getStackTrace())); + return badRequest( + "Response size is too large to serialize. Please query by resources or instance filter"); + } catch (Exception e) { + LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace())); + return badRequest("Failed to compute partition assignment: " + e); + } + } + + private InputFields readInput(String content) + throws InvalidParameterException, JsonProcessingException { + + Map<String, Map<String, Object>> customFieldsMap; + InputFields inputMap = new InputFields(); + customFieldsMap = + OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, Map<String, Object>>>() { + }); + + // Content is given as a KV mapping. + for (Map.Entry<String, Map<String, Object>> entry : customFieldsMap.entrySet()) { + String key = entry.getKey(); + switch (key) { + case "InstanceChange": + for (Map.Entry<String, Object> instanceChange : entry.getValue().entrySet()) { + String instanceChangeKey = instanceChange.getKey(); + if (instanceChangeKey.equals("AddInstances") && (instanceChange + .getValue() instanceof List)) { + inputMap.newInstances.addAll((List<String>) instanceChange.getValue()); + } else if (instanceChangeKey.equals("RemoveInstances") && (instanceChange + .getValue() instanceof List)) { + inputMap.instancesToRemove.addAll((List<String>) instanceChange.getValue()); + } else if (instanceChangeKey.equals("SwapInstances") && (instanceChange + .getValue() instanceof Map)) { + for (Map.Entry<String, String> swapPair : ((Map<String, String>) instanceChange + .getValue()).entrySet()) { + inputMap.nodeSwap.put(swapPair.getKey(), swapPair.getValue()); + } + } else { + throw new InvalidParameterException( + "Unsupported command or invalid format for InstanceChange : " + + instanceChangeKey); + } + } + break; + case "Options": + for (Map.Entry<String, Object> option : entry.getValue().entrySet()) { + String optionKey = option.getKey(); + if (optionKey.equals("ResourceFilter") && (option.getValue() instanceof List)) { + inputMap.resourceFilter.addAll((List<String>) option.getValue()); + } else if (optionKey.equals("InstanceFilter") && (option.getValue() instanceof List)) { + inputMap.instanceFilter.addAll((List<String>) option.getValue()); + } else { + throw new InvalidParameterException( + "Unsupported command for invalid format for Options : " + option); + } + } + break; + default: + throw new InvalidParameterException( + "Unsupported command for partitionAssignment : " + key); + } + } + return inputMap; + } + + private ClusterState readClusterStateAndValidateInput(String clusterId, InputFields inputFields) + throws InvalidParameterException { + + ClusterState clusterState = new ClusterState(); + ConfigAccessor cfgAccessor = getConfigAccessor(); + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + clusterState.resources = dataAccessor.getChildNames(dataAccessor.keyBuilder().idealStates()); + // Add existing live instance and new instance from user input to instances list + clusterState.instances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()); + clusterState.instances.addAll(inputFields.newInstances); + + // Check if to be removed instances and old instances in swap node exist in live instance + if (!inputFields.nodeSwap.isEmpty() || !inputFields.instancesToRemove.isEmpty()) { + Set<String> liveInstanceSet = new HashSet<>(clusterState.instances); + for (Map.Entry<String, String> nodeSwapPair : inputFields.nodeSwap.entrySet()) { + if (!liveInstanceSet.contains(nodeSwapPair.getKey())) { + throw new InvalidParameterException("Invalid input: instance [" + nodeSwapPair.getKey() + + "] in SwapInstances does not exist in cluster."); + } + } + for (String instanceToRemove : inputFields.instancesToRemove) { + if (!liveInstanceSet.contains(instanceToRemove)) { + throw new InvalidParameterException("Invalid input: instance [" + instanceToRemove + + "] in RemoveInstances does not exist in cluster."); + } + } + if (!inputFields.instancesToRemove.isEmpty()) { + clusterState.instances.removeIf(inputFields.instancesToRemove::contains); + } + } + + // Read instance and cluster config. + // It will throw exception if there is no instanceConfig for newly added instance. + for (String instance : clusterState.instances) { + InstanceConfig config = cfgAccessor.getInstanceConfig(clusterId, instance); + clusterState.instanceConfigs.add(config); + } + clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId); + return clusterState; + } + + private AssignmentResult computeOptimalAssignmentForResources(InputFields inputFields, + ClusterState clusterState, String clusterId) throws Exception { + + AssignmentResult result = new AssignmentResult(); + // Iterate through resources, read resource level info and get potential assignment. + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + List<IdealState> wagedResourceIdealState = new ArrayList<>(); + + for (String resource : clusterState.resources) { + IdealState idealState = + dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resource)); + // Compute all Waged resources in a batch later. + if (idealState.getRebalancerClassName() != null && idealState.getRebalancerClassName() + .equals(WagedRebalancer.class.getName())) { + wagedResourceIdealState.add(idealState); + continue; + } + // For non Waged resources, we don't compute resources not in white list. + if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) { + continue; + } + // Use getIdealAssignmentForFullAuto for FULL_AUTO resource. + if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) { + String rebalanceStrategy = idealState.getRebalanceStrategy(); + if (rebalanceStrategy == null || rebalanceStrategy + .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) { + rebalanceStrategy = AutoRebalanceStrategy.class.getName(); + } + Map<String, Map<String, String>> partitionAssignments = new TreeMap<>( + getIdealAssignmentForFullAuto(clusterState.clusterConfig, clusterState.instanceConfigs, + clusterState.instances, idealState, new ArrayList<>(idealState.getPartitionSet()), + rebalanceStrategy)); + instanceSwapAndFilter(inputFields, partitionAssignments, resource, result); Review comment: Also this function can be called once instead of three times as it's used by each case. ########## File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java ########## @@ -0,0 +1,347 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.rest.common.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.util.HelixUtil.computeIdealMapping; +import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto; +import static org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto; + + +@Path("/clusters/{clusterId}/partitionAssignment") +public class ResourceAssignmentOptimizer extends AbstractHelixResource { + private static Logger LOG = LoggerFactory + .getLogger(org.apache.helix.rest.server.resources.helix.ClusterAccessor.class.getName()); + + private static class InputFields { + Set<String> resourceFilter = new HashSet<>(); + Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance. + List<String> newInstances = new ArrayList<>(); + List<String> instancesToRemove = new ArrayList<>(); + Set<String> instanceFilter = new HashSet<>(); + } + + private static class ClusterState { + List<InstanceConfig> instanceConfigs = new ArrayList<>(); + ClusterConfig clusterConfig; + List<String> resources = new ArrayList<>(); + List<String> instances; // cluster LiveInstance + addInstances - instancesToRemove. + } + + // Result format: Map of resource -> partition -> instance -> state. + private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> { + public AssignmentResult() { + super(); + } + } + + @ResponseMetered(name = HttpConstants.WRITE_REQUEST) + @Timed(name = HttpConstants.WRITE_REQUEST) + @POST + public Response computePotentialAssignment(@PathParam("clusterId") String clusterId, + String content) { + + InputFields inputFields; + ClusterState clusterState; + AssignmentResult result; + + try { + // 1. Try to parse the content string. If parseable, use it as a KV mapping. Otherwise, return a REASON String + inputFields = readInput(content); + // 2. Read cluster status from ZK. + clusterState = readClusterStateAndValidateInput(clusterId, inputFields); + // 3. Call rebalancer tools for each resource. + result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId); + // 4. Serialize result to JSON and return. + return JSONRepresentation(result); + } catch (InvalidParameterException ex) { + return badRequest(ex.getMessage()); + } catch (JsonProcessingException e) { + return badRequest("Invalid input: Input can but be parsed into a KV map." + e.getMessage()); + } catch (OutOfMemoryError e) { + LOG.error( + "OutOfMemoryError while calling AssignmentResult" + Arrays.toString(e.getStackTrace())); + return badRequest( + "Response size is too large to serialize. Please query by resources or instance filter"); + } catch (Exception e) { + LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace())); + return badRequest("Failed to compute partition assignment: " + e); + } + } + + private InputFields readInput(String content) + throws InvalidParameterException, JsonProcessingException { + + Map<String, Map<String, Object>> customFieldsMap; + InputFields inputMap = new InputFields(); + customFieldsMap = + OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, Map<String, Object>>>() { + }); + + // Content is given as a KV mapping. + for (Map.Entry<String, Map<String, Object>> entry : customFieldsMap.entrySet()) { + String key = entry.getKey(); + switch (key) { + case "InstanceChange": + for (Map.Entry<String, Object> instanceChange : entry.getValue().entrySet()) { + String instanceChangeKey = instanceChange.getKey(); + if (instanceChangeKey.equals("AddInstances") && (instanceChange + .getValue() instanceof List)) { + inputMap.newInstances.addAll((List<String>) instanceChange.getValue()); + } else if (instanceChangeKey.equals("RemoveInstances") && (instanceChange + .getValue() instanceof List)) { + inputMap.instancesToRemove.addAll((List<String>) instanceChange.getValue()); + } else if (instanceChangeKey.equals("SwapInstances") && (instanceChange + .getValue() instanceof Map)) { + for (Map.Entry<String, String> swapPair : ((Map<String, String>) instanceChange + .getValue()).entrySet()) { + inputMap.nodeSwap.put(swapPair.getKey(), swapPair.getValue()); + } + } else { + throw new InvalidParameterException( + "Unsupported command or invalid format for InstanceChange : " + + instanceChangeKey); + } + } + break; + case "Options": + for (Map.Entry<String, Object> option : entry.getValue().entrySet()) { + String optionKey = option.getKey(); + if (optionKey.equals("ResourceFilter") && (option.getValue() instanceof List)) { + inputMap.resourceFilter.addAll((List<String>) option.getValue()); + } else if (optionKey.equals("InstanceFilter") && (option.getValue() instanceof List)) { + inputMap.instanceFilter.addAll((List<String>) option.getValue()); + } else { + throw new InvalidParameterException( + "Unsupported command for invalid format for Options : " + option); + } + } + break; + default: + throw new InvalidParameterException( + "Unsupported command for partitionAssignment : " + key); + } + } + return inputMap; + } + + private ClusterState readClusterStateAndValidateInput(String clusterId, InputFields inputFields) + throws InvalidParameterException { + + ClusterState clusterState = new ClusterState(); + ConfigAccessor cfgAccessor = getConfigAccessor(); + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + clusterState.resources = dataAccessor.getChildNames(dataAccessor.keyBuilder().idealStates()); + // Add existing live instance and new instance from user input to instances list + clusterState.instances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()); + clusterState.instances.addAll(inputFields.newInstances); + + // Check if to be removed instances and old instances in swap node exist in live instance + if (!inputFields.nodeSwap.isEmpty() || !inputFields.instancesToRemove.isEmpty()) { + Set<String> liveInstanceSet = new HashSet<>(clusterState.instances); + for (Map.Entry<String, String> nodeSwapPair : inputFields.nodeSwap.entrySet()) { + if (!liveInstanceSet.contains(nodeSwapPair.getKey())) { + throw new InvalidParameterException("Invalid input: instance [" + nodeSwapPair.getKey() + + "] in SwapInstances does not exist in cluster."); + } + } + for (String instanceToRemove : inputFields.instancesToRemove) { + if (!liveInstanceSet.contains(instanceToRemove)) { + throw new InvalidParameterException("Invalid input: instance [" + instanceToRemove + + "] in RemoveInstances does not exist in cluster."); + } + } + if (!inputFields.instancesToRemove.isEmpty()) { + clusterState.instances.removeIf(inputFields.instancesToRemove::contains); + } + } + + // Read instance and cluster config. + // It will throw exception if there is no instanceConfig for newly added instance. + for (String instance : clusterState.instances) { + InstanceConfig config = cfgAccessor.getInstanceConfig(clusterId, instance); + clusterState.instanceConfigs.add(config); + } + clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId); + return clusterState; + } + + private AssignmentResult computeOptimalAssignmentForResources(InputFields inputFields, + ClusterState clusterState, String clusterId) throws Exception { + + AssignmentResult result = new AssignmentResult(); + // Iterate through resources, read resource level info and get potential assignment. + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + List<IdealState> wagedResourceIdealState = new ArrayList<>(); + + for (String resource : clusterState.resources) { + IdealState idealState = + dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resource)); + // Compute all Waged resources in a batch later. + if (idealState.getRebalancerClassName() != null && idealState.getRebalancerClassName() + .equals(WagedRebalancer.class.getName())) { + wagedResourceIdealState.add(idealState); + continue; + } + // For non Waged resources, we don't compute resources not in white list. + if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) { + continue; + } + // Use getIdealAssignmentForFullAuto for FULL_AUTO resource. + if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) { Review comment: It's worth to have private functions to implement FULL_AUTO and SEMI_AUTO scenarios. Then this compute function would be much clear and organized. ########## File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java ########## @@ -0,0 +1,347 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.rest.common.HttpConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.util.HelixUtil.computeIdealMapping; +import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto; +import static org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto; + + +@Path("/clusters/{clusterId}/partitionAssignment") +public class ResourceAssignmentOptimizer extends AbstractHelixResource { Review comment: For the class naming, I think we can follow the convention for rest class, i.e. "***accessor" so that we can easily identify it is rest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
