sergeyuttsel commented on code in PR #1436:
URL: https://github.com/apache/ignite-3/pull/1436#discussion_r1048355394


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -36,19 +37,48 @@ class DistributionZonesUtil {
     /** Key prefix for zone's data nodes. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
+    /** Key prefix for zones' logical topology nodes. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX = 
"distributionZones.logicalTopology";
+
+    /** Key prefix for zones' logical topology version. */
+    private static final String 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_PREFIX = 
"distributionZones.logicalTopologyVersion";

Review Comment:
   Not a prefix.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -36,19 +37,48 @@ class DistributionZonesUtil {
     /** Key prefix for zone's data nodes. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
+    /** Key prefix for zones' logical topology nodes. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX = 
"distributionZones.logicalTopology";
+
+    /** Key prefix for zones' logical topology version. */
+    private static final String 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_PREFIX = 
"distributionZones.logicalTopologyVersion";
+
     /** The key, needed for processing the event about zones' update was 
triggered only once. */
-    private static final ByteArray ZONES_CHANGE_TRIGGER_KEY = new 
ByteArray("distributionZones.change.trigger");
+    private static final ByteArray DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY = new 
ByteArray("distributionZones.change.trigger");
 
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
     static ByteArray zoneDataNodesKey(int zoneId) {
         return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId);
     }
 
+    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX}. */
+    private static final ByteArray DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY = 
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX);
+
+    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_PREFIX}. */
+    private static final ByteArray 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =

Review Comment:
   Need to move "private static final" fields on top.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -373,4 +404,92 @@ private void updateMetaStorageOnZoneDelete(int zoneId, 
long revision) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Updates {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and 
{@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()}
+     * in meta storage.
+     *
+     * @param newTopology Logical topology snapshot.
+     * @param topologyLeap Flag that indicates whether this updates was 
trigger by
+     *                     {@link 
LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} or not.
+     */
+    private void updateMetaStorageKeys(LogicalTopologySnapshot newTopology, 
boolean topologyLeap) {
+        Set<String> topologyFromCmg = 
newTopology.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+        Condition updateCondition;
+
+        if (topologyLeap) {
+            updateCondition = 
value(zonesLogicalTopologyVersionKey()).lt(ByteUtils.longToBytes(newTopology.version()));
+        } else {
+            // This condition may be stronger, as far as we receive topology 
events one by one.
+            updateCondition = 
value(zonesLogicalTopologyVersionKey()).eq(ByteUtils.longToBytes(newTopology.version()
 - 1));
+        }
+
+        If iff = If.iif(
+                updateCondition,
+                updateLogicalTopologyAndVersion(topologyFromCmg, 
newTopology.version()),
+                ops().yield(false)
+        );
+
+        metaStorageManager.invoke(iff).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.debug(
+                        "Distribution zones' logical topology and version keys 
were updated [topology = {}, version = {}]",
+                        Arrays.toString(topologyFromCmg.toArray()),
+                        newTopology.version()
+                );
+            } else {
+                LOG.debug(
+                        "Failed to update distribution zones' logical topology 
and version keys [topology = {}, version = {}]",
+                        Arrays.toString(topologyFromCmg.toArray()),
+                        newTopology.version()
+                );
+            }
+        });
+    }
+
+    /**
+     * Initialises {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
+     * {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} from 
meta storage on the start of {@link DistributionZoneManager}.
+     */
+    private void initMetaStorageKeysOnStart() {
+        logicalTopologyService.logicalTopologyOnLeader().thenAccept(snapshot 
-> {
+            long topologyVersionFromCmg = snapshot.version();
+
+            byte[] topVerFromMetastorage;
+
+            try {
+                topVerFromMetastorage = 
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);

Review Comment:
   Need to use IgniteInternalException or other.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -36,19 +37,48 @@ class DistributionZonesUtil {
     /** Key prefix for zone's data nodes. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
+    /** Key prefix for zones' logical topology nodes. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX = 
"distributionZones.logicalTopology";

Review Comment:
   it isn't a prefix. It's just a DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY or 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to