zstan commented on code in PR #1665:
URL: https://github.com/apache/ignite-3/pull/1665#discussion_r1108059926


##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -117,119 +120,105 @@ private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<Named
         try {
             ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
 
-            int verFromUpdate = tblCfg.schemaId();
+            int newSchemaVersion = tblCfg.schemaId();
 
             UUID tblId = tblCfg.id();
 
-            String tableName = tblCfg.name();
-
-            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
-
-            if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) 
!= null) {
+            if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
                 return completedFuture(null);
             }
 
-            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
-                SchemaDescriptor oldSchema = searchSchemaByVersion(tblId, 
verFromUpdate - 1);
-
-                if (oldSchema == null) {
-                    byte[] serPrevSchema = schemaByVersion(tblId, 
verFromUpdate - 1);
-
-                    assert serPrevSchema != null;
-
-                    oldSchema = 
SchemaSerializerImpl.INSTANCE.deserialize(serPrevSchema);
-                }
+            SchemaDescriptor newSchema = 
SchemaUtils.prepareSchemaDescriptor(newSchemaVersion, tblCfg);
 
-                
schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(oldSchema, 
schemaDescFromUpdate));
+            // This is intentionally a blocking call to enforce configuration 
listener execution order. Unfortunately it is not possible
+            // to execute this method asynchronously, because there is a race 
between this listener and completion of associated
+            // VersionedValues.
+            try {
+                setColumnMapping(newSchema, tblId);
+            } catch (ExecutionException | InterruptedException e) {
+                return failedFuture(e);
             }
 
             long causalityToken = ctx.storageRevision();
 
-            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
-
-            try {
-                final ByteArray key = schemaWithVerHistKey(tblId, 
verFromUpdate);
-
-                createSchemaFut.thenCompose(t -> metastorageMgr.invoke(
-                        Conditions.notExists(key),
-                        Operations.put(key, 
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate)),
-                        Operations.noop()));
-            } catch (Throwable th) {
-                createSchemaFut.completeExceptionally(th);
-            }
-
-            createSchemaFut.whenComplete((ignore, th) -> {
-                if (th == null) {
-                    registriesVv.get(causalityToken).thenRun(() -> 
inBusyLock(busyLock,
-                            () -> fireEvent(SchemaEvent.CREATE, new 
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+            // Fire event early, because dependent listeners have to register 
VersionedValues' update futures
+            var eventParams = new SchemaEventParameters(causalityToken, tblId, 
newSchema);
+
+            fireEvent(SchemaEvent.CREATE, eventParams)
+                    .whenComplete((v, e) -> {
+                        if (e != null) {
+                            LOGGER.warn("Error when processing CREATE event", 
e);
+                        }
+                    });
+
+            return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
+                if (e != null) {
+                    return failedFuture(new 
IgniteInternalException(IgniteStringFormatter.format(
+                            "Cannot create a schema for the table [tblId={}, 
ver={}]", tblId, newSchemaVersion), e)
+                    );
                 }
-            });
 
-            return createSchemaFut;
+                return registerSchema(registries, tblId, tblCfg.name(), 
newSchema);
+            }));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    /**
-     * Create new schema locally.
-     *
-     * @param causalityToken Causality token.
-     * @param tableId Table id.
-     * @param tableName Table name.
-     * @param schemaDescriptor Schema descriptor.
-     * @return Create schema future.
-     */
-    private CompletableFuture<?> createSchema(long causalityToken, UUID 
tableId, String tableName, SchemaDescriptor schemaDescriptor) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+    private void setColumnMapping(SchemaDescriptor schema, UUID tableId) 
throws ExecutionException, InterruptedException {
+        if (schema.version() == INITIAL_SCHEMA_VERSION) {
+            return;
         }
 
-        try {
-            return createSchemaInternal(causalityToken, tableId, tableName, 
schemaDescriptor);
-        } finally {
-            busyLock.leaveBusy();
+        int prevVersion = schema.version() - 1;
+
+        SchemaDescriptor prevSchema = searchSchemaByVersion(tableId, 
prevVersion);
+
+        if (prevSchema == null) {
+            // This is intentionally a blocking call, because this method is 
used in a synchronous part of the configuration listener.
+            // See the call site for more details.
+            prevSchema = schemaByVersion(tableId, prevVersion).get();

Review Comment:
   who will cache in such a case ? probably i miss something but if we obtain 
descriptor from metastorage in next time we will call the metastorage too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to