sergeyuttsel commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1163050578


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry 
but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and 
logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> 
entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
 
-                    long revision = newEntry.revision();
+                        if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
 
-                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;
+                    assert revision > 0;
+
+                    Set<String> newLogicalTopology0 = newLogicalTopology;
 
                     Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toSet());
+                            logicalTopology.stream().filter(node -> 
!newLogicalTopology0.contains(node)).collect(toSet());
 
                     Set<String> addedNodes =
                             newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toSet());
 
+                    if (!addedNodes.isEmpty()) {
+                        lastScaleUpRevision = revision;
+                    }
+
+                    if (!removedNodes.isEmpty()) {
+                        lastScaleDownRevision = revision;
+                    }
+
+                    //The topology version must be updated after the 
lastScaleUpRevision and lastScaleDownRevision are updated.
+                    //This is necessary in order to when topology version 
waiters will be notified that topology version is updated

Review Comment:
   I've rephrased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to