sergeyuttsel commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1132316122


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,718 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when dataNodesAutoAdjustScaleUp
+ * or dataNodesAutoAdjustScaleDown equals to 0.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        VaultManager vaultManager = mock(VaultManager.class);
+
+        when(vaultManager.get(any())).thenReturn(completedFuture(null));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), 
zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        
when(metaStorageManager.invoke(any())).thenReturn(completedFuture(StatementResultImpl.builder().result(new
 byte[] {0}).build()));
+
+        vaultManager.start();
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = 
mock(TablesConfiguration.class);
+
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        clusterCfgMgr.start();
+
+        zonesConfiguration = clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultManager,
+                "node"
+        );
+
+        mockCmgLocalNodes();
+
+        distributionZoneManager.start();
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(0).dataNodesAutoAdjustScaleDown(0).build())
+                .get(3, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        distributionZoneManager.stop();
+        clusterCfgMgr.stop();
+        clusterStateStorage.stop();
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws 
Exception {
+        LOG.info("Topology with added and removed nodes.");
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        Set<String> dataNodes = Set.of("node0", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        DistributionZoneManager.DataNodes dataNodesMeta = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleUpFutures().size() == 1, 3_000));
+
+        
assertTrue(testData.dataNodesMeta.revisionScaleDownFutures().isEmpty());
+
+        CompletableFuture<Void> revisionUpFut = 
testData.dataNodesMeta.revisionScaleUpFutures().get((long) 
testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(() -> revisionUpFut.isDone(), 3000));
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleDownFutures().size() == 1, 3_000));
+
+        CompletableFuture<Void> revisionDownFut = 
testData.dataNodesMeta.revisionScaleDownFutures()
+                .get((long) testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        assertFalse(testData.dataNodesUpFut3.isDone());
+        assertFalse(testData.dataNodesDownFut3.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 2);
+
+        assertTrue(waitForCondition(() -> revisionDownFut.isDone(), 3000));
+
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, 
TimeUnit.SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, 
TimeUnit.SECONDS));
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUp() throws Exception {
+        LOG.info("Topology with added nodes.");
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        Set<String> dataNodes = Set.of("node0", "node1", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        DistributionZoneManager.DataNodes dataNodesMeta = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleUpFutures().size() == 1, 3_000));
+
+        
assertTrue(testData.dataNodesMeta.revisionScaleDownFutures().isEmpty());
+
+        CompletableFuture<Void> revisionUpFut = 
testData.dataNodesMeta.revisionScaleUpFutures().get((long) 
testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(() -> revisionUpFut.isDone(), 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, 
TimeUnit.SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, 
TimeUnit.SECONDS));
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleDown() throws Exception 
{
+        LOG.info("Topology with removed nodes.");
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = 
testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        Set<String> dataNodes = Set.of("node0");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, 
testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodesMeta0 = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodesMeta0 == null) {
+                return false;
+            }
+
+            return dataNodesMeta0.revisionScaleDownFutures().size() == 1;
+        },
+                3_000));
+
+        assertTrue(testData.dataNodesMeta.revisionScaleUpFutures().isEmpty());
+
+        CompletableFuture<Void> revisionDownFut = 
testData.dataNodesMeta.revisionScaleDownFutures()
+                .get((long) testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, 
testData.dataNodesRevision2, testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(() -> revisionDownFut.isDone(), 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, 
TimeUnit.SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, 
TimeUnit.SECONDS));
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    private static class TestSeveralScaleUpAndSeveralScaleDownDataObject {
+        private long topVer2;
+        private long dataNodesRevision2;
+        private CompletableFuture<Void> topVerUpFut2;
+        private CompletableFuture<Void> topVerDownFut2;
+        private DistributionZoneManager.DataNodes dataNodesMeta;
+        private CompletableFuture<Set<String>> dataNodesUpFut3;
+        private CompletableFuture<Set<String>> dataNodesDownFut3;
+
+        public TestSeveralScaleUpAndSeveralScaleDownDataObject(
+                long topVer2,
+                long dataNodesRevision2,
+                CompletableFuture<Void> topVerUpFut2,
+                CompletableFuture<Void> topVerDownFut2,
+                DistributionZoneManager.DataNodes dataNodesMeta,
+                CompletableFuture<Set<String>> dataNodesUpFut3,
+                CompletableFuture<Set<String>> dataNodesDownFut3) {
+            this.topVer2 = topVer2;
+            this.dataNodesRevision2 = dataNodesRevision2;
+            this.topVerUpFut2 = topVerUpFut2;
+            this.topVerDownFut2 = topVerDownFut2;
+            this.dataNodesMeta = dataNodesMeta;
+            this.dataNodesUpFut3 = dataNodesUpFut3;
+            this.dataNodesDownFut3 = dataNodesDownFut3;
+        }
+    }
+
+    private TestSeveralScaleUpAndSeveralScaleDownDataObject 
testSeveralScaleUpAndSeveralScaleDownGeneral() throws Exception {
+        LOG.info("Topology with added nodes.");
+
+        CompletableFuture<Set<String>> dataNodesUpFut0 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut1 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut2 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 2);
+        CompletableFuture<Set<String>> dataNodesUpFut3 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 11);
+
+        assertTrue(waitForCondition(() -> 
distributionZoneManager.topVerFutures().size() == 3,
+                3_000));
+
+        int topVer0 = 2;
+
+        int dataNodesRevision0 = 2;
+
+        CompletableFuture<Void> topVerUpFut0 = 
distributionZoneManager.topVerFutures().get(1L);
+        CompletableFuture<Void> topVerUpFut1 = 
distributionZoneManager.topVerFutures().get(2L);
+        CompletableFuture<Void> topVerUpFut2 = 
distributionZoneManager.topVerFutures().get(11L);
+
+        assertFalse(topVerUpFut0.isDone());
+        assertFalse(topVerUpFut1.isDone());
+        assertFalse(topVerUpFut2.isDone());
+
+        Set<String> threeNodes = Set.of("node0", "node1", "node2");
+
+        topologyWatchListenerOnUpdate(threeNodes, topVer0, dataNodesRevision0);
+
+        assertTrue(waitForCondition(() -> topVerUpFut0.isDone(), 3_000));
+        assertTrue(waitForCondition(() -> topVerUpFut1.isDone(), 3_000));
+        assertFalse(topVerUpFut2.isDone());
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodesMeta0 = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodesMeta0 == null) {
+                return false;
+            }
+
+            return dataNodesMeta0.revisionScaleUpFutures().size() == 1;
+        },
+                3_000));
+
+        DistributionZoneManager.DataNodes dataNodesMeta = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+        CompletableFuture<Void> revision2Fut = 
dataNodesMeta.revisionScaleUpFutures().get((long) dataNodesRevision0);
+
+        assertFalse(revision2Fut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, threeNodes, true, 
dataNodesRevision0, dataNodesRevision0 + 1);
+
+        assertTrue(waitForCondition(() -> revision2Fut.isDone(),
+                3_000));
+
+        assertEquals(threeNodes, dataNodesUpFut0.get());
+        assertEquals(threeNodes, dataNodesUpFut1.get());
+        assertEquals(threeNodes, dataNodesUpFut2.get());
+        assertFalse(dataNodesUpFut3.isDone());
+
+        assertTrue(distributionZoneManager.topVerFutures().size() == 1);
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+
+
+        LOG.info("Topology with removed nodes.");
+
+        CompletableFuture<Set<String>> dataNodesDownFut0 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut1 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut2 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 5);
+        CompletableFuture<Set<String>> dataNodesDownFut3 = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 6);
+
+        assertTrue(waitForCondition(() -> 
distributionZoneManager.topVerFutures().size() == 4,
+                3_000));
+
+        CompletableFuture<Void> topVerDownFut0 = 
distributionZoneManager.topVerFutures().get(4L);
+        CompletableFuture<Void> topVerDownFut1 = 
distributionZoneManager.topVerFutures().get(5L);
+        CompletableFuture<Void> topVerDownFut2 = 
distributionZoneManager.topVerFutures().get(6L);
+
+        assertFalse(topVerDownFut0.isDone());
+        assertFalse(topVerDownFut1.isDone());
+        assertFalse(topVerDownFut2.isDone());
+
+        int topVer1 = 5;
+
+        int dataNodesRevision1 = dataNodesRevision0 + 2;
+
+        Set<String> twoNodes = Set.of("node0", "node1");
+
+        topologyWatchListenerOnUpdate(twoNodes, topVer1, dataNodesRevision1);
+
+        assertTrue(waitForCondition(() -> topVerDownFut0.isDone(), 3_000));
+        assertTrue(waitForCondition(() -> topVerDownFut1.isDone(), 3_000));
+        assertFalse(waitForCondition(() -> topVerDownFut2.isDone(), 3_000));
+
+        assertTrue(waitForCondition(() -> 
dataNodesMeta.revisionScaleDownFutures().size() == 1, 3_000));
+
+        CompletableFuture<Void> revision5Fut = 
dataNodesMeta.revisionScaleDownFutures().get((long) dataNodesRevision1);
+
+        assertFalse(revision5Fut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, twoNodes, false, 
dataNodesRevision1, dataNodesRevision1 + 1);
+
+        assertTrue(waitForCondition(() -> revision5Fut.isDone(), 3_000));
+
+        assertEquals(twoNodes, dataNodesDownFut0.get());
+        assertEquals(twoNodes, dataNodesDownFut1.get());
+        assertEquals(twoNodes, dataNodesDownFut2.get());
+        assertFalse(dataNodesDownFut3.isDone());
+
+        assertEquals(2, distributionZoneManager.topVerFutures().size());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+
+        int topVer2 = 20;
+
+        int dataNodesRevision2 = dataNodesRevision1 + 2;
+
+        return new TestSeveralScaleUpAndSeveralScaleDownDataObject(
+                topVer2,
+                dataNodesRevision2,
+                topVerUpFut2,
+                topVerDownFut2,
+                dataNodesMeta,
+                dataNodesUpFut3,
+                dataNodesDownFut3
+        );
+    }
+
+    @Test
+    void testScaleUpAndThenScaleDown() throws Exception {
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 5);
+
+        AtomicReference<CompletableFuture<Void>> topVerFut = new 
AtomicReference<>();
+
+        assertTrue(waitForCondition(() -> {
+            topVerFut.set(distributionZoneManager.topVerFutures().get(5L));
+
+            return topVerFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(topVerFut.get().isDone());
+
+        topologyWatchListenerOnUpdate(Set.of("node0", "node1"), 100, 10);
+
+        assertTrue(waitForCondition(() -> topVerFut.get().isDone(),
+                3_000));
+
+        AtomicReference<CompletableFuture<Void>> revisionFut = new 
AtomicReference<>();
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodes = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodes == null) {
+                return false;
+            }
+
+            revisionFut.set(dataNodes.revisionScaleUpFutures().get(10L));
+
+            return revisionFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(revisionFut.get().isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, Set.of("node0", 
"node1"), true, 10, 11);
+
+        assertTrue(waitForCondition(() -> revisionFut.get().isDone(),
+                3_000));
+
+        assertEquals(Set.of("node0", "node1"), dataNodesFut.get());
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+
+        dataNodesFut = distributionZoneManager.getDataNodes(DEFAULT_ZONE_ID, 
106);
+
+        topVerFut.set(null);
+
+        assertTrue(waitForCondition(() -> {
+            topVerFut.set(distributionZoneManager.topVerFutures().get(106L));
+
+            return topVerFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(topVerFut.get().isDone());
+
+        topologyWatchListenerOnUpdate(Set.of("node0"), 200, 12);
+
+        assertTrue(waitForCondition(() -> topVerFut.get().isDone(),
+                3_000));
+
+        revisionFut.set(null);
+
+        assertTrue(waitForCondition(() -> {
+            DistributionZoneManager.DataNodes dataNodes = 
distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID);
+
+            if (dataNodes == null) {
+                return false;
+            }
+
+            revisionFut.set(dataNodes.revisionScaleDownFutures().get(12L));
+
+            return revisionFut.get() != null;
+        },
+                3_000));
+
+        assertFalse(revisionFut.get().isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, Set.of("node0"), 
false, 12, 13);
+
+        assertTrue(waitForCondition(() -> revisionFut.get().isDone(),
+                3_000));
+
+        assertEquals(Set.of("node0"), dataNodesFut.get());
+
+        assertTrue(distributionZoneManager.topVerFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleUpFutures().isEmpty());
+        
assertTrue(distributionZoneManager.dataNodes().get(DEFAULT_ZONE_ID).revisionScaleDownFutures().isEmpty());
+    }
+
+    @Test
+    void testAwaitingScaleUpOnly() throws Exception {
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(Integer.MAX_VALUE).dataNodesAutoAdjustScaleDown(Integer.MAX_VALUE).build())
+                .get(3, TimeUnit.SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                                .dataNodesAutoAdjustScaleUp(0)
+                                
.dataNodesAutoAdjustScaleDown(Integer.MAX_VALUE)
+                                .build()
+                )
+                .get(3, TimeUnit.SECONDS);
+
+        int zoneId = distributionZoneManager.getZoneId("zone1");
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.getDataNodes(zoneId, 1);
+
+        CompletableFuture<Void> topVerUpFut = 
distributionZoneManager.topVerFutures().get(1L);
+
+        assertFalse(topVerUpFut.isDone());
+
+        Set<String> nodes0 = Set.of("node0", "node1");
+
+        topologyWatchListenerOnUpdate(nodes0, 1, 1);
+
+        dataNodesWatchListenerOnUpdate(zoneId, nodes0, true, 1, 2);
+
+        dataNodesFut.get(3, TimeUnit.SECONDS);
+
+        dataNodesFut = distributionZoneManager.getDataNodes(zoneId, 2);
+
+        Set<String> nodes1 = Set.of("node0");
+
+        topologyWatchListenerOnUpdate(nodes1, 2, 2);
+
+        dataNodesFut.get(3, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testAwaitingScaleDownOnly() throws Exception {

Review Comment:
   Added asserts for other zones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to