rpuch commented on code in PR #7322:
URL: https://github.com/apache/ignite-3/pull/7322#discussion_r2650830339
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -461,12 +482,34 @@ private CompletableFuture<Void> processZonesOnStart(long
recoveryRevision, @Null
var startedZones = new IntOpenHashSet();
var startZoneFutures = new ArrayList<CompletableFuture<?>>();
+ Catalog nextCatalog = null;
+
for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion;
ver--) {
+ Catalog catalog = catalogService.catalog(ver);
+ Catalog nextCatalog0 = nextCatalog;
+
int ver0 = ver;
catalogService.catalog(ver).zones().stream()
.filter(zone -> startedZones.add(zone.id()))
- .forEach(zoneDescriptor -> startZoneFutures.add(
-
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0,
zoneDescriptor, true)));
+ .forEach(zoneDescriptor -> {
+ // Handle missed zone drop event.
Review Comment:
This comment seems to belong to the interior of the following `if`
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -461,12 +482,34 @@ private CompletableFuture<Void> processZonesOnStart(long
recoveryRevision, @Null
var startedZones = new IntOpenHashSet();
var startZoneFutures = new ArrayList<CompletableFuture<?>>();
+ Catalog nextCatalog = null;
+
for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion;
ver--) {
+ Catalog catalog = catalogService.catalog(ver);
+ Catalog nextCatalog0 = nextCatalog;
Review Comment:
The name with `0` is always cryptic. I guess here, this variable is added
because Java forces it to be effectively final to be used by a lambda. How
about `finalNextCatalog`? It will highlight the intention of this variable
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources
zonePartitionResources(ZonePartitionId zonePartiti
return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
}
+ private void onZoneDrop(DropZoneEventParameters parameters) {
+ inBusyLock(busyLock, () -> {
+ int eventCatalogVersion = parameters.catalogVersion();
+ int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+ int zoneId = parameters.zoneId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+ assert zoneDescriptor != null : "Unexpected null zone descriptor
for zoneId=" + zoneId + ", catalogVersion "
+ + catalogVersionWithZonePresent;
+
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ eventCatalogVersion,
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ });
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
+ private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+ if (!busyLock.enterBusy()) {
+ return falseCompletedFuture();
+ }
+
+ try {
+ int newEarliestCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+ // Run zone destruction fully asynchronously.
+ destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+ .forEach(this::removeZonePartitionsIfPossible);
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * For each partition performs following actions.
+ *
+ * <ol>
+ * <li>Check whether it's started or await if it's starting.</li>
+ * <li>Check whether the partition is eligible for removal - has zero
table resources and empty txStateStorage.</li>
+ * <li>Stop partition, drop zone partition resources and unregister it
from within startedReplicationGroups.</li>
+ * <li>Remove partition assignments from meta storage.</li>
+ * </ol>
+ */
+ private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+ int zoneId = event.zoneId();
+ int partitionsCount = event.partitions();
+
+ List<CompletableFuture<Boolean>>
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+ for (int partitionIndex = 0; partitionIndex < partitionsCount;
partitionIndex++) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionIndex);
+
+ CompletableFuture<Boolean> partitionRemovalFuture =
+
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+ .thenComposeAsync(started -> {
+ if (!started) {
+ return falseCompletedFuture();
+ }
+
+ return inBusyLockAsync(busyLock, () ->
+ isEligibleForDrop(zonePartitionId)
+ .thenCompose(eligible -> {
+ if (!eligible) {
+ return
falseCompletedFuture();
+ }
+
+ // It's safe to use -1 as
revision id here, since we only stop partitions that do not
+ // active table-related
resources.
Review Comment:
```suggestion
// have active
table-related resources.
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources
zonePartitionResources(ZonePartitionId zonePartiti
return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
}
+ private void onZoneDrop(DropZoneEventParameters parameters) {
+ inBusyLock(busyLock, () -> {
+ int eventCatalogVersion = parameters.catalogVersion();
+ int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+ int zoneId = parameters.zoneId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+ assert zoneDescriptor != null : "Unexpected null zone descriptor
for zoneId=" + zoneId + ", catalogVersion "
+ + catalogVersionWithZonePresent;
+
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ eventCatalogVersion,
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ });
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
+ private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+ if (!busyLock.enterBusy()) {
+ return falseCompletedFuture();
+ }
+
+ try {
+ int newEarliestCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+ // Run zone destruction fully asynchronously.
+ destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+ .forEach(this::removeZonePartitionsIfPossible);
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * For each partition performs following actions.
+ *
+ * <ol>
+ * <li>Check whether it's started or await if it's starting.</li>
+ * <li>Check whether the partition is eligible for removal - has zero
table resources and empty txStateStorage.</li>
+ * <li>Stop partition, drop zone partition resources and unregister it
from within startedReplicationGroups.</li>
Review Comment:
```suggestion
* <li>Stop partition, destroy zone partition resources and
unregister it from within startedReplicationGroups.</li>
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources
zonePartitionResources(ZonePartitionId zonePartiti
return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
}
+ private void onZoneDrop(DropZoneEventParameters parameters) {
+ inBusyLock(busyLock, () -> {
+ int eventCatalogVersion = parameters.catalogVersion();
+ int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+ int zoneId = parameters.zoneId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+ assert zoneDescriptor != null : "Unexpected null zone descriptor
for zoneId=" + zoneId + ", catalogVersion "
+ + catalogVersionWithZonePresent;
+
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ eventCatalogVersion,
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ });
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
+ private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+ if (!busyLock.enterBusy()) {
+ return falseCompletedFuture();
+ }
+
+ try {
+ int newEarliestCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+ // Run zone destruction fully asynchronously.
+ destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+ .forEach(this::removeZonePartitionsIfPossible);
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * For each partition performs following actions.
+ *
+ * <ol>
+ * <li>Check whether it's started or await if it's starting.</li>
+ * <li>Check whether the partition is eligible for removal - has zero
table resources and empty txStateStorage.</li>
+ * <li>Stop partition, drop zone partition resources and unregister it
from within startedReplicationGroups.</li>
+ * <li>Remove partition assignments from meta storage.</li>
+ * </ol>
+ */
+ private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+ int zoneId = event.zoneId();
+ int partitionsCount = event.partitions();
+
+ List<CompletableFuture<Boolean>>
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+ for (int partitionIndex = 0; partitionIndex < partitionsCount;
partitionIndex++) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionIndex);
+
+ CompletableFuture<Boolean> partitionRemovalFuture =
+
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+ .thenComposeAsync(started -> {
+ if (!started) {
+ return falseCompletedFuture();
+ }
+
+ return inBusyLockAsync(busyLock, () ->
+ isEligibleForDrop(zonePartitionId)
+ .thenCompose(eligible -> {
+ if (!eligible) {
+ return
falseCompletedFuture();
+ }
+
+ // It's safe to use -1 as
revision id here, since we only stop partitions that do not
+ // active table-related
resources.
+ return
stopAndDestroyPartition(zonePartitionId, -1)
+ .thenCompose(v ->
dropAssignments(zonePartitionId))
+ .thenApply(v ->
true);
+ })
+ );
+ }, partitionOperationsExecutor)
+ .exceptionally(e -> {
+ if (!hasCause(e, NodeStoppingException.class))
{
+ LOG.error(
+ "Unable to destroy zone partition
[zonePartitionId={}]",
+ e,
+ zonePartitionId);
+ }
+
+ // In case of false, event will be returned to
the destructionEventsQueue and thus removal will be retried
+ // on next iteration of LWM change.
+ return false;
+ });
+
+ partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+ }
+
+ // If there's a partition that still have non empty recourses e.g.
non-empty txnStateStorage, event is returned
+ // back to destructionEventsQueue and thus will be re-processed on
next lwm change.
+ allOf(partitionsEligibilityForRemovalFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(fs ->
partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> !f.join()))
+ .thenAccept(anyFalse -> {
+ if (anyFalse) {
+ destructionEventsQueue.enqueue(event);
+ } else {
+ distributionZoneMgr.onDropZoneDestroy(zoneId,
event.catalogVersion)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Unable to destroy zone
resources [zoneId={}]",
+ e,
+ zoneId);
+ }
+ });
+ }
+ });
+ }
+
+ /**
+ * Checks whether partition could be removed. In order to match the
condition zone partition should have zero table resources and
Review Comment:
```suggestion
* Checks whether partition could be removed. In order to match the
condition, a zone partition should have zero table resources and
```
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java:
##########
@@ -584,4 +640,104 @@ private IgniteImpl
nodeNotHostingPartition(ReplicationGroupId replicationGroupId
.map(TestWrappers::unwrapIgniteImpl)
.orElseThrow();
}
+
+ private static void
verifyAssignmentKeysWereRemovedFromMetaStorage(IgniteImpl ignite,
ZonePartitionId zonePartitionId)
+ throws InterruptedException {
+ MetaStorageManager metaStorage =
unwrapIgniteImpl(ignite).metaStorageManager();
+
+ assertTrue(
+ waitForCondition(() -> {
Review Comment:
Please rewrite with `Awaitility.await()` instead
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources
zonePartitionResources(ZonePartitionId zonePartiti
return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
}
+ private void onZoneDrop(DropZoneEventParameters parameters) {
+ inBusyLock(busyLock, () -> {
+ int eventCatalogVersion = parameters.catalogVersion();
+ int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+ int zoneId = parameters.zoneId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+ assert zoneDescriptor != null : "Unexpected null zone descriptor
for zoneId=" + zoneId + ", catalogVersion "
+ + catalogVersionWithZonePresent;
+
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ eventCatalogVersion,
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ });
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
+ private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+ if (!busyLock.enterBusy()) {
+ return falseCompletedFuture();
+ }
+
+ try {
+ int newEarliestCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+ // Run zone destruction fully asynchronously.
+ destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+ .forEach(this::removeZonePartitionsIfPossible);
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * For each partition performs following actions.
+ *
+ * <ol>
+ * <li>Check whether it's started or await if it's starting.</li>
+ * <li>Check whether the partition is eligible for removal - has zero
table resources and empty txStateStorage.</li>
+ * <li>Stop partition, drop zone partition resources and unregister it
from within startedReplicationGroups.</li>
+ * <li>Remove partition assignments from meta storage.</li>
+ * </ol>
+ */
+ private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+ int zoneId = event.zoneId();
+ int partitionsCount = event.partitions();
+
+ List<CompletableFuture<Boolean>>
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+ for (int partitionIndex = 0; partitionIndex < partitionsCount;
partitionIndex++) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionIndex);
+
+ CompletableFuture<Boolean> partitionRemovalFuture =
+
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+ .thenComposeAsync(started -> {
+ if (!started) {
+ return falseCompletedFuture();
+ }
+
+ return inBusyLockAsync(busyLock, () ->
+ isEligibleForDrop(zonePartitionId)
+ .thenCompose(eligible -> {
+ if (!eligible) {
+ return
falseCompletedFuture();
+ }
+
+ // It's safe to use -1 as
revision id here, since we only stop partitions that do not
+ // active table-related
resources.
+ return
stopAndDestroyPartition(zonePartitionId, -1)
+ .thenCompose(v ->
dropAssignments(zonePartitionId))
+ .thenApply(v ->
true);
+ })
+ );
+ }, partitionOperationsExecutor)
+ .exceptionally(e -> {
+ if (!hasCause(e, NodeStoppingException.class))
{
+ LOG.error(
+ "Unable to destroy zone partition
[zonePartitionId={}]",
+ e,
+ zonePartitionId);
+ }
+
+ // In case of false, event will be returned to
the destructionEventsQueue and thus removal will be retried
+ // on next iteration of LWM change.
+ return false;
+ });
+
+ partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+ }
+
+ // If there's a partition that still have non empty recourses e.g.
non-empty txnStateStorage, event is returned
+ // back to destructionEventsQueue and thus will be re-processed on
next lwm change.
+ allOf(partitionsEligibilityForRemovalFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(fs ->
partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> !f.join()))
+ .thenAccept(anyFalse -> {
+ if (anyFalse) {
+ destructionEventsQueue.enqueue(event);
+ } else {
+ distributionZoneMgr.onDropZoneDestroy(zoneId,
event.catalogVersion)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Unable to destroy zone
resources [zoneId={}]",
+ e,
+ zoneId);
+ }
+ });
+ }
+ });
+ }
+
+ /**
+ * Checks whether partition could be removed. In order to match the
condition zone partition should have zero table resources and
+ * empty txStateStorage.
+ */
+ private CompletableFuture<Boolean> isEligibleForDrop(ZonePartitionId
zonePartitionId) {
+ ZonePartitionResources zonePartitionResources =
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+
+ if (zonePartitionResources == null) {
+ return trueCompletedFuture();
+ }
+
+ try (var cursor =
zonePartitionResources.txStatePartitionStorage().scan()) {
+ if (cursor.hasNext()) {
+ return falseCompletedFuture();
+ }
+ } catch (TxStateStorageRebalanceException e) {
+ return falseCompletedFuture();
+ }
+
+ // In order to simplify the logic of table resources cleanup, that is
handled by TableManager and
+ // zone resources cleanup that is handled by
PartitionReplicaLifecycleManager, on zone destruction event
+ // we await TableManager to finish its own job instead of stealing it.
+ // In words case if both tables and zone were destroyed on same them
lwm, PartitionReplicaLifecycleManager will cleanup
Review Comment:
```suggestion
// In case when both tables and zone were destroyed on the same lwm,
PartitionReplicaLifecycleManager will cleanup
```
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java:
##########
@@ -497,6 +544,15 @@ private static void verifyPartitionGetsRemovedFromDisk(
}
}
+ private static void verifyPartitionNonMvDataExistsOnDisk(IgniteImpl
ignite, ZonePartitionId replicationGroupId) {
+ assertTrue(hasSomethingInTxStateStorage(ignite, replicationGroupId),
"Tx state storage was unexpectedly destroyed");
+
+ assertTrue(partitionLogStorage(ignite,
replicationGroupId).getLastLogIndex() > 0L, "Partition Raft log was
unexpectedly removed.");
Review Comment:
```suggestion
assertThat(partitionLogStorage(ignite,
replicationGroupId).getLastLogIndex(), is(greaterThan(0L), "Partition Raft log
was unexpectedly removed.");
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java:
##########
@@ -211,6 +212,24 @@ CompletableFuture<Void>
removeTableResources(ZonePartitionId zonePartitionId, in
});
}
+ /**
+ * Returns future of true if there are no corresponding table-related
resources, otherwise awaits replicaListenerFuture
+ * and checks whether table replica processors, table raft processors and
partition snapshot storages are present.
+ * if any is present returns false, otherwise returns true.
Review Comment:
```suggestion
* if any is present, returns false, otherwise returns true.
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java:
##########
@@ -100,6 +102,23 @@ boolean hasReplicationGroupStarted(ZonePartitionId
zonePartitionId) {
return startedReplicationGroupIds.contains(zonePartitionId);
}
+ /**
+ * Returns trueCompleted future if group was already started or awaits
groups startup if it's in the middle of the start process,
+ * otherwise returns falseCompleted future.
+ */
+ CompletableFuture<Boolean>
hasReplicationGroupStartedOrAwaitIfStarting(ZonePartitionId zonePartitionId) {
+ if (startedReplicationGroupIds.contains(zonePartitionId)) {
+ return trueCompletedFuture();
+ }
+
+ CompletableFuture<Void> groupStartingFuture =
startingReplicationGroupIds.get(zonePartitionId);
+ if (groupStartingFuture != null) {
Review Comment:
Is there a race between adding this future and checking for its existence
here?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources
zonePartitionResources(ZonePartitionId zonePartiti
return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
}
+ private void onZoneDrop(DropZoneEventParameters parameters) {
+ inBusyLock(busyLock, () -> {
+ int eventCatalogVersion = parameters.catalogVersion();
+ int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+ int zoneId = parameters.zoneId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+ assert zoneDescriptor != null : "Unexpected null zone descriptor
for zoneId=" + zoneId + ", catalogVersion "
+ + catalogVersionWithZonePresent;
+
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ eventCatalogVersion,
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ });
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
+ private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+ if (!busyLock.enterBusy()) {
+ return falseCompletedFuture();
+ }
+
+ try {
+ int newEarliestCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+ // Run zone destruction fully asynchronously.
+ destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+ .forEach(this::removeZonePartitionsIfPossible);
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * For each partition performs following actions.
+ *
+ * <ol>
+ * <li>Check whether it's started or await if it's starting.</li>
+ * <li>Check whether the partition is eligible for removal - has zero
table resources and empty txStateStorage.</li>
+ * <li>Stop partition, drop zone partition resources and unregister it
from within startedReplicationGroups.</li>
+ * <li>Remove partition assignments from meta storage.</li>
+ * </ol>
+ */
+ private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+ int zoneId = event.zoneId();
+ int partitionsCount = event.partitions();
+
+ List<CompletableFuture<Boolean>>
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+ for (int partitionIndex = 0; partitionIndex < partitionsCount;
partitionIndex++) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionIndex);
+
+ CompletableFuture<Boolean> partitionRemovalFuture =
+
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+ .thenComposeAsync(started -> {
+ if (!started) {
+ return falseCompletedFuture();
+ }
+
+ return inBusyLockAsync(busyLock, () ->
+ isEligibleForDrop(zonePartitionId)
+ .thenCompose(eligible -> {
+ if (!eligible) {
+ return
falseCompletedFuture();
+ }
+
+ // It's safe to use -1 as
revision id here, since we only stop partitions that do not
+ // active table-related
resources.
+ return
stopAndDestroyPartition(zonePartitionId, -1)
+ .thenCompose(v ->
dropAssignments(zonePartitionId))
+ .thenApply(v ->
true);
+ })
+ );
+ }, partitionOperationsExecutor)
+ .exceptionally(e -> {
+ if (!hasCause(e, NodeStoppingException.class))
{
+ LOG.error(
+ "Unable to destroy zone partition
[zonePartitionId={}]",
+ e,
+ zonePartitionId);
+ }
+
+ // In case of false, event will be returned to
the destructionEventsQueue and thus removal will be retried
+ // on next iteration of LWM change.
+ return false;
+ });
+
+ partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+ }
+
+ // If there's a partition that still have non empty recourses e.g.
non-empty txnStateStorage, event is returned
Review Comment:
```suggestion
// If there's a partition that still has non empty resourses e.g.
non-empty txnStateStorage, the event is returned
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]