AMashenkov commented on code in PR #2500:
URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1313085975


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1233,20 +1130,77 @@ public boolean 
removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li
         return assignmentsChangeListeners.remove(listener);
     }
 
+    /**
+     * Creates local structures for a table.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version on which the table was created.
+     * @param tableDescriptor Catalog table descriptor.
+     * @return Future that will be completed when local changes related to the 
table creation are applied.
+     */
+    private CompletableFuture<?> createTableLocally(long causalityToken, int 
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+        int tableId = tableDescriptor.id();
+
+        if (!busyLock.enterBusy()) {
+            fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
+
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            int zoneId = tableDescriptor.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
+
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
+
+            // Check if the table already has assignments in the vault.
+            // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
+            if (partitionAssignments(vaultManager, tableId, 0) != null) {
+                assignmentsFuture = 
completedFuture(tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions()));
+            } else {
+                assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, zoneId)
+                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
+                                dataNodes,
+                                zoneDescriptor.partitions(),
+                                zoneDescriptor.replicas()
+                        ));
+            }
+
+            return createTableLocally(
+                    causalityToken,
+                    tableDescriptor,
+                    zoneDescriptor,
+                    assignmentsFuture,
+                    catalogVersion
+            ).whenComplete((v, e) -> {
+                if (e == null) {
+                    for (var listener : assignmentsChangeListeners) {
+                        listener.accept(this);
+                    }
+                }
+            }).thenCompose(ignored -> 
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));

Review Comment:
   BTW, we do nothing in case of error.
   Also, @sanpwc, does the order look correct? or maybe we should write to 
metastorage prior to notification?
   
   I'm ok to make a separate fix if needed. I'm just curious and this place 
cause questions.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1233,20 +1130,77 @@ public boolean 
removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li
         return assignmentsChangeListeners.remove(listener);
     }
 
+    /**
+     * Creates local structures for a table.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version on which the table was created.
+     * @param tableDescriptor Catalog table descriptor.
+     * @return Future that will be completed when local changes related to the 
table creation are applied.
+     */
+    private CompletableFuture<?> createTableLocally(long causalityToken, int 
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+        int tableId = tableDescriptor.id();
+
+        if (!busyLock.enterBusy()) {
+            fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
+
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            int zoneId = tableDescriptor.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
+
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
+
+            // Check if the table already has assignments in the vault.
+            // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
+            if (partitionAssignments(vaultManager, tableId, 0) != null) {
+                assignmentsFuture = 
completedFuture(tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions()));
+            } else {
+                assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, zoneId)
+                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
+                                dataNodes,
+                                zoneDescriptor.partitions(),
+                                zoneDescriptor.replicas()
+                        ));
+            }
+
+            return createTableLocally(
+                    causalityToken,
+                    tableDescriptor,
+                    zoneDescriptor,
+                    assignmentsFuture,
+                    catalogVersion
+            ).whenComplete((v, e) -> {
+                if (e == null) {
+                    for (var listener : assignmentsChangeListeners) {
+                        listener.accept(this);
+                    }
+                }
+            }).thenCompose(ignored -> 
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));

Review Comment:
   BTW, we do nothing in case of error.
   Also, @sanpwc, does the order look correct? or maybe we should write to 
metastorage prior to notification?
   
   I'm ok to make a separate fix if needed. I'm just curious and this place 
causes questions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to