korlov42 commented on code in PR #1134:
URL: https://github.com/apache/ignite-3/pull/1134#discussion_r987683750


##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +87,76 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {

Review Comment:
   what about recovery after restart?



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java:
##########
@@ -139,4 +147,103 @@ public static boolean equalSchemas(SchemaDescriptor exp, 
SchemaDescriptor actual
 
         return true;
     }
+
+    /**
+     * Forms schema history key.
+     *
+     * @param tblId Table id.
+     * @param ver Schema version.
+     * @return {@link ByteArray} representation.
+     */
+    public static ByteArray schemaWithVerHistKey(UUID tblId, int ver) {
+        return ByteArray.fromString(tblId + SCHEMA_STORE_PREDICATE + ver);
+    }
+
+    /**
+     * Forms schema history predicate.
+     *
+     * @param tblId Table id.
+     * @return {@link ByteArray} representation.
+     */
+    public static ByteArray schemaHistPredicate(UUID tblId) {
+        return ByteArray.fromString(tblId + SCHEMA_STORE_PREDICATE);
+    }
+
+    /**
+     * Gets the latest version of the table schema which available in 
Metastore.
+     *
+     * @param tblId Table id.
+     * @param metastorageMgr Metastorage manager.
+     * @return The latest schema version.
+     */
+    public static int latestSchemaVersion(MetaStorageManager metastorageMgr, 
UUID tblId) {
+        try {
+            Cursor<Entry> cur = 
metastorageMgr.prefix(schemaHistPredicate(tblId));
+
+            int lastVer = INITIAL_SCHEMA_VERSION;
+
+            for (Entry ent : cur) {
+                String key = ent.key().toString();
+                int pos = key.indexOf(':');
+                assert pos != -1 : "Unexpected key: " + key;
+
+                key = key.substring(pos + 1);
+                int descVer = Integer.parseInt(key);
+
+                if (descVer > lastVer) {
+                    lastVer = descVer;
+                }
+            }
+
+            return lastVer;
+        } catch (NoSuchElementException e) {
+            assert false : "Table must exist. [tableId=" + tblId + ']';
+
+            return INITIAL_SCHEMA_VERSION;
+        } catch (NodeStoppingException e) {
+            throw new IgniteException(e.traceId(), e.code(), e.getMessage(), 
e);
+        }
+    }
+
+    /**
+     * Gets the latest serialized schema of the table which available in 
Metastore.
+     *
+     * @param tblId Table id.
+     * @param metastorageMgr Metastorage manager.
+     * @return The latest schema version or {@code null} if not found.
+     */
+    public static @Nullable byte[] schemaById(MetaStorageManager 
metastorageMgr, UUID tblId, int ver) {

Review Comment:
   ```suggestion
       public static byte @Nullable [] schemaById(MetaStorageManager 
metastorageMgr, UUID tblId, int ver) {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1226,50 +1205,20 @@ private CompletableFuture<Void> 
alterTableAsyncInternal(String name, Consumer<Ta
             if (tbl == null) {
                 tblFut.completeExceptionally(new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
             } else {
-                TableImpl tblImpl = (TableImpl) tbl;
-
                 tablesCfg.tables().change(ch -> {
                     if (ch.get(name) == null) {
                         throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, 
name);
                     }
 
                     ch.update(name, tblCh -> {
-                                tableChange.accept(tblCh);
-
-                                ((ExtendedTableChange) 
tblCh).changeSchemas(schemasCh ->
-                                        
schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
-                                            ExtendedTableView currTableView = 
(ExtendedTableView) tablesCfg.tables().get(name).value();
-
-                                            SchemaDescriptor descriptor;
-
-                                            //TODO IGNITE-15747 Remove 
try-catch and force configuration validation
-                                            // here to ensure a valid 
configuration passed to prepareSchemaDescriptor() method.
-                                            try {
-                                                descriptor = 
SchemaUtils.prepareSchemaDescriptor(
-                                                        ((ExtendedTableView) 
tblCh).schemas().size(),
-                                                        tblCh);
-
-                                                
descriptor.columnMapping(SchemaUtils.columnMapper(
-                                                        
tblImpl.schemaView().schema(currTableView.schemas().size()),
-                                                        
currTableView.columns(),
-                                                        descriptor,
-                                                        tblCh.columns()));
-                                            } catch (IllegalArgumentException 
ex) {
-                                                // Convert unexpected 
exceptions here,
-                                                // because validation actually 
happens later,
-                                                // when bulk configuration 
update is applied.
-                                                
ConfigurationValidationException e =
-                                                        new 
ConfigurationValidationException(ex.getMessage());
-
-                                                e.addSuppressed(ex);
-
-                                                throw e;
-                                            }
-
-                                            
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
-                                        }));
-                            }
-                    );
+                        tableChange.accept(tblCh);
+
+                        ExtendedTableChange exTblChange = 
(ExtendedTableChange) tblCh;
+
+                        int lastSchemaVer = 
latestSchemaVersion(metaStorageMgr, exTblChange.id());
+
+                        exTblChange.changeSchemaId(lastSchemaVer + 1);

Review Comment:
   ```suggestion
                           exTblChange.changeSchemaId(exTblChange.schemaId() + 
1);
   ```
   Can we do this? 



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
-            }
-        });
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
+
+            int verFromUpdate = tblCfg.schemaId();
+
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            byte[] curSchemaDesc = schemaById(metastorageMgr, tblId, 
verFromUpdate);
 
-            UUID tblId = tblCfg.id().value();
+            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+                byte[] oldSchemaSerialized = schemaById(metastorageMgr, tblId, 
verFromUpdate - 1);
+                assert oldSchemaSerialized != null;
+                SchemaDescriptor oldSchema = 
SchemaSerializerImpl.INSTANCE.deserialize(oldSchemaSerialized);
 
-            String tableName = tblCfg.name().value();
+                NamedListView<ColumnView> oldCols = ctx.oldValue();
+                NamedListView<ColumnView> newCols = ctx.newValue();
 
-            SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+                schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+                        oldSchema,
+                        oldCols,
+                        schemaDescFromUpdate,
+                        newCols));
+            }
+
+            long causalityToken = ctx.storageRevision();
 
-            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
 
             registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
-                    () -> fireEvent(SchemaEvent.CREATE, new 
SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+                    () -> fireEvent(SchemaEvent.CREATE, new 
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+
+            if (curSchemaDesc == null) {
+                try {
+                    byte[] serialized = 
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate);
+
+                    createSchemaFut.thenCompose(t -> metastorageMgr.put(
+                            schemaWithVerHistKey(tblId, verFromUpdate), 
serialized)
+                    ).thenApply(t -> t);

Review Comment:
   `.thenApply(t -> t)` doesn't make sense



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
-            }
-        });
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
+
+            int verFromUpdate = tblCfg.schemaId();
+
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            byte[] curSchemaDesc = schemaById(metastorageMgr, tblId, 
verFromUpdate);
 
-            UUID tblId = tblCfg.id().value();
+            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+                byte[] oldSchemaSerialized = schemaById(metastorageMgr, tblId, 
verFromUpdate - 1);

Review Comment:
   Can we get the previous schema from registry?



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java:
##########
@@ -139,4 +147,103 @@ public static boolean equalSchemas(SchemaDescriptor exp, 
SchemaDescriptor actual
 
         return true;
     }
+
+    /**
+     * Forms schema history key.
+     *
+     * @param tblId Table id.
+     * @param ver Schema version.
+     * @return {@link ByteArray} representation.
+     */
+    public static ByteArray schemaWithVerHistKey(UUID tblId, int ver) {

Review Comment:
   I believe these new methods should be internals of SchemaManager, and we 
must not expose such information 



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
-            }
-        });
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
+
+            int verFromUpdate = tblCfg.schemaId();
+
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            byte[] curSchemaDesc = schemaById(metastorageMgr, tblId, 
verFromUpdate);

Review Comment:
   seems like we can avoid this read



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
-            }
-        });
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
+
+            int verFromUpdate = tblCfg.schemaId();
+
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            byte[] curSchemaDesc = schemaById(metastorageMgr, tblId, 
verFromUpdate);
 
-            UUID tblId = tblCfg.id().value();
+            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+                byte[] oldSchemaSerialized = schemaById(metastorageMgr, tblId, 
verFromUpdate - 1);
+                assert oldSchemaSerialized != null;
+                SchemaDescriptor oldSchema = 
SchemaSerializerImpl.INSTANCE.deserialize(oldSchemaSerialized);
 
-            String tableName = tblCfg.name().value();
+                NamedListView<ColumnView> oldCols = ctx.oldValue();
+                NamedListView<ColumnView> newCols = ctx.newValue();
 
-            SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+                schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+                        oldSchema,
+                        oldCols,
+                        schemaDescFromUpdate,
+                        newCols));
+            }
+
+            long causalityToken = ctx.storageRevision();
 
-            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
 
             registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
-                    () -> fireEvent(SchemaEvent.CREATE, new 
SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+                    () -> fireEvent(SchemaEvent.CREATE, new 
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+
+            if (curSchemaDesc == null) {
+                try {
+                    byte[] serialized = 
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate);
+
+                    createSchemaFut.thenCompose(t -> metastorageMgr.put(
+                            schemaWithVerHistKey(tblId, verFromUpdate), 
serialized)
+                    ).thenApply(t -> t);

Review Comment:
   Actually, this whole try - catch block is problematic. Why do you write the 
schema after the operation is considered completed? 



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -281,59 +336,34 @@ private boolean checkSchemaVersion(UUID tblId, int 
schemaVer) {
     }
 
     /**
-     * Checks that the schema is configured in the Metasorage consensus.
+     * Try to found schema in cache.

Review Comment:
   ```suggestion
        * Try to find schema in cache.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to