This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c4301b11 Fix BestPossibleExternalViewVerifier to use a ZkClient that 
has the serializer set to ByteArraySerializer  (#2776)
7c4301b11 is described below

commit 7c4301b115bbad44a09b199a84abcc2c238858a0
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Wed Mar 13 20:00:32 2024 -0700

    Fix BestPossibleExternalViewVerifier to use a ZkClient that has the 
serializer set to ByteArraySerializer  (#2776)
    
    * Fix BestPossibleExternalViewVerifier to use a ZkClient that has the 
serializer set to ByteArraySerializer so it can read the assignment meta store 
best possible state. Fix BestPossibleExternalViewVerifier to actually calculate 
BEST_POSSIBLE instead of returning last persisted to ZK because we now need to 
consider handleDelayedRebalanceMinActiveReplica not being persisted to 
ZK(#2447). Fix handleDelayedRebalanceMinActiveReplica modifying in-memory 
_bestPossibleState in the _assignmen [...]
---
 .../rebalancer/waged/AssignmentManager.java        |  8 ++++++-
 .../BestPossibleExternalViewVerifier.java          | 27 +++++++++-------------
 .../rebalancer/TestInstanceOperation.java          |  5 +---
 3 files changed, 19 insertions(+), 21 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
index 475e8aad1..8cb089cb9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ResourceAssignment;
@@ -86,7 +88,11 @@ class AssignmentManager {
     if (assignmentMetadataStore != null) {
       try {
         _stateReadLatency.startMeasuringLatency();
-        currentBestAssignment = new 
HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
+        currentBestAssignment =
+            
assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect(
+                Collectors.toMap(Map.Entry::getKey,
+                    entry -> new 
ResourceAssignment(entry.getValue().getRecord())));
+        ;
         _stateReadLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException(
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 1997bea06..0b0926dda 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -30,19 +30,16 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.common.PartitionStateMap;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
-import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ClusterConfig;
@@ -50,7 +47,6 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.RebalanceUtil;
@@ -59,8 +55,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * verifier that the ExternalViews of given resources (or all resources in the 
cluster)
- * match its best possible mapping states.
+ * Verify that the ExternalViews of given resources (or all resources in the 
cluster)
+ * match its best possible mapping states. The best possible mapping states 
are computed
+ * by running the BestPossibleStateCalc stage with the same inputs that the 
controller would
+ * use to calculate the best possible state. The mappings produced by this 
stage are compared
+ * to the external view to ensure that they match. When they match, the 
cluster has converged.
+ * Note: The best possible state compared to the external view includes the 
non-persisted state
+ * mappings generated when handling MIN_ACTIVE replicas.
  */
 public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
   private static Logger LOG = 
LoggerFactory.getLogger(BestPossibleExternalViewVerifier.class);
@@ -433,7 +434,10 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
     RebalanceUtil.runStage(event, new CurrentStateComputationStage());
     // Note the readOnlyWagedRebalancer is just for one time usage
 
-    try (ZkBucketDataAccessor zkBucketDataAccessor = new 
ZkBucketDataAccessor(_zkClient);
+    try (
+        // Pass the zkAddress to constructor to ensure the correct ZkClient is 
created with ByteArraySerializer
+        ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(
+            _zkClient.getServers());
         DryrunWagedRebalancer dryrunWagedRebalancer = new 
DryrunWagedRebalancer(zkBucketDataAccessor,
             cache.getClusterName(), 
cache.getClusterConfig().getGlobalRebalancePreference())) {
       event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), 
dryrunWagedRebalancer);
@@ -462,14 +466,5 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
         Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
       super(zkBucketDataAccessor, clusterName, preferences);
     }
-
-    @Override
-    protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
-        ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
-        Set<String> activeNodes, CurrentStateOutput currentStateOutput,
-        RebalanceAlgorithm algorithm) throws HelixRebalanceException {
-      return getBestPossibleAssignment(getAssignmentMetadataStore(), 
currentStateOutput,
-          resourceMap.keySet());
-    }
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 010153e64..1fc3a3e20 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -1070,7 +1070,6 @@ public class TestInstanceOperation extends ZkTestBase {
         InstanceConstants.InstanceOperation.EVACUATE);
 
     // Validate that the assignment has not changed since setting the 
InstanceOperation to EVACUATE
-    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
@@ -1105,7 +1104,7 @@ public class TestInstanceOperation extends ZkTestBase {
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), 
TIMEOUT);
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapWithSwapOutInstanceOffline")
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testSwapEvacuateAdd")
   public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws 
Exception {
     System.out.println(
         "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
@@ -1326,8 +1325,6 @@ public class TestInstanceOperation extends ZkTestBase {
       
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
     }
 
-    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
-
     // exit MM
     _gSetupTool.getClusterManagementTool()
         .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);

Reply via email to