ibessonov commented on code in PR #7207:
URL: https://github.com/apache/ignite-3/pull/7207#discussion_r2610047614
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -700,17 +700,47 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
- var eventParams = new
LocalPartitionReplicaEventParameters(zonePartitionId, revision, onRecovery);
ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(
zonePartitionId,
partitionCount,
storageIndexTracker
);
+ var eventParams = new LocalBeforeReplicaStartEventParameters(
+ zonePartitionId,
+ revision,
+ onRecovery,
+ zoneResources.txStatePartitionStorageIsInRebalanceState()
+ );
+
startedReplicationGroups.beforeStartingGroup(zonePartitionId);
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
+ .thenCompose(v -> {
+ if (eventParams.anyStorageIsInRebalanceState()) {
+ // We must destroy protocol storages first. If we
do so, then, as MV and TX state storages sync Raft log
+ // before being flushed, there is a guarantee
that, after a possible crash, we will either see some storage
+ // still in the rebalance state (and hence we'll
repeat the destruction on the next start), or the Raft log
+ // destruction will be persisted (and we'll just
recover normally).
+ try {
+
replicaMgr.destroyReplicationProtocolStorages(zonePartitionId, isVolatileZone);
+ } catch (NodeStoppingException e) {
+ return failedFuture(e);
+ }
+
+ CompletableFuture<Void> clearTxStateStorage =
zoneResources.txStatePartitionStorage().clear();
+
+ CompletableFuture<?>[] registeedCleanupFutures =
eventParams.cleanupActions().stream()
+ .map(Supplier::get)
+ .toArray(CompletableFuture[]::new);
+ CompletableFuture<Void> clearMvStorages =
allOf(registeedCleanupFutures);
Review Comment:
```suggestion
CompletableFuture<?>[] registeredCleanupFutures
= eventParams.cleanupActions().stream()
.map(Supplier::get)
.toArray(CompletableFuture[]::new);
CompletableFuture<Void> clearMvStorages =
allOf(registeredCleanupFutures);
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -608,42 +610,77 @@ private CompletableFuture<Void>
beforeZoneReplicaStartedImpl(LocalPartitionRepli
private CompletableFuture<Void>
createPartitionsAndLoadResourcesToZoneReplica(
ZonePartitionId zonePartitionId,
Set<TableViewInternal> zoneTables,
- boolean onRecovery
+ LocalBeforeReplicaStartEventParameters event
) {
int partitionIndex = zonePartitionId.partitionId();
PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
- CompletableFuture<?>[] futures = zoneTables.stream()
+ List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
.map(tbl -> inBusyLockAsync(busyLock, () -> {
- return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
- .thenRunAsync(() -> inBusyLock(busyLock, () -> {
- localPartsByTableId.compute(
- tbl.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
- );
-
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(
- tbl,
- catalogService,
- singlePartitionIdSet,
- tbl.schemaView(),
- lwm
- )
- );
-
-
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId,
onRecovery);
- }), ioExecutor)
+ return createPartitionStoragesIfNotCreated(tbl,
singlePartitionIdSet)
Review Comment:
Maybe `...IfAbsent` would look nicer, I'm not a fan of the word "create"
being twice in the same name
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -608,42 +610,77 @@ private CompletableFuture<Void>
beforeZoneReplicaStartedImpl(LocalPartitionRepli
private CompletableFuture<Void>
createPartitionsAndLoadResourcesToZoneReplica(
ZonePartitionId zonePartitionId,
Set<TableViewInternal> zoneTables,
- boolean onRecovery
+ LocalBeforeReplicaStartEventParameters event
) {
int partitionIndex = zonePartitionId.partitionId();
PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
- CompletableFuture<?>[] futures = zoneTables.stream()
+ List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
.map(tbl -> inBusyLockAsync(busyLock, () -> {
- return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
- .thenRunAsync(() -> inBusyLock(busyLock, () -> {
- localPartsByTableId.compute(
- tbl.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
- );
-
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(
- tbl,
- catalogService,
- singlePartitionIdSet,
- tbl.schemaView(),
- lwm
- )
- );
-
-
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId,
onRecovery);
- }), ioExecutor)
+ return createPartitionStoragesIfNotCreated(tbl,
singlePartitionIdSet)
// If the table is already closed, it's not a
problem (probably the node is stopping).
.exceptionally(ignoreTableClosedException());
}))
- .toArray(CompletableFuture[]::new);
+ .collect(toList());
+
+ return CompletableFutures.allOf(storageCreationFutures)
+ .thenRunAsync(() ->
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event),
ioExecutor)
+ // If a table is already closed, it's not a problem (probably
the node is stopping).
+ .exceptionally(ignoreTableClosedException())
+ .thenCompose(unused -> {
+ CompletableFuture<?>[] futures = zoneTables.stream()
+ .map(tbl -> inBusyLockAsync(busyLock, () -> {
+ return runAsync(() -> inBusyLock(busyLock, ()
-> {
+ localPartsByTableId.compute(
+ tbl.tableId(),
+ (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
+ );
+
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(
+ tbl,
+ catalogService,
+ singlePartitionIdSet,
+ tbl.schemaView(),
+ lwm
+ )
+ );
+
+
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId,
event.onRecovery());
+ }), ioExecutor)
+ // If the table is already closed, it's not a
problem (probably the node is stopping).
+ .exceptionally(ignoreTableClosedException());
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ });
+ }
+
+ private static void scheduleMvPartitionsCleanupIfNeeded(
+ Set<TableViewInternal> zoneTables,
+ int partitionIndex,
+ LocalBeforeReplicaStartEventParameters event
+ ) {
+ boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
+ .map(table ->
table.internalTable().storage().getMvPartition(partitionIndex))
+ .filter(Objects::nonNull)
+ .anyMatch(partitionStorage ->
partitionStorage.lastAppliedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS);
+
+ if (anyMvPartitionStorageIsInRebalanceState) {
+ event.registerStorageInRebalanceState();
+ }
- return allOf(futures);
+ event.addCleanupAction(() -> {
+ CompletableFuture<?>[] clearFutures = zoneTables.stream()
+ .map(table ->
table.internalTable().storage().clearPartition(partitionIndex))
+ .toArray(CompletableFuture[]::new);
+ return allOf(clearFutures);
+ });
Review Comment:
Should it be under the same `if` as
`event.registerStorageInRebalanceState();`? I got a little confused by this
code, not going to lie
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -700,17 +700,47 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
- var eventParams = new
LocalPartitionReplicaEventParameters(zonePartitionId, revision, onRecovery);
ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(
zonePartitionId,
partitionCount,
storageIndexTracker
);
+ var eventParams = new LocalBeforeReplicaStartEventParameters(
+ zonePartitionId,
+ revision,
+ onRecovery,
+ zoneResources.txStatePartitionStorageIsInRebalanceState()
+ );
+
startedReplicationGroups.beforeStartingGroup(zonePartitionId);
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
+ .thenCompose(v -> {
+ if (eventParams.anyStorageIsInRebalanceState()) {
+ // We must destroy protocol storages first. If we
do so, then, as MV and TX state storages sync Raft log
+ // before being flushed, there is a guarantee
that, after a possible crash, we will either see some storage
+ // still in the rebalance state (and hence we'll
repeat the destruction on the next start), or the Raft log
+ // destruction will be persisted (and we'll just
recover normally).
+ try {
+
replicaMgr.destroyReplicationProtocolStorages(zonePartitionId, isVolatileZone);
+ } catch (NodeStoppingException e) {
+ return failedFuture(e);
+ }
+
+ CompletableFuture<Void> clearTxStateStorage =
zoneResources.txStatePartitionStorage().clear();
+
+ CompletableFuture<?>[] registeedCleanupFutures =
eventParams.cleanupActions().stream()
+ .map(Supplier::get)
+ .toArray(CompletableFuture[]::new);
+ CompletableFuture<Void> clearMvStorages =
allOf(registeedCleanupFutures);
+
+ return allOf(clearTxStateStorage, clearMvStorages);
Review Comment:
Just a "shower thought". Why don't we use something like `thenAcceptBoth`
that's designed to combine two futures, but instead use `allOf` that's designed
to combine an arbitrary number of futures, if we know in advance that there's
only two of them...
I'm not asking to change anything, just sharing my ideas
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]