alievmirza commented on code in PR #1391:
URL: https://github.com/apache/ignite-3/pull/1391#discussion_r1043165197
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
/** {@inheritDoc} */
@Override
public void start() {
-
+ zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
}
+
+ private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long
revision) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ byte[] logicalTopologyBytes;
+
+ Set<ClusterNode> clusterNodes;
+
+ //TODO temporary code, will be removed in
https://issues.apache.org/jira/browse/IGNITE-18087
+ try {
+ clusterNodes = cmgManager.logicalTopology().get().nodes();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ // Update data nodes for a zone only if the revision of the event
is newer than value in that trigger key,
+ // so we do not react on a stale events
+ CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
+
+ Set<String> nodesConsistentIds =
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+ logicalTopologyBytes = ByteUtils.toBytes(nodesConsistentIds);
+
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+
+ var iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
Review Comment:
fixed
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
/** {@inheritDoc} */
@Override
public void start() {
-
+ zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
}
+
+ private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long
revision) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ byte[] logicalTopologyBytes;
Review Comment:
done
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
/** {@inheritDoc} */
@Override
public void start() {
-
Review Comment:
thanks!
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -17,40 +17,79 @@
package org.apache.ignite.internal.distributionzones;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
+import static org.apache.ignite.internal.metastorage.client.Operations.ops;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListChange;
+import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneRenameException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.CompoundCondition;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.Update;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
/**
* Distribution zones manager.
*/
public class DistributionZoneManager implements IgniteComponent {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(DistributionZoneManager.class);
+
/** Distribution zone configuration. */
private final DistributionZonesConfiguration zonesConfiguration;
+ /** Meta Storage manager. */
+ private final MetaStorageManager metaStorageManager;
+
+ /* Cluster Management manager. */
+ private final ClusterManagementGroupManager cmgManager;
+
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/**
* Creates a new distribution zone manager.
*
* @param zonesConfiguration Distribution zones configuration.
+ * @param metaStorageManager Meta Storage manager.
+ * @param cmgManager Cluster management group manager.
*/
- public DistributionZoneManager(DistributionZonesConfiguration
zonesConfiguration) {
+ public DistributionZoneManager(
+ DistributionZonesConfiguration zonesConfiguration,
+ MetaStorageManager metaStorageManager,
+ ClusterManagementGroupManager cmgManager
+ ) {
this.zonesConfiguration = zonesConfiguration;
+
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]