sanpwc commented on code in PR #2500:
URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1316767338


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java:
##########
@@ -350,7 +350,7 @@ public CompletableFuture<Void> dropTable(DropTableParams 
params) {
 
             Arrays.stream(schema.indexes())
                     .filter(index -> index.tableId() == table.id())
-                    .forEach(index -> updateEntries.add(new 
DropIndexEntry(index.id(), index.tableId(), table.name())));
+                    .forEach(index -> updateEntries.add(new 
DropIndexEntry(index.id(), index.tableId())));

Review Comment:
   Seems that given action and all similar should be covered by busyLock. Is it 
covered?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java:
##########
@@ -129,50 +121,6 @@ void afterEach() throws Exception {
         IgniteUtils.closeAll(closeables);
     }
 
-    /**
-     * Check unsupported column type change.
-     */
-    @Test
-    public void testChangeColumnType() throws Exception {

Review Comment:
   We do not need such tests any longer, or we have them somewhere else?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java:
##########
@@ -1229,30 +1222,9 @@ private void createTableWithData(List<IgniteImpl> nodes, 
String name, int replic
     private void waitForIndex(Collection<IgniteImpl> nodes, String indexName) 
throws InterruptedException {
         // FIXME: Wait for the index to be created on all nodes,
         //  this is a workaround for 
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to 
the PK index.
-
-        Stream<TablesConfiguration> partialTablesConfiguration = 
Stream.empty();
-
-        if (!partialNodes.isEmpty()) {
-            partialTablesConfiguration = partialNodes.stream()
-                    .flatMap(it -> it.startedComponents().stream())
-                    .filter(ConfigurationManager.class::isInstance)
-                    .map(c -> ((ConfigurationManager) 
c).configurationRegistry().getConfiguration(TablesConfiguration.KEY))
-                    .filter(Objects::nonNull)
-                    .findAny()
-                    .map(Stream::of)
-                    .orElseThrow();
-        }
-
-        Stream<TablesConfiguration> nodesTablesConfigurations = nodes.stream()
-                .filter(Objects::nonNull)
-                .map(node -> 
node.clusterConfiguration().getConfiguration(TablesConfiguration.KEY));
-
-        List<TablesConfiguration> tablesConfigurations = 
Stream.concat(nodesTablesConfigurations, partialTablesConfiguration)
-                .collect(Collectors.toList());
-
         assertTrue(waitForCondition(
-                () -> tablesConfigurations.stream()
-                        .map(cfg -> cfg.indexes().get(indexName.toUpperCase()))
+                () -> nodes.stream()

Review Comment:
   Well, it's not related to you changes. However this method seems useless. 
Having index in catalog doesn't mean that it's up and running. Catalog may even 
didn't send corresponding event. 



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java:
##########
@@ -486,14 +431,17 @@ private void addColumnInternal(Ignite node, String 
tableName, ColumnDefinition c
      * @param node Ignite node.
      * @param tableName Table name.
      */
-    protected void addColumnIfNotExists(Ignite node, String tableName) {
-        ColumnDefinition col = SchemaBuilders.column("valStrNew", 
ColumnType.string()).asNullable(true)
-                .withDefaultValue("default").build();
-
+    private static void addColumnIfNotExists(Ignite node, String tableName) {
         try {
-            addColumnInternal(node, tableName, col);
-        } catch (ColumnAlreadyExistsException ex) {
-            log.info("Column already exists [naem={}]", col.name());
+            addColumn(node, tableName);
+        } catch (ColumnAlreadyExistsException e) {
+            log.info("Column already exists", e);

Review Comment:
   The way how given method is used, e.g. in testAddColumn is confusing. Within 
test env we should assert that ColumnAlreadyExistsException was thrown if there 
was column with given name and assert that no exceptions were thrown if there 
wasn't one. Logging "Column already exists" within tests scenarios in 
ItTablesApiTest is useless.
   It seems trivial to fix it, could you please do the stuff? 



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java:
##########
@@ -166,7 +167,8 @@ private static boolean solePartitionIsEmpty(IgniteImpl 
node) {
     }
 
     private static MvPartitionStorage solePartitionStorage(IgniteImpl node) {
-        TableImpl table = (TableImpl) node.tables().table(TABLE_NAME);
+        // We use this api because there is no waiting for schemas to sync.

Review Comment:
   Not sure, that I got it. Sounds like FIXME with sync ticket. Is it a tmp one?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java:
##########
@@ -603,25 +585,26 @@ protected IgniteSql igniteSql() {
     }
 
     /**
-     * Returns the index ID from the configuration, {@code null} if there is 
no index configuration.
+     * Returns the index ID from the catalog, {@code null} if there is no 
index.
      *
      * @param node Node.
      * @param indexName Index name.
      */
     static @Nullable Integer indexId(Ignite node, String indexName) {
-        TableIndexConfiguration indexConfig = getIndexConfiguration(node, 
indexName);
+        CatalogIndexDescriptor indexDescriptor = getIndexDescriptor(node, 
indexName);
 
-        return indexConfig == null ? null : indexConfig.id().value();
+        return indexDescriptor == null ? null : indexDescriptor.id();
     }
 
     /**
-     * Returns table configuration of the given table at the given node, or 
{@code null} if no such table exists.
+     * Returns table descriptor of the given table at the given node, or 
{@code null} if no such table exists.
      *
      * @param node Node.
      * @param tableName Table name.
-     * @return Table configuration.
      */
-    private static @Nullable TableConfiguration getTableConfiguration(Ignite 
node, String tableName) {
-        return 
getTablesConfiguration(node).tables().get(tableName.toUpperCase());
+    private static @Nullable CatalogTableDescriptor getTableDescriptor(Ignite 
node, String tableName) {
+        IgniteImpl nodeImpl = (IgniteImpl) node;
+
+        return getTable(nodeImpl.catalogManager(), tableName, 
nodeImpl.clock().nowLong());

Review Comment:
   Well, I didn't dig deeply. Seems that you widely use 
`nodeImpl.clock().nowLong()` so I have a questions here:
   Are you sure that you use **same** timestamp in case of reads that are 
expected to be consistent? E.g. you read table and index, or just retrieve 
multiple tables.



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TableValidatorImpl.java:
##########
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema.configuration;
-
-import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.ignite.configuration.NamedListView;
-import org.apache.ignite.configuration.validation.ValidationContext;
-import org.apache.ignite.configuration.validation.ValidationIssue;
-import org.apache.ignite.configuration.validation.Validator;
-
-/**
- * Table schema configuration validator implementation.
- */
-public class TableValidatorImpl implements Validator<TableValidator, 
NamedListView<TableView>> {
-    /** Static instance. */
-    public static final TableValidatorImpl INSTANCE = new TableValidatorImpl();
-
-    /** {@inheritDoc} */
-    @Override
-    public void validate(TableValidator annotation, 
ValidationContext<NamedListView<TableView>> ctx) {
-        TablesView tablesConfig = ctx.getNewRoot(TablesConfiguration.KEY);

Review Comment:
   Not sure whether I've already asked you or not. We do have corresponding 
validation in catalog, right? Or if not, we do have a ticket, correct?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -219,17 +200,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     private static final int TX_STATE_STORAGE_FLUSH_DELAY = 1000;
     private static final IntSupplier TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER = 
() -> TX_STATE_STORAGE_FLUSH_DELAY;
 
-    /**
-     * If this property is set to {@code true} then an attempt to get the 
configuration property directly from Meta storage will be skipped,
-     * and the local property will be returned.
-     * TODO: IGNITE-16774 This property and overall approach, access 
configuration directly through Meta storage,
-     * TODO: will be removed after fix of the issue.
-     */
-    private final boolean getMetadataLocallyOnly = 
IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");

Review Comment:
   Why it's removed? We should do it within IGNITE-16774. And by the way, given 
property still used in test and only in test 
   ```
   @WithSystemProperty(key = "IGNITE_GET_METADATA_LOCALLY_ONLY", value = "true")
   public class ItNoSyncMetadataTest extends ClusterPerClassIntegrationTest {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1233,20 +1130,77 @@ public boolean 
removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li
         return assignmentsChangeListeners.remove(listener);
     }
 
+    /**
+     * Creates local structures for a table.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version on which the table was created.
+     * @param tableDescriptor Catalog table descriptor.
+     * @return Future that will be completed when local changes related to the 
table creation are applied.
+     */
+    private CompletableFuture<?> createTableLocally(long causalityToken, int 
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+        int tableId = tableDescriptor.id();
+
+        if (!busyLock.enterBusy()) {
+            fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId), new NodeStoppingException());

Review Comment:
   Who listens such events now? Should it listen catalog events instead?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2740,47 +2255,47 @@ private PartitionUpdateHandlers 
createPartitionUpdateHandlers(
         return findTableImplByName(startedTables.values(), name);
     }
 
-    private @Nullable CatalogTableDescriptor getTableDescriptor(int id) {
-        TableView tableView = findTableView(tablesCfg.value(), id);
+    private CatalogTableDescriptor getTableDescriptor(int tableId, int 
catalogVersion) {
+        CatalogTableDescriptor tableDescriptor = catalogService.table(tableId, 
catalogVersion);
+
+        assert tableDescriptor != null : "tableId=" + tableId + ", 
catalogVersion=" + catalogVersion;
 
-        return tableView == null ? null : toTableDescriptor(tableView);
+        return tableDescriptor;
     }
 
-    private @Nullable CatalogZoneDescriptor getZoneDescriptor(int zoneId, int 
catalogVersion) {
-        return catalogService.zone(zoneId, catalogVersion);
+    private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor 
tableDescriptor, int catalogVersion) {
+        CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(tableDescriptor.zoneId(), catalogVersion);
+
+        assert zoneDescriptor != null :
+                "tableId=" + tableDescriptor.id() + ", zoneId=" + 
tableDescriptor.zoneId() + ", catalogVersion=" + catalogVersion;
+
+        return zoneDescriptor;
     }
 
     private static @Nullable TableImpl 
findTableImplByName(Collection<TableImpl> tables, String name) {
         return tables.stream().filter(table -> 
table.name().equals(name)).findAny().orElse(null);
     }
 
-    /**
-     * Fires table creation events so that indexes can be correctly created at 
IndexManager startup.
-     *
-     * <p>NOTE: This is a temporary solution that must be get 
rid/remake/change.
-     */
-    // TODO: IGNITE-19499 Need to get rid/remake/change
-    private void fireCreateTablesOnManagerStart() {
+    private void startTables() {
         CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
 
         assert recoveryFinishFuture.isDone();
 
+        int catalogVersion = catalogService.latestCatalogVersion();
         long causalityToken = recoveryFinishFuture.join();
 
-        List<CompletableFuture<?>> fireEventFutures = new ArrayList<>();
+        List<CompletableFuture<?>> startTableFutures = new ArrayList<>();
 
-        for (TableView tableView : tablesCfg.tables().value()) {
-            fireEventFutures.add(fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableView.id())));
+        for (CatalogTableDescriptor tableDescriptor : 
catalogService.tables(catalogVersion)) {

Review Comment:
   Same as for DZM. We should also cleanup tables that were removed. Fill free 
to add new ticket with corresponding TODO.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1233,20 +1130,77 @@ public boolean 
removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li
         return assignmentsChangeListeners.remove(listener);
     }
 
+    /**
+     * Creates local structures for a table.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version on which the table was created.
+     * @param tableDescriptor Catalog table descriptor.
+     * @return Future that will be completed when local changes related to the 
table creation are applied.
+     */
+    private CompletableFuture<?> createTableLocally(long causalityToken, int 
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+        int tableId = tableDescriptor.id();
+
+        if (!busyLock.enterBusy()) {
+            fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
+
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            int zoneId = tableDescriptor.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
+
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
+
+            // Check if the table already has assignments in the vault.
+            // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
+            if (partitionAssignments(vaultManager, tableId, 0) != null) {

Review Comment:
   This logic is incorrect, could you please add todo with 
https://issues.apache.org/jira/browse/IGNITE-20210 here?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java:
##########
@@ -608,6 +600,12 @@ private class Node {
         /** The future have to be complete after the node start and all Meta 
storage watches are deployd. */
         private CompletableFuture<Void> deployWatchesFut;
 
+        /** Hybrid clock. */
+        private final HybridClock clock = new HybridClockImpl();

Review Comment:
   I agree that sharing clock is a bug, however I don't think that in this 
particular case it is sharing. Seems that Kirill just moved clock instantiation 
from the constructor to the field initialization. In both cases it's node 
specific. 



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -198,7 +197,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private static final int FUTURE_SCHEMA_ROW_INDEXED_VALUE = 0;
 
     /** Table id. */
-    private final int tblId = 1;
+    private static final int tblId = 1;

Review Comment:
   UPPER case. 



##########
modules/table/build.gradle:
##########
@@ -66,6 +66,7 @@ dependencies {
     testImplementation(testFixtures(project(':ignite-metastorage')))
     testImplementation(testFixtures(project(':ignite-distribution-zones')))
     testImplementation(testFixtures(project(':ignite-catalog')))
+    testImplementation(testFixtures(project(':ignite-vault')))

Review Comment:
   Why?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1233,20 +1130,77 @@ public boolean 
removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li
         return assignmentsChangeListeners.remove(listener);
     }
 
+    /**
+     * Creates local structures for a table.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version on which the table was created.
+     * @param tableDescriptor Catalog table descriptor.
+     * @return Future that will be completed when local changes related to the 
table creation are applied.
+     */
+    private CompletableFuture<?> createTableLocally(long causalityToken, int 
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+        int tableId = tableDescriptor.id();
+
+        if (!busyLock.enterBusy()) {
+            fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
+
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            int zoneId = tableDescriptor.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
+
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
+
+            // Check if the table already has assignments in the vault.
+            // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
+            if (partitionAssignments(vaultManager, tableId, 0) != null) {
+                assignmentsFuture = 
completedFuture(tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions()));
+            } else {
+                assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, zoneId)
+                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
+                                dataNodes,
+                                zoneDescriptor.partitions(),
+                                zoneDescriptor.replicas()
+                        ));
+            }
+
+            return createTableLocally(
+                    causalityToken,
+                    tableDescriptor,
+                    zoneDescriptor,
+                    assignmentsFuture,
+                    catalogVersion
+            ).whenComplete((v, e) -> {
+                if (e == null) {
+                    for (var listener : assignmentsChangeListeners) {
+                        listener.accept(this);
+                    }
+                }
+            }).thenCompose(ignored -> 
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));

Review Comment:
   > @sanpwc wdyt?
   > Also, @sanpwc, does the order look correct? or maybe we should write to 
metastorage prior to notification?
   
   I think yes. Generally speaking I believe that notification flow logic 
should be reworked because we have catalog with corresponding catalog events 
now. Let me repeat aforementioned questions here: 
   1. Who listens table specific events events now?
   2. Should it listen catalog events instead?
   
   I also agree that it should be covered within separate ticket if we will 
agree on necessity of such changes.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -245,37 +222,36 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Hybrid clock. */
     private final HybridClock clock = new HybridClockImpl();
 
+    /** Catalog vault. */
+    private VaultManager catalogVault;

Review Comment:
   First of all, why do you guys need vault here? MS doesn't use it to store 
key projections any longer. Seems that it's only used for persisting applied 
revision which is also effectively deprecated. 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1463,408 +1416,51 @@ private void dropTableLocally(long causalityToken, 
CatalogTableDescriptor tableD
     }
 
     private Set<Assignment> calculateAssignments(TablePartitionId 
tablePartitionId) {
-        CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tablePartitionId.tableId());
+        int catalogVersion = catalogService.latestCatalogVersion();
 
-        assert tableDescriptor != null : tablePartitionId;
+        CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tablePartitionId.tableId(), catalogVersion);
 
         return AffinityUtils.calculateAssignmentForPartition(
                 // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 we 
must use distribution zone keys here
                 
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
                 tablePartitionId.partitionId(),
-                getZoneDescriptor(tableDescriptor.zoneId(), 
catalogService.latestCatalogVersion()).replicas()
+                getZoneDescriptor(tableDescriptor, catalogVersion).replicas()
         );
     }
 
-    /**
-     * Creates a new table with the given {@code name} asynchronously. If a 
table with the same name already exists, a future will be
-     * completed with {@link TableAlreadyExistsException}.
-     *
-     * @param name Table name.
-     * @param zoneName Distribution zone name.
-     * @param tableInitChange Table changer.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
-     *         <ul>
-     *             <li>the node is stopping.</li>
-     *         </ul>
-     * @see TableAlreadyExistsException
-     */
-    public CompletableFuture<Table> createTableAsync(String name, String 
zoneName, Consumer<TableChange> tableInitChange) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return createTableAsyncInternal(name, zoneName, tableInitChange);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /** See {@link #createTableAsync(String, String, Consumer)} for details. */
-    private CompletableFuture<Table> createTableAsyncInternal(
-            String name,
-            String zoneName,
-            Consumer<TableChange> tableInitChange
-    ) {
-        CompletableFuture<Table> tblFut = new CompletableFuture<>();
-
-        tableAsyncInternal(name)
-                .handle((tbl, tblEx) -> {
-                    if (tbl != null) {
-                        tblFut.completeExceptionally(new 
TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name));
-                    } else if (tblEx != null) {
-                        tblFut.completeExceptionally(tblEx);
-                    } else {
-                        if (!busyLock.enterBusy()) {
-                            NodeStoppingException nodeStoppingException = new 
NodeStoppingException();
-
-                            
tblFut.completeExceptionally(nodeStoppingException);
-
-                            throw new IgniteException(nodeStoppingException);
-                        }
-
-                        try {
-                            // TODO: IGNITE-19499 Should listen to event 
CreateTableEventParameters and get the zone ID from it
-                            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zoneName, clock.nowLong());
-
-                            if (zoneDescriptor == null) {
-                                tblFut.completeExceptionally(new 
DistributionZoneNotFoundException(zoneName));
-
-                                return null;
-                            }
-
-                            cmgMgr.logicalTopology()
-                                    .handle((cmgTopology, e) -> {
-                                        if (e == null) {
-                                            if (!busyLock.enterBusy()) {
-                                                NodeStoppingException 
nodeStoppingException = new NodeStoppingException();
-
-                                                
tblFut.completeExceptionally(nodeStoppingException);
-
-                                                throw new 
IgniteException(nodeStoppingException);
-                                            }
-
-                                            try {
-                                                
changeTablesConfigurationOnTableCreate(
-                                                        name,
-                                                        zoneDescriptor.id(),
-                                                        tableInitChange,
-                                                        tblFut
-                                                );
-                                            } finally {
-                                                busyLock.leaveBusy();
-                                            }
-                                        } else {
-                                            tblFut.completeExceptionally(e);
-                                        }
-
-                                        return null;
-                                    });
-                        } catch (Throwable t) {
-                            tblFut.completeExceptionally(t);
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
-                    }
-
-                    return null;
-                });
-
-        return tblFut;
-    }
-
-    /**
-     * Creates a new table in {@link TablesConfiguration}.
-     *
-     * @param name Table name.
-     * @param zoneId Distribution zone id.
-     * @param tableInitChange Table changer.
-     * @param tblFut Future representing pending completion of the table 
creation.
-     */
-    private void changeTablesConfigurationOnTableCreate(
-            String name,
-            int zoneId,
-            Consumer<TableChange> tableInitChange,
-            CompletableFuture<Table> tblFut
-    ) {
-        tablesCfg.change(tablesChange -> {
-            incrementTablesGeneration(tablesChange);
-
-            tablesChange.changeTables(tablesListChange -> {
-                if (tablesListChange.get(name) != null) {
-                    throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, 
name);
-                }
-
-                tablesListChange.create(name, (tableChange) -> {
-                    tableInitChange.accept(tableChange);
-
-                    tableChange.changeZoneId(zoneId);
-
-                    var extConfCh = ((ExtendedTableChange) tableChange);
-
-                    int tableId = tablesChange.globalIdCounter() + 1;
-
-                    extConfCh.changeId(tableId);
-
-                    tablesChange.changeGlobalIdCounter(tableId);
-
-                    extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
-
-                    tableCreateFuts.put(extConfCh.id(), tblFut);
-                });
-            });
-        }).exceptionally(t -> {
-            Throwable ex = getRootCause(t);
-
-            if (ex instanceof TableAlreadyExistsException) {
-                tblFut.completeExceptionally(ex);
-            } else {
-                LOG.debug("Unable to create table [name={}]", ex, name);
-
-                tblFut.completeExceptionally(ex);
-
-                tableCreateFuts.values().removeIf(fut -> fut == tblFut);
-            }
-
-            return null;
-        });
-    }
-
-    private static void incrementTablesGeneration(TablesChange tablesChange) {
-        tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 
1);
-    }
-
-    /**
-     * Alters a cluster table. If an appropriate table does not exist, a 
future will be completed with {@link TableNotFoundException}.
-     *
-     * @param name Table name.
-     * @param tableChange Table changer.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
-     *         <ul>
-     *             <li>the node is stopping.</li>
-     *         </ul>
-     * @see TableNotFoundException
-     */
-    public CompletableFuture<Void> alterTableAsync(String name, 
Function<TableChange, Boolean> tableChange) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return alterTableAsyncInternal(name, tableChange);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /** See {@link #alterTableAsync(String, Function)} for details. */
-    private CompletableFuture<Void> alterTableAsyncInternal(String name, 
Function<TableChange, Boolean> tableChange) {
-        CompletableFuture<Void> tblFut = new CompletableFuture<>();
-
-        tableAsync(name).thenAccept(tbl -> {
-            if (tbl == null) {
-                tblFut.completeExceptionally(new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
-            } else {
-                tablesCfg.tables().change(ch -> {
-                    if (ch.get(name) == null) {
-                        throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, 
name);
-                    }
-
-                    ch.update(name, tblCh -> {
-                        if (!tableChange.apply(tblCh)) {
-                            return;
-                        }
-
-                        ExtendedTableChange exTblChange = 
(ExtendedTableChange) tblCh;
-
-                        exTblChange.changeSchemaId(exTblChange.schemaId() + 1);
-                    });
-                }).whenComplete((res, t) -> {
-                    if (t != null) {
-                        Throwable ex = getRootCause(t);
-
-                        if (ex instanceof TableNotFoundException) {
-                            tblFut.completeExceptionally(ex);
-                        } else {
-                            LOG.debug("Unable to modify table [name={}]", ex, 
name);
-
-                            tblFut.completeExceptionally(ex);
-                        }
-                    } else {
-                        tblFut.complete(res);
-                    }
-                });
-            }
-        }).exceptionally(th -> {
-            tblFut.completeExceptionally(th);
-
-            return null;
-        });
-
-        return tblFut;
-    }
-
-    /**
-     * Gets a cause exception for a client.
-     *
-     * @param t Exception wrapper.
-     * @return A root exception which will be acceptable to throw for public 
API.
-     */
-    //TODO: IGNITE-16051 Implement exception converter for public API.
-    private IgniteException getRootCause(Throwable t) {
-        Throwable ex;
-
-        if (t instanceof CompletionException) {
-            if (t.getCause() instanceof ConfigurationChangeException) {
-                ex = t.getCause().getCause();
-            } else {
-                ex = t.getCause();
-            }
-
-        } else {
-            ex = t;
-        }
-
-        // TODO https://issues.apache.org/jira/browse/IGNITE-19539
-        return (ex instanceof IgniteException) ? (IgniteException) ex : 
ExceptionUtils.wrap(ex);
-    }
-
-    /**
-     * Drops a table with the name specified. If appropriate table does not be 
found, a future will be completed with
-     * {@link TableNotFoundException}.
-     *
-     * @param name Table name.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
-     *         <ul>
-     *             <li>the node is stopping.</li>
-     *         </ul>
-     * @see TableNotFoundException
-     */
-    public CompletableFuture<Void> dropTableAsync(String name) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return dropTableAsyncInternal(name);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /** See {@link #dropTableAsync(String)} for details. */
-    private CompletableFuture<Void> dropTableAsyncInternal(String name) {
-        return tableAsyncInternal(name).thenCompose(tbl -> {
-            // In case of drop it's an optimization that allows not to fire 
drop-change-closure if there's no such
-            // distributed table and the local config has lagged behind.
-            if (tbl == null) {
-                return failedFuture(new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
-            }
-
-            return tablesCfg
-                    .change(chg -> {
-                        incrementTablesGeneration(chg);
-
-                        chg
-                                .changeTables(tblChg -> {
-                                    if (tblChg.get(name) == null) {
-                                        throw new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
-                                    }
-
-                                    tblChg.delete(name);
-                                });
-                    })
-                    .exceptionally(t -> {
-                        Throwable ex = getRootCause(t);
-
-                        if (!(ex instanceof TableNotFoundException)) {
-                            LOG.debug("Unable to drop table [name={}]", ex, 
name);
-                        }
-
-                        throw new CompletionException(ex);
-                    });
-        });
-    }
-
-    /** {@inheritDoc} */
     @Override
     public List<Table> tables() {
         return join(tablesAsync());
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<List<Table>> tablesAsync() {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return tablesAsyncInternal();
-        } finally {
-            busyLock.leaveBusy();
-        }
+        return inBusyLockAsync(busyLock, this::tablesAsyncInternalBusy);
     }
 
-    /**
-     * Internal method for getting table.
-     *
-     * @return Future representing pending completion of the operation.
-     */
-    private CompletableFuture<List<Table>> tablesAsyncInternal() {
-        return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), 
ioExecutor)
-                .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
-                    var tableFuts = new CompletableFuture[tableIds.size()];
-
-                    var i = 0;
+    private CompletableFuture<List<Table>> tablesAsyncInternalBusy() {
+        HybridTimestamp now = clock.now();
 
-                    for (int tblId : tableIds) {
-                        tableFuts[i++] = tableAsyncInternal(tblId, false);
-                    }
-
-                    return allOf(tableFuts).thenApply(unused -> 
inBusyLock(busyLock, () -> {
-                        var tables = new ArrayList<Table>(tableIds.size());
-
-                        for (var fut : tableFuts) {
-                            var table = fut.join();
-
-                            if (table != null) {
-                                tables.add((Table) table);
-                            }
-                        }
+        return anyOf(schemaSyncService.waitForMetadataCompleteness(now), 
stopManagerFuture)

Review Comment:
   No, why would we? Probably I am missing something. SchemaSyncService is not 
the only actor that we are touching here, TableManager internals are also 
involved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to