jiajunwang commented on a change in pull request #1747:
URL: https://github.com/apache/helix/pull/1747#discussion_r638221054
##########
File path:
helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
##########
@@ -201,6 +201,10 @@ public ZkClient getZkClient() {
return (ZkClient) getRealmAwareZkClient();
}
+ public String getZkAddr() {
Review comment:
I think we don't want to expose the ZkAddr to avoid unnecessary
dependencies. In a multi ZK scenario, the address might be a mock address or a
fake one. Or we may eventually get rid of this field.
This method is the only safe option, public RealmAwareZkClient
getRealmAwareZkClient(); However, there is no way to get "address" from a
RealmAwareZkClient interface.
Although this is a problem that we have noticed for a long time, it seems
that we are still facing the same problem.
IMO, the right solution is to keep refactoring the WAGED tool even WAGED
rebalancer so it accepts RealmAwareZkClient instead of the raw address. This
means we ensure our assumption, the physical ZK address is hidden in the
RealmAwareZkClient interface, is guaranteed through all of our code.
However, I'm not sure how much will this change cost. It seems to be a must.
##########
File path:
helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java
##########
@@ -0,0 +1,347 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.util.HelixUtil.computeIdealMapping;
+import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto;
+import static
org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto;
Review comment:
Please avoid static imports like this. HelixUtil would be a good
reference for the code readers to understand where the methods are coming from.
##########
File path:
helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizer.java
##########
@@ -0,0 +1,347 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.util.HelixUtil.computeIdealMapping;
+import static org.apache.helix.util.HelixUtil.getIdealAssignmentForFullAuto;
+import static
org.apache.helix.util.HelixUtil.getTargetAssignmentForWagedFullAuto;
+
+
+@Path("/clusters/{clusterId}/partitionAssignment")
+public class ResourceAssignmentOptimizer extends AbstractHelixResource {
+ private static Logger LOG = LoggerFactory
+
.getLogger(org.apache.helix.rest.server.resources.helix.ClusterAccessor.class.getName());
+
+ private static class InputFields {
+ Set<String> resourceFilter = new HashSet<>();
+ Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new
instance.
+ List<String> newInstances = new ArrayList<>();
+ List<String> instancesToRemove = new ArrayList<>();
+ Set<String> instanceFilter = new HashSet<>();
+ }
+
+ private static class ClusterState {
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+ ClusterConfig clusterConfig;
+ List<String> resources = new ArrayList<>();
+ List<String> instances; // cluster LiveInstance + addInstances -
instancesToRemove.
+ }
+
+ // Result format: Map of resource -> partition -> instance -> state.
+ private static class AssignmentResult extends HashMap<String, Map<String,
Map<String, String>>> {
+ public AssignmentResult() {
+ super();
+ }
+ }
+
+ @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+ @Timed(name = HttpConstants.WRITE_REQUEST)
+ @POST
+ public Response computePotentialAssignment(@PathParam("clusterId") String
clusterId,
+ String content) {
+
+ InputFields inputFields;
+ ClusterState clusterState;
+ AssignmentResult result;
+
+ try {
+ // 1. Try to parse the content string. If parseable, use it as a KV
mapping. Otherwise, return a REASON String
+ inputFields = readInput(content);
+ // 2. Read cluster status from ZK.
+ clusterState = readClusterStateAndValidateInput(clusterId, inputFields);
+ // 3. Call rebalancer tools for each resource.
+ result = computeOptimalAssignmentForResources(inputFields, clusterState,
clusterId);
+ // 4. Serialize result to JSON and return.
+ return JSONRepresentation(result);
+ } catch (InvalidParameterException ex) {
+ return badRequest(ex.getMessage());
+ } catch (JsonProcessingException e) {
+ return badRequest("Invalid input: Input can but be parsed into a KV
map." + e.getMessage());
+ } catch (OutOfMemoryError e) {
+ LOG.error(
+ "OutOfMemoryError while calling AssignmentResult" +
Arrays.toString(e.getStackTrace()));
+ return badRequest(
+ "Response size is too large to serialize. Please query by resources
or instance filter");
+ } catch (Exception e) {
+ LOG.error("Failed to compute partition assignment:" +
Arrays.toString(e.getStackTrace()));
+ return badRequest("Failed to compute partition assignment: " + e);
+ }
+ }
+
+ private InputFields readInput(String content)
+ throws InvalidParameterException, JsonProcessingException {
+
+ Map<String, Map<String, Object>> customFieldsMap;
+ InputFields inputMap = new InputFields();
+ customFieldsMap =
+ OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String,
Map<String, Object>>>() {
+ });
+
+ // Content is given as a KV mapping.
+ for (Map.Entry<String, Map<String, Object>> entry :
customFieldsMap.entrySet()) {
+ String key = entry.getKey();
+ switch (key) {
+ case "InstanceChange":
Review comment:
All the keys, commands should be enum or static Strings whenever
possible. Hard code String is not good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]