tkalkirill commented on code in PR #2500: URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1319496620
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1463,408 +1416,51 @@ private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableD } private Set<Assignment> calculateAssignments(TablePartitionId tablePartitionId) { - CatalogTableDescriptor tableDescriptor = getTableDescriptor(tablePartitionId.tableId()); + int catalogVersion = catalogService.latestCatalogVersion(); - assert tableDescriptor != null : tablePartitionId; + CatalogTableDescriptor tableDescriptor = getTableDescriptor(tablePartitionId.tableId(), catalogVersion); return AffinityUtils.calculateAssignmentForPartition( // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 we must use distribution zone keys here baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), tablePartitionId.partitionId(), - getZoneDescriptor(tableDescriptor.zoneId(), catalogService.latestCatalogVersion()).replicas() + getZoneDescriptor(tableDescriptor, catalogVersion).replicas() ); } - /** - * Creates a new table with the given {@code name} asynchronously. If a table with the same name already exists, a future will be - * completed with {@link TableAlreadyExistsException}. - * - * @param name Table name. - * @param zoneName Distribution zone name. - * @param tableInitChange Table changer. - * @return Future representing pending completion of the operation. - * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: - * <ul> - * <li>the node is stopping.</li> - * </ul> - * @see TableAlreadyExistsException - */ - public CompletableFuture<Table> createTableAsync(String name, String zoneName, Consumer<TableChange> tableInitChange) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return createTableAsyncInternal(name, zoneName, tableInitChange); - } finally { - busyLock.leaveBusy(); - } - } - - /** See {@link #createTableAsync(String, String, Consumer)} for details. */ - private CompletableFuture<Table> createTableAsyncInternal( - String name, - String zoneName, - Consumer<TableChange> tableInitChange - ) { - CompletableFuture<Table> tblFut = new CompletableFuture<>(); - - tableAsyncInternal(name) - .handle((tbl, tblEx) -> { - if (tbl != null) { - tblFut.completeExceptionally(new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name)); - } else if (tblEx != null) { - tblFut.completeExceptionally(tblEx); - } else { - if (!busyLock.enterBusy()) { - NodeStoppingException nodeStoppingException = new NodeStoppingException(); - - tblFut.completeExceptionally(nodeStoppingException); - - throw new IgniteException(nodeStoppingException); - } - - try { - // TODO: IGNITE-19499 Should listen to event CreateTableEventParameters and get the zone ID from it - CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zoneName, clock.nowLong()); - - if (zoneDescriptor == null) { - tblFut.completeExceptionally(new DistributionZoneNotFoundException(zoneName)); - - return null; - } - - cmgMgr.logicalTopology() - .handle((cmgTopology, e) -> { - if (e == null) { - if (!busyLock.enterBusy()) { - NodeStoppingException nodeStoppingException = new NodeStoppingException(); - - tblFut.completeExceptionally(nodeStoppingException); - - throw new IgniteException(nodeStoppingException); - } - - try { - changeTablesConfigurationOnTableCreate( - name, - zoneDescriptor.id(), - tableInitChange, - tblFut - ); - } finally { - busyLock.leaveBusy(); - } - } else { - tblFut.completeExceptionally(e); - } - - return null; - }); - } catch (Throwable t) { - tblFut.completeExceptionally(t); - } finally { - busyLock.leaveBusy(); - } - } - - return null; - }); - - return tblFut; - } - - /** - * Creates a new table in {@link TablesConfiguration}. - * - * @param name Table name. - * @param zoneId Distribution zone id. - * @param tableInitChange Table changer. - * @param tblFut Future representing pending completion of the table creation. - */ - private void changeTablesConfigurationOnTableCreate( - String name, - int zoneId, - Consumer<TableChange> tableInitChange, - CompletableFuture<Table> tblFut - ) { - tablesCfg.change(tablesChange -> { - incrementTablesGeneration(tablesChange); - - tablesChange.changeTables(tablesListChange -> { - if (tablesListChange.get(name) != null) { - throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name); - } - - tablesListChange.create(name, (tableChange) -> { - tableInitChange.accept(tableChange); - - tableChange.changeZoneId(zoneId); - - var extConfCh = ((ExtendedTableChange) tableChange); - - int tableId = tablesChange.globalIdCounter() + 1; - - extConfCh.changeId(tableId); - - tablesChange.changeGlobalIdCounter(tableId); - - extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION); - - tableCreateFuts.put(extConfCh.id(), tblFut); - }); - }); - }).exceptionally(t -> { - Throwable ex = getRootCause(t); - - if (ex instanceof TableAlreadyExistsException) { - tblFut.completeExceptionally(ex); - } else { - LOG.debug("Unable to create table [name={}]", ex, name); - - tblFut.completeExceptionally(ex); - - tableCreateFuts.values().removeIf(fut -> fut == tblFut); - } - - return null; - }); - } - - private static void incrementTablesGeneration(TablesChange tablesChange) { - tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1); - } - - /** - * Alters a cluster table. If an appropriate table does not exist, a future will be completed with {@link TableNotFoundException}. - * - * @param name Table name. - * @param tableChange Table changer. - * @return Future representing pending completion of the operation. - * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: - * <ul> - * <li>the node is stopping.</li> - * </ul> - * @see TableNotFoundException - */ - public CompletableFuture<Void> alterTableAsync(String name, Function<TableChange, Boolean> tableChange) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return alterTableAsyncInternal(name, tableChange); - } finally { - busyLock.leaveBusy(); - } - } - - /** See {@link #alterTableAsync(String, Function)} for details. */ - private CompletableFuture<Void> alterTableAsyncInternal(String name, Function<TableChange, Boolean> tableChange) { - CompletableFuture<Void> tblFut = new CompletableFuture<>(); - - tableAsync(name).thenAccept(tbl -> { - if (tbl == null) { - tblFut.completeExceptionally(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name)); - } else { - tablesCfg.tables().change(ch -> { - if (ch.get(name) == null) { - throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name); - } - - ch.update(name, tblCh -> { - if (!tableChange.apply(tblCh)) { - return; - } - - ExtendedTableChange exTblChange = (ExtendedTableChange) tblCh; - - exTblChange.changeSchemaId(exTblChange.schemaId() + 1); - }); - }).whenComplete((res, t) -> { - if (t != null) { - Throwable ex = getRootCause(t); - - if (ex instanceof TableNotFoundException) { - tblFut.completeExceptionally(ex); - } else { - LOG.debug("Unable to modify table [name={}]", ex, name); - - tblFut.completeExceptionally(ex); - } - } else { - tblFut.complete(res); - } - }); - } - }).exceptionally(th -> { - tblFut.completeExceptionally(th); - - return null; - }); - - return tblFut; - } - - /** - * Gets a cause exception for a client. - * - * @param t Exception wrapper. - * @return A root exception which will be acceptable to throw for public API. - */ - //TODO: IGNITE-16051 Implement exception converter for public API. - private IgniteException getRootCause(Throwable t) { - Throwable ex; - - if (t instanceof CompletionException) { - if (t.getCause() instanceof ConfigurationChangeException) { - ex = t.getCause().getCause(); - } else { - ex = t.getCause(); - } - - } else { - ex = t; - } - - // TODO https://issues.apache.org/jira/browse/IGNITE-19539 - return (ex instanceof IgniteException) ? (IgniteException) ex : ExceptionUtils.wrap(ex); - } - - /** - * Drops a table with the name specified. If appropriate table does not be found, a future will be completed with - * {@link TableNotFoundException}. - * - * @param name Table name. - * @return Future representing pending completion of the operation. - * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: - * <ul> - * <li>the node is stopping.</li> - * </ul> - * @see TableNotFoundException - */ - public CompletableFuture<Void> dropTableAsync(String name) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return dropTableAsyncInternal(name); - } finally { - busyLock.leaveBusy(); - } - } - - /** See {@link #dropTableAsync(String)} for details. */ - private CompletableFuture<Void> dropTableAsyncInternal(String name) { - return tableAsyncInternal(name).thenCompose(tbl -> { - // In case of drop it's an optimization that allows not to fire drop-change-closure if there's no such - // distributed table and the local config has lagged behind. - if (tbl == null) { - return failedFuture(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name)); - } - - return tablesCfg - .change(chg -> { - incrementTablesGeneration(chg); - - chg - .changeTables(tblChg -> { - if (tblChg.get(name) == null) { - throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name); - } - - tblChg.delete(name); - }); - }) - .exceptionally(t -> { - Throwable ex = getRootCause(t); - - if (!(ex instanceof TableNotFoundException)) { - LOG.debug("Unable to drop table [name={}]", ex, name); - } - - throw new CompletionException(ex); - }); - }); - } - - /** {@inheritDoc} */ @Override public List<Table> tables() { return join(tablesAsync()); } - /** {@inheritDoc} */ @Override public CompletableFuture<List<Table>> tablesAsync() { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return tablesAsyncInternal(); - } finally { - busyLock.leaveBusy(); - } + return inBusyLockAsync(busyLock, this::tablesAsyncInternalBusy); } - /** - * Internal method for getting table. - * - * @return Future representing pending completion of the operation. - */ - private CompletableFuture<List<Table>> tablesAsyncInternal() { - return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor) - .thenCompose(tableIds -> inBusyLock(busyLock, () -> { - var tableFuts = new CompletableFuture[tableIds.size()]; - - var i = 0; + private CompletableFuture<List<Table>> tablesAsyncInternalBusy() { + HybridTimestamp now = clock.now(); - for (int tblId : tableIds) { - tableFuts[i++] = tableAsyncInternal(tblId, false); - } - - return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> { - var tables = new ArrayList<Table>(tableIds.size()); - - for (var fut : tableFuts) { - var table = fut.join(); - - if (table != null) { - tables.add((Table) table); - } - } + return anyOf(schemaSyncService.waitForMetadataCompleteness(now), stopManagerFuture) Review Comment: We discussed it in person and removed use busyLock at `TableManager#join`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org