sk0x50 commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060339037


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
                 new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
 
         assertTrue(e != null);
-        assertTrue(e.getCause().getCause() instanceof 
DistributionZoneAlreadyExistsException, e.toString());
+        assertTrue(e.getCause() instanceof 
DistributionZoneAlreadyExistsException, e.toString());
     }
 
     @Test
-    public void testDropZoneIfNotExists() {
+    public void testDropZoneIfNotExists() throws NodeStoppingException {

Review Comment:
   This test does not throw `NodeStoppingException`. Please check and update 
the test in accordance. (here and below)



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -483,47 +560,206 @@ private void initMetaStorageKeysOnStart() {
             }
 
             try {
-                long topologyVersionFromCmg = snapshot.version();
-
-                byte[] topVerFromMetastorage;
+                
metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry 
-> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException());
+                    }
 
-                try {
-                    topVerFromMetastorage = 
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, e);
-                }
+                    try {
+                        long topologyVersionFromCmg = snapshot.version();
 
-                if (topVerFromMetastorage == null || 
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
-                    Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+                        byte[] topVerFromMetaStorage = topVerEntry.value();
 
-                    Condition topologyVersionCondition = topVerFromMetastorage 
== null ? notExists(zonesLogicalTopologyVersionKey()) :
-                            
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+                        if (topVerFromMetaStorage == null || 
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+                            Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
 
-                    If iff = If.iif(topologyVersionCondition,
-                            updateLogicalTopologyAndVersion(topologyFromCmg, 
topologyVersionFromCmg),
-                            ops().yield(false)
-                    );
+                            Condition topologyVersionCondition = 
topVerFromMetaStorage == null
+                                    ? 
notExists(zonesLogicalTopologyVersionKey()) :
+                                    
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
 
-                    metaStorageManager.invoke(iff).thenAccept(res -> {
-                        if (res.getAsBoolean()) {
-                            LOG.debug(
-                                    "Distribution zones' logical topology and 
version keys were initialised [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
-                            );
-                        } else {
-                            LOG.debug(
-                                    "Failed to initialize distribution zones' 
logical topology "
-                                            + "and version keys [topology = 
{}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
+                            If iff = If.iif(topologyVersionCondition,
+                                    
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+                                    ops().yield(false)
                             );
+
+                            metaStorageManager.invoke(iff).thenAccept(res -> {
+                                if (res.getAsBoolean()) {
+                                    LOG.debug(
+                                            "Distribution zones' logical 
topology and version keys were initialised "
+                                                    + "[topology = {}, version 
= {}]",
+                                            
Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                } else {
+                                    LOG.debug(
+                                            "Failed to initialize distribution 
zones' logical topology "
+                                                    + "and version keys 
[topology = {}, version = {}]",
+                                            
Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                }
+                            });
                         }
-                    });
-                }
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+
             } finally {
                 busyLock.leaveBusy();
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        vaultMgr.get(APPLIED_REV)
+                .thenApply(appliedRevision -> appliedRevision == null ? 0L : 
bytesToLong(appliedRevision.value()))
+                .thenAccept(vaultAppliedRevision -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException());
+                    }
+
+                    try {
+                        vaultMgr.get(zonesLogicalTopologyKey())
+                                .thenAccept(vaultEntry -> {
+                                    if (!busyLock.enterBusy()) {
+                                        throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                                    }
+
+                                    try {
+                                        if (vaultEntry != null && 
vaultEntry.value() != null) {
+                                            logicalTopology = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                                    .forEach(zoneName -> {
+                                                        int zoneId = 
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                                                        
saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+                                                    });
+                                        }
+                                    } finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                });
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of 
distribution zones on logical topology changing event.
+     *
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<?> registerMetaStorageWatchListener() {
+        return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new 
WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = 
ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg 
= zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = 
zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = 
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = 
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = 
zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || 
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create 
scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), 
newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && 
autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create 
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && 
autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create 
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", 
e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param dataNodes Data nodes of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long 
revision) {
+        Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+        var iif = If.iif(triggerKeyCondition(revision), 
dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+        metaStorageManager.invoke(iif).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+            } else {
+                LOG.debug("Failed to delete zones' dataNodes key [zoneId = 
{}]", zoneId);
+            }
+        });
+    }
+
+    /**
+     * Unwraps distribution zone exceptions from {@link 
ConfigurationChangeException} if it is possible.
+     */
+    private static Throwable unwrapDistributionZoneException(Throwable e) {

Review Comment:
   This method looks weird to me, especially `e.getCause().getCause()` part. I 
would propose something like the following:
   
   ```
       private static Throwable unwrapDistributionZoneException(Throwable e, 
Class<? extends Throwable>... expectedClz) {
           Throwable ret = unwrapDistributionZoneExceptionRecursively(e, 
expectedClz);
   
           return ret != null ? ret : e;
       }
   
       private static Throwable 
unwrapDistributionZoneExceptionRecursively(Throwable e, Class<? extends 
Throwable>... expectedClz) {
           if ((e instanceof CompletionException || e instanceof 
ConfigurationChangeException) && e.getCause() != null) {
               return unwrapDistributionZoneExceptionRecursively(e.getCause(), 
expectedClz);
           }
   
           for (Class<?> expected : expectedClz) {
               if (expected.isAssignableFrom(e.getClass()))
                   return e;
           }
   
           return null;
       }
   ```
   
   By the way, it allows you clearly to define expected types of exceptions:
   ```
       public CompletableFuture<Void> 
createZone(DistributionZoneConfigurationParameters distributionZoneCfg) {
           ....
               })).whenComplete((res, e) -> {
                   if (e != null) {
                       
fut.completeExceptionally(unwrapDistributionZoneException(e, 
DistributionZoneAlreadyExistsException.class, 
ConfigurationValidationException.class));
                   } else {
                       fut.complete(null);
                   }
               });
   ```
   
   What do you think?



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
                 new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
 
         assertTrue(e != null);
-        assertTrue(e.getCause().getCause() instanceof 
DistributionZoneAlreadyExistsException, e.toString());
+        assertTrue(e.getCause() instanceof 
DistributionZoneAlreadyExistsException, e.toString());

Review Comment:
   I like the approach that always requires a message, it looks a lot 
friendlier.
   ```
       assertTrue("Expected exception was not thrown.", e != null);
       assertTrue("Unexpected type of exception (requires 
DistributionZoneAlreadyExistsException)", e.getCause() instanceof 
DistributionZoneAlreadyExistsException);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to