sergeyuttsel commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060559895
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -483,47 +560,206 @@ private void initMetaStorageKeysOnStart() {
}
try {
- long topologyVersionFromCmg = snapshot.version();
-
- byte[] topVerFromMetastorage;
+
metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry
-> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException());
+ }
- try {
- topVerFromMetastorage =
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
- } catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(UNEXPECTED_ERR, e);
- }
+ try {
+ long topologyVersionFromCmg = snapshot.version();
- if (topVerFromMetastorage == null ||
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
- Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+ byte[] topVerFromMetaStorage = topVerEntry.value();
- Condition topologyVersionCondition = topVerFromMetastorage
== null ? notExists(zonesLogicalTopologyVersionKey()) :
-
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+ if (topVerFromMetaStorage == null ||
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+ Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
- If iff = If.iif(topologyVersionCondition,
- updateLogicalTopologyAndVersion(topologyFromCmg,
topologyVersionFromCmg),
- ops().yield(false)
- );
+ Condition topologyVersionCondition =
topVerFromMetaStorage == null
+ ?
notExists(zonesLogicalTopologyVersionKey()) :
+
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
- metaStorageManager.invoke(iff).thenAccept(res -> {
- if (res.getAsBoolean()) {
- LOG.debug(
- "Distribution zones' logical topology and
version keys were initialised [topology = {}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
- );
- } else {
- LOG.debug(
- "Failed to initialize distribution zones'
logical topology "
- + "and version keys [topology =
{}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
+ If iff = If.iif(topologyVersionCondition,
+
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+ ops().yield(false)
);
+
+ metaStorageManager.invoke(iff).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug(
+ "Distribution zones' logical
topology and version keys were initialised "
+ + "[topology = {}, version
= {}]",
+
Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ } else {
+ LOG.debug(
+ "Failed to initialize distribution
zones' logical topology "
+ + "and version keys
[topology = {}, version = {}]",
+
Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ }
+ });
}
- });
- }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+
} finally {
busyLock.leaveBusy();
}
});
}
+
+ /**
+ * Initialises data nodes of distribution zones in meta storage
+ * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+ */
+ private void initDataNodesFromVaultManager() {
+ vaultMgr.get(APPLIED_REV)
+ .thenApply(appliedRevision -> appliedRevision == null ? 0L :
bytesToLong(appliedRevision.value()))
+ .thenAccept(vaultAppliedRevision -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException());
+ }
+
+ try {
+ vaultMgr.get(zonesLogicalTopologyKey())
+ .thenAccept(vaultEntry -> {
+ if (!busyLock.enterBusy()) {
+ throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ if (vaultEntry != null &&
vaultEntry.value() != null) {
+ logicalTopology =
ByteUtils.fromBytes(vaultEntry.value());
+
+
zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ int zoneId =
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+
saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+ });
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
+
+ /**
+ * Registers {@link WatchListener} which updates data nodes of
distribution zones on logical topology changing event.
+ *
+ * @return Future representing pending completion of the operation.
+ */
+ private CompletableFuture<?> registerMetaStorageWatchListener() {
+ return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new
WatchListener() {
+ @Override
+ public boolean onUpdate(@NotNull WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ assert evt.single();
+
+ Entry newEntry = evt.entryEvent().newEntry();
+
+ Set<String> newLogicalTopology =
ByteUtils.fromBytes(newEntry.value());
+
+ List<String> removedNodes =
+ logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toList());
+
+ List<String> addedNodes =
+ newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toList());
+
+ logicalTopology = newLogicalTopology;
+
+
zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ DistributionZoneConfiguration zoneCfg
= zonesConfiguration.distributionZones().get(zoneName);
+
+ int autoAdjust =
zoneCfg.dataNodesAutoAdjust().value();
+ int autoAdjustScaleDown =
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+ int autoAdjustScaleUp =
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+ Integer zoneId =
zoneCfg.zoneId().value();
+
+ if ((!addedNodes.isEmpty() ||
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18134 Create
scheduler with dataNodesAutoAdjust timer.
+ saveDataNodesToMetaStorage(
+ zoneId, newEntry.value(),
newEntry.revision()
+ );
+ } else {
+ if (!addedNodes.isEmpty() &&
autoAdjustScaleUp != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18121 Create
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+ saveDataNodesToMetaStorage(
+ zoneId,
newEntry.value(), newEntry.revision()
+ );
+ }
+
+ if (!removedNodes.isEmpty() &&
autoAdjustScaleDown != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18132 Create
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+ saveDataNodesToMetaStorage(
+ zoneId,
newEntry.value(), newEntry.revision()
+ );
+ }
+ }
+ });
+
+ return true;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void onError(@NotNull Throwable e) {
+ LOG.warn("Unable to process logical topology event",
e);
+ }
+ })
+ .thenAccept(id -> watchListenerId = id);
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param dataNodes Data nodes of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long
revision) {
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+ var iif = If.iif(triggerKeyCondition(revision),
dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+ metaStorageManager.invoke(iif).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+ } else {
+ LOG.debug("Failed to delete zones' dataNodes key [zoneId =
{}]", zoneId);
+ }
+ });
+ }
+
+ /**
+ * Unwraps distribution zone exceptions from {@link
ConfigurationChangeException} if it is possible.
+ */
+ private static Throwable unwrapDistributionZoneException(Throwable e) {
Review Comment:
Yes, I think it's better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]