sk0x50 commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060339037
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
).get(5, TimeUnit.SECONDS);
+ CompletableFuture<Void> fut;
+
+ fut = distributionZoneManager.createZone(
+ new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+ );
+
try {
- distributionZoneManager.createZone(
- new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
- ).get(5, TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
assertTrue(e != null);
- assertTrue(e.getCause().getCause() instanceof
DistributionZoneAlreadyExistsException, e.toString());
+ assertTrue(e.getCause() instanceof
DistributionZoneAlreadyExistsException, e.toString());
}
@Test
- public void testDropZoneIfNotExists() {
+ public void testDropZoneIfNotExists() throws NodeStoppingException {
Review Comment:
This test does not throw `NodeStoppingException`. Please check and update
the test in accordance. (here and below)
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -483,47 +560,206 @@ private void initMetaStorageKeysOnStart() {
}
try {
- long topologyVersionFromCmg = snapshot.version();
-
- byte[] topVerFromMetastorage;
+
metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry
-> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException());
+ }
- try {
- topVerFromMetastorage =
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
- } catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(UNEXPECTED_ERR, e);
- }
+ try {
+ long topologyVersionFromCmg = snapshot.version();
- if (topVerFromMetastorage == null ||
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
- Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+ byte[] topVerFromMetaStorage = topVerEntry.value();
- Condition topologyVersionCondition = topVerFromMetastorage
== null ? notExists(zonesLogicalTopologyVersionKey()) :
-
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+ if (topVerFromMetaStorage == null ||
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+ Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
- If iff = If.iif(topologyVersionCondition,
- updateLogicalTopologyAndVersion(topologyFromCmg,
topologyVersionFromCmg),
- ops().yield(false)
- );
+ Condition topologyVersionCondition =
topVerFromMetaStorage == null
+ ?
notExists(zonesLogicalTopologyVersionKey()) :
+
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
- metaStorageManager.invoke(iff).thenAccept(res -> {
- if (res.getAsBoolean()) {
- LOG.debug(
- "Distribution zones' logical topology and
version keys were initialised [topology = {}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
- );
- } else {
- LOG.debug(
- "Failed to initialize distribution zones'
logical topology "
- + "and version keys [topology =
{}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
+ If iff = If.iif(topologyVersionCondition,
+
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+ ops().yield(false)
);
+
+ metaStorageManager.invoke(iff).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug(
+ "Distribution zones' logical
topology and version keys were initialised "
+ + "[topology = {}, version
= {}]",
+
Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ } else {
+ LOG.debug(
+ "Failed to initialize distribution
zones' logical topology "
+ + "and version keys
[topology = {}, version = {}]",
+
Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ }
+ });
}
- });
- }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+
} finally {
busyLock.leaveBusy();
}
});
}
+
+ /**
+ * Initialises data nodes of distribution zones in meta storage
+ * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+ */
+ private void initDataNodesFromVaultManager() {
+ vaultMgr.get(APPLIED_REV)
+ .thenApply(appliedRevision -> appliedRevision == null ? 0L :
bytesToLong(appliedRevision.value()))
+ .thenAccept(vaultAppliedRevision -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException());
+ }
+
+ try {
+ vaultMgr.get(zonesLogicalTopologyKey())
+ .thenAccept(vaultEntry -> {
+ if (!busyLock.enterBusy()) {
+ throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ if (vaultEntry != null &&
vaultEntry.value() != null) {
+ logicalTopology =
ByteUtils.fromBytes(vaultEntry.value());
+
+
zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ int zoneId =
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+
saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+ });
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
+
+ /**
+ * Registers {@link WatchListener} which updates data nodes of
distribution zones on logical topology changing event.
+ *
+ * @return Future representing pending completion of the operation.
+ */
+ private CompletableFuture<?> registerMetaStorageWatchListener() {
+ return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new
WatchListener() {
+ @Override
+ public boolean onUpdate(@NotNull WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ assert evt.single();
+
+ Entry newEntry = evt.entryEvent().newEntry();
+
+ Set<String> newLogicalTopology =
ByteUtils.fromBytes(newEntry.value());
+
+ List<String> removedNodes =
+ logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toList());
+
+ List<String> addedNodes =
+ newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toList());
+
+ logicalTopology = newLogicalTopology;
+
+
zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ DistributionZoneConfiguration zoneCfg
= zonesConfiguration.distributionZones().get(zoneName);
+
+ int autoAdjust =
zoneCfg.dataNodesAutoAdjust().value();
+ int autoAdjustScaleDown =
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+ int autoAdjustScaleUp =
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+ Integer zoneId =
zoneCfg.zoneId().value();
+
+ if ((!addedNodes.isEmpty() ||
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18134 Create
scheduler with dataNodesAutoAdjust timer.
+ saveDataNodesToMetaStorage(
+ zoneId, newEntry.value(),
newEntry.revision()
+ );
+ } else {
+ if (!addedNodes.isEmpty() &&
autoAdjustScaleUp != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18121 Create
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+ saveDataNodesToMetaStorage(
+ zoneId,
newEntry.value(), newEntry.revision()
+ );
+ }
+
+ if (!removedNodes.isEmpty() &&
autoAdjustScaleDown != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18132 Create
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+ saveDataNodesToMetaStorage(
+ zoneId,
newEntry.value(), newEntry.revision()
+ );
+ }
+ }
+ });
+
+ return true;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void onError(@NotNull Throwable e) {
+ LOG.warn("Unable to process logical topology event",
e);
+ }
+ })
+ .thenAccept(id -> watchListenerId = id);
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param dataNodes Data nodes of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long
revision) {
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+ var iif = If.iif(triggerKeyCondition(revision),
dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+ metaStorageManager.invoke(iif).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+ } else {
+ LOG.debug("Failed to delete zones' dataNodes key [zoneId =
{}]", zoneId);
+ }
+ });
+ }
+
+ /**
+ * Unwraps distribution zone exceptions from {@link
ConfigurationChangeException} if it is possible.
+ */
+ private static Throwable unwrapDistributionZoneException(Throwable e) {
Review Comment:
This method looks weird to me, especially `e.getCause().getCause()` part. I
would propose something like the following:
```
private static Throwable unwrapDistributionZoneException(Throwable e,
Class<? extends Throwable>... expectedClz) {
Throwable ret = unwrapDistributionZoneExceptionRecursively(e,
expectedClz);
return ret != null ? ret : e;
}
private static Throwable
unwrapDistributionZoneExceptionRecursively(Throwable e, Class<? extends
Throwable>... expectedClz) {
if ((e instanceof CompletionException || e instanceof
ConfigurationChangeException) && e.getCause() != null) {
return unwrapDistributionZoneExceptionRecursively(e.getCause(),
expectedClz);
}
for (Class<?> expected : expectedClz) {
if (expected.isAssignableFrom(e.getClass()))
return e;
}
return null;
}
```
By the way, it allows you clearly to define expected types of exceptions:
```
public CompletableFuture<Void>
createZone(DistributionZoneConfigurationParameters distributionZoneCfg) {
....
})).whenComplete((res, e) -> {
if (e != null) {
fut.completeExceptionally(unwrapDistributionZoneException(e,
DistributionZoneAlreadyExistsException.class,
ConfigurationValidationException.class));
} else {
fut.complete(null);
}
});
```
What do you think?
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
).get(5, TimeUnit.SECONDS);
+ CompletableFuture<Void> fut;
+
+ fut = distributionZoneManager.createZone(
+ new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+ );
+
try {
- distributionZoneManager.createZone(
- new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
- ).get(5, TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
assertTrue(e != null);
- assertTrue(e.getCause().getCause() instanceof
DistributionZoneAlreadyExistsException, e.toString());
+ assertTrue(e.getCause() instanceof
DistributionZoneAlreadyExistsException, e.toString());
Review Comment:
I like the approach that always requires a message, it looks a lot
friendlier.
```
assertTrue("Expected exception was not thrown.", e != null);
assertTrue("Unexpected type of exception (requires
DistributionZoneAlreadyExistsException)", e.getCause() instanceof
DistributionZoneAlreadyExistsException);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]