alievmirza commented on code in PR #1391:
URL: https://github.com/apache/ignite-3/pull/1391#discussion_r1043164943
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
/** {@inheritDoc} */
@Override
public void start() {
-
+ zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
}
+
+ private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long
revision) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ byte[] logicalTopologyBytes;
+
+ Set<ClusterNode> clusterNodes;
+
+ //TODO temporary code, will be removed in
https://issues.apache.org/jira/browse/IGNITE-18087
+ try {
+ clusterNodes = cmgManager.logicalTopology().get().nodes();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ // Update data nodes for a zone only if the revision of the event
is newer than value in that trigger key,
+ // so we do not react on a stale events
+ CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
+
+ Set<String> nodesConsistentIds =
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+ logicalTopologyBytes = ByteUtils.toBytes(nodesConsistentIds);
+
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+
+ var iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
+
+ metaStorageManager.invoke(iif).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.info("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, nodesConsistentIds);
+ } else {
+ LOG.info("Failed to update zones' dataNodes value [zoneId
= {}]", zoneId);
Review Comment:
Changed to debig
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -197,12 +235,114 @@ public CompletableFuture<Void> dropZone(String name) {
/** {@inheritDoc} */
@Override
public void start() {
-
+ zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
}
+
+ private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ updateMetaStorageOnZoneCreateOrUpdate(ctx.newValue().zoneId(),
ctx.storageRevision());
+
+ return completedFuture(null);
+ }
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void updateMetaStorageOnZoneCreateOrUpdate(int zoneId, long
revision) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ byte[] logicalTopologyBytes;
+
+ Set<ClusterNode> clusterNodes;
+
+ //TODO temporary code, will be removed in
https://issues.apache.org/jira/browse/IGNITE-18087
+ try {
+ clusterNodes = cmgManager.logicalTopology().get().nodes();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ // Update data nodes for a zone only if the revision of the event
is newer than value in that trigger key,
+ // so we do not react on a stale events
+ CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
+
+ Set<String> nodesConsistentIds =
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+ logicalTopologyBytes = ByteUtils.toBytes(nodesConsistentIds);
+
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+
+ var iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
+
+ metaStorageManager.invoke(iif).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.info("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, nodesConsistentIds);
+ } else {
+ LOG.info("Failed to update zones' dataNodes value [zoneId
= {}]", zoneId);
Review Comment:
Changed to debug
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]