sanpwc commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1053470125


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot 
newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   Commonly, it's better to instantiate the state in one place only, meaning 
that it's better to move `= Collections.emptySet()` to constructor. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -137,7 +156,7 @@ public CompletableFuture<Void> 
createZone(DistributionZoneConfigurationParameter
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone 
configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());

Review Comment:
   Here and in other places, please use withCause in order not to regenerate 
traceId.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot 
newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();
+
+    /** Watch listener id to unregister the watch listener on {@link 
DistributionZoneManager#stop()}. */
+    private Long watchListenerId;

Review Comment:
   Same as above, this variable should be thread-safe. It's possible to you 
will try to set and read it from different threads.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();

Review Comment:
   Same, as above. It's not valid to call get() on futures in such cases. 
   There's no sense in going further with review until you fix it. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(UNEXPECTED_ERR, e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            logicalTopology = ByteUtils.fromBytes(vaultEntry.value());
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = 
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetaStorage(zoneId, 
toBytes(logicalTopology), vaultAppliedRevision);
+                    });
+        }
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of 
distribution zones on logical topology changing event.
+     */
+    private void registerMetaStorageWatchListener() {
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new 
WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = 
ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg 
= zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = 
zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = 
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = 
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = 
zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || 
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create 
scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), 
newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && 
autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create 
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && 
autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create 
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", 
e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Returns applied vault revision.
+     *
+     * @return Applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L 
: bytesToLong(appliedRevision.value()))
+                    .get();

Review Comment:
   We should never-ever call get() or join() on futures in such cases. It's an 
important one.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot 
newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   This variable should be thread safe, it's possible that you will init in one 
thread(node startup thread) and will try to read in watch thread, thus 
visibility is broken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to