denis-chudov commented on a change in pull request #704:
URL: https://github.com/apache/ignite-3/pull/704#discussion_r820099130



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -465,13 +487,8 @@ private void createTableLocally(
             long causalityToken,
             String name,
             UUID tblId,
-            List<List<ClusterNode>> assignment,

Review comment:
       there is an old javadoc for assignments

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
     /** {@inheritDoc} */
     @Override
     public void start() {
-        tablesCfg.tables()
-                .listenElements(new ConfigurationNamedListListener<>() {
-                    @Override
-                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
-                        if (!busyLock.enterBusy()) {
-                            String tblName = ctx.newValue().name();
-                            UUID tblId = ((ExtendedTableView) 
ctx.newValue()).id();
-
-                            fireEvent(TableEvent.CREATE,
-                                    new 
TableEventParameters(ctx.storageRevision(), tblId, tblName),
-                                    new NodeStoppingException()
-                            );
-
-                            return CompletableFuture.failedFuture(new 
NodeStoppingException());
-                        }
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
+            @Override
+            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
 
-                        try {
-                            onTableCreateInternal(ctx);
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
+                long causalityToken = schemasCtx.storageRevision();
 
-                        return CompletableFuture.completedFuture(null);
-                    }
+                ExtendedTableConfiguration tblCfg = 
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
 
-                    /**
-                     * Method for handle a table configuration event.
-                     *
-                     * @param ctx Configuration event.
-                     */
-                    private void 
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
-                        String tblName = ctx.newValue().name();
-                        UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
-
-                        // Empty assignments might be a valid case if tables 
are created from within cluster init HOCON
-                        // configuration, which is not supported now.
-                        assert ((ExtendedTableView) 
ctx.newValue()).assignments() != null :
-                                IgniteStringFormatter.format("Table [id={}, 
name={}] has empty assignments.", tblId, tblName);
-
-                        // TODO: IGNITE-16369 Listener with any placeholder 
should be used instead.
-                        ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName)).schemas()
-                                .listenElements(new 
ConfigurationNamedListListener<>() {
-                                    @Override
-                                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                                        long causalityToken = 
schemasCtx.storageRevision();
-
-                                        if (!busyLock.enterBusy()) {
-                                            fireEvent(
-                                                    TableEvent.ALTER,
-                                                    new 
TableEventParameters(causalityToken, tblId, tblName),
-                                                    new NodeStoppingException()
-                                            );
-
-                                            return 
CompletableFuture.failedFuture(new NodeStoppingException());
-                                        }
+                UUID tblId = tblCfg.id().value();
 
-                                        try {
-                                            // Avoid calling listener 
immediately after the listener completes to create the current table.
-                                            // FIXME: 
https://issues.apache.org/jira/browse/IGNITE-16369
-                                            if (ctx.storageRevision() != 
schemasCtx.storageRevision()) {
-                                                return 
tablesByIdVv.get(causalityToken).thenAccept(tablesById -> {
-                                                    TableImpl table = 
tablesById.get(tblId);
+                String tblName = tblCfg.name().value();
 
-                                                    ((SchemaRegistryImpl) 
table.schemaView())
-                                                            
.onSchemaRegistered(
-                                                                    
SchemaSerializerImpl.INSTANCE.deserialize(
-                                                                            
(schemasCtx.newValue().schema())));
+                SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
 
-                                                    
fireEvent(TableEvent.ALTER, new TableEventParameters(causalityToken,
-                                                            table), null);
+                if (!busyLock.enterBusy()) {
+                    if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+                        fireEvent(
+                                TableEvent.ALTER,
+                                new TableEventParameters(causalityToken, 
tblId, tblName),
+                                new NodeStoppingException()
+                        );
+                    }
 
-                                                });
-                                            }
+                    return CompletableFuture.failedFuture(new 
NodeStoppingException());
+                }
 
-                                            return 
CompletableFuture.completedFuture(null);
-                                        } catch (Exception e) {
-                                            fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, tblId,
-                                                    tblName), e);
+                try {
+                    tablesByIdVv.update(causalityToken, tablesById -> {
+                        TableImpl table = tablesById.get(tblId);
 
-                                            return 
CompletableFuture.failedFuture(e);
-                                        } finally {
-                                            busyLock.leaveBusy();
-                                        }
-                                    }
-                                });
-
-                        ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName)).assignments()
-                                .listen(assignmentsCtx -> {
-                                    if (!busyLock.enterBusy()) {
-                                        return 
CompletableFuture.failedFuture(new NodeStoppingException());
-                                    }
-
-                                    try {
-                                        // Avoid calling listener immediately 
after the listener completes to create the current table.
-                                        // FIXME: 
https://issues.apache.org/jira/browse/IGNITE-16369
-                                        if (ctx.storageRevision() == 
assignmentsCtx.storageRevision()) {
-                                            return 
CompletableFuture.completedFuture(null);
-                                        } else {
-                                            return 
updateAssignmentInternal(assignmentsCtx.storageRevision(), tblId, 
assignmentsCtx);
-                                        }
-                                    } finally {
-                                        busyLock.leaveBusy();
-                                    }
-                                });
-
-                        createTableLocally(
-                                ctx.storageRevision(),
-                                tblName,
-                                tblId,
-                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(((ExtendedTableView) ctx.newValue()).assignments()),
-                                
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView) 
ctx.newValue()).schemas()
-                                        
.get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
-                        );
-                    }
+                        ((SchemaRegistryImpl) 
table.schemaView()).onSchemaRegistered(schemaDescriptor);
 
-                    private CompletableFuture<?> updateAssignmentInternal(
-                            long causalityToken,
-                            UUID tblId,
-                            ConfigurationNotificationEvent<byte[]> 
assignmentsCtx
-                    ) {
-                        List<List<ClusterNode>> oldAssignments =
-                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(assignmentsCtx.oldValue());
-
-                        List<List<ClusterNode>> newAssignments =
-                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(assignmentsCtx.newValue());
-
-                        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[oldAssignments.size()];
-
-                        // TODO: IGNITE-15554 Add logic for assignment 
recalculation in case of partitions or replicas changes
-                        // TODO: Until IGNITE-15554 is implemented it's safe 
to iterate over partitions and replicas cause there will
-                        // TODO: be exact same amount of partitions and 
replicas for both old and new assignments
-                        for (int i = 0; i < oldAssignments.size(); i++) {
-                            int partId = i;
-
-                            List<ClusterNode> oldPartitionAssignment = 
oldAssignments.get(partId);
-                            List<ClusterNode> newPartitionAssignment = 
newAssignments.get(partId);
-
-                            var toAdd = new HashSet<>(newPartitionAssignment);
-
-                            toAdd.removeAll(oldPartitionAssignment);
-
-                            // Create new raft nodes according to new 
assignments.
-                            futures[i] = 
tablesByIdVv.get(causalityToken).thenCompose(tablesById -> {
-                                InternalTable internalTable = 
tablesById.get(tblId).internalTable();
-
-                                try {
-                                    return raftMgr.updateRaftGroup(
-                                            raftGroupName(tblId, partId),
-                                            newPartitionAssignment,
-                                            toAdd,
-                                            () -> new PartitionListener(tblId,
-                                                    new 
VersionedRowStore(internalTable.storage().getOrCreatePartition(partId),
-                                                            txManager))
-                                    ).thenAccept(
-                                            updatedRaftGroupService -> 
((InternalTableImpl) internalTable)
-                                                    
.updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
-                                    ).exceptionally(th -> {
-                                        LOG.error("Failed to update raft 
groups one the node", th);
-
-                                        return null;
-                                    });
-                                } catch (NodeStoppingException e) {
-                                    throw new AssertionError("Loza was stopped 
before Table manager", e);
-                                }
-                            });
+                        if (schemaDescriptor.version() != 
INITIAL_SCHEMA_VERSION) {
+                            fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, table), null);
                         }
 
-                        return CompletableFuture.allOf(futures);
+                        return tablesById;
+                    }, th -> {
+                        throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot create a schema 
for table"
+                                + " [tableId={}, schemaVer={}]", tblId, 
schemaDescriptor.version()), th);
+                    });
+
+                    return CompletableFuture.completedFuture(null);
+                } catch (Exception e) {
+                    if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+                        fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, tblId, tblName), e);
                     }
 
-                    @Override
-                    public CompletableFuture<?> onRename(String oldName, 
String newName, ConfigurationNotificationEvent<TableView> ctx) {
-                        // TODO: IGNITE-15485 Support table rename operation.
+                    return CompletableFuture.failedFuture(e);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
 
-                        return CompletableFuture.completedFuture(null);
-                    }
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().listen(assignmentsCtx -> {
+            long causalityToken = assignmentsCtx.storageRevision();
 
-                    @Override
-                    public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<TableView> ctx) {
-                        if (!busyLock.enterBusy()) {
-                            String tblName = ctx.oldValue().name();
-                            UUID tblId = ((ExtendedTableView) 
ctx.oldValue()).id();
+            ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) 
assignmentsCtx.config(TableConfiguration.class);
 
-                            fireEvent(
-                                    TableEvent.DROP,
-                                    new 
TableEventParameters(ctx.storageRevision(), tblId, tblName),
-                                    new NodeStoppingException()
-                            );
+            UUID tblId = tblCfg.id().value();
 
-                            return CompletableFuture.failedFuture(new 
NodeStoppingException());
-                        }
+            if (!busyLock.enterBusy()) {
+                return CompletableFuture.failedFuture(new 
NodeStoppingException());
+            }
 
-                        try {
-                            dropTableLocally(
-                                    ctx.storageRevision(),
-                                    ctx.oldValue().name(),
-                                    ((ExtendedTableView) ctx.oldValue()).id(),
-                                    (List<List<ClusterNode>>) 
ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
-                            );
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
+            try {
+                return updateAssignmentInternal(causalityToken, tblId, 
assignmentsCtx);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        });
 
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        tablesCfg.tables().listenElements(new 
ConfigurationNamedListListener<>() {
+            @Override
+            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
+                if (!busyLock.enterBusy()) {
+                    String tblName = ctx.newValue().name();
+                    UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
+
+                    fireEvent(TableEvent.CREATE,
+                            new TableEventParameters(ctx.storageRevision(), 
tblId, tblName),
+                            new NodeStoppingException()
+                    );
+
+                    return CompletableFuture.failedFuture(new 
NodeStoppingException());
+                }
+
+                try {
+                    onTableCreateInternal(ctx);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            /**
+             * Method for handle a table configuration event.
+             *
+             * @param ctx Configuration event.
+             */
+            private void 
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
+                String tblName = ctx.newValue().name();
+                UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
+
+                // Empty assignments might be a valid case if tables are 
created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView) ctx.newValue()).assignments() != 
null :
+                        IgniteStringFormatter.format("Table [id={}, name={}] 
has empty assignments.", tblId, tblName);
+
+                createTableLocally(
+                        ctx.storageRevision(),
+                        tblName,
+                        tblId,
+                        ((ExtendedTableView) ctx.newValue()).partitions()

Review comment:
       excessive cast

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
     /** {@inheritDoc} */
     @Override
     public void start() {
-        tablesCfg.tables()
-                .listenElements(new ConfigurationNamedListListener<>() {
-                    @Override
-                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
-                        if (!busyLock.enterBusy()) {
-                            String tblName = ctx.newValue().name();
-                            UUID tblId = ((ExtendedTableView) 
ctx.newValue()).id();
-
-                            fireEvent(TableEvent.CREATE,
-                                    new 
TableEventParameters(ctx.storageRevision(), tblId, tblName),
-                                    new NodeStoppingException()
-                            );
-
-                            return CompletableFuture.failedFuture(new 
NodeStoppingException());
-                        }
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
+            @Override
+            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
 
-                        try {
-                            onTableCreateInternal(ctx);
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
+                long causalityToken = schemasCtx.storageRevision();
 
-                        return CompletableFuture.completedFuture(null);
-                    }
+                ExtendedTableConfiguration tblCfg = 
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
 
-                    /**
-                     * Method for handle a table configuration event.
-                     *
-                     * @param ctx Configuration event.
-                     */
-                    private void 
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
-                        String tblName = ctx.newValue().name();
-                        UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
-
-                        // Empty assignments might be a valid case if tables 
are created from within cluster init HOCON
-                        // configuration, which is not supported now.
-                        assert ((ExtendedTableView) 
ctx.newValue()).assignments() != null :
-                                IgniteStringFormatter.format("Table [id={}, 
name={}] has empty assignments.", tblId, tblName);
-
-                        // TODO: IGNITE-16369 Listener with any placeholder 
should be used instead.
-                        ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName)).schemas()
-                                .listenElements(new 
ConfigurationNamedListListener<>() {
-                                    @Override
-                                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                                        long causalityToken = 
schemasCtx.storageRevision();
-
-                                        if (!busyLock.enterBusy()) {
-                                            fireEvent(
-                                                    TableEvent.ALTER,
-                                                    new 
TableEventParameters(causalityToken, tblId, tblName),
-                                                    new NodeStoppingException()
-                                            );
-
-                                            return 
CompletableFuture.failedFuture(new NodeStoppingException());
-                                        }
+                UUID tblId = tblCfg.id().value();
 
-                                        try {
-                                            // Avoid calling listener 
immediately after the listener completes to create the current table.
-                                            // FIXME: 
https://issues.apache.org/jira/browse/IGNITE-16369
-                                            if (ctx.storageRevision() != 
schemasCtx.storageRevision()) {
-                                                return 
tablesByIdVv.get(causalityToken).thenAccept(tablesById -> {
-                                                    TableImpl table = 
tablesById.get(tblId);
+                String tblName = tblCfg.name().value();
 
-                                                    ((SchemaRegistryImpl) 
table.schemaView())
-                                                            
.onSchemaRegistered(
-                                                                    
SchemaSerializerImpl.INSTANCE.deserialize(
-                                                                            
(schemasCtx.newValue().schema())));
+                SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
 
-                                                    
fireEvent(TableEvent.ALTER, new TableEventParameters(causalityToken,
-                                                            table), null);
+                if (!busyLock.enterBusy()) {
+                    if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+                        fireEvent(
+                                TableEvent.ALTER,
+                                new TableEventParameters(causalityToken, 
tblId, tblName),
+                                new NodeStoppingException()
+                        );

Review comment:
       why don't we need to wait for versioned value updates?

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
     /** {@inheritDoc} */
     @Override
     public void start() {
-        tablesCfg.tables()
-                .listenElements(new ConfigurationNamedListListener<>() {
-                    @Override
-                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
-                        if (!busyLock.enterBusy()) {
-                            String tblName = ctx.newValue().name();
-                            UUID tblId = ((ExtendedTableView) 
ctx.newValue()).id();
-
-                            fireEvent(TableEvent.CREATE,
-                                    new 
TableEventParameters(ctx.storageRevision(), tblId, tblName),
-                                    new NodeStoppingException()
-                            );
-
-                            return CompletableFuture.failedFuture(new 
NodeStoppingException());
-                        }
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
+            @Override
+            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
 
-                        try {
-                            onTableCreateInternal(ctx);
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
+                long causalityToken = schemasCtx.storageRevision();
 
-                        return CompletableFuture.completedFuture(null);
-                    }
+                ExtendedTableConfiguration tblCfg = 
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);

Review comment:
       seems the cast is excessive

##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
     /** {@inheritDoc} */
     @Override
     public void start() {
-        tablesCfg.tables()
-                .listenElements(new ConfigurationNamedListListener<>() {
-                    @Override
-                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
-                        if (!busyLock.enterBusy()) {
-                            String tblName = ctx.newValue().name();
-                            UUID tblId = ((ExtendedTableView) 
ctx.newValue()).id();
-
-                            fireEvent(TableEvent.CREATE,
-                                    new 
TableEventParameters(ctx.storageRevision(), tblId, tblName),
-                                    new NodeStoppingException()
-                            );
-
-                            return CompletableFuture.failedFuture(new 
NodeStoppingException());
-                        }
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
+            @Override
+            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
 
-                        try {
-                            onTableCreateInternal(ctx);
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
+                long causalityToken = schemasCtx.storageRevision();
 
-                        return CompletableFuture.completedFuture(null);
-                    }
+                ExtendedTableConfiguration tblCfg = 
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
 
-                    /**
-                     * Method for handle a table configuration event.
-                     *
-                     * @param ctx Configuration event.
-                     */
-                    private void 
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
-                        String tblName = ctx.newValue().name();
-                        UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
-
-                        // Empty assignments might be a valid case if tables 
are created from within cluster init HOCON
-                        // configuration, which is not supported now.
-                        assert ((ExtendedTableView) 
ctx.newValue()).assignments() != null :
-                                IgniteStringFormatter.format("Table [id={}, 
name={}] has empty assignments.", tblId, tblName);
-
-                        // TODO: IGNITE-16369 Listener with any placeholder 
should be used instead.
-                        ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName)).schemas()
-                                .listenElements(new 
ConfigurationNamedListListener<>() {
-                                    @Override
-                                    public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                                        long causalityToken = 
schemasCtx.storageRevision();
-
-                                        if (!busyLock.enterBusy()) {
-                                            fireEvent(
-                                                    TableEvent.ALTER,
-                                                    new 
TableEventParameters(causalityToken, tblId, tblName),
-                                                    new NodeStoppingException()
-                                            );
-
-                                            return 
CompletableFuture.failedFuture(new NodeStoppingException());
-                                        }
+                UUID tblId = tblCfg.id().value();
 
-                                        try {
-                                            // Avoid calling listener 
immediately after the listener completes to create the current table.
-                                            // FIXME: 
https://issues.apache.org/jira/browse/IGNITE-16369
-                                            if (ctx.storageRevision() != 
schemasCtx.storageRevision()) {
-                                                return 
tablesByIdVv.get(causalityToken).thenAccept(tablesById -> {
-                                                    TableImpl table = 
tablesById.get(tblId);
+                String tblName = tblCfg.name().value();
 
-                                                    ((SchemaRegistryImpl) 
table.schemaView())
-                                                            
.onSchemaRegistered(
-                                                                    
SchemaSerializerImpl.INSTANCE.deserialize(
-                                                                            
(schemasCtx.newValue().schema())));
+                SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
 
-                                                    
fireEvent(TableEvent.ALTER, new TableEventParameters(causalityToken,
-                                                            table), null);
+                if (!busyLock.enterBusy()) {
+                    if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+                        fireEvent(
+                                TableEvent.ALTER,
+                                new TableEventParameters(causalityToken, 
tblId, tblName),
+                                new NodeStoppingException()
+                        );
+                    }
 
-                                                });
-                                            }
+                    return CompletableFuture.failedFuture(new 
NodeStoppingException());
+                }
 
-                                            return 
CompletableFuture.completedFuture(null);
-                                        } catch (Exception e) {
-                                            fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, tblId,
-                                                    tblName), e);
+                try {
+                    tablesByIdVv.update(causalityToken, tablesById -> {
+                        TableImpl table = tablesById.get(tblId);
 
-                                            return 
CompletableFuture.failedFuture(e);
-                                        } finally {
-                                            busyLock.leaveBusy();
-                                        }
-                                    }
-                                });
-
-                        ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName)).assignments()
-                                .listen(assignmentsCtx -> {
-                                    if (!busyLock.enterBusy()) {
-                                        return 
CompletableFuture.failedFuture(new NodeStoppingException());
-                                    }
-
-                                    try {
-                                        // Avoid calling listener immediately 
after the listener completes to create the current table.
-                                        // FIXME: 
https://issues.apache.org/jira/browse/IGNITE-16369
-                                        if (ctx.storageRevision() == 
assignmentsCtx.storageRevision()) {
-                                            return 
CompletableFuture.completedFuture(null);
-                                        } else {
-                                            return 
updateAssignmentInternal(assignmentsCtx.storageRevision(), tblId, 
assignmentsCtx);
-                                        }
-                                    } finally {
-                                        busyLock.leaveBusy();
-                                    }
-                                });
-
-                        createTableLocally(
-                                ctx.storageRevision(),
-                                tblName,
-                                tblId,
-                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(((ExtendedTableView) ctx.newValue()).assignments()),
-                                
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView) 
ctx.newValue()).schemas()
-                                        
.get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
-                        );
-                    }
+                        ((SchemaRegistryImpl) 
table.schemaView()).onSchemaRegistered(schemaDescriptor);
 
-                    private CompletableFuture<?> updateAssignmentInternal(
-                            long causalityToken,
-                            UUID tblId,
-                            ConfigurationNotificationEvent<byte[]> 
assignmentsCtx
-                    ) {
-                        List<List<ClusterNode>> oldAssignments =
-                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(assignmentsCtx.oldValue());
-
-                        List<List<ClusterNode>> newAssignments =
-                                (List<List<ClusterNode>>) 
ByteUtils.fromBytes(assignmentsCtx.newValue());
-
-                        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[oldAssignments.size()];
-
-                        // TODO: IGNITE-15554 Add logic for assignment 
recalculation in case of partitions or replicas changes
-                        // TODO: Until IGNITE-15554 is implemented it's safe 
to iterate over partitions and replicas cause there will
-                        // TODO: be exact same amount of partitions and 
replicas for both old and new assignments
-                        for (int i = 0; i < oldAssignments.size(); i++) {
-                            int partId = i;
-
-                            List<ClusterNode> oldPartitionAssignment = 
oldAssignments.get(partId);
-                            List<ClusterNode> newPartitionAssignment = 
newAssignments.get(partId);
-
-                            var toAdd = new HashSet<>(newPartitionAssignment);
-
-                            toAdd.removeAll(oldPartitionAssignment);
-
-                            // Create new raft nodes according to new 
assignments.
-                            futures[i] = 
tablesByIdVv.get(causalityToken).thenCompose(tablesById -> {
-                                InternalTable internalTable = 
tablesById.get(tblId).internalTable();
-
-                                try {
-                                    return raftMgr.updateRaftGroup(
-                                            raftGroupName(tblId, partId),
-                                            newPartitionAssignment,
-                                            toAdd,
-                                            () -> new PartitionListener(tblId,
-                                                    new 
VersionedRowStore(internalTable.storage().getOrCreatePartition(partId),
-                                                            txManager))
-                                    ).thenAccept(
-                                            updatedRaftGroupService -> 
((InternalTableImpl) internalTable)
-                                                    
.updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
-                                    ).exceptionally(th -> {
-                                        LOG.error("Failed to update raft 
groups one the node", th);
-
-                                        return null;
-                                    });
-                                } catch (NodeStoppingException e) {
-                                    throw new AssertionError("Loza was stopped 
before Table manager", e);
-                                }
-                            });
+                        if (schemaDescriptor.version() != 
INITIAL_SCHEMA_VERSION) {
+                            fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, table), null);
                         }
 
-                        return CompletableFuture.allOf(futures);
+                        return tablesById;
+                    }, th -> {
+                        throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot create a schema 
for table"
+                                + " [tableId={}, schemaVer={}]", tblId, 
schemaDescriptor.version()), th);
+                    });
+
+                    return CompletableFuture.completedFuture(null);
+                } catch (Exception e) {
+                    if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+                        fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, tblId, tblName), e);
                     }
 
-                    @Override
-                    public CompletableFuture<?> onRename(String oldName, 
String newName, ConfigurationNotificationEvent<TableView> ctx) {
-                        // TODO: IGNITE-15485 Support table rename operation.
+                    return CompletableFuture.failedFuture(e);
+                } finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
 
-                        return CompletableFuture.completedFuture(null);
-                    }
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().listen(assignmentsCtx -> {
+            long causalityToken = assignmentsCtx.storageRevision();
 
-                    @Override
-                    public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<TableView> ctx) {
-                        if (!busyLock.enterBusy()) {
-                            String tblName = ctx.oldValue().name();
-                            UUID tblId = ((ExtendedTableView) 
ctx.oldValue()).id();
+            ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration) 
assignmentsCtx.config(TableConfiguration.class);

Review comment:
       excessive cast




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to