sanpwc commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1152768257


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -163,6 +176,30 @@ public class DistributionZoneManager implements 
IgniteComponent {
      */
     private final Map<Integer, ZoneState> zonesState;
 
+    /** Data nodes modification mutex. */
+    private final Object dataNodesMutex = new Object();
+
+    /** The last topology version which was observed by distribution zone 
manager. */
+    private long lastTopVer;

Review Comment:
   Seems that lastTopVer onStart initializatoin is missing. And, probably, 
corresponding test is also missing. Same is about topVerAndScaleUpRevision etc.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -917,24 +1213,69 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry 
but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and 
logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> 
entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
 
-                    long revision = newEntry.revision();
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                        if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
+
+                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;
+                    assert revision > 0;
+
+                    Set<String> newLogicalTopology0 = newLogicalTopology;
 
                     Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toSet());
+                            logicalTopology.stream().filter(node -> 
!newLogicalTopology0.contains(node)).collect(toSet());
 
                     Set<String> addedNodes =
                             newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toSet());
 
+                    synchronized (dataNodesMutex) {
+                        lastTopVer = topVer;
+
+                        //Associates topology version and scale up meta 
storage revision.
+                        if (!addedNodes.isEmpty()) {
+                            topVerAndScaleUpRevision.put(topVer, revision);
+
+                            topVerAndScaleUpRevision.headMap(topVer).clear();
+                        }
+
+                        //Associates topology version and scale down meta 
storage revision.
+                        if (!removedNodes.isEmpty()) {
+                            topVerAndScaleDownRevision.put(topVer, revision);
+
+                            topVerAndScaleDownRevision.headMap(topVer).clear();

Review Comment:
   So, topVerAndScaleDownRevision is a map of one element only?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -163,6 +176,30 @@ public class DistributionZoneManager implements 
IgniteComponent {
      */
     private final Map<Integer, ZoneState> zonesState;
 
+    /** Data nodes modification mutex. */
+    private final Object dataNodesMutex = new Object();
+
+    /** The last topology version which was observed by distribution zone 
manager. */
+    private long lastTopVer;
+
+    /**
+     * The map contains futures which are completed when zone manager observe 
appropriate logical topology version.
+     * Map (topology version -> future).
+     */
+    private final NavigableMap<Long, CompletableFuture<Void>> topVerFutures;
+
+    /**
+     * The map stores the correspondence between logical topology version and 
meta storage revision
+     * on which scale up timer was started.
+     */
+    private final NavigableMap<Long, Long> topVerAndScaleUpRevision;

Review Comment:
   Seems that cleanup is missing.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;

Review Comment:
   Why do you need up0/down0? I believe that it's possible to use atomics 
directly.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {

Review Comment:
   Naming is still confusing.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.

Review Comment:
   According to the javadoc it's not clear what we will get from the method if 
proposed topVer already passed. E.g. currently it's topVer == 10, and we ask 
topVer ==5.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;
+            boolean immediateScaleDown0 = false;
+
+            if (zoneId == DEFAULT_ZONE_ID) {
+                immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+                immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+            } else {
+                NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                        zonesConfiguration.distributionZones();
+
+                for (int i = 0; i < zones.value().size(); i++) {
+                    DistributionZoneView zone = zones.value().get(i);
+
+                    if (zone.zoneId() == zoneId) {
+                        immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() 
== 0;
+                        immediateScaleDown0 = 
zone.dataNodesAutoAdjustScaleDown() == 0;
+
+                        break;
+                    }
+                }
+            }
+
+            immediateScaleUp.set(immediateScaleUp0);
+            immediateScaleDown.set(immediateScaleDown0);
+        });
+
+        CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            synchronized (dataNodesMutex) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState == null) {
+                    throw new DistributionZoneNotFoundException(zoneId);
+                }
+
+                CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+                if (immediateScaleUp.get()) {
+                    Map.Entry<Long, Long> scaleUpRevisionEntry = 
topVerAndScaleUpRevision.ceilingEntry(topVer);

Review Comment:
   It's a bit confusing. 
   You have a map
   with one element only
   ```
                               topVerAndScaleUpRevision.put(topVer, revision);
   
                               topVerAndScaleUpRevision.headMap(topVer).clear();
   ```
   and you read only one element, the last one
   `topVerAndScaleUpRevision.ceilingEntry(topVer);`
   
   Why? ))
   



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;
+            boolean immediateScaleDown0 = false;
+
+            if (zoneId == DEFAULT_ZONE_ID) {
+                immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+                immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+            } else {
+                NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                        zonesConfiguration.distributionZones();
+
+                for (int i = 0; i < zones.value().size(); i++) {
+                    DistributionZoneView zone = zones.value().get(i);
+
+                    if (zone.zoneId() == zoneId) {

Review Comment:
   I believe that we already have zone by id utility method, if not - please 
add.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;

Review Comment:
   I'd rather add several private methods like 
   1. awaitTopologyVersion(topVer)
   2. isScaleUpImmediate(zoneId) / isScaleDownImmediate(zoneId)
   3. ...
   in order to use them inside topologyVersionDataNodes.
   



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;
+            boolean immediateScaleDown0 = false;
+
+            if (zoneId == DEFAULT_ZONE_ID) {
+                immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+                immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+            } else {
+                NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                        zonesConfiguration.distributionZones();
+
+                for (int i = 0; i < zones.value().size(); i++) {
+                    DistributionZoneView zone = zones.value().get(i);
+
+                    if (zone.zoneId() == zoneId) {

Review Comment:
   What if there's no corresponding zone with given Id on a local node? Is it 
possible?
   Please consider following flow:
   1. Node A:: create zone1.
   2. Node B:: create table with zone = zone1.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;
+            boolean immediateScaleDown0 = false;
+
+            if (zoneId == DEFAULT_ZONE_ID) {
+                immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+                immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+            } else {
+                NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                        zonesConfiguration.distributionZones();
+
+                for (int i = 0; i < zones.value().size(); i++) {
+                    DistributionZoneView zone = zones.value().get(i);
+
+                    if (zone.zoneId() == zoneId) {
+                        immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() 
== 0;
+                        immediateScaleDown0 = 
zone.dataNodesAutoAdjustScaleDown() == 0;
+
+                        break;
+                    }
+                }
+            }
+
+            immediateScaleUp.set(immediateScaleUp0);
+            immediateScaleDown.set(immediateScaleDown0);
+        });
+
+        CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            synchronized (dataNodesMutex) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState == null) {
+                    throw new DistributionZoneNotFoundException(zoneId);
+                }
+
+                CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+                if (immediateScaleUp.get()) {
+                    Map.Entry<Long, Long> scaleUpRevisionEntry = 
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+                    Long scaleUpRevision = null;
+
+                    if (scaleUpRevisionEntry != null) {
+                        scaleUpRevision = scaleUpRevisionEntry.getValue();
+                    }
+
+                    if (scaleUpRevision != null && zoneState.scaleUpRevision() 
< scaleUpRevision) {
+                        Map.Entry<Long, CompletableFuture<Void>> ceilingEntry =
+                                
zoneState.revisionScaleUpFutures().ceilingEntry(scaleUpRevision);
+
+                        if (ceilingEntry != null) {
+                            topVerScaleUpFut0 = ceilingEntry.getValue();
+                        }
+
+                        if (topVerScaleUpFut0 == null) {
+                            topVerScaleUpFut0 = new CompletableFuture<>();
+
+                            
zoneState.revisionScaleUpFutures().put(scaleUpRevision, topVerScaleUpFut0);
+                        }
+
+                        topVerScaleUpFut0.handle((ignored0, e) -> {
+                            if (e == null) {
+                                topVerScaleUpFut.complete(null);
+                            } else {
+                                topVerScaleUpFut.completeExceptionally(e);
+                            }
+
+                            return null;
+                        });
+                    } else {
+                        topVerScaleUpFut.complete(null);
+                    }
+                } else {
+                    topVerScaleUpFut.complete(null);
+                }
+            }
+        });
+
+        CompletableFuture<Void> topVerScaleDownFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {

Review Comment:
   The chaining is incorrect. 651::topVerFut.thenAccept will be executed before 
598::topVerFut.thenAccept, so that immediateScaleDown will have uninitialized 
value (false). By the way, why there are no failing tests showing that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to