sanpwc commented on code in PR #2500: URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1316767338
########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java: ########## @@ -350,7 +350,7 @@ public CompletableFuture<Void> dropTable(DropTableParams params) { Arrays.stream(schema.indexes()) .filter(index -> index.tableId() == table.id()) - .forEach(index -> updateEntries.add(new DropIndexEntry(index.id(), index.tableId(), table.name()))); + .forEach(index -> updateEntries.add(new DropIndexEntry(index.id(), index.tableId()))); Review Comment: Seems that given action and all similar should be covered by busyLock. Is it covered? ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java: ########## @@ -129,50 +121,6 @@ void afterEach() throws Exception { IgniteUtils.closeAll(closeables); } - /** - * Check unsupported column type change. - */ - @Test - public void testChangeColumnType() throws Exception { Review Comment: We do not need such tests any longer, or we have them somewhere else? ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java: ########## @@ -1229,30 +1222,9 @@ private void createTableWithData(List<IgniteImpl> nodes, String name, int replic private void waitForIndex(Collection<IgniteImpl> nodes, String indexName) throws InterruptedException { // FIXME: Wait for the index to be created on all nodes, // this is a workaround for https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to the PK index. - - Stream<TablesConfiguration> partialTablesConfiguration = Stream.empty(); - - if (!partialNodes.isEmpty()) { - partialTablesConfiguration = partialNodes.stream() - .flatMap(it -> it.startedComponents().stream()) - .filter(ConfigurationManager.class::isInstance) - .map(c -> ((ConfigurationManager) c).configurationRegistry().getConfiguration(TablesConfiguration.KEY)) - .filter(Objects::nonNull) - .findAny() - .map(Stream::of) - .orElseThrow(); - } - - Stream<TablesConfiguration> nodesTablesConfigurations = nodes.stream() - .filter(Objects::nonNull) - .map(node -> node.clusterConfiguration().getConfiguration(TablesConfiguration.KEY)); - - List<TablesConfiguration> tablesConfigurations = Stream.concat(nodesTablesConfigurations, partialTablesConfiguration) - .collect(Collectors.toList()); - assertTrue(waitForCondition( - () -> tablesConfigurations.stream() - .map(cfg -> cfg.indexes().get(indexName.toUpperCase())) + () -> nodes.stream() Review Comment: Well, it's not related to you changes. However this method seems useless. Having index in catalog doesn't mean that it's up and running. Catalog may even didn't send corresponding event. ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java: ########## @@ -486,14 +431,17 @@ private void addColumnInternal(Ignite node, String tableName, ColumnDefinition c * @param node Ignite node. * @param tableName Table name. */ - protected void addColumnIfNotExists(Ignite node, String tableName) { - ColumnDefinition col = SchemaBuilders.column("valStrNew", ColumnType.string()).asNullable(true) - .withDefaultValue("default").build(); - + private static void addColumnIfNotExists(Ignite node, String tableName) { try { - addColumnInternal(node, tableName, col); - } catch (ColumnAlreadyExistsException ex) { - log.info("Column already exists [naem={}]", col.name()); + addColumn(node, tableName); + } catch (ColumnAlreadyExistsException e) { + log.info("Column already exists", e); Review Comment: The way how given method is used, e.g. in testAddColumn is confusing. Within test env we should assert that ColumnAlreadyExistsException was thrown if there was column with given name and assert that no exceptions were thrown if there wasn't one. Logging "Column already exists" within tests scenarios in ItTablesApiTest is useless. It seems trivial to fix it, could you please do the stuff? ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java: ########## @@ -166,7 +167,8 @@ private static boolean solePartitionIsEmpty(IgniteImpl node) { } private static MvPartitionStorage solePartitionStorage(IgniteImpl node) { - TableImpl table = (TableImpl) node.tables().table(TABLE_NAME); + // We use this api because there is no waiting for schemas to sync. Review Comment: Not sure, that I got it. Sounds like FIXME with sync ticket. Is it a tmp one? ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java: ########## @@ -603,25 +585,26 @@ protected IgniteSql igniteSql() { } /** - * Returns the index ID from the configuration, {@code null} if there is no index configuration. + * Returns the index ID from the catalog, {@code null} if there is no index. * * @param node Node. * @param indexName Index name. */ static @Nullable Integer indexId(Ignite node, String indexName) { - TableIndexConfiguration indexConfig = getIndexConfiguration(node, indexName); + CatalogIndexDescriptor indexDescriptor = getIndexDescriptor(node, indexName); - return indexConfig == null ? null : indexConfig.id().value(); + return indexDescriptor == null ? null : indexDescriptor.id(); } /** - * Returns table configuration of the given table at the given node, or {@code null} if no such table exists. + * Returns table descriptor of the given table at the given node, or {@code null} if no such table exists. * * @param node Node. * @param tableName Table name. - * @return Table configuration. */ - private static @Nullable TableConfiguration getTableConfiguration(Ignite node, String tableName) { - return getTablesConfiguration(node).tables().get(tableName.toUpperCase()); + private static @Nullable CatalogTableDescriptor getTableDescriptor(Ignite node, String tableName) { + IgniteImpl nodeImpl = (IgniteImpl) node; + + return getTable(nodeImpl.catalogManager(), tableName, nodeImpl.clock().nowLong()); Review Comment: Well, I didn't dig deeply. Seems that you widely use `nodeImpl.clock().nowLong()` so I have a questions here: Are you sure that you use **same** timestamp in case of reads that are expected to be consistent? E.g. you read table and index, or just retrieve multiple tables. ########## modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TableValidatorImpl.java: ########## @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.schema.configuration; - -import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.ignite.configuration.NamedListView; -import org.apache.ignite.configuration.validation.ValidationContext; -import org.apache.ignite.configuration.validation.ValidationIssue; -import org.apache.ignite.configuration.validation.Validator; - -/** - * Table schema configuration validator implementation. - */ -public class TableValidatorImpl implements Validator<TableValidator, NamedListView<TableView>> { - /** Static instance. */ - public static final TableValidatorImpl INSTANCE = new TableValidatorImpl(); - - /** {@inheritDoc} */ - @Override - public void validate(TableValidator annotation, ValidationContext<NamedListView<TableView>> ctx) { - TablesView tablesConfig = ctx.getNewRoot(TablesConfiguration.KEY); Review Comment: Not sure whether I've already asked you or not. We do have corresponding validation in catalog, right? Or if not, we do have a ticket, correct? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -219,17 +200,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private static final int TX_STATE_STORAGE_FLUSH_DELAY = 1000; private static final IntSupplier TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER = () -> TX_STATE_STORAGE_FLUSH_DELAY; - /** - * If this property is set to {@code true} then an attempt to get the configuration property directly from Meta storage will be skipped, - * and the local property will be returned. - * TODO: IGNITE-16774 This property and overall approach, access configuration directly through Meta storage, - * TODO: will be removed after fix of the issue. - */ - private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY"); Review Comment: Why it's removed? We should do it within IGNITE-16774. And by the way, given property still used in test and only in test ``` @WithSystemProperty(key = "IGNITE_GET_METADATA_LOCALLY_ONLY", value = "true") public class ItNoSyncMetadataTest extends ClusterPerClassIntegrationTest { ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1233,20 +1130,77 @@ public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li return assignmentsChangeListeners.remove(listener); } + /** + * Creates local structures for a table. + * + * @param causalityToken Causality token. + * @param catalogVersion Catalog version on which the table was created. + * @param tableDescriptor Catalog table descriptor. + * @return Future that will be completed when local changes related to the table creation are applied. + */ + private CompletableFuture<?> createTableLocally(long causalityToken, int catalogVersion, CatalogTableDescriptor tableDescriptor) { + int tableId = tableDescriptor.id(); + + if (!busyLock.enterBusy()) { + fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId), new NodeStoppingException()); Review Comment: Who listens such events now? Should it listen catalog events instead? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2740,47 +2255,47 @@ private PartitionUpdateHandlers createPartitionUpdateHandlers( return findTableImplByName(startedTables.values(), name); } - private @Nullable CatalogTableDescriptor getTableDescriptor(int id) { - TableView tableView = findTableView(tablesCfg.value(), id); + private CatalogTableDescriptor getTableDescriptor(int tableId, int catalogVersion) { + CatalogTableDescriptor tableDescriptor = catalogService.table(tableId, catalogVersion); + + assert tableDescriptor != null : "tableId=" + tableId + ", catalogVersion=" + catalogVersion; - return tableView == null ? null : toTableDescriptor(tableView); + return tableDescriptor; } - private @Nullable CatalogZoneDescriptor getZoneDescriptor(int zoneId, int catalogVersion) { - return catalogService.zone(zoneId, catalogVersion); + private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) { + CatalogZoneDescriptor zoneDescriptor = catalogService.zone(tableDescriptor.zoneId(), catalogVersion); + + assert zoneDescriptor != null : + "tableId=" + tableDescriptor.id() + ", zoneId=" + tableDescriptor.zoneId() + ", catalogVersion=" + catalogVersion; + + return zoneDescriptor; } private static @Nullable TableImpl findTableImplByName(Collection<TableImpl> tables, String name) { return tables.stream().filter(table -> table.name().equals(name)).findAny().orElse(null); } - /** - * Fires table creation events so that indexes can be correctly created at IndexManager startup. - * - * <p>NOTE: This is a temporary solution that must be get rid/remake/change. - */ - // TODO: IGNITE-19499 Need to get rid/remake/change - private void fireCreateTablesOnManagerStart() { + private void startTables() { CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture(); assert recoveryFinishFuture.isDone(); + int catalogVersion = catalogService.latestCatalogVersion(); long causalityToken = recoveryFinishFuture.join(); - List<CompletableFuture<?>> fireEventFutures = new ArrayList<>(); + List<CompletableFuture<?>> startTableFutures = new ArrayList<>(); - for (TableView tableView : tablesCfg.tables().value()) { - fireEventFutures.add(fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableView.id()))); + for (CatalogTableDescriptor tableDescriptor : catalogService.tables(catalogVersion)) { Review Comment: Same as for DZM. We should also cleanup tables that were removed. Fill free to add new ticket with corresponding TODO. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1233,20 +1130,77 @@ public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li return assignmentsChangeListeners.remove(listener); } + /** + * Creates local structures for a table. + * + * @param causalityToken Causality token. + * @param catalogVersion Catalog version on which the table was created. + * @param tableDescriptor Catalog table descriptor. + * @return Future that will be completed when local changes related to the table creation are applied. + */ + private CompletableFuture<?> createTableLocally(long causalityToken, int catalogVersion, CatalogTableDescriptor tableDescriptor) { + int tableId = tableDescriptor.id(); + + if (!busyLock.enterBusy()) { + fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId), new NodeStoppingException()); + + return failedFuture(new NodeStoppingException()); + } + + try { + int zoneId = tableDescriptor.zoneId(); + + CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); + + CompletableFuture<List<Set<Assignment>>> assignmentsFuture; + + // Check if the table already has assignments in the vault. + // So, it means, that it is a recovery process and we should use the vault assignments instead of calculation for the new ones. + if (partitionAssignments(vaultManager, tableId, 0) != null) { Review Comment: This logic is incorrect, could you please add todo with https://issues.apache.org/jira/browse/IGNITE-20210 here? ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java: ########## @@ -608,6 +600,12 @@ private class Node { /** The future have to be complete after the node start and all Meta storage watches are deployd. */ private CompletableFuture<Void> deployWatchesFut; + /** Hybrid clock. */ + private final HybridClock clock = new HybridClockImpl(); Review Comment: I agree that sharing clock is a bug, however I don't think that in this particular case it is sharing. Seems that Kirill just moved clock instantiation from the constructor to the field initialization. In both cases it's node specific. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java: ########## @@ -198,7 +197,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private static final int FUTURE_SCHEMA_ROW_INDEXED_VALUE = 0; /** Table id. */ - private final int tblId = 1; + private static final int tblId = 1; Review Comment: UPPER case. ########## modules/table/build.gradle: ########## @@ -66,6 +66,7 @@ dependencies { testImplementation(testFixtures(project(':ignite-metastorage'))) testImplementation(testFixtures(project(':ignite-distribution-zones'))) testImplementation(testFixtures(project(':ignite-catalog'))) + testImplementation(testFixtures(project(':ignite-vault'))) Review Comment: Why? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1233,20 +1130,77 @@ public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li return assignmentsChangeListeners.remove(listener); } + /** + * Creates local structures for a table. + * + * @param causalityToken Causality token. + * @param catalogVersion Catalog version on which the table was created. + * @param tableDescriptor Catalog table descriptor. + * @return Future that will be completed when local changes related to the table creation are applied. + */ + private CompletableFuture<?> createTableLocally(long causalityToken, int catalogVersion, CatalogTableDescriptor tableDescriptor) { + int tableId = tableDescriptor.id(); + + if (!busyLock.enterBusy()) { + fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId), new NodeStoppingException()); + + return failedFuture(new NodeStoppingException()); + } + + try { + int zoneId = tableDescriptor.zoneId(); + + CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); + + CompletableFuture<List<Set<Assignment>>> assignmentsFuture; + + // Check if the table already has assignments in the vault. + // So, it means, that it is a recovery process and we should use the vault assignments instead of calculation for the new ones. + if (partitionAssignments(vaultManager, tableId, 0) != null) { + assignmentsFuture = completedFuture(tableAssignments(vaultManager, tableId, zoneDescriptor.partitions())); + } else { + assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, zoneId) + .thenApply(dataNodes -> AffinityUtils.calculateAssignments( + dataNodes, + zoneDescriptor.partitions(), + zoneDescriptor.replicas() + )); + } + + return createTableLocally( + causalityToken, + tableDescriptor, + zoneDescriptor, + assignmentsFuture, + catalogVersion + ).whenComplete((v, e) -> { + if (e == null) { + for (var listener : assignmentsChangeListeners) { + listener.accept(this); + } + } + }).thenCompose(ignored -> writeTableAssignmentsToMetastore(tableId, assignmentsFuture)); Review Comment: > @sanpwc wdyt? > Also, @sanpwc, does the order look correct? or maybe we should write to metastorage prior to notification? I think yes. Generally speaking I believe that notification flow logic should be reworked because we have catalog with corresponding catalog events now. Let me repeat aforementioned questions here: 1. Who listens table specific events events now? 2. Should it listen catalog events instead? I also agree that it should be covered within separate ticket if we will agree on necessity of such changes. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java: ########## @@ -245,37 +222,36 @@ public class TableManagerTest extends IgniteAbstractTest { /** Hybrid clock. */ private final HybridClock clock = new HybridClockImpl(); + /** Catalog vault. */ + private VaultManager catalogVault; Review Comment: First of all, why do you guys need vault here? MS doesn't use it to store key projections any longer. Seems that it's only used for persisting applied revision which is also effectively deprecated. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1463,408 +1416,51 @@ private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableD } private Set<Assignment> calculateAssignments(TablePartitionId tablePartitionId) { - CatalogTableDescriptor tableDescriptor = getTableDescriptor(tablePartitionId.tableId()); + int catalogVersion = catalogService.latestCatalogVersion(); - assert tableDescriptor != null : tablePartitionId; + CatalogTableDescriptor tableDescriptor = getTableDescriptor(tablePartitionId.tableId(), catalogVersion); return AffinityUtils.calculateAssignmentForPartition( // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 we must use distribution zone keys here baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), tablePartitionId.partitionId(), - getZoneDescriptor(tableDescriptor.zoneId(), catalogService.latestCatalogVersion()).replicas() + getZoneDescriptor(tableDescriptor, catalogVersion).replicas() ); } - /** - * Creates a new table with the given {@code name} asynchronously. If a table with the same name already exists, a future will be - * completed with {@link TableAlreadyExistsException}. - * - * @param name Table name. - * @param zoneName Distribution zone name. - * @param tableInitChange Table changer. - * @return Future representing pending completion of the operation. - * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: - * <ul> - * <li>the node is stopping.</li> - * </ul> - * @see TableAlreadyExistsException - */ - public CompletableFuture<Table> createTableAsync(String name, String zoneName, Consumer<TableChange> tableInitChange) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return createTableAsyncInternal(name, zoneName, tableInitChange); - } finally { - busyLock.leaveBusy(); - } - } - - /** See {@link #createTableAsync(String, String, Consumer)} for details. */ - private CompletableFuture<Table> createTableAsyncInternal( - String name, - String zoneName, - Consumer<TableChange> tableInitChange - ) { - CompletableFuture<Table> tblFut = new CompletableFuture<>(); - - tableAsyncInternal(name) - .handle((tbl, tblEx) -> { - if (tbl != null) { - tblFut.completeExceptionally(new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name)); - } else if (tblEx != null) { - tblFut.completeExceptionally(tblEx); - } else { - if (!busyLock.enterBusy()) { - NodeStoppingException nodeStoppingException = new NodeStoppingException(); - - tblFut.completeExceptionally(nodeStoppingException); - - throw new IgniteException(nodeStoppingException); - } - - try { - // TODO: IGNITE-19499 Should listen to event CreateTableEventParameters and get the zone ID from it - CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zoneName, clock.nowLong()); - - if (zoneDescriptor == null) { - tblFut.completeExceptionally(new DistributionZoneNotFoundException(zoneName)); - - return null; - } - - cmgMgr.logicalTopology() - .handle((cmgTopology, e) -> { - if (e == null) { - if (!busyLock.enterBusy()) { - NodeStoppingException nodeStoppingException = new NodeStoppingException(); - - tblFut.completeExceptionally(nodeStoppingException); - - throw new IgniteException(nodeStoppingException); - } - - try { - changeTablesConfigurationOnTableCreate( - name, - zoneDescriptor.id(), - tableInitChange, - tblFut - ); - } finally { - busyLock.leaveBusy(); - } - } else { - tblFut.completeExceptionally(e); - } - - return null; - }); - } catch (Throwable t) { - tblFut.completeExceptionally(t); - } finally { - busyLock.leaveBusy(); - } - } - - return null; - }); - - return tblFut; - } - - /** - * Creates a new table in {@link TablesConfiguration}. - * - * @param name Table name. - * @param zoneId Distribution zone id. - * @param tableInitChange Table changer. - * @param tblFut Future representing pending completion of the table creation. - */ - private void changeTablesConfigurationOnTableCreate( - String name, - int zoneId, - Consumer<TableChange> tableInitChange, - CompletableFuture<Table> tblFut - ) { - tablesCfg.change(tablesChange -> { - incrementTablesGeneration(tablesChange); - - tablesChange.changeTables(tablesListChange -> { - if (tablesListChange.get(name) != null) { - throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name); - } - - tablesListChange.create(name, (tableChange) -> { - tableInitChange.accept(tableChange); - - tableChange.changeZoneId(zoneId); - - var extConfCh = ((ExtendedTableChange) tableChange); - - int tableId = tablesChange.globalIdCounter() + 1; - - extConfCh.changeId(tableId); - - tablesChange.changeGlobalIdCounter(tableId); - - extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION); - - tableCreateFuts.put(extConfCh.id(), tblFut); - }); - }); - }).exceptionally(t -> { - Throwable ex = getRootCause(t); - - if (ex instanceof TableAlreadyExistsException) { - tblFut.completeExceptionally(ex); - } else { - LOG.debug("Unable to create table [name={}]", ex, name); - - tblFut.completeExceptionally(ex); - - tableCreateFuts.values().removeIf(fut -> fut == tblFut); - } - - return null; - }); - } - - private static void incrementTablesGeneration(TablesChange tablesChange) { - tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1); - } - - /** - * Alters a cluster table. If an appropriate table does not exist, a future will be completed with {@link TableNotFoundException}. - * - * @param name Table name. - * @param tableChange Table changer. - * @return Future representing pending completion of the operation. - * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: - * <ul> - * <li>the node is stopping.</li> - * </ul> - * @see TableNotFoundException - */ - public CompletableFuture<Void> alterTableAsync(String name, Function<TableChange, Boolean> tableChange) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return alterTableAsyncInternal(name, tableChange); - } finally { - busyLock.leaveBusy(); - } - } - - /** See {@link #alterTableAsync(String, Function)} for details. */ - private CompletableFuture<Void> alterTableAsyncInternal(String name, Function<TableChange, Boolean> tableChange) { - CompletableFuture<Void> tblFut = new CompletableFuture<>(); - - tableAsync(name).thenAccept(tbl -> { - if (tbl == null) { - tblFut.completeExceptionally(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name)); - } else { - tablesCfg.tables().change(ch -> { - if (ch.get(name) == null) { - throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name); - } - - ch.update(name, tblCh -> { - if (!tableChange.apply(tblCh)) { - return; - } - - ExtendedTableChange exTblChange = (ExtendedTableChange) tblCh; - - exTblChange.changeSchemaId(exTblChange.schemaId() + 1); - }); - }).whenComplete((res, t) -> { - if (t != null) { - Throwable ex = getRootCause(t); - - if (ex instanceof TableNotFoundException) { - tblFut.completeExceptionally(ex); - } else { - LOG.debug("Unable to modify table [name={}]", ex, name); - - tblFut.completeExceptionally(ex); - } - } else { - tblFut.complete(res); - } - }); - } - }).exceptionally(th -> { - tblFut.completeExceptionally(th); - - return null; - }); - - return tblFut; - } - - /** - * Gets a cause exception for a client. - * - * @param t Exception wrapper. - * @return A root exception which will be acceptable to throw for public API. - */ - //TODO: IGNITE-16051 Implement exception converter for public API. - private IgniteException getRootCause(Throwable t) { - Throwable ex; - - if (t instanceof CompletionException) { - if (t.getCause() instanceof ConfigurationChangeException) { - ex = t.getCause().getCause(); - } else { - ex = t.getCause(); - } - - } else { - ex = t; - } - - // TODO https://issues.apache.org/jira/browse/IGNITE-19539 - return (ex instanceof IgniteException) ? (IgniteException) ex : ExceptionUtils.wrap(ex); - } - - /** - * Drops a table with the name specified. If appropriate table does not be found, a future will be completed with - * {@link TableNotFoundException}. - * - * @param name Table name. - * @return Future representing pending completion of the operation. - * @throws IgniteException If an unspecified platform exception has happened internally. Is thrown when: - * <ul> - * <li>the node is stopping.</li> - * </ul> - * @see TableNotFoundException - */ - public CompletableFuture<Void> dropTableAsync(String name) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return dropTableAsyncInternal(name); - } finally { - busyLock.leaveBusy(); - } - } - - /** See {@link #dropTableAsync(String)} for details. */ - private CompletableFuture<Void> dropTableAsyncInternal(String name) { - return tableAsyncInternal(name).thenCompose(tbl -> { - // In case of drop it's an optimization that allows not to fire drop-change-closure if there's no such - // distributed table and the local config has lagged behind. - if (tbl == null) { - return failedFuture(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name)); - } - - return tablesCfg - .change(chg -> { - incrementTablesGeneration(chg); - - chg - .changeTables(tblChg -> { - if (tblChg.get(name) == null) { - throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name); - } - - tblChg.delete(name); - }); - }) - .exceptionally(t -> { - Throwable ex = getRootCause(t); - - if (!(ex instanceof TableNotFoundException)) { - LOG.debug("Unable to drop table [name={}]", ex, name); - } - - throw new CompletionException(ex); - }); - }); - } - - /** {@inheritDoc} */ @Override public List<Table> tables() { return join(tablesAsync()); } - /** {@inheritDoc} */ @Override public CompletableFuture<List<Table>> tablesAsync() { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - try { - return tablesAsyncInternal(); - } finally { - busyLock.leaveBusy(); - } + return inBusyLockAsync(busyLock, this::tablesAsyncInternalBusy); } - /** - * Internal method for getting table. - * - * @return Future representing pending completion of the operation. - */ - private CompletableFuture<List<Table>> tablesAsyncInternal() { - return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor) - .thenCompose(tableIds -> inBusyLock(busyLock, () -> { - var tableFuts = new CompletableFuture[tableIds.size()]; - - var i = 0; + private CompletableFuture<List<Table>> tablesAsyncInternalBusy() { + HybridTimestamp now = clock.now(); - for (int tblId : tableIds) { - tableFuts[i++] = tableAsyncInternal(tblId, false); - } - - return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> { - var tables = new ArrayList<Table>(tableIds.size()); - - for (var fut : tableFuts) { - var table = fut.join(); - - if (table != null) { - tables.add((Table) table); - } - } + return anyOf(schemaSyncService.waitForMetadataCompleteness(now), stopManagerFuture) Review Comment: No, why would we? Probably I am missing something. SchemaSyncService is not the only actor that we are touching here, TableManager internals are also involved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org