sergeyuttsel commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1132038601


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -183,8 +221,89 @@ public void onTopologyLeap(LogicalTopologySnapshot 
newTopology) {
      */
     private volatile Set<String> logicalTopology;
 
-    /** Watch listener. Needed to unregister it on {@link 
DistributionZoneManager#stop()}. */
-    private final WatchListener watchListener;
+    /** Watch listener for logical topology keys. */
+    private final WatchListener topologyWatchListener;
+
+    /** Watch listener for data nodes keys. */
+    private final WatchListener dataNodesWatchListener;
+
+    /**
+     * Contains data nodes and meta info.
+     */
+    static class DataNodes {
+        /** Data nodes. */
+        private Set<String> nodes;
+
+        /** Scale up metastorage revision of current nodes value. */
+        private long scaleUpRevision;
+
+        /** Scale down metastorage revision of current nodes value. */
+        private long scaleDownRevision;
+
+        /**
+         * The map contains futures which are completed when zone manager 
observe data nodes bound to appropriate scale up revision.
+         * Map (revision -> future).
+         */
+        private final NavigableMap<Long, CompletableFuture<Void>> 
revisionScaleUpFutures = new ConcurrentSkipListMap();
+
+        /**
+         * The map contains futures which are completed when zone manager 
observe data nodes bound to appropriate scale down revision.
+         * Map (revision -> future).
+         */
+        private final NavigableMap<Long, CompletableFuture<Void>> 
revisionScaleDownFutures = new ConcurrentSkipListMap();
+
+        DataNodes() {
+            nodes = emptySet();
+        }
+
+        DataNodes(Set<String> nodes, long scaleUpRevision, long 
scaleDownRevision) {
+            this.nodes = nodes;
+            this.scaleUpRevision = scaleUpRevision;
+            this.scaleDownRevision = scaleDownRevision;
+        }
+
+        Set<String> nodes() {
+            return nodes;
+        }
+
+        void nodes(Set<String> nodes) {
+            this.nodes = nodes;
+        }
+
+        long scaleUpRevision() {
+            return scaleUpRevision;
+        }
+
+        void scaleUpRevision(long scaleUpRevision) {
+            this.scaleUpRevision = scaleUpRevision;
+        }
+
+        long scaleDownRevision() {
+            return scaleDownRevision;
+        }
+
+        void scaleDownRevision(long scaleDownRevision) {
+            this.scaleDownRevision = scaleDownRevision;
+        }
+
+        NavigableMap<Long, CompletableFuture<Void>> revisionScaleUpFutures() {
+            return revisionScaleUpFutures;
+        }
+
+        NavigableMap<Long, CompletableFuture<Void>> revisionScaleDownFutures() 
{
+            return revisionScaleDownFutures;
+        }
+    }
+
+    @TestOnly
+    Map<Integer, DataNodes> dataNodes() {

Review Comment:
   Now I use it in synchronized block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to