This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new 7c4301b11 Fix BestPossibleExternalViewVerifier to use a ZkClient that has the serializer set to ByteArraySerializer (#2776) 7c4301b11 is described below commit 7c4301b115bbad44a09b199a84abcc2c238858a0 Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Wed Mar 13 20:00:32 2024 -0700 Fix BestPossibleExternalViewVerifier to use a ZkClient that has the serializer set to ByteArraySerializer (#2776) * Fix BestPossibleExternalViewVerifier to use a ZkClient that has the serializer set to ByteArraySerializer so it can read the assignment meta store best possible state. Fix BestPossibleExternalViewVerifier to actually calculate BEST_POSSIBLE instead of returning last persisted to ZK because we now need to consider handleDelayedRebalanceMinActiveReplica not being persisted to ZK(#2447). Fix handleDelayedRebalanceMinActiveReplica modifying in-memory _bestPossibleState in the _assignmen [...] --- .../rebalancer/waged/AssignmentManager.java | 8 ++++++- .../BestPossibleExternalViewVerifier.java | 27 +++++++++------------- .../rebalancer/TestInstanceOperation.java | 5 +--- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java index 475e8aad1..8cb089cb9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ResourceAssignment; @@ -86,7 +88,11 @@ class AssignmentManager { if (assignmentMetadataStore != null) { try { _stateReadLatency.startMeasuringLatency(); - currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment()); + currentBestAssignment = + assignmentMetadataStore.getBestPossibleAssignment().entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, + entry -> new ResourceAssignment(entry.getValue().getRecord()))); + ; _stateReadLatency.endMeasuringLatency(); } catch (Exception ex) { throw new HelixRebalanceException( diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index 1997bea06..0b0926dda 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -30,19 +30,16 @@ import java.util.Map; import java.util.Set; import org.apache.helix.HelixDefinedState; -import org.apache.helix.HelixRebalanceException; import org.apache.helix.PropertyKey; import org.apache.helix.controller.common.PartitionStateMap; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer; -import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CurrentStateComputationStage; -import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.ClusterConfig; @@ -50,7 +47,6 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.task.TaskConstants; import org.apache.helix.util.RebalanceUtil; @@ -59,8 +55,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * verifier that the ExternalViews of given resources (or all resources in the cluster) - * match its best possible mapping states. + * Verify that the ExternalViews of given resources (or all resources in the cluster) + * match its best possible mapping states. The best possible mapping states are computed + * by running the BestPossibleStateCalc stage with the same inputs that the controller would + * use to calculate the best possible state. The mappings produced by this stage are compared + * to the external view to ensure that they match. When they match, the cluster has converged. + * Note: The best possible state compared to the external view includes the non-persisted state + * mappings generated when handling MIN_ACTIVE replicas. */ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { private static Logger LOG = LoggerFactory.getLogger(BestPossibleExternalViewVerifier.class); @@ -433,7 +434,10 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { RebalanceUtil.runStage(event, new CurrentStateComputationStage()); // Note the readOnlyWagedRebalancer is just for one time usage - try (ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(_zkClient); + try ( + // Pass the zkAddress to constructor to ensure the correct ZkClient is created with ByteArraySerializer + ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor( + _zkClient.getServers()); DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(zkBucketDataAccessor, cache.getClusterName(), cache.getClusterConfig().getGlobalRebalancePreference())) { event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer); @@ -462,14 +466,5 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) { super(zkBucketDataAccessor, clusterName, preferences); } - - @Override - protected Map<String, ResourceAssignment> computeBestPossibleAssignment( - ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap, - Set<String> activeNodes, CurrentStateOutput currentStateOutput, - RebalanceAlgorithm algorithm) throws HelixRebalanceException { - return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput, - resourceMap.keySet()); - } } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 010153e64..1fc3a3e20 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -1070,7 +1070,6 @@ public class TestInstanceOperation extends ZkTestBase { InstanceConstants.InstanceOperation.EVACUATE); // Validate that the assignment has not changed since setting the InstanceOperation to EVACUATE - Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, Collections.emptySet(), Collections.emptySet()); @@ -1105,7 +1104,7 @@ public class TestInstanceOperation extends ZkTestBase { Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); } - @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWithSwapOutInstanceOffline") + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testSwapEvacuateAdd") public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws Exception { System.out.println( "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at " @@ -1326,8 +1325,6 @@ public class TestInstanceOperation extends ZkTestBase { Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate)); } - Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); - // exit MM _gSetupTool.getClusterManagementTool() .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);