alievmirza commented on code in PR #1391:
URL: https://github.com/apache/ignite-3/pull/1391#discussion_r1043165197


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
     /** {@inheritDoc} */
     @Override
     public void start() {
-
+        zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
 
     }
+
+    private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
+        @Override
+        public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long 
revision) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            byte[] logicalTopologyBytes;
+
+            Set<ClusterNode> clusterNodes;
+
+            //TODO temporary code, will be removed in 
https://issues.apache.org/jira/browse/IGNITE-18087
+            try {
+                clusterNodes = cmgManager.logicalTopology().get().nodes();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new IgniteInternalException(e);
+            }
+
+            // Update data nodes for a zone only if the revision of the event 
is newer than value in that trigger key,
+            // so we do not react on a stale events
+            CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
+
+            Set<String> nodesConsistentIds = 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+            logicalTopologyBytes = ByteUtils.toBytes(nodesConsistentIds);
+
+            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+
+            var iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));

Review Comment:
   fixed



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
     /** {@inheritDoc} */
     @Override
     public void start() {
-
+        zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
 
     }
+
+    private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
+        @Override
+        public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+            updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+
+            return completedFuture(null);
+        }
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long 
revision) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            byte[] logicalTopologyBytes;

Review Comment:
   done



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
     /** {@inheritDoc} */
     @Override
     public void start() {
-

Review Comment:
   thanks! 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -17,40 +17,79 @@
 
 package org.apache.ignite.internal.distributionzones;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
+import static org.apache.ignite.internal.metastorage.client.Operations.ops;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
 
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 import org.apache.ignite.configuration.NamedListChange;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneRenameException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.CompoundCondition;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.Update;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * Distribution zones manager.
  */
 public class DistributionZoneManager implements IgniteComponent {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneManager.class);
+
     /** Distribution zone configuration. */
     private final DistributionZonesConfiguration zonesConfiguration;
 
+    /** Meta Storage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /* Cluster Management manager. */
+    private final ClusterManagementGroupManager cmgManager;
+
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /**
      * Creates a new distribution zone manager.
      *
      * @param zonesConfiguration Distribution zones configuration.
+     * @param metaStorageManager Meta Storage manager.
+     * @param cmgManager Cluster management group manager.
      */
-    public DistributionZoneManager(DistributionZonesConfiguration 
zonesConfiguration) {
+    public DistributionZoneManager(
+            DistributionZonesConfiguration zonesConfiguration,
+            MetaStorageManager metaStorageManager,
+            ClusterManagementGroupManager cmgManager
+    ) {
         this.zonesConfiguration = zonesConfiguration;
+

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to