zstan commented on code in PR #1665:
URL: https://github.com/apache/ignite-3/pull/1665#discussion_r1108059926
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -117,119 +120,105 @@ private CompletableFuture<?>
onSchemaChange(ConfigurationNotificationEvent<Named
try {
ExtendedTableView tblCfg = (ExtendedTableView)
ctx.config(ExtendedTableConfiguration.class).value();
- int verFromUpdate = tblCfg.schemaId();
+ int newSchemaVersion = tblCfg.schemaId();
UUID tblId = tblCfg.id();
- String tableName = tblCfg.name();
-
- SchemaDescriptor schemaDescFromUpdate =
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
-
- if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version())
!= null) {
+ if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
return completedFuture(null);
}
- if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
- SchemaDescriptor oldSchema = searchSchemaByVersion(tblId,
verFromUpdate - 1);
-
- if (oldSchema == null) {
- byte[] serPrevSchema = schemaByVersion(tblId,
verFromUpdate - 1);
-
- assert serPrevSchema != null;
-
- oldSchema =
SchemaSerializerImpl.INSTANCE.deserialize(serPrevSchema);
- }
+ SchemaDescriptor newSchema =
SchemaUtils.prepareSchemaDescriptor(newSchemaVersion, tblCfg);
-
schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(oldSchema,
schemaDescFromUpdate));
+ // This is intentionally a blocking call to enforce configuration
listener execution order. Unfortunately it is not possible
+ // to execute this method asynchronously, because there is a race
between this listener and completion of associated
+ // VersionedValues.
+ try {
+ setColumnMapping(newSchema, tblId);
+ } catch (ExecutionException | InterruptedException e) {
+ return failedFuture(e);
}
long causalityToken = ctx.storageRevision();
- CompletableFuture<?> createSchemaFut =
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
-
- try {
- final ByteArray key = schemaWithVerHistKey(tblId,
verFromUpdate);
-
- createSchemaFut.thenCompose(t -> metastorageMgr.invoke(
- Conditions.notExists(key),
- Operations.put(key,
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate)),
- Operations.noop()));
- } catch (Throwable th) {
- createSchemaFut.completeExceptionally(th);
- }
-
- createSchemaFut.whenComplete((ignore, th) -> {
- if (th == null) {
- registriesVv.get(causalityToken).thenRun(() ->
inBusyLock(busyLock,
- () -> fireEvent(SchemaEvent.CREATE, new
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+ // Fire event early, because dependent listeners have to register
VersionedValues' update futures
+ var eventParams = new SchemaEventParameters(causalityToken, tblId,
newSchema);
+
+ fireEvent(SchemaEvent.CREATE, eventParams)
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOGGER.warn("Error when processing CREATE event",
e);
+ }
+ });
+
+ return registriesVv.update(causalityToken, (registries, e) ->
inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(new
IgniteInternalException(IgniteStringFormatter.format(
+ "Cannot create a schema for the table [tblId={},
ver={}]", tblId, newSchemaVersion), e)
+ );
}
- });
- return createSchemaFut;
+ return registerSchema(registries, tblId, tblCfg.name(),
newSchema);
+ }));
} finally {
busyLock.leaveBusy();
}
}
- /**
- * Create new schema locally.
- *
- * @param causalityToken Causality token.
- * @param tableId Table id.
- * @param tableName Table name.
- * @param schemaDescriptor Schema descriptor.
- * @return Create schema future.
- */
- private CompletableFuture<?> createSchema(long causalityToken, UUID
tableId, String tableName, SchemaDescriptor schemaDescriptor) {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ private void setColumnMapping(SchemaDescriptor schema, UUID tableId)
throws ExecutionException, InterruptedException {
+ if (schema.version() == INITIAL_SCHEMA_VERSION) {
+ return;
}
- try {
- return createSchemaInternal(causalityToken, tableId, tableName,
schemaDescriptor);
- } finally {
- busyLock.leaveBusy();
+ int prevVersion = schema.version() - 1;
+
+ SchemaDescriptor prevSchema = searchSchemaByVersion(tableId,
prevVersion);
+
+ if (prevSchema == null) {
+ // This is intentionally a blocking call, because this method is
used in a synchronous part of the configuration listener.
+ // See the call site for more details.
+ prevSchema = schemaByVersion(tableId, prevVersion).get();
Review Comment:
who will cache in such a case ? probably i miss something but if we obtain
descriptor from metastorage in next time we will call the metastorage too ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]