sanpwc commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1157493102


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -937,24 +1269,65 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry 
but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and 
logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> 
entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
 
-                    long revision = newEntry.revision();
+                    Set<String> newLogicalTopology = null;
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                    long revision = 0;
 
-                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
+
+                        if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
+
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
+
+                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;
+                    assert revision > 0;
+
+                    Set<String> newLogicalTopology0 = newLogicalTopology;
 
                     Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toSet());
+                            logicalTopology.stream().filter(node -> 
!newLogicalTopology0.contains(node)).collect(toSet());
 
                     Set<String> addedNodes =
                             newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toSet());
 
+                    synchronized (dataNodesMutex) {
+                        lastTopVer = topVer;
+
+                        //Associates topology version and scale up meta 
storage revision.
+                        if (!addedNodes.isEmpty()) {

Review Comment:
   - Seems that lastScaleUp/DownRevision update is a bit misplaced. We already 
have `!added/remvoedNodes.isEmpty()` checks right in place where we schedule 
corresponding timers. So that I'd rather move `lastScaleUpRevision = revision;` 
and `lastScaleDownRevision = revision;` there.
   - I'd also move `lastTopVer = topVer` along with 
`topVerFuts.values().forEach(v -> v.complete(null));` after scheduleTimers 
calls. Thus we will have HB relation between setting lastScaleUp/DownRevision 
and lastTopVer update, so that it'll be possible to update 
lastScaleUp/DownRevision without synchronized (dataNodesMutex). Volatile will 
be enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to