sergeyuttsel commented on code in PR #2172:
URL: https://github.com/apache/ignite-3/pull/2172#discussion_r1224198380


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -73,6 +73,12 @@ public class DistributionZonesUtil {
     /** Key prefix for zones' logical topology nodes and logical topology 
version. */
     private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX = 
"distributionZones.logicalTopology.";
 
+    /** Key value for zones' nodes' attributes in vault. */
+    private static final String DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT = 
"vault.distributionZones.nodesAttributes";
+
+    /** Key prefix for zones' logical topology nodes in vault. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VAULT = 
"vault.distributionZones.logicalTopology.nodes";

Review Comment:
   Maybe it will be better to use `"vault." + 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distribution.zones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.BaseIgniteRestartTest;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.ConfigurationModules;
+import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NettyBootstrapFactory;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for checking {@link DistributionZoneManager} behavior after node's 
restart.
+ */
+@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
+@ExtendWith(ConfigurationExtension.class)
+public class ItIgniteDistributionZoneManagerNodeRestartTest extends 
BaseIgniteRestartTest {
+    private static final LogicalNode A = new LogicalNode(
+            new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+            Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+    );
+
+    private static final LogicalNode B = new LogicalNode(
+            new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+            Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+    );
+
+    private static final LogicalNode C = new LogicalNode(
+            new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+            Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+    );
+
+    private static final String ZONE_NAME = "zone1";
+
+    /**
+     * Start some of Ignite components that are able to serve as Ignite node 
for test purposes.
+     *
+     * @param idx Node index.
+     * @return Partial node.
+     */
+    private PartialNode startPartialNode(int idx) {
+        String name = testNodeName(testInfo, idx);
+
+        Path dir = workDir.resolve(name);
+
+        List<IgniteComponent> components = new ArrayList<>();
+
+        VaultManager vault = createVault(name, dir);
+
+        ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
+
+        Path configFile = 
workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
+        String configString = configurationString(idx);
+        try {
+            Files.writeString(configFile, configString);
+        } catch (IOException e) {
+            throw new NodeConfigWriteException("Failed to write config content 
to file.", e);
+        }
+
+        var localConfigurationGenerator = new ConfigurationTreeGenerator(
+                modules.local().rootKeys(),
+                modules.local().internalSchemaExtensions(),
+                modules.local().polymorphicSchemaExtensions()
+        );
+
+        var nodeCfgMgr = new ConfigurationManager(
+                modules.local().rootKeys(),
+                new LocalFileConfigurationStorage(configFile, 
localConfigurationGenerator),
+                localConfigurationGenerator,
+                
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, 
modules.local().validators())
+        );
+
+        NetworkConfiguration networkConfiguration = 
nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
+
+        var nettyBootstrapFactory = new 
NettyBootstrapFactory(networkConfiguration, name);
+
+        var clusterSvc = new 
TestScaleCubeClusterServiceFactory().createClusterService(
+                name,
+                networkConfiguration,
+                nettyBootstrapFactory,
+                defaultSerializationRegistry(),
+                new VaultStateIds(vault)
+        );
+
+        var clusterStateStorage = new TestClusterStateStorage();
+
+        var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        var cmgManager = mock(ClusterManagementGroupManager.class);
+
+        when(cmgManager.logicalTopology()).thenAnswer(invocation -> 
completedFuture(logicalTopology.getLogicalTopology()));
+
+        var metaStorageMgr = StandaloneMetaStorageManager.create(
+                vault,
+                new TestRocksDbKeyValueStorage("test", 
workDir.resolve("metastorage"))
+        );
+
+        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
+
+        ConfigurationTreeGenerator distributedConfigurationGenerator = new 
ConfigurationTreeGenerator(
+                modules.distributed().rootKeys(),
+                modules.distributed().internalSchemaExtensions(),
+                modules.distributed().polymorphicSchemaExtensions()
+        );
+
+        var clusterCfgMgr = new ConfigurationManager(
+                modules.distributed().rootKeys(),
+                cfgStorage,
+                distributedConfigurationGenerator,
+                
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator,
 modules.distributed().validators())
+        );
+
+        DistributionZonesConfiguration zonesConfiguration = 
clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        TablesConfiguration tablesConfiguration = 
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageMgr,
+                logicalTopologyService,
+                vault,
+                name
+        );
+
+        // Preparing the result map.
+
+        components.add(vault);
+        components.add(nodeCfgMgr);
+
+        // Start.
+
+        vault.start();
+        vault.putName(name).join();
+
+        nodeCfgMgr.start();
+
+        // Start the remaining components.
+        List<IgniteComponent> otherComponents = List.of(
+                nettyBootstrapFactory,
+                clusterSvc,
+                clusterStateStorage,
+                cmgManager,
+                metaStorageMgr,
+                clusterCfgMgr,
+                distributionZoneManager
+        );
+
+        for (IgniteComponent component : otherComponents) {
+            component.start();
+
+            components.add(component);
+        }
+
+        PartialNode partialNode = partialNode(
+                nodeCfgMgr,
+                clusterCfgMgr,
+                null,
+                components,
+                localConfigurationGenerator,
+                logicalTopology,
+                cfgStorage,
+                distributedConfigurationGenerator
+        );
+
+        partialNodes.add(partialNode);
+
+        return partialNode;
+    }
+
+    @Test
+    public void testNodeAttributesRestoredAfterRestart() throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().putNode(C);
+
+        distributionZoneManager.createZone(
+                new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                        .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                        .build()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        Set<String> nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                partialNode.logicalTopology().getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+
+        Map<String, Map<String, String>> nodeAttributesBeforeRestart = 
distributionZoneManager.nodesAttributes();
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        Map<String, Map<String, String>> nodeAttributesAfterRestart = 
distributionZoneManager.nodesAttributes();
+
+        assertEquals(3, nodeAttributesAfterRestart.size());
+
+        assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
+    }
+
+    @Test
+    public void testLogicalTopologyRestoredAfterRestart() throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().putNode(C);
+
+        distributionZoneManager.createZone(
+                new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                        .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                        .build()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        Set<String> nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                partialNode.logicalTopology().getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);

Review Comment:
   `Set.of(A, B, C).stream().map(ClusterNode::name).collect(Collectors.toSet())`
   Need to save it in the variable.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distribution.zones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.BaseIgniteRestartTest;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.ConfigurationModules;
+import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NettyBootstrapFactory;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for checking {@link DistributionZoneManager} behavior after node's 
restart.
+ */
+@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
+@ExtendWith(ConfigurationExtension.class)
+public class ItIgniteDistributionZoneManagerNodeRestartTest extends 
BaseIgniteRestartTest {
+    private static final LogicalNode A = new LogicalNode(
+            new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+            Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+    );
+
+    private static final LogicalNode B = new LogicalNode(
+            new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+            Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+    );
+
+    private static final LogicalNode C = new LogicalNode(
+            new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+            Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+    );
+
+    private static final String ZONE_NAME = "zone1";
+
+    /**
+     * Start some of Ignite components that are able to serve as Ignite node 
for test purposes.
+     *
+     * @param idx Node index.
+     * @return Partial node.
+     */
+    private PartialNode startPartialNode(int idx) {
+        String name = testNodeName(testInfo, idx);
+
+        Path dir = workDir.resolve(name);
+
+        List<IgniteComponent> components = new ArrayList<>();
+
+        VaultManager vault = createVault(name, dir);
+
+        ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
+
+        Path configFile = 
workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
+        String configString = configurationString(idx);
+        try {
+            Files.writeString(configFile, configString);
+        } catch (IOException e) {
+            throw new NodeConfigWriteException("Failed to write config content 
to file.", e);
+        }
+
+        var localConfigurationGenerator = new ConfigurationTreeGenerator(
+                modules.local().rootKeys(),
+                modules.local().internalSchemaExtensions(),
+                modules.local().polymorphicSchemaExtensions()
+        );
+
+        var nodeCfgMgr = new ConfigurationManager(
+                modules.local().rootKeys(),
+                new LocalFileConfigurationStorage(configFile, 
localConfigurationGenerator),
+                localConfigurationGenerator,
+                
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, 
modules.local().validators())
+        );
+
+        NetworkConfiguration networkConfiguration = 
nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
+
+        var nettyBootstrapFactory = new 
NettyBootstrapFactory(networkConfiguration, name);
+
+        var clusterSvc = new 
TestScaleCubeClusterServiceFactory().createClusterService(
+                name,
+                networkConfiguration,
+                nettyBootstrapFactory,
+                defaultSerializationRegistry(),
+                new VaultStateIds(vault)
+        );
+
+        var clusterStateStorage = new TestClusterStateStorage();
+
+        var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        var cmgManager = mock(ClusterManagementGroupManager.class);
+
+        when(cmgManager.logicalTopology()).thenAnswer(invocation -> 
completedFuture(logicalTopology.getLogicalTopology()));
+
+        var metaStorageMgr = StandaloneMetaStorageManager.create(
+                vault,
+                new TestRocksDbKeyValueStorage("test", 
workDir.resolve("metastorage"))
+        );
+
+        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
+
+        ConfigurationTreeGenerator distributedConfigurationGenerator = new 
ConfigurationTreeGenerator(
+                modules.distributed().rootKeys(),
+                modules.distributed().internalSchemaExtensions(),
+                modules.distributed().polymorphicSchemaExtensions()
+        );
+
+        var clusterCfgMgr = new ConfigurationManager(
+                modules.distributed().rootKeys(),
+                cfgStorage,
+                distributedConfigurationGenerator,
+                
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator,
 modules.distributed().validators())
+        );
+
+        DistributionZonesConfiguration zonesConfiguration = 
clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        TablesConfiguration tablesConfiguration = 
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageMgr,
+                logicalTopologyService,
+                vault,
+                name
+        );
+
+        // Preparing the result map.
+
+        components.add(vault);
+        components.add(nodeCfgMgr);
+
+        // Start.
+
+        vault.start();
+        vault.putName(name).join();
+
+        nodeCfgMgr.start();
+
+        // Start the remaining components.
+        List<IgniteComponent> otherComponents = List.of(
+                nettyBootstrapFactory,
+                clusterSvc,
+                clusterStateStorage,
+                cmgManager,
+                metaStorageMgr,
+                clusterCfgMgr,
+                distributionZoneManager
+        );
+
+        for (IgniteComponent component : otherComponents) {
+            component.start();
+
+            components.add(component);
+        }
+
+        PartialNode partialNode = partialNode(
+                nodeCfgMgr,
+                clusterCfgMgr,
+                null,
+                components,
+                localConfigurationGenerator,
+                logicalTopology,
+                cfgStorage,
+                distributedConfigurationGenerator
+        );
+
+        partialNodes.add(partialNode);
+
+        return partialNode;
+    }
+
+    @Test
+    public void testNodeAttributesRestoredAfterRestart() throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().putNode(C);
+
+        distributionZoneManager.createZone(
+                new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                        .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                        .build()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        Set<String> nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                partialNode.logicalTopology().getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+
+        Map<String, Map<String, String>> nodeAttributesBeforeRestart = 
distributionZoneManager.nodesAttributes();
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        Map<String, Map<String, String>> nodeAttributesAfterRestart = 
distributionZoneManager.nodesAttributes();
+
+        assertEquals(3, nodeAttributesAfterRestart.size());
+
+        assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
+    }
+
+    @Test
+    public void testLogicalTopologyRestoredAfterRestart() throws Exception {

Review Comment:
   Do we need two separate tests? They have many of the same lines of code. 
Perhaps need to move them to a separate method.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distribution.zones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.BaseIgniteRestartTest;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.ConfigurationModules;
+import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NettyBootstrapFactory;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for checking {@link DistributionZoneManager} behavior after node's 
restart.
+ */
+@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
+@ExtendWith(ConfigurationExtension.class)
+public class ItIgniteDistributionZoneManagerNodeRestartTest extends 
BaseIgniteRestartTest {
+    private static final LogicalNode A = new LogicalNode(
+            new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+            Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+    );
+
+    private static final LogicalNode B = new LogicalNode(
+            new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+            Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+    );
+
+    private static final LogicalNode C = new LogicalNode(
+            new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+            Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+    );
+
+    private static final String ZONE_NAME = "zone1";
+
+    /**
+     * Start some of Ignite components that are able to serve as Ignite node 
for test purposes.
+     *
+     * @param idx Node index.
+     * @return Partial node.
+     */
+    private PartialNode startPartialNode(int idx) {
+        String name = testNodeName(testInfo, idx);
+
+        Path dir = workDir.resolve(name);
+
+        List<IgniteComponent> components = new ArrayList<>();
+
+        VaultManager vault = createVault(name, dir);
+
+        ConfigurationModules modules = loadConfigurationModules(log, 
Thread.currentThread().getContextClassLoader());
+
+        Path configFile = 
workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
+        String configString = configurationString(idx);
+        try {
+            Files.writeString(configFile, configString);
+        } catch (IOException e) {
+            throw new NodeConfigWriteException("Failed to write config content 
to file.", e);
+        }
+
+        var localConfigurationGenerator = new ConfigurationTreeGenerator(
+                modules.local().rootKeys(),
+                modules.local().internalSchemaExtensions(),
+                modules.local().polymorphicSchemaExtensions()
+        );
+
+        var nodeCfgMgr = new ConfigurationManager(
+                modules.local().rootKeys(),
+                new LocalFileConfigurationStorage(configFile, 
localConfigurationGenerator),
+                localConfigurationGenerator,
+                
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, 
modules.local().validators())
+        );
+
+        NetworkConfiguration networkConfiguration = 
nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
+
+        var nettyBootstrapFactory = new 
NettyBootstrapFactory(networkConfiguration, name);
+
+        var clusterSvc = new 
TestScaleCubeClusterServiceFactory().createClusterService(
+                name,
+                networkConfiguration,
+                nettyBootstrapFactory,
+                defaultSerializationRegistry(),
+                new VaultStateIds(vault)
+        );
+
+        var clusterStateStorage = new TestClusterStateStorage();
+
+        var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        var cmgManager = mock(ClusterManagementGroupManager.class);
+
+        when(cmgManager.logicalTopology()).thenAnswer(invocation -> 
completedFuture(logicalTopology.getLogicalTopology()));
+
+        var metaStorageMgr = StandaloneMetaStorageManager.create(
+                vault,
+                new TestRocksDbKeyValueStorage("test", 
workDir.resolve("metastorage"))
+        );
+
+        var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
+
+        ConfigurationTreeGenerator distributedConfigurationGenerator = new 
ConfigurationTreeGenerator(
+                modules.distributed().rootKeys(),
+                modules.distributed().internalSchemaExtensions(),
+                modules.distributed().polymorphicSchemaExtensions()
+        );
+
+        var clusterCfgMgr = new ConfigurationManager(
+                modules.distributed().rootKeys(),
+                cfgStorage,
+                distributedConfigurationGenerator,
+                
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator,
 modules.distributed().validators())
+        );
+
+        DistributionZonesConfiguration zonesConfiguration = 
clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        TablesConfiguration tablesConfiguration = 
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageMgr,
+                logicalTopologyService,
+                vault,
+                name
+        );
+
+        // Preparing the result map.
+
+        components.add(vault);
+        components.add(nodeCfgMgr);
+
+        // Start.
+
+        vault.start();
+        vault.putName(name).join();
+
+        nodeCfgMgr.start();
+
+        // Start the remaining components.
+        List<IgniteComponent> otherComponents = List.of(
+                nettyBootstrapFactory,
+                clusterSvc,
+                clusterStateStorage,
+                cmgManager,
+                metaStorageMgr,
+                clusterCfgMgr,
+                distributionZoneManager
+        );
+
+        for (IgniteComponent component : otherComponents) {
+            component.start();
+
+            components.add(component);
+        }
+
+        PartialNode partialNode = partialNode(
+                nodeCfgMgr,
+                clusterCfgMgr,
+                null,
+                components,
+                localConfigurationGenerator,
+                logicalTopology,
+                cfgStorage,
+                distributedConfigurationGenerator
+        );
+
+        partialNodes.add(partialNode);
+
+        return partialNode;
+    }
+
+    @Test
+    public void testNodeAttributesRestoredAfterRestart() throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().putNode(C);
+
+        distributionZoneManager.createZone(
+                new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                        .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                        .build()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        Set<String> nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                partialNode.logicalTopology().getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+
+        Map<String, Map<String, String>> nodeAttributesBeforeRestart = 
distributionZoneManager.nodesAttributes();
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        Map<String, Map<String, String>> nodeAttributesAfterRestart = 
distributionZoneManager.nodesAttributes();
+
+        assertEquals(3, nodeAttributesAfterRestart.size());
+
+        assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
+    }
+
+    @Test
+    public void testLogicalTopologyRestoredAfterRestart() throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().putNode(C);
+
+        distributionZoneManager.createZone(

Review Comment:
   Why do you need to create a zone in this test?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -1340,6 +1349,26 @@ public void onError(Throwable e) {
         };
     }
 
+    /**
+     * Saves logical topology and nodes' attributes map to vault atomically in 
one batch.
+     * After restart it could be used to restore these fields.
+     *
+     * @param newLogicalTopology Logical topology.
+     */
+    private void 
saveLogicalTopologyNodeAttributesToVault(Set<NodeWithAttributes> 
newLogicalTopology) {

Review Comment:
   It's not only save to vault but also update local state of `logicalTopology` 
and `nodesAttributes`. Maybe need to rename it or update these fields outside 
from this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to