sanpwc commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1154136650


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -500,6 +545,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;
+            boolean immediateScaleDown0 = false;
+
+            if (zoneId == DEFAULT_ZONE_ID) {
+                immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+                immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+            } else {
+                NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                        zonesConfiguration.distributionZones();
+
+                for (int i = 0; i < zones.value().size(); i++) {
+                    DistributionZoneView zone = zones.value().get(i);
+
+                    if (zone.zoneId() == zoneId) {
+                        immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() 
== 0;
+                        immediateScaleDown0 = 
zone.dataNodesAutoAdjustScaleDown() == 0;
+
+                        break;
+                    }
+                }
+            }
+
+            immediateScaleUp.set(immediateScaleUp0);
+            immediateScaleDown.set(immediateScaleDown0);
+        });
+
+        CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            synchronized (dataNodesMutex) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState == null) {
+                    throw new DistributionZoneNotFoundException(zoneId);
+                }
+
+                CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+                if (immediateScaleUp.get()) {
+                    Map.Entry<Long, Long> scaleUpRevisionEntry = 
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+                    Long scaleUpRevision = null;
+
+                    if (scaleUpRevisionEntry != null) {
+                        scaleUpRevision = scaleUpRevisionEntry.getValue();
+                    }
+
+                    if (scaleUpRevision != null && zoneState.scaleUpRevision() 
< scaleUpRevision) {
+                        Map.Entry<Long, CompletableFuture<Void>> ceilingEntry =
+                                
zoneState.revisionScaleUpFutures().ceilingEntry(scaleUpRevision);
+
+                        if (ceilingEntry != null) {
+                            topVerScaleUpFut0 = ceilingEntry.getValue();
+                        }
+
+                        if (topVerScaleUpFut0 == null) {
+                            topVerScaleUpFut0 = new CompletableFuture<>();
+
+                            
zoneState.revisionScaleUpFutures().put(scaleUpRevision, topVerScaleUpFut0);
+                        }
+
+                        topVerScaleUpFut0.handle((ignored0, e) -> {
+                            if (e == null) {
+                                topVerScaleUpFut.complete(null);
+                            } else {
+                                topVerScaleUpFut.completeExceptionally(e);
+                            }
+
+                            return null;
+                        });
+                    } else {
+                        topVerScaleUpFut.complete(null);
+                    }
+                } else {
+                    topVerScaleUpFut.complete(null);
+                }
+            }
+        });
+
+        CompletableFuture<Void> topVerScaleDownFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            CompletableFuture<Void> topVerScaleDownFut0 = null;
+
+            synchronized (dataNodesMutex) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState == null) {
+                    throw new DistributionZoneNotFoundException(zoneId);
+                }
+
+                if (immediateScaleDown.get()) {
+                    Map.Entry<Long, Long> scaleDownRevisionEntry = 
topVerAndScaleDownRevision.ceilingEntry(topVer);
+
+                    Long scaleDownRevision = null;
+
+                    if (scaleDownRevisionEntry != null) {
+                        scaleDownRevision = scaleDownRevisionEntry.getValue();
+                    }
+
+                    if (scaleDownRevision != null && 
zoneState.scaleDownRevision() < scaleDownRevision) {
+                        Map.Entry<Long, CompletableFuture<Void>> ceilingEntry =
+                                
zoneState.revisionScaleDownFutures().ceilingEntry(scaleDownRevision);
+
+                        if (ceilingEntry != null) {
+                            topVerScaleDownFut0 = ceilingEntry.getValue();
+                        }
+
+                        if (topVerScaleDownFut0 == null) {
+                            topVerScaleDownFut0 = new CompletableFuture<>();
+
+                            
zoneState.revisionScaleDownFutures().put(scaleDownRevision, 
topVerScaleDownFut0);
+                        }
+
+                        topVerScaleDownFut0.handle((ignored0, e) -> {
+                            if (e == null) {
+                                topVerScaleDownFut.complete(null);
+                            } else {
+                                topVerScaleDownFut.completeExceptionally(e);
+                            }
+
+                            return null;
+                        });
+                    } else {
+                        topVerScaleDownFut.complete(null);

Review Comment:
   What does this clause means? Please add the javadoc explaining why it's 
possible.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -500,6 +545,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version in 
topologyWatchListener.
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
than wait for writing started nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
than wait for writing stopped nodes
+     * corresponding to the topology version to data nodes into the meta 
storage .
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionDataNodes(int zoneId, 
long topVer) {
+        CompletableFuture<Set<String>> dataNodesFut = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> topVerFut;
+
+        synchronized (dataNodesMutex) {
+            if (topVer > lastTopVer) {
+                topVerFut = topVerFutures.computeIfAbsent(topVer, key -> new 
CompletableFuture<>());
+            } else {
+                topVerFut = completedFuture(null);
+            }
+        }
+
+        var immediateScaleUp = new AtomicBoolean();
+        var immediateScaleDown = new AtomicBoolean();
+
+        topVerFut = topVerFut.thenAccept(ignored -> {
+            boolean immediateScaleUp0 = false;
+            boolean immediateScaleDown0 = false;
+
+            if (zoneId == DEFAULT_ZONE_ID) {
+                immediateScaleUp0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().value()
 == 0;
+                immediateScaleDown0 = 
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().value()
 == 0;
+            } else {
+                NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                        zonesConfiguration.distributionZones();
+
+                for (int i = 0; i < zones.value().size(); i++) {
+                    DistributionZoneView zone = zones.value().get(i);
+
+                    if (zone.zoneId() == zoneId) {
+                        immediateScaleUp0 = zone.dataNodesAutoAdjustScaleUp() 
== 0;
+                        immediateScaleDown0 = 
zone.dataNodesAutoAdjustScaleDown() == 0;
+
+                        break;
+                    }
+                }
+            }
+
+            immediateScaleUp.set(immediateScaleUp0);
+            immediateScaleDown.set(immediateScaleDown0);
+        });
+
+        CompletableFuture<Void> topVerScaleUpFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            synchronized (dataNodesMutex) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState == null) {
+                    throw new DistributionZoneNotFoundException(zoneId);
+                }
+
+                CompletableFuture<Void> topVerScaleUpFut0 = null;
+
+                if (immediateScaleUp.get()) {
+                    Map.Entry<Long, Long> scaleUpRevisionEntry = 
topVerAndScaleUpRevision.ceilingEntry(topVer);
+
+                    Long scaleUpRevision = null;
+
+                    if (scaleUpRevisionEntry != null) {
+                        scaleUpRevision = scaleUpRevisionEntry.getValue();
+                    }
+
+                    if (scaleUpRevision != null && zoneState.scaleUpRevision() 
< scaleUpRevision) {
+                        Map.Entry<Long, CompletableFuture<Void>> ceilingEntry =
+                                
zoneState.revisionScaleUpFutures().ceilingEntry(scaleUpRevision);
+
+                        if (ceilingEntry != null) {
+                            topVerScaleUpFut0 = ceilingEntry.getValue();
+                        }
+
+                        if (topVerScaleUpFut0 == null) {
+                            topVerScaleUpFut0 = new CompletableFuture<>();
+
+                            
zoneState.revisionScaleUpFutures().put(scaleUpRevision, topVerScaleUpFut0);
+                        }
+
+                        topVerScaleUpFut0.handle((ignored0, e) -> {
+                            if (e == null) {
+                                topVerScaleUpFut.complete(null);
+                            } else {
+                                topVerScaleUpFut.completeExceptionally(e);
+                            }
+
+                            return null;
+                        });
+                    } else {
+                        topVerScaleUpFut.complete(null);
+                    }
+                } else {
+                    topVerScaleUpFut.complete(null);
+                }
+            }
+        });
+
+        CompletableFuture<Void> topVerScaleDownFut = new CompletableFuture<>();
+
+        topVerFut.thenAccept(ignored -> {
+            CompletableFuture<Void> topVerScaleDownFut0 = null;
+
+            synchronized (dataNodesMutex) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState == null) {
+                    throw new DistributionZoneNotFoundException(zoneId);

Review Comment:
   How it's possible to specify incorrect zoneId. It might be an assertion, but 
not DistributionZoneNotFoundException because we have zone existence validation 
on table create/altering etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to