kgusakov commented on code in PR #3931:
URL: https://github.com/apache/ignite-3/pull/3931#discussion_r1687198033
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -635,6 +645,133 @@ public CompletableFuture<Void>
startAsync(ComponentContext componentContext) {
});
}
+ private CompletableFuture<Boolean>
initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
+ if (!PartitionReplicaLifecycleManager.ENABLED) {
+ return completedFuture(false);
+ }
+
+ long causalityToken = parameters.causalityToken();
+ CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
+ CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+
+ TableImpl table = createTableImpl(causalityToken, tableDescriptor,
zoneDescriptor);
+
+ tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return schemaManager.schemaRegistry(causalityToken,
parameters.tableId()).thenAccept(table::schemaView);
+ }));
+
+ // NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
+ CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
nullCompletedFuture().thenComposeAsync((ignored) -> {
+ PartitionSet parts = new BitSetPartitionSet();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
+ parts.set(i);
+ }
+ }
+
+ return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
+ }, ioExecutor)));
+
+ CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+
+ CompletableFuture<?> createPartsFut =
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+ var startPartsFut = new
ArrayList<CompletableFuture<?>>();
+
+ for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+ if
(partitionReplicaLifecycleManager.hasLocalPartition(new
ZonePartitionId(zoneDescriptor.id(), i))) {
Review Comment:
Yep, we need to replace it by the discussed methods, I made todo
https://github.com/apache/ignite-3/blob/239dcbf3b458140540067c763c12cd1a6735f85c/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java#L581
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]