jiajunwang commented on a change in pull request #1031:
URL: https://github.com/apache/helix/pull/1031#discussion_r431322627



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
##########
@@ -0,0 +1,88 @@
+package org.apache.helix.controller.rebalancer.waged;

Review comment:
       Let's put this in the util or tool package? It should not be used as a 
rebalancer.

##########
File path: helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
##########
@@ -140,6 +160,81 @@ public static String serializeByComma(List<String> 
objects) {
     }
   }
 
+  /**
+   * Returns the expected ideal ResourceAssignments for the given resources in 
the cluster
+   * calculated using the read-only WAGED rebalancer.
+   * @param metadataStoreAddress
+   * @param clusterConfig
+   * @param instanceConfigs
+   * @param liveInstances
+   * @param newIdealStates
+   * @param newResourceConfigs
+   * @return
+   */
+  public static Map<String, ResourceAssignment> 
getIdealAssignmentForWagedFullAuto(

Review comment:
       I feel RebalanceUtil is a better placement, although the existing method 
is already in the HelixUtil. What do you think?

##########
File path: helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
##########
@@ -140,6 +160,81 @@ public static String serializeByComma(List<String> 
objects) {
     }
   }
 
+  /**
+   * Returns the expected ideal ResourceAssignments for the given resources in 
the cluster
+   * calculated using the read-only WAGED rebalancer.
+   * @param metadataStoreAddress
+   * @param clusterConfig
+   * @param instanceConfigs
+   * @param liveInstances
+   * @param newIdealStates
+   * @param newResourceConfigs
+   * @return
+   */
+  public static Map<String, ResourceAssignment> 
getIdealAssignmentForWagedFullAuto(
+      String metadataStoreAddress, ClusterConfig clusterConfig,
+      List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      List<IdealState> newIdealStates, List<ResourceConfig> 
newResourceConfigs) {
+    // Prepare a data accessor for a dataProvider (cache) refresh
+    BaseDataAccessor<ZNRecord> baseDataAccessor = new 
ZkBaseDataAccessor<>(metadataStoreAddress);
+    HelixDataAccessor helixDataAccessor =
+        new ZKHelixDataAccessor(clusterConfig.getClusterName(), 
baseDataAccessor);
+
+    // Obtain a refreshed dataProvider (cache) and overwrite cluster 
parameters with the given parameters
+    ResourceControllerDataProvider dataProvider =
+        new ResourceControllerDataProvider(clusterConfig.getClusterName());
+    dataProvider.requireFullRefresh();
+    dataProvider.refresh(helixDataAccessor);
+    dataProvider.setClusterConfig(clusterConfig);
+    dataProvider.setInstanceConfigMap(instanceConfigs.stream()

Review comment:
       The WAGED rebalancer considers all resources, so if the 
"newResourceConfigs" only contains user-specified items and we overwrite the 
cached map with this input map, it may return a different result. The other 
list/map fields have a similar concern.
   1. Can we do merge instead of overwriting? This serves for the users who 
want to add or modify some items.
   2. How to handle requests to remove some items? I didn't have the answer 
yet. Will update if I have a good idea.

##########
File path: helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
##########
@@ -140,6 +160,81 @@ public static String serializeByComma(List<String> 
objects) {
     }
   }
 
+  /**
+   * Returns the expected ideal ResourceAssignments for the given resources in 
the cluster
+   * calculated using the read-only WAGED rebalancer.
+   * @param metadataStoreAddress
+   * @param clusterConfig
+   * @param instanceConfigs
+   * @param liveInstances
+   * @param newIdealStates
+   * @param newResourceConfigs
+   * @return
+   */
+  public static Map<String, ResourceAssignment> 
getIdealAssignmentForWagedFullAuto(
+      String metadataStoreAddress, ClusterConfig clusterConfig,
+      List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      List<IdealState> newIdealStates, List<ResourceConfig> 
newResourceConfigs) {
+    // Prepare a data accessor for a dataProvider (cache) refresh
+    BaseDataAccessor<ZNRecord> baseDataAccessor = new 
ZkBaseDataAccessor<>(metadataStoreAddress);
+    HelixDataAccessor helixDataAccessor =
+        new ZKHelixDataAccessor(clusterConfig.getClusterName(), 
baseDataAccessor);
+
+    // Obtain a refreshed dataProvider (cache) and overwrite cluster 
parameters with the given parameters
+    ResourceControllerDataProvider dataProvider =
+        new ResourceControllerDataProvider(clusterConfig.getClusterName());
+    dataProvider.requireFullRefresh();
+    dataProvider.refresh(helixDataAccessor);
+    dataProvider.setClusterConfig(clusterConfig);
+    dataProvider.setInstanceConfigMap(instanceConfigs.stream()
+        .collect(Collectors.toMap(InstanceConfig::getInstanceName, 
Function.identity())));
+    dataProvider.setLiveInstances(
+        
liveInstances.stream().map(LiveInstance::new).collect(Collectors.toList()));
+    dataProvider.setIdealStates(newIdealStates);
+    dataProvider.setResourceConfigMap(newResourceConfigs.stream()
+        .collect(Collectors.toMap(ResourceConfig::getResourceName, 
Function.identity())));
+
+    // Create an instance of read-only WAGED rebalancer
+    ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
+        new ReadOnlyWagedRebalancer(metadataStoreAddress, 
clusterConfig.getClusterName(),
+            clusterConfig.getGlobalRebalancePreference());
+
+    // Use a dummy event to run the required stages for BestPossibleState 
calculation
+    // Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in 
ResourceComputationStage
+    ClusterEvent event = new ClusterEvent(clusterConfig.getClusterName(), 
ClusterEventType.Unknown);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), 
dataProvider);
+    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), 
readOnlyWagedRebalancer);
+
+    try {
+      // Run the required stages to obtain the BestPossibleOutput
+      RebalanceUtil.runStage(event, new ResourceComputationStage());
+      RebalanceUtil.runStage(event, new CurrentStateComputationStage());
+      RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
+    } catch (Exception e) {
+      LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute 
ResourceAssignments!", e);
+    } finally {
+      // Close all ZK connections
+      baseDataAccessor.close();

Review comment:
       If the process fails before runStage, for example, while refreshing the 
cache, this accessor connection will be leaked.

##########
File path: helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
##########
@@ -164,4 +167,25 @@ public static void scheduleOnDemandPipeline(String 
clusterName, long delay,
           clusterName);
     }
   }
+
+  /**
+   * runStage allows the run of individual stages. It can be used to mock a 
part of the Controller
+   * pipeline run.
+   *
+   * An example usage is as follows:
+   *       runStage(event, new ResourceComputationStage());
+   *       runStage(event, new CurrentStateComputationStage());
+   *       runStage(event, new BestPossibleStateCalcStage());
+   * By running these stages, we are able to obtain BestPossibleStateOutput in 
the event object.
+   * @param event
+   * @param stage
+   * @throws Exception
+   */
+  public static void runStage(ClusterEvent event, Stage stage) throws 
Exception {

Review comment:
       It might be better if we have a runStage**s** method as util. It would 
be easier to use, and more generic.
   Of course, we will have to refactor calcBestPossState() in the verifier to 
make the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to