korlov42 commented on code in PR #1134:
URL: https://github.com/apache/ignite-3/pull/1134#discussion_r994485435


##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +80,115 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
+        for (String tblName : tablesCfg.tables().value().namedListKeys()) {
+            ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName));
+            UUID tblId = tblCfg.id().value();
+
+            Map<Integer, byte[]> schemas = collectAllSchemas(tblId);
+
+            byte[] serialized;
+
+            if (schemas.size() > 1) {
+                for (Map.Entry<Integer, byte[]> ent : schemas.entrySet()) {
+                    serialized = ent.getValue();
+
+                    SchemaDescriptor desc = 
SchemaSerializerImpl.INSTANCE.deserialize(serialized);
+
+                    createSchema(0, tblId, tblName, desc).join();
+                }
+
+                registriesVv.complete(0);
+            } else {
+                serialized = schemas.get(INITIAL_SCHEMA_VERSION);
+
+                assert serialized != null;
             }
-        });
+        }
+
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            int verFromUpdate = tblCfg.schemaId();
 
-            UUID tblId = tblCfg.id().value();
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
+
+            if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) 
!= null) {
+                return completedFuture(null);
+            }
+
+            byte[] curSchemaDesc = schemaById(tblId, verFromUpdate);
+
+            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+                SchemaDescriptor oldSchema = searchSchemaByVersion(tblId, 
verFromUpdate - 1);
+                assert oldSchema != null;
 
-            String tableName = tblCfg.name().value();
+                NamedListView<ColumnView> oldCols = ctx.oldValue();
+                NamedListView<ColumnView> newCols = ctx.newValue();
 
-            SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+                schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+                        oldSchema,
+                        oldCols,
+                        schemaDescFromUpdate,
+                        newCols));
+            }
+
+            long causalityToken = ctx.storageRevision();
+
+            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
+
+            if (curSchemaDesc == null) {
+                try {
+                    byte[] serialized = 
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate);
 
-            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+                    createSchemaFut.thenCompose(t -> metastorageMgr.put(

Review Comment:
   you need to CAS here instead of `put`



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +80,115 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
+        for (String tblName : tablesCfg.tables().value().namedListKeys()) {
+            ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName));
+            UUID tblId = tblCfg.id().value();
+
+            Map<Integer, byte[]> schemas = collectAllSchemas(tblId);
+
+            byte[] serialized;
+
+            if (schemas.size() > 1) {
+                for (Map.Entry<Integer, byte[]> ent : schemas.entrySet()) {
+                    serialized = ent.getValue();
+
+                    SchemaDescriptor desc = 
SchemaSerializerImpl.INSTANCE.deserialize(serialized);
+
+                    createSchema(0, tblId, tblName, desc).join();
+                }
+
+                registriesVv.complete(0);
+            } else {
+                serialized = schemas.get(INITIAL_SCHEMA_VERSION);
+
+                assert serialized != null;
             }
-        });
+        }
+
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            int verFromUpdate = tblCfg.schemaId();
 
-            UUID tblId = tblCfg.id().value();
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
+
+            if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) 
!= null) {
+                return completedFuture(null);
+            }
+
+            byte[] curSchemaDesc = schemaById(tblId, verFromUpdate);

Review Comment:
   why do we still trying to search schema in MS?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to