zstan commented on code in PR #1134:
URL: https://github.com/apache/ignite-3/pull/1134#discussion_r994532013


##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +80,115 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> 
registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).schemas().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
+        for (String tblName : tablesCfg.tables().value().namedListKeys()) {
+            ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) 
tablesCfg.tables().get(tblName));
+            UUID tblId = tblCfg.id().value();
+
+            Map<Integer, byte[]> schemas = collectAllSchemas(tblId);
+
+            byte[] serialized;
+
+            if (schemas.size() > 1) {
+                for (Map.Entry<Integer, byte[]> ent : schemas.entrySet()) {
+                    serialized = ent.getValue();
+
+                    SchemaDescriptor desc = 
SchemaSerializerImpl.INSTANCE.deserialize(serialized);
+
+                    createSchema(0, tblId, tblName, desc).join();
+                }
+
+                registriesVv.complete(0);
+            } else {
+                serialized = schemas.get(INITIAL_SCHEMA_VERSION);
+
+                assert serialized != null;
             }
-        });
+        }
+
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> 
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
 
-            ExtendedTableConfiguration tblCfg = 
schemasCtx.config(ExtendedTableConfiguration.class);
+            int verFromUpdate = tblCfg.schemaId();
 
-            UUID tblId = tblCfg.id().value();
+            UUID tblId = tblCfg.id();
+
+            String tableName = tblCfg.name();
+
+            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
+
+            if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) 
!= null) {
+                return completedFuture(null);
+            }
+
+            byte[] curSchemaDesc = schemaById(tblId, verFromUpdate);
+
+            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+                SchemaDescriptor oldSchema = searchSchemaByVersion(tblId, 
verFromUpdate - 1);
+                assert oldSchema != null;
 
-            String tableName = tblCfg.name().value();
+                NamedListView<ColumnView> oldCols = ctx.oldValue();
+                NamedListView<ColumnView> newCols = ctx.newValue();
 
-            SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+                schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+                        oldSchema,
+                        oldCols,
+                        schemaDescFromUpdate,
+                        newCols));
+            }
+
+            long causalityToken = ctx.storageRevision();
+
+            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
+
+            if (curSchemaDesc == null) {
+                try {
+                    byte[] serialized = 
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate);
 
-            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+                    createSchemaFut.thenCompose(t -> metastorageMgr.put(

Review Comment:
   listener is called on each node, i have some questions :
   1. there is no cas at all, do you mean getAndPut with further assertion ? 
   2. seems we can`t use cas here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to