rpuch commented on code in PR #7322:
URL: https://github.com/apache/ignite-3/pull/7322#discussion_r2650830339


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -461,12 +482,34 @@ private CompletableFuture<Void> processZonesOnStart(long 
recoveryRevision, @Null
         var startedZones = new IntOpenHashSet();
         var startZoneFutures = new ArrayList<CompletableFuture<?>>();
 
+        Catalog nextCatalog = null;
+
         for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; 
ver--) {
+            Catalog catalog = catalogService.catalog(ver);
+            Catalog nextCatalog0 = nextCatalog;
+
             int ver0 = ver;
             catalogService.catalog(ver).zones().stream()
                     .filter(zone -> startedZones.add(zone.id()))
-                    .forEach(zoneDescriptor -> startZoneFutures.add(
-                            
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0, 
zoneDescriptor, true)));
+                    .forEach(zoneDescriptor -> {
+                        // Handle missed zone drop event.

Review Comment:
   This comment seems to belong to the interior of the following `if`



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -461,12 +482,34 @@ private CompletableFuture<Void> processZonesOnStart(long 
recoveryRevision, @Null
         var startedZones = new IntOpenHashSet();
         var startZoneFutures = new ArrayList<CompletableFuture<?>>();
 
+        Catalog nextCatalog = null;
+
         for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; 
ver--) {
+            Catalog catalog = catalogService.catalog(ver);
+            Catalog nextCatalog0 = nextCatalog;

Review Comment:
   The name with `0` is always cryptic. I guess here, this variable is added 
because Java forces it to be effectively final to be used by a lambda. How 
about `finalNextCatalog`? It will highlight the intention of this variable



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources 
zonePartitionResources(ZonePartitionId zonePartiti
         return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
     }
 
+    private void onZoneDrop(DropZoneEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int eventCatalogVersion = parameters.catalogVersion();
+            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+            int zoneId = parameters.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+            assert zoneDescriptor != null : "Unexpected null zone descriptor 
for zoneId=" + zoneId + ", catalogVersion "
+                    + catalogVersionWithZonePresent;
+
+            destructionEventsQueue.enqueue(
+                    new DestroyZoneEvent(
+                            eventCatalogVersion,
+                            zoneId,
+                            zoneDescriptor.partitions()
+                    )
+            );
+        });
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
+    private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+        if (!busyLock.enterBusy()) {
+            return falseCompletedFuture();
+        }
+
+        try {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+            // Run zone destruction fully asynchronously.
+            destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+                    .forEach(this::removeZonePartitionsIfPossible);
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * For each partition performs following actions.
+     *
+     * <ol>
+     *     <li>Check whether it's started or await if it's starting.</li>
+     *     <li>Check whether the partition is eligible for removal -  has zero 
table resources and empty txStateStorage.</li>
+     *     <li>Stop partition, drop zone partition resources and unregister it 
from within startedReplicationGroups.</li>
+     *     <li>Remove partition assignments from meta storage.</li>
+     * </ol>
+     */
+    private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+        int zoneId = event.zoneId();
+        int partitionsCount = event.partitions();
+
+        List<CompletableFuture<Boolean>> 
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+        for (int partitionIndex = 0; partitionIndex < partitionsCount; 
partitionIndex++) {
+            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionIndex);
+
+            CompletableFuture<Boolean> partitionRemovalFuture =
+                    
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+                            .thenComposeAsync(started -> {
+                                if (!started) {
+                                    return falseCompletedFuture();
+                                }
+
+                                return inBusyLockAsync(busyLock, () ->
+                                        isEligibleForDrop(zonePartitionId)
+                                                .thenCompose(eligible -> {
+                                                    if (!eligible) {
+                                                        return 
falseCompletedFuture();
+                                                    }
+
+                                                    // It's safe to use -1 as 
revision id here, since we only stop partitions that do not
+                                                    // active table-related 
resources.

Review Comment:
   ```suggestion
                                                       // have active 
table-related resources.
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources 
zonePartitionResources(ZonePartitionId zonePartiti
         return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
     }
 
+    private void onZoneDrop(DropZoneEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int eventCatalogVersion = parameters.catalogVersion();
+            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+            int zoneId = parameters.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+            assert zoneDescriptor != null : "Unexpected null zone descriptor 
for zoneId=" + zoneId + ", catalogVersion "
+                    + catalogVersionWithZonePresent;
+
+            destructionEventsQueue.enqueue(
+                    new DestroyZoneEvent(
+                            eventCatalogVersion,
+                            zoneId,
+                            zoneDescriptor.partitions()
+                    )
+            );
+        });
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
+    private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+        if (!busyLock.enterBusy()) {
+            return falseCompletedFuture();
+        }
+
+        try {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+            // Run zone destruction fully asynchronously.
+            destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+                    .forEach(this::removeZonePartitionsIfPossible);
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * For each partition performs following actions.
+     *
+     * <ol>
+     *     <li>Check whether it's started or await if it's starting.</li>
+     *     <li>Check whether the partition is eligible for removal -  has zero 
table resources and empty txStateStorage.</li>
+     *     <li>Stop partition, drop zone partition resources and unregister it 
from within startedReplicationGroups.</li>

Review Comment:
   ```suggestion
        *     <li>Stop partition, destroy zone partition resources and 
unregister it from within startedReplicationGroups.</li>
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources 
zonePartitionResources(ZonePartitionId zonePartiti
         return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
     }
 
+    private void onZoneDrop(DropZoneEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int eventCatalogVersion = parameters.catalogVersion();
+            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+            int zoneId = parameters.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+            assert zoneDescriptor != null : "Unexpected null zone descriptor 
for zoneId=" + zoneId + ", catalogVersion "
+                    + catalogVersionWithZonePresent;
+
+            destructionEventsQueue.enqueue(
+                    new DestroyZoneEvent(
+                            eventCatalogVersion,
+                            zoneId,
+                            zoneDescriptor.partitions()
+                    )
+            );
+        });
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
+    private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+        if (!busyLock.enterBusy()) {
+            return falseCompletedFuture();
+        }
+
+        try {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+            // Run zone destruction fully asynchronously.
+            destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+                    .forEach(this::removeZonePartitionsIfPossible);
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * For each partition performs following actions.
+     *
+     * <ol>
+     *     <li>Check whether it's started or await if it's starting.</li>
+     *     <li>Check whether the partition is eligible for removal -  has zero 
table resources and empty txStateStorage.</li>
+     *     <li>Stop partition, drop zone partition resources and unregister it 
from within startedReplicationGroups.</li>
+     *     <li>Remove partition assignments from meta storage.</li>
+     * </ol>
+     */
+    private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+        int zoneId = event.zoneId();
+        int partitionsCount = event.partitions();
+
+        List<CompletableFuture<Boolean>> 
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+        for (int partitionIndex = 0; partitionIndex < partitionsCount; 
partitionIndex++) {
+            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionIndex);
+
+            CompletableFuture<Boolean> partitionRemovalFuture =
+                    
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+                            .thenComposeAsync(started -> {
+                                if (!started) {
+                                    return falseCompletedFuture();
+                                }
+
+                                return inBusyLockAsync(busyLock, () ->
+                                        isEligibleForDrop(zonePartitionId)
+                                                .thenCompose(eligible -> {
+                                                    if (!eligible) {
+                                                        return 
falseCompletedFuture();
+                                                    }
+
+                                                    // It's safe to use -1 as 
revision id here, since we only stop partitions that do not
+                                                    // active table-related 
resources.
+                                                    return 
stopAndDestroyPartition(zonePartitionId, -1)
+                                                            .thenCompose(v -> 
dropAssignments(zonePartitionId))
+                                                            .thenApply(v -> 
true);
+                                                })
+                                );
+                            }, partitionOperationsExecutor)
+                            .exceptionally(e -> {
+                                if (!hasCause(e, NodeStoppingException.class)) 
{
+                                    LOG.error(
+                                            "Unable to destroy zone partition 
[zonePartitionId={}]",
+                                            e,
+                                            zonePartitionId);
+                                }
+
+                                // In case of false, event will be returned to 
the destructionEventsQueue and thus removal will be retried
+                                // on next iteration of LWM change.
+                                return false;
+                            });
+
+            partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+        }
+
+        // If there's a partition that still have non empty recourses e.g. 
non-empty txnStateStorage, event is returned
+        // back to destructionEventsQueue and thus will be re-processed on 
next lwm change.
+        allOf(partitionsEligibilityForRemovalFutures.toArray(new 
CompletableFuture[0]))
+                .thenApply(fs -> 
partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> !f.join()))
+                .thenAccept(anyFalse -> {
+                    if (anyFalse) {
+                        destructionEventsQueue.enqueue(event);
+                    } else {
+                        distributionZoneMgr.onDropZoneDestroy(zoneId, 
event.catalogVersion)
+                                .whenComplete((r, e) -> {
+                                    if (e != null) {
+                                        LOG.error(
+                                                "Unable to destroy zone 
resources [zoneId={}]",
+                                                e,
+                                                zoneId);
+                                    }
+                                });
+                    }
+                });
+    }
+
+    /**
+     * Checks whether partition could be removed. In order to match the 
condition zone partition should have zero table resources and

Review Comment:
   ```suggestion
        * Checks whether partition could be removed. In order to match the 
condition, a zone partition should have zero table resources and
   ```



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java:
##########
@@ -584,4 +640,104 @@ private IgniteImpl 
nodeNotHostingPartition(ReplicationGroupId replicationGroupId
                 .map(TestWrappers::unwrapIgniteImpl)
                 .orElseThrow();
     }
+
+    private static void 
verifyAssignmentKeysWereRemovedFromMetaStorage(IgniteImpl ignite, 
ZonePartitionId zonePartitionId)
+            throws InterruptedException {
+        MetaStorageManager metaStorage = 
unwrapIgniteImpl(ignite).metaStorageManager();
+
+        assertTrue(
+                waitForCondition(() -> {

Review Comment:
   Please rewrite with `Awaitility.await()` instead



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources 
zonePartitionResources(ZonePartitionId zonePartiti
         return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
     }
 
+    private void onZoneDrop(DropZoneEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int eventCatalogVersion = parameters.catalogVersion();
+            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+            int zoneId = parameters.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+            assert zoneDescriptor != null : "Unexpected null zone descriptor 
for zoneId=" + zoneId + ", catalogVersion "
+                    + catalogVersionWithZonePresent;
+
+            destructionEventsQueue.enqueue(
+                    new DestroyZoneEvent(
+                            eventCatalogVersion,
+                            zoneId,
+                            zoneDescriptor.partitions()
+                    )
+            );
+        });
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
+    private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+        if (!busyLock.enterBusy()) {
+            return falseCompletedFuture();
+        }
+
+        try {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+            // Run zone destruction fully asynchronously.
+            destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+                    .forEach(this::removeZonePartitionsIfPossible);
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * For each partition performs following actions.
+     *
+     * <ol>
+     *     <li>Check whether it's started or await if it's starting.</li>
+     *     <li>Check whether the partition is eligible for removal -  has zero 
table resources and empty txStateStorage.</li>
+     *     <li>Stop partition, drop zone partition resources and unregister it 
from within startedReplicationGroups.</li>
+     *     <li>Remove partition assignments from meta storage.</li>
+     * </ol>
+     */
+    private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+        int zoneId = event.zoneId();
+        int partitionsCount = event.partitions();
+
+        List<CompletableFuture<Boolean>> 
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+        for (int partitionIndex = 0; partitionIndex < partitionsCount; 
partitionIndex++) {
+            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionIndex);
+
+            CompletableFuture<Boolean> partitionRemovalFuture =
+                    
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+                            .thenComposeAsync(started -> {
+                                if (!started) {
+                                    return falseCompletedFuture();
+                                }
+
+                                return inBusyLockAsync(busyLock, () ->
+                                        isEligibleForDrop(zonePartitionId)
+                                                .thenCompose(eligible -> {
+                                                    if (!eligible) {
+                                                        return 
falseCompletedFuture();
+                                                    }
+
+                                                    // It's safe to use -1 as 
revision id here, since we only stop partitions that do not
+                                                    // active table-related 
resources.
+                                                    return 
stopAndDestroyPartition(zonePartitionId, -1)
+                                                            .thenCompose(v -> 
dropAssignments(zonePartitionId))
+                                                            .thenApply(v -> 
true);
+                                                })
+                                );
+                            }, partitionOperationsExecutor)
+                            .exceptionally(e -> {
+                                if (!hasCause(e, NodeStoppingException.class)) 
{
+                                    LOG.error(
+                                            "Unable to destroy zone partition 
[zonePartitionId={}]",
+                                            e,
+                                            zonePartitionId);
+                                }
+
+                                // In case of false, event will be returned to 
the destructionEventsQueue and thus removal will be retried
+                                // on next iteration of LWM change.
+                                return false;
+                            });
+
+            partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+        }
+
+        // If there's a partition that still have non empty recourses e.g. 
non-empty txnStateStorage, event is returned
+        // back to destructionEventsQueue and thus will be re-processed on 
next lwm change.
+        allOf(partitionsEligibilityForRemovalFutures.toArray(new 
CompletableFuture[0]))
+                .thenApply(fs -> 
partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> !f.join()))
+                .thenAccept(anyFalse -> {
+                    if (anyFalse) {
+                        destructionEventsQueue.enqueue(event);
+                    } else {
+                        distributionZoneMgr.onDropZoneDestroy(zoneId, 
event.catalogVersion)
+                                .whenComplete((r, e) -> {
+                                    if (e != null) {
+                                        LOG.error(
+                                                "Unable to destroy zone 
resources [zoneId={}]",
+                                                e,
+                                                zoneId);
+                                    }
+                                });
+                    }
+                });
+    }
+
+    /**
+     * Checks whether partition could be removed. In order to match the 
condition zone partition should have zero table resources and
+     * empty txStateStorage.
+     */
+    private CompletableFuture<Boolean> isEligibleForDrop(ZonePartitionId 
zonePartitionId) {
+        ZonePartitionResources zonePartitionResources = 
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+
+        if (zonePartitionResources == null) {
+            return trueCompletedFuture();
+        }
+
+        try (var cursor = 
zonePartitionResources.txStatePartitionStorage().scan()) {
+            if (cursor.hasNext()) {
+                return falseCompletedFuture();
+            }
+        } catch (TxStateStorageRebalanceException e) {
+            return falseCompletedFuture();
+        }
+
+        // In order to simplify the logic of table resources cleanup, that is 
handled by TableManager and
+        // zone resources cleanup that is handled by 
PartitionReplicaLifecycleManager, on zone destruction event
+        // we await TableManager to finish its own job instead of stealing it.
+        // In words case if both tables and zone were destroyed on same them 
lwm, PartitionReplicaLifecycleManager will cleanup

Review Comment:
   ```suggestion
           // In case when both tables and zone were destroyed on the same lwm, 
PartitionReplicaLifecycleManager will cleanup
   ```



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java:
##########
@@ -497,6 +544,15 @@ private static void verifyPartitionGetsRemovedFromDisk(
         }
     }
 
+    private static void verifyPartitionNonMvDataExistsOnDisk(IgniteImpl 
ignite, ZonePartitionId replicationGroupId) {
+        assertTrue(hasSomethingInTxStateStorage(ignite, replicationGroupId), 
"Tx state storage was unexpectedly destroyed");
+
+        assertTrue(partitionLogStorage(ignite, 
replicationGroupId).getLastLogIndex() > 0L, "Partition Raft log was 
unexpectedly removed.");

Review Comment:
   ```suggestion
           assertThat(partitionLogStorage(ignite, 
replicationGroupId).getLastLogIndex(), is(greaterThan(0L), "Partition Raft log 
was unexpectedly removed.");
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java:
##########
@@ -211,6 +212,24 @@ CompletableFuture<Void> 
removeTableResources(ZonePartitionId zonePartitionId, in
                 });
     }
 
+    /**
+     *  Returns future of true if there are no corresponding table-related 
resources, otherwise awaits replicaListenerFuture
+     *  and checks whether table replica processors, table raft processors and 
partition snapshot storages are present.
+     *  if any is present returns false, otherwise returns true.

Review Comment:
   ```suggestion
        *  if any is present, returns false, otherwise returns true.
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java:
##########
@@ -100,6 +102,23 @@ boolean hasReplicationGroupStarted(ZonePartitionId 
zonePartitionId) {
         return startedReplicationGroupIds.contains(zonePartitionId);
     }
 
+    /**
+     * Returns trueCompleted future if group was already started or awaits 
groups startup if it's in the middle of the start process,
+     * otherwise returns falseCompleted future.
+     */
+    CompletableFuture<Boolean> 
hasReplicationGroupStartedOrAwaitIfStarting(ZonePartitionId zonePartitionId) {
+        if (startedReplicationGroupIds.contains(zonePartitionId)) {
+            return trueCompletedFuture();
+        }
+
+        CompletableFuture<Void> groupStartingFuture = 
startingReplicationGroupIds.get(zonePartitionId);
+        if (groupStartingFuture != null) {

Review Comment:
   Is there a race between adding this future and checking for its existence 
here?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -2009,6 +2055,174 @@ public ZonePartitionResources 
zonePartitionResources(ZonePartitionId zonePartiti
         return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
     }
 
+    private void onZoneDrop(DropZoneEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int eventCatalogVersion = parameters.catalogVersion();
+            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+            int zoneId = parameters.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+            assert zoneDescriptor != null : "Unexpected null zone descriptor 
for zoneId=" + zoneId + ", catalogVersion "
+                    + catalogVersionWithZonePresent;
+
+            destructionEventsQueue.enqueue(
+                    new DestroyZoneEvent(
+                            eventCatalogVersion,
+                            zoneId,
+                            zoneDescriptor.partitions()
+                    )
+            );
+        });
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
+    private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+        if (!busyLock.enterBusy()) {
+            return falseCompletedFuture();
+        }
+
+        try {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+            // Run zone destruction fully asynchronously.
+            destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+                    .forEach(this::removeZonePartitionsIfPossible);
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * For each partition performs following actions.
+     *
+     * <ol>
+     *     <li>Check whether it's started or await if it's starting.</li>
+     *     <li>Check whether the partition is eligible for removal -  has zero 
table resources and empty txStateStorage.</li>
+     *     <li>Stop partition, drop zone partition resources and unregister it 
from within startedReplicationGroups.</li>
+     *     <li>Remove partition assignments from meta storage.</li>
+     * </ol>
+     */
+    private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+        int zoneId = event.zoneId();
+        int partitionsCount = event.partitions();
+
+        List<CompletableFuture<Boolean>> 
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+        for (int partitionIndex = 0; partitionIndex < partitionsCount; 
partitionIndex++) {
+            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionIndex);
+
+            CompletableFuture<Boolean> partitionRemovalFuture =
+                    
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+                            .thenComposeAsync(started -> {
+                                if (!started) {
+                                    return falseCompletedFuture();
+                                }
+
+                                return inBusyLockAsync(busyLock, () ->
+                                        isEligibleForDrop(zonePartitionId)
+                                                .thenCompose(eligible -> {
+                                                    if (!eligible) {
+                                                        return 
falseCompletedFuture();
+                                                    }
+
+                                                    // It's safe to use -1 as 
revision id here, since we only stop partitions that do not
+                                                    // active table-related 
resources.
+                                                    return 
stopAndDestroyPartition(zonePartitionId, -1)
+                                                            .thenCompose(v -> 
dropAssignments(zonePartitionId))
+                                                            .thenApply(v -> 
true);
+                                                })
+                                );
+                            }, partitionOperationsExecutor)
+                            .exceptionally(e -> {
+                                if (!hasCause(e, NodeStoppingException.class)) 
{
+                                    LOG.error(
+                                            "Unable to destroy zone partition 
[zonePartitionId={}]",
+                                            e,
+                                            zonePartitionId);
+                                }
+
+                                // In case of false, event will be returned to 
the destructionEventsQueue and thus removal will be retried
+                                // on next iteration of LWM change.
+                                return false;
+                            });
+
+            partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+        }
+
+        // If there's a partition that still have non empty recourses e.g. 
non-empty txnStateStorage, event is returned

Review Comment:
   ```suggestion
           // If there's a partition that still has non empty resourses e.g. 
non-empty txnStateStorage, the event is returned
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to