tkalkirill commented on code in PR #2518: URL: https://github.com/apache/ignite-3/pull/2518#discussion_r1312541699
########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java: ########## @@ -167,9 +170,11 @@ public void checkSchemasCorrectUpdate() throws Exception { table = (TableImpl) ignite2.tables().table(TABLE_NAME); TableImpl table0 = table; - assertTrue(waitForCondition(() -> table0.schemaView().schema().version() == 2, 5_000)); + assertTrue(waitForCondition(tableSchemaVersionEqPredicate(table0, 2), 5_000)); - table = (TableImpl) ignite1.tables().table(TABLE_NAME); + table = ((TableManager) ignite1.tables()).getTable(table.tableId()); + + assertFalse(waitForCondition(tableSchemaVersionEqPredicate(table, 2), 5_000)); Review Comment: I think that the test can be fixed using api without schema synchronization, see an example. https://github.com/apache/ignite-3/pull/2500/commits/b3f4aa701239ff41638e0e35a2a4de6ec360e1e3 ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java: ########## @@ -78,23 +79,26 @@ void laggingSchemasPreventPartitionDataReplication() throws Exception { IgniteImpl nodeToInhibitMetaStorage = cluster.node(1); + Table notInhibitedNodeTable = cluster.node(notInhibitedNodeIndex).tables().table(TABLE_NAME); + TableImpl inhibitedNodeTable = (TableImpl) nodeToInhibitMetaStorage.tables().table(TABLE_NAME); + WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage); listenerInhibitor.startInhibit(); try { CompletableFuture<?> rejectionTriggered = rejectionDueToMetadataLagTriggered(); updateTableSchemaAt(notInhibitedNodeIndex); - putToTableAt(notInhibitedNodeIndex); + putToTableAt(notInhibitedNodeTable); assertThat("Did not see rejections due to lagging metadata", rejectionTriggered, willSucceedIn(10, TimeUnit.SECONDS)); - assertTrue(solePartitionIsEmpty(nodeToInhibitMetaStorage), "Something was written to the partition"); + assertTrue(solePartitionIsEmpty(inhibitedNodeTable), "Something was written to the partition"); Review Comment: I think that the test can be fixed using api without schema synchronization, see how I fixed it. https://github.com/apache/ignite-3/pull/2500/commits/b3f4aa701239ff41638e0e35a2a4de6ec360e1e3 ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2017,52 +1993,81 @@ public CompletableFuture<TableImpl> tableImplAsync(String name) { /** * Gets a table by name, if it was created before. Doesn't parse canonical name. * - * @param name Table name. + * @param tableName Table name. * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(String name) { + public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) { if (!busyLock.enterBusy()) { throw new IgniteException(new NodeStoppingException()); } try { - return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor) - .thenCompose(tableId -> inBusyLock(busyLock, () -> { + HybridTimestamp now = clock.now(); + + return schemaSyncService.waitForMetadataCompleteness(now) + .thenComposeAsync(ignore -> { + Integer tableId = tableNameToId(tableName); + if (tableId == null) { + // Table isn't configured. return completedFuture(null); } - return tableAsyncInternal(tableId, false); - })); + TableImpl table = latestTablesById().get(tableId); + + if (table != null) { + // Table was published. + return completedFuture(table); + } + + // Wait for table initialization. + return tableReadyFuture(tableId); + }, ioExecutor); } finally { busyLock.leaveBusy(); } } /** - * Internal method for getting table by id. + * Gets a table by id, if it was created before. * - * @param id Table id. - * @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false} - * otherwise. - * @return Future representing pending completion of the operation. + * @param tableId Table id. + * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean checkConfiguration) { - CompletableFuture<Boolean> tblCfgFut = checkConfiguration - ? supplyAsync(() -> inBusyLock(busyLock, () -> isTableConfigured(id)), ioExecutor) - : completedFuture(true); + private CompletableFuture<TableImpl> tableAsyncInternal(int tableId) { + HybridTimestamp now = clock.now(); - return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> { - if (!isCfg) { - return completedFuture(null); - } + return schemaSyncService.waitForMetadataCompleteness(now) + .thenComposeAsync(ignore -> { + TableImpl table = latestTablesById().get(tableId); - TableImpl tbl = latestTablesById().get(id); + if (table != null) { + // Table was published. + return completedFuture(table); + } - if (tbl != null) { - return completedFuture(tbl); - } + if (tablesCfg.tables().value().stream().noneMatch(tbl -> tbl.id() == tableId)) { + // Table isn't configured. + return completedFuture(null); + } + + // Wait for table initialization. + return tableReadyFuture(tableId); + }, ioExecutor); + } + /** + * Internal method for getting a table future, which is completed when the table will be published. + * + * @param id Table id. + * @return Future representing pending completion of the operation. + */ + private CompletableFuture<TableImpl> tableReadyFuture(int id) { + if (!busyLock.enterBusy()) { Review Comment: I think it's better to complete the future with an error instead of throwing exceptions. see `org.apache.ignite.internal.util.IgniteUtils#inBusyLockAsync` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1864,29 +1850,19 @@ private CompletableFuture<List<Table>> tablesAsyncInternal() { } /** - * Collects a list of direct table ids. + * Return actual table id by given name or {@code null} if table doesn't exist. * - * @return A list of direct table ids. + * @param tableName Table name. + * @return Table id or {@code null} if not found. */ - private List<Integer> directTableIds() { - return configuredTablesCache.configuredTableIds(); - } - - /** - * Gets direct id of table with {@code tblName}. - * - * @param tblName Name of the table. - * @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found. - */ - @Nullable - private Integer directTableId(String tblName) { + private @Nullable Integer tableNameToId(String tableName) { Review Comment: Why no schema synchronization? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1837,18 +1821,20 @@ public CompletableFuture<List<Table>> tablesAsync() { * @return Future representing pending completion of the operation. */ private CompletableFuture<List<Table>> tablesAsyncInternal() { - return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor) - .thenCompose(tableIds -> inBusyLock(busyLock, () -> { - var tableFuts = new CompletableFuture[tableIds.size()]; + return schemaSyncService.waitForMetadataCompleteness(clock.now()) Review Comment: `TableManager#tables` may hang, and if you try to stop the `TableManager`, you will permanently hang the stop of the node. See how I fixed this problem. https://github.com/apache/ignite-3/blob/c63c4530f00ff67895c734e9221eac1189849d78/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java#L1444C26-L1444C26 ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java: ########## @@ -809,6 +810,9 @@ private TableManager createTableManager(CompletableFuture<TableManager> tblManag when(vaultManager.get(any(ByteArray.class))).thenReturn(completedFuture(null)); when(vaultManager.put(any(ByteArray.class), any(byte[].class))).thenReturn(completedFuture(null)); + SchemaSyncService schemaSyncService = mock(SchemaSyncService.class); + when(schemaSyncService.waitForMetadataCompleteness(any(HybridTimestamp.class))).thenReturn(completedFuture(null)); + Review Comment: ```suggestion ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2017,52 +1993,81 @@ public CompletableFuture<TableImpl> tableImplAsync(String name) { /** * Gets a table by name, if it was created before. Doesn't parse canonical name. * - * @param name Table name. + * @param tableName Table name. * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(String name) { + public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) { if (!busyLock.enterBusy()) { throw new IgniteException(new NodeStoppingException()); } try { - return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor) - .thenCompose(tableId -> inBusyLock(busyLock, () -> { + HybridTimestamp now = clock.now(); + + return schemaSyncService.waitForMetadataCompleteness(now) Review Comment: Same problem with node stop. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1864,29 +1850,19 @@ private CompletableFuture<List<Table>> tablesAsyncInternal() { } /** - * Collects a list of direct table ids. + * Return actual table id by given name or {@code null} if table doesn't exist. * - * @return A list of direct table ids. + * @param tableName Table name. + * @return Table id or {@code null} if not found. Review Comment: ```suggestion ``` ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java: ########## @@ -834,7 +838,7 @@ private TableManager createTableManager(CompletableFuture<TableManager> tblManag vaultManager, cmgMgr, distributionZoneManager, - mock(SchemaSyncService.class), + schemaSyncService, Review Comment: ```suggestion mock(SchemaSyncService.class, invocation -> completedFuture(null)), ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2089,26 +2094,18 @@ public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean checkConf // This check is needed for the case when we have registered tablesListener, // but tablesByIdVv has already been completed, so listener would be triggered only for the next versioned value update. - tbl = latestTablesById().get(id); + TableImpl table = latestTablesById().get(id); - if (tbl != null) { + if (table != null) { assignmentsUpdatedVv.removeWhenComplete(tablesListener); - return completedFuture(tbl); + return completedFuture(table); } return getTblFut.whenComplete((unused, throwable) -> assignmentsUpdatedVv.removeWhenComplete(tablesListener)); Review Comment: Same problem with node stop. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2017,52 +1993,81 @@ public CompletableFuture<TableImpl> tableImplAsync(String name) { /** * Gets a table by name, if it was created before. Doesn't parse canonical name. * - * @param name Table name. + * @param tableName Table name. * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(String name) { + public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) { if (!busyLock.enterBusy()) { throw new IgniteException(new NodeStoppingException()); } try { - return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor) - .thenCompose(tableId -> inBusyLock(busyLock, () -> { + HybridTimestamp now = clock.now(); + + return schemaSyncService.waitForMetadataCompleteness(now) + .thenComposeAsync(ignore -> { + Integer tableId = tableNameToId(tableName); + if (tableId == null) { + // Table isn't configured. return completedFuture(null); } - return tableAsyncInternal(tableId, false); - })); + TableImpl table = latestTablesById().get(tableId); + + if (table != null) { + // Table was published. + return completedFuture(table); + } + + // Wait for table initialization. + return tableReadyFuture(tableId); + }, ioExecutor); } finally { busyLock.leaveBusy(); } } /** - * Internal method for getting table by id. + * Gets a table by id, if it was created before. * - * @param id Table id. - * @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false} - * otherwise. - * @return Future representing pending completion of the operation. + * @param tableId Table id. + * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean checkConfiguration) { - CompletableFuture<Boolean> tblCfgFut = checkConfiguration - ? supplyAsync(() -> inBusyLock(busyLock, () -> isTableConfigured(id)), ioExecutor) - : completedFuture(true); + private CompletableFuture<TableImpl> tableAsyncInternal(int tableId) { + HybridTimestamp now = clock.now(); - return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> { - if (!isCfg) { - return completedFuture(null); - } + return schemaSyncService.waitForMetadataCompleteness(now) Review Comment: Same problem with node stop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org