rpuch commented on code in PR #2518:
URL: https://github.com/apache/ignite-3/pull/2518#discussion_r1312624057


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java:
##########
@@ -145,12 +149,8 @@ private CompletableFuture<?> 
rejectionDueToMetadataLagTriggered() {
         return rejectionTriggered;
     }
 
-    private void putToTableAt(int nodeIndex) {
-        cluster.node(nodeIndex)
-                .tables()
-                .table(TABLE_NAME)
-                .keyValueView()
-                .put(null, Tuple.create().set("key", 1), 
Tuple.create().set("val", "one"));
+    private void putToTableAt(Table table) {

Review Comment:
   `at` made sense when the parameter was an index, now I suggest to drop the 
suffix so that the method name becomes 'putToTable()` (or even `putTo()`)



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java:
##########
@@ -363,4 +367,8 @@ protected ResultSet<SqlRow> sql(Ignite node, String query, 
Object... args) {
         }
         return rs;
     }
+
+    private static BooleanSupplier tableSchemaVersionEqPredicate(TableImpl 
table0, int version) {

Review Comment:
   Let's rename table0 to just table



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2017,52 +1993,81 @@ public CompletableFuture<TableImpl> 
tableImplAsync(String name) {
     /**
      * Gets a table by name, if it was created before. Doesn't parse canonical 
name.
      *
-     * @param name Table name.
+     * @param tableName Table name.
      * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+    public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
 
         try {
-            return supplyAsync(() -> inBusyLock(busyLock, () -> 
directTableId(name)), ioExecutor)
-                    .thenCompose(tableId -> inBusyLock(busyLock, () -> {
+            HybridTimestamp now = clock.now();
+
+            return schemaSyncService.waitForMetadataCompleteness(now)
+                    .thenComposeAsync(ignore -> {
+                        Integer tableId = tableNameToId(tableName);
+
                         if (tableId == null) {
+                            // Table isn't configured.
                             return completedFuture(null);
                         }
 
-                        return tableAsyncInternal(tableId, false);
-                    }));
+                        TableImpl table = latestTablesById().get(tableId);
+
+                        if (table != null) {
+                            // Table was published.
+                            return completedFuture(table);
+                        }
+
+                        // Wait for table initialization.
+                        return tableReadyFuture(tableId);
+                    }, ioExecutor);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     /**
-     * Internal method for getting table by id.
+     * Gets a table by id, if it was created before.
      *
-     * @param id Table id.
-     * @param checkConfiguration {@code True} when the method checks a 
configuration before trying to get a table, {@code false}
-     *         otherwise.
-     * @return Future representing pending completion of the operation.
+     * @param tableId Table id.
+     * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean 
checkConfiguration) {
-        CompletableFuture<Boolean> tblCfgFut = checkConfiguration
-                ? supplyAsync(() -> inBusyLock(busyLock, () -> 
isTableConfigured(id)), ioExecutor)
-                : completedFuture(true);
+    private CompletableFuture<TableImpl> tableAsyncInternal(int tableId) {
+        HybridTimestamp now = clock.now();
 
-        return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
-            if (!isCfg) {
-                return completedFuture(null);
-            }
+        return schemaSyncService.waitForMetadataCompleteness(now)
+                .thenComposeAsync(ignore -> {
+                    TableImpl table = latestTablesById().get(tableId);
 
-            TableImpl tbl = latestTablesById().get(id);
+                    if (table != null) {
+                        // Table was published.
+                        return completedFuture(table);
+                    }
 
-            if (tbl != null) {
-                return completedFuture(tbl);
-            }
+                    if (tablesCfg.tables().value().stream().noneMatch(tbl -> 
tbl.id() == tableId)) {
+                        // Table isn't configured.
+                        return completedFuture(null);
+                    }
+
+                    // Wait for table initialization.
+                    return tableReadyFuture(tableId);
+                }, ioExecutor);
+    }
 
+    /**
+     * Internal method for getting a table future, which is completed when the 
table will be published.

Review Comment:
   ```suggestion
        * Internal method for getting a table future, which is completed when 
the table is published.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2017,52 +1993,81 @@ public CompletableFuture<TableImpl> 
tableImplAsync(String name) {
     /**
      * Gets a table by name, if it was created before. Doesn't parse canonical 
name.
      *
-     * @param name Table name.
+     * @param tableName Table name.
      * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+    public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
 
         try {
-            return supplyAsync(() -> inBusyLock(busyLock, () -> 
directTableId(name)), ioExecutor)
-                    .thenCompose(tableId -> inBusyLock(busyLock, () -> {
+            HybridTimestamp now = clock.now();
+
+            return schemaSyncService.waitForMetadataCompleteness(now)
+                    .thenComposeAsync(ignore -> {
+                        Integer tableId = tableNameToId(tableName);
+
                         if (tableId == null) {
+                            // Table isn't configured.
                             return completedFuture(null);
                         }
 
-                        return tableAsyncInternal(tableId, false);
-                    }));
+                        TableImpl table = latestTablesById().get(tableId);
+
+                        if (table != null) {
+                            // Table was published.
+                            return completedFuture(table);
+                        }
+
+                        // Wait for table initialization.
+                        return tableReadyFuture(tableId);
+                    }, ioExecutor);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     /**
-     * Internal method for getting table by id.
+     * Gets a table by id, if it was created before.
      *
-     * @param id Table id.
-     * @param checkConfiguration {@code True} when the method checks a 
configuration before trying to get a table, {@code false}
-     *         otherwise.
-     * @return Future representing pending completion of the operation.
+     * @param tableId Table id.
+     * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean 
checkConfiguration) {
-        CompletableFuture<Boolean> tblCfgFut = checkConfiguration
-                ? supplyAsync(() -> inBusyLock(busyLock, () -> 
isTableConfigured(id)), ioExecutor)
-                : completedFuture(true);
+    private CompletableFuture<TableImpl> tableAsyncInternal(int tableId) {
+        HybridTimestamp now = clock.now();
 
-        return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
-            if (!isCfg) {
-                return completedFuture(null);
-            }
+        return schemaSyncService.waitForMetadataCompleteness(now)
+                .thenComposeAsync(ignore -> {
+                    TableImpl table = latestTablesById().get(tableId);
 
-            TableImpl tbl = latestTablesById().get(id);
+                    if (table != null) {
+                        // Table was published.
+                        return completedFuture(table);
+                    }
 
-            if (tbl != null) {
-                return completedFuture(tbl);
-            }
+                    if (tablesCfg.tables().value().stream().noneMatch(tbl -> 
tbl.id() == tableId)) {

Review Comment:
   Why do we need to check in the configuration here? We've awaited for 
metadata completeness, the table should be in the catalog, in the config and in 
tablesByIdVv.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1837,18 +1821,20 @@ public CompletableFuture<List<Table>> tablesAsync() {
      * @return Future representing pending completion of the operation.
      */
     private CompletableFuture<List<Table>> tablesAsyncInternal() {
-        return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), 
ioExecutor)
-                .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
-                    var tableFuts = new CompletableFuture[tableIds.size()];
+        return schemaSyncService.waitForMetadataCompleteness(clock.now())
+                .thenCompose(ignore -> inBusyLock(busyLock, () -> {
+                    NamedListView<TableView> configuredTables = 
tablesCfg.tables().value();

Review Comment:
   Ahem, now the method goes go the configuration. Wouldn't the approach 'take 
tables from tablesByIdVv, then for each of them wait till they are ready and 
then return the tables' work? It seems that it wouldn't require to go to the 
configuration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to