alievmirza commented on code in PR #1391:
URL: https://github.com/apache/ignite-3/pull/1391#discussion_r1043164943


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
     /** {@inheritDoc} */
     @Override
     public void start() {
-
+        zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
 
     }
+
+    private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
+        @Override
+        public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long 
revision) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            byte[] logicalTopologyBytes;
+
+            Set<ClusterNode> clusterNodes;
+
+            //TODO temporary code, will be removed in 
https://issues.apache.org/jira/browse/IGNITE-18087
+            try {
+                clusterNodes = cmgManager.logicalTopology().get().nodes();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new IgniteInternalException(e);
+            }
+
+            // Update data nodes for a zone only if the revision of the event 
is newer than value in that trigger key,
+            // so we do not react on a stale events
+            CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
+
+            Set<String> nodesConsistentIds = 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+            logicalTopologyBytes = ByteUtils.toBytes(nodesConsistentIds);
+
+            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+
+            var iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
+
+            metaStorageManager.invoke(iif).thenAccept(res -> {
+                if (res.getAsBoolean()) {
+                    LOG.info("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, nodesConsistentIds);
+                } else {
+                    LOG.info("Failed to update zones' dataNodes value [zoneId 
= {}]", zoneId);

Review Comment:
   Changed to debig



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
     /** {@inheritDoc} */
     @Override
     public void start() {
-
+        zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
 
     }
+
+    private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
+        @Override
+        public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long 
revision) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            byte[] logicalTopologyBytes;
+
+            Set<ClusterNode> clusterNodes;
+
+            //TODO temporary code, will be removed in 
https://issues.apache.org/jira/browse/IGNITE-18087
+            try {
+                clusterNodes = cmgManager.logicalTopology().get().nodes();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new IgniteInternalException(e);
+            }
+
+            // Update data nodes for a zone only if the revision of the event 
is newer than value in that trigger key,
+            // so we do not react on a stale events
+            CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
+
+            Set<String> nodesConsistentIds = 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+            logicalTopologyBytes = ByteUtils.toBytes(nodesConsistentIds);
+
+            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+
+            var iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
+
+            metaStorageManager.invoke(iif).thenAccept(res -> {
+                if (res.getAsBoolean()) {
+                    LOG.info("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, nodesConsistentIds);
+                } else {
+                    LOG.info("Failed to update zones' dataNodes value [zoneId 
= {}]", zoneId);

Review Comment:
   Changed to debug



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to