sanpwc commented on code in PR #1426: URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1053470125
########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) { } }; + /** The logical topology on the last watch event. */ + private Set<String> logicalTopology = Collections.emptySet(); Review Comment: Commonly, it's better to instantiate the state in one place only, meaning that it's better to move `= Collections.emptySet()` to constructor. ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -137,7 +156,7 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null."); if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); + throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException()); Review Comment: Here and in other places, please use withCause in order not to regenerate traceId. ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) { } }; + /** The logical topology on the last watch event. */ + private Set<String> logicalTopology = Collections.emptySet(); + + /** Watch listener id to unregister the watch listener on {@link DistributionZoneManager#stop()}. */ + private Long watchListenerId; Review Comment: Same as above, this variable should be thread-safe. It's possible to you will try to set and read it from different threads. ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() { } }); } + + /** + * Initialises data nodes of distribution zones in meta storage + * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault. + */ + private void initDataNodesFromVaultManager() { + long vaultAppliedRevision = vaultAppliedRevision(); + + VaultEntry vaultEntry; + + try { + vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get(); Review Comment: Same, as above. It's not valid to call get() on futures in such cases. There's no sense in going further with review until you fix it. ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() { } }); } + + /** + * Initialises data nodes of distribution zones in meta storage + * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault. + */ + private void initDataNodesFromVaultManager() { + long vaultAppliedRevision = vaultAppliedRevision(); + + VaultEntry vaultEntry; + + try { + vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IgniteInternalException(UNEXPECTED_ERR, e); + } + + if (vaultEntry != null && vaultEntry.value() != null) { + logicalTopology = ByteUtils.fromBytes(vaultEntry.value()); + + zonesConfiguration.distributionZones().value().namedListKeys() + .forEach(zoneName -> { + int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value(); + + saveDataNodesToMetaStorage(zoneId, toBytes(logicalTopology), vaultAppliedRevision); + }); + } + } + + /** + * Registers {@link WatchListener} which updates data nodes of distribution zones on logical topology changing event. + */ + private void registerMetaStorageWatchListener() { + metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() { + @Override + public boolean onUpdate(@NotNull WatchEvent evt) { + if (!busyLock.enterBusy()) { + throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); + } + + try { + assert evt.single(); + + Entry newEntry = evt.entryEvent().newEntry(); + + Set<String> newLogicalTopology = ByteUtils.fromBytes(newEntry.value()); + + List<String> removedNodes = + logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toList()); + + List<String> addedNodes = + newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList()); + + logicalTopology = newLogicalTopology; + + zonesConfiguration.distributionZones().value().namedListKeys() + .forEach(zoneName -> { + DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName); + + int autoAdjust = zoneCfg.dataNodesAutoAdjust().value(); + int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value(); + int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value(); + + Integer zoneId = zoneCfg.zoneId().value(); + + if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) { + //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer. + saveDataNodesToMetaStorage( + zoneId, newEntry.value(), newEntry.revision() + ); + } else { + if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) { + //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer. + saveDataNodesToMetaStorage( + zoneId, newEntry.value(), newEntry.revision() + ); + } + + if (!removedNodes.isEmpty() && autoAdjustScaleDown != Integer.MAX_VALUE) { + //TODO: IGNITE-18132 Create scale down scheduler with dataNodesAutoAdjustScaleDown timer. + saveDataNodesToMetaStorage( + zoneId, newEntry.value(), newEntry.revision() + ); + } + } + }); + + return true; + } finally { + busyLock.leaveBusy(); + } + } + + @Override + public void onError(@NotNull Throwable e) { + LOG.warn("Unable to process logical topology event", e); + } + }) + .thenAccept(id -> watchListenerId = id); + } + + /** + * Returns applied vault revision. + * + * @return Applied vault revision. + */ + private long vaultAppliedRevision() { + try { + return vaultMgr.get(APPLIED_REV) + .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value())) + .get(); Review Comment: We should never-ever call get() or join() on futures in such cases. It's an important one. ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java: ########## @@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) { } }; + /** The logical topology on the last watch event. */ + private Set<String> logicalTopology = Collections.emptySet(); Review Comment: This variable should be thread safe, it's possible that you will init in one thread(node startup thread) and will try to read in watch thread, thus visibility is broken. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org