kgusakov commented on code in PR #3931:
URL: https://github.com/apache/ignite-3/pull/3931#discussion_r1687198033


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -635,6 +645,133 @@ public CompletableFuture<Void> 
startAsync(ComponentContext componentContext) {
         });
     }
 
+    private CompletableFuture<Boolean> 
initTableFsmOnTableCreate(CreateTableEventParameters parameters) {
+        if (!PartitionReplicaLifecycleManager.ENABLED) {
+            return completedFuture(false);
+        }
+
+        long causalityToken = parameters.causalityToken();
+        CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
+        CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
+
+        TableImpl table =  createTableImpl(causalityToken, tableDescriptor, 
zoneDescriptor);
+
+        tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return schemaManager.schemaRegistry(causalityToken, 
parameters.tableId()).thenAccept(table::schemaView);
+        }));
+
+        // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
+        CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
+                (ignore, throwable) -> inBusyLock(busyLock, () -> 
nullCompletedFuture().thenComposeAsync((ignored) -> {
+                    PartitionSet parts = new BitSetPartitionSet();
+
+                    for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+                        if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {
+                            parts.set(i);
+                        }
+                    }
+
+                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(parameters.tableId(), parts));
+                }, ioExecutor)));
+
+        CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
+
+        CompletableFuture<?> createPartsFut = 
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+                        var startPartsFut = new 
ArrayList<CompletableFuture<?>>();
+
+                        for (int i = 0; i < zoneDescriptor.partitions(); i++) {
+                            if 
(partitionReplicaLifecycleManager.hasLocalPartition(new 
ZonePartitionId(zoneDescriptor.id(), i))) {

Review Comment:
   Yep, we need to replace it by the discussed methods, I made todo 
https://github.com/apache/ignite-3/blob/239dcbf3b458140540067c763c12cd1a6735f85c/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java#L581



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to