sergeyuttsel commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060559895


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -483,47 +560,206 @@ private void initMetaStorageKeysOnStart() {
             }
 
             try {
-                long topologyVersionFromCmg = snapshot.version();
-
-                byte[] topVerFromMetastorage;
+                
metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry 
-> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException());
+                    }
 
-                try {
-                    topVerFromMetastorage = 
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, e);
-                }
+                    try {
+                        long topologyVersionFromCmg = snapshot.version();
 
-                if (topVerFromMetastorage == null || 
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
-                    Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+                        byte[] topVerFromMetaStorage = topVerEntry.value();
 
-                    Condition topologyVersionCondition = topVerFromMetastorage 
== null ? notExists(zonesLogicalTopologyVersionKey()) :
-                            
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+                        if (topVerFromMetaStorage == null || 
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+                            Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
 
-                    If iff = If.iif(topologyVersionCondition,
-                            updateLogicalTopologyAndVersion(topologyFromCmg, 
topologyVersionFromCmg),
-                            ops().yield(false)
-                    );
+                            Condition topologyVersionCondition = 
topVerFromMetaStorage == null
+                                    ? 
notExists(zonesLogicalTopologyVersionKey()) :
+                                    
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
 
-                    metaStorageManager.invoke(iff).thenAccept(res -> {
-                        if (res.getAsBoolean()) {
-                            LOG.debug(
-                                    "Distribution zones' logical topology and 
version keys were initialised [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
-                            );
-                        } else {
-                            LOG.debug(
-                                    "Failed to initialize distribution zones' 
logical topology "
-                                            + "and version keys [topology = 
{}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
+                            If iff = If.iif(topologyVersionCondition,
+                                    
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+                                    ops().yield(false)
                             );
+
+                            metaStorageManager.invoke(iff).thenAccept(res -> {
+                                if (res.getAsBoolean()) {
+                                    LOG.debug(
+                                            "Distribution zones' logical 
topology and version keys were initialised "
+                                                    + "[topology = {}, version 
= {}]",
+                                            
Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                } else {
+                                    LOG.debug(
+                                            "Failed to initialize distribution 
zones' logical topology "
+                                                    + "and version keys 
[topology = {}, version = {}]",
+                                            
Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                }
+                            });
                         }
-                    });
-                }
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+
             } finally {
                 busyLock.leaveBusy();
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        vaultMgr.get(APPLIED_REV)
+                .thenApply(appliedRevision -> appliedRevision == null ? 0L : 
bytesToLong(appliedRevision.value()))
+                .thenAccept(vaultAppliedRevision -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException());
+                    }
+
+                    try {
+                        vaultMgr.get(zonesLogicalTopologyKey())
+                                .thenAccept(vaultEntry -> {
+                                    if (!busyLock.enterBusy()) {
+                                        throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                                    }
+
+                                    try {
+                                        if (vaultEntry != null && 
vaultEntry.value() != null) {
+                                            logicalTopology = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                                    .forEach(zoneName -> {
+                                                        int zoneId = 
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                                                        
saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+                                                    });
+                                        }
+                                    } finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                });
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of 
distribution zones on logical topology changing event.
+     *
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<?> registerMetaStorageWatchListener() {
+        return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new 
WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = 
ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg 
= zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = 
zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = 
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = 
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = 
zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || 
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create 
scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), 
newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && 
autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create 
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && 
autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create 
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", 
e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param dataNodes Data nodes of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long 
revision) {
+        Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+        var iif = If.iif(triggerKeyCondition(revision), 
dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+        metaStorageManager.invoke(iif).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+            } else {
+                LOG.debug("Failed to delete zones' dataNodes key [zoneId = 
{}]", zoneId);
+            }
+        });
+    }
+
+    /**
+     * Unwraps distribution zone exceptions from {@link 
ConfigurationChangeException} if it is possible.
+     */
+    private static Throwable unwrapDistributionZoneException(Throwable e) {

Review Comment:
   Yes, I think it's better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to