junkaixue commented on code in PR #2846: URL: https://github.com/apache/helix/pull/2846#discussion_r1688967238
########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java: ########## @@ -0,0 +1,179 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.condition.RebalanceCondition; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> { + private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class); + private final RebalanceCondition _rebalanceCondition; + + public ConditionBasedRebalancer(RebalanceCondition rebalanceCondition) { + this._rebalanceCondition = rebalanceCondition; + } + + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) { + if (!this._rebalanceCondition.evaluate()) { + ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); + if (cachedIdealState != null) { + return new IdealState(cachedIdealState); + } + // In theory, the cache should be populated already if no rebalance is needed + LOG.warn(String.format( Review Comment: Better use log with '{}' for string construction instead of String.format. The legacy code if we have time we need to refactor out as well. ########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java: ########## @@ -0,0 +1,179 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.condition.RebalanceCondition; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> { + private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class); + private final RebalanceCondition _rebalanceCondition; + + public ConditionBasedRebalancer(RebalanceCondition rebalanceCondition) { Review Comment: Let's make it generic? We can allow add a list of rebalance conditions. ########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java: ########## @@ -0,0 +1,5 @@ +package org.apache.helix.controller.rebalancer.condition; + +public interface RebalanceCondition { + boolean evaluate(); Review Comment: Maybe we call it performRebalance? ########## helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java: ########## @@ -0,0 +1,179 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.condition.RebalanceCondition; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> { + private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class); + private final RebalanceCondition _rebalanceCondition; + + public ConditionBasedRebalancer(RebalanceCondition rebalanceCondition) { + this._rebalanceCondition = rebalanceCondition; + } + + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) { + if (!this._rebalanceCondition.evaluate()) { + ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); + if (cachedIdealState != null) { + return new IdealState(cachedIdealState); + } + // In theory, the cache should be populated already if no rebalance is needed + LOG.warn(String.format( + "Cannot fetch the cached Ideal State for resource: %s, will recompute the Ideal State", + resourceName)); + } + + LOG.info("Computing IdealState for " + resourceName); + + List<String> partitions = getStablePartitionList(clusterData, currentIdealState); + String stateModelName = currentIdealState.getStateModelDefRef(); + StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName); + if (stateModelDef == null) { + LOG.error("State Model Definition null for resource: " + resourceName); + throw new HelixException("State Model Definition null for resource: " + resourceName); + } + Map<String, LiveInstance> assignableLiveInstance = clusterData.getAssignableLiveInstances(); + int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size()); + + LinkedHashMap<String, Integer> stateCountMap = + stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas); + List<String> assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet()); + List<String> assignableNodes = new ArrayList<>(clusterData.getAssignableInstances()); + assignableNodes.removeAll(clusterData.getDisabledInstances()); + assignableLiveNodes.retainAll(assignableNodes); + + Map<String, Map<String, String>> currentMapping = + currentMapping(currentStateOutput, resourceName, partitions, stateCountMap); + + // If there are nodes tagged with resource name, use only those nodes + Set<String> taggedNodes = new HashSet<String>(); + Set<String> taggedLiveNodes = new HashSet<String>(); + if (currentIdealState.getInstanceGroupTag() != null) { + for (String instanceName : assignableNodes) { + if (clusterData.getAssignableInstanceConfigMap().get(instanceName) + .containsTag(currentIdealState.getInstanceGroupTag())) { + taggedNodes.add(instanceName); + if (assignableLiveInstance.containsKey(instanceName)) { + taggedLiveNodes.add(instanceName); + } + } + } + if (!taggedLiveNodes.isEmpty()) { + // live nodes exist that have this tag + if (LOG.isInfoEnabled()) { + LOG.info( + "found the following participants with tag " + currentIdealState.getInstanceGroupTag() + + " for " + resourceName + ": " + taggedLiveNodes); + } + } else if (taggedNodes.isEmpty()) { + // no live nodes and no configured nodes have this tag + LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + + " but no configured participants have this tag"); + } else { + // configured nodes have this tag, but no live nodes have this tag + LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + + " but no live participants have this tag"); + } + assignableNodes = new ArrayList<>(taggedNodes); + assignableLiveNodes = new ArrayList<>(taggedLiveNodes); + } + + // sort node lists to ensure consistent preferred assignments + Collections.sort(assignableNodes); + Collections.sort(assignableLiveNodes); + + int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); + _rebalanceStrategy = + getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName, + stateCountMap, maxPartition); + ZNRecord newMapping = + _rebalanceStrategy.computePartitionAssignment(assignableNodes, assignableLiveNodes, + currentMapping, clusterData); + + LOG.debug("currentMapping: {}", currentMapping); + LOG.debug("stateCountMap: {}", stateCountMap); + LOG.debug("assignableLiveNodes: {}", assignableLiveNodes); + LOG.debug("assignableNodes: {}", assignableNodes); + LOG.debug("maxPartition: {}", maxPartition); + LOG.debug("newMapping: {}", newMapping); Review Comment: Usually we have a if statement with debug logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org