rpuch commented on code in PR #7207:
URL: https://github.com/apache/ignite-3/pull/7207#discussion_r2610149704


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -700,17 +700,47 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
 
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
             var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
-            var eventParams = new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision, onRecovery);
 
             ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(
                     zonePartitionId,
                     partitionCount,
                     storageIndexTracker
             );
 
+            var eventParams = new LocalBeforeReplicaStartEventParameters(
+                    zonePartitionId,
+                    revision,
+                    onRecovery,
+                    zoneResources.txStatePartitionStorageIsInRebalanceState()
+            );
+
             startedReplicationGroups.beforeStartingGroup(zonePartitionId);
 
             return 
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
+                    .thenCompose(v -> {
+                        if (eventParams.anyStorageIsInRebalanceState()) {
+                            // We must destroy protocol storages first. If we 
do so, then, as MV and TX state storages sync Raft log
+                            // before being flushed, there is a guarantee 
that, after a possible crash, we will either see some storage
+                            // still in the rebalance state (and hence we'll 
repeat the destruction on the next start), or the Raft log
+                            // destruction will be persisted (and we'll just 
recover normally).
+                            try {
+                                
replicaMgr.destroyReplicationProtocolStorages(zonePartitionId, isVolatileZone);
+                            } catch (NodeStoppingException e) {
+                                return failedFuture(e);
+                            }
+
+                            CompletableFuture<Void> clearTxStateStorage = 
zoneResources.txStatePartitionStorage().clear();
+
+                            CompletableFuture<?>[] registeedCleanupFutures = 
eventParams.cleanupActions().stream()
+                                    .map(Supplier::get)
+                                    .toArray(CompletableFuture[]::new);
+                            CompletableFuture<Void> clearMvStorages = 
allOf(registeedCleanupFutures);
+
+                            return allOf(clearTxStateStorage, clearMvStorages);

Review Comment:
   `allOf()` accepts a vararg, so it seems that it was designed to accept 
arbitrary number of futures, *even if it's known beforehand*. It is convenient 
to 'unite' 2 futures like allOf(fut1, fut2), and the resulting code looks 
obviously.
   
   With `thenAcceptBoth` and friends: the best thing I was able to invent is 
`fut1.runAfterBoth(fut2, () -> {})`, which seems to be less clear and also it 
forces us to provide at least that no-op `Runnable`.
   
   Did you have something different in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to